一个简单的airflow dolphindb Operator的示例

dataz
2022-06-16

如下是一个简单的airflow+dolphindb operator的示例,使用airflow调度的话可以参考。初始版本约定:

1. 直接采用s.run调用dos脚本;

2. dos实现runDolScr函数;

3. 入参如果dos是双引号的需要手动声明为双引号;


后续优化:仿造spark的operaotor实现submit等逻辑;负载均衡提交到不同节点的逻辑;

- 定义

from airflow.models.baseoperator import BaseOperatorfrom typing import Any, Callable, Dict, Iterable, List, Optionalimport osimport dolphindb as ddbclass DolphinDBOperator(BaseOperator): template_fields = ["op_kwargs"] BLUE = '#ffefeb' ui_color = BLUE shallow_copy_attrs = ( 'op_kwargs' ) def __init__( self, dos_file: str, op_kwargs: Optional[Dict] = None, **kwargs, ) -> None: super().__init__(**kwargs) self.op_kwargs = op_kwargs or {} self.dos_filepath = dos_file def execute(self, context): print(self.op_kwargs) runScr="""runDolScr({})""".format(",".join(list(self.op_kwargs.values()))) with open(self.dos_filepath, 'r') as infile: scr_f = infile.readlines() script = "\n" for scr_i in scr_f: script = script + scr_i script = script + '\n'+runScr+"\n" print(script) s=ddb.session() s.connect("127.0.0.1", 8711, "admin", "xxxxxxxx") msg = s.run(script) print(msg) s.close() return msg

- 使用

from operators.dolphindb_operator import DolphinDBOperator# xxxxxx load_stg_data = DolphinDBOperator( task_id = "load_stg_data", dos_file = "/home/airflow/load_stg_comm_ctp.dos", op_kwargs={ "cDate":ddb_data_date, "data_file":"\"/xxx/{{next_ds_nodash}}/xele_{{next_ds_nodash}}.csv\"" } )