自定义参数方法 #
-- 当前时间
report_time = datetime.datetime.now()
-- 当前时间,往后推一天, 也可以小时(hours), 分钟(minutes), 秒(seconds)...
report_time = datetime.datetime.now()- datetime.timedelta(days=1)
-- 获取上个月最后一天
report_time = datetime.datetime.now().replace(day=1) - datetime.timedelta(days=1)
-- 格式化字符串时间:(%Y-%m-%d %H:%M:%S):  2015-03-08 23:30:42
ZYMD= report_time.strftime('%Y%m%d')
-- 也可以采用格式化参数f"xxxxxx",变量用{}包裹
PARAM=f"where create_time >= '{ZYMD}' "
设定方法 #
-- 每一个JOB为一个节点,填写驱动名称和要执行的任务名
-- 任务名不能使用数字开头如00xxx, 不可以使用非法字符, 不可使用符号如(.)
#starrocks_sql  sql_filename
-- 参数和备注
#impala_sql  sql_filename ZYMD,MSG    -- 我是备注
-- DAG的设定方法
JOB写完在下方输入////, 4个以上/分隔为DAG设定
后可输入如: sql_filename1 >> sql_filename2 >> sql_filename3  为一个分支
完整样列可参考 完整样列
目前支持的驱动 #
| 驱动名 | 说明 | 使用说明 | 
|---|---|---|
| datax | 执行datax的抽取 | 模板会自动带出 | 
| kafkastarrocks | 使用Streamload方法消费kafka到starrocks | 使用方法参考下文 | 
| apistarrocks | api接入到starrocks | 使用方法 | 
| routinestarrocks | 监控routineload方法消费kafka到starrocks | #routinestarrocks 任务名 标签名 不预警[可选], 标签名带库:db.label | 
| starrocks_sql | 使用starrocks执行sql | |
| flinkcdc | 实时导入数据,支持自动建表 | |
| hive_sql | 使用hive执行sql | refresh table语句时会自动调用impala来执行 | 
| impala_sql | 使用impala执行sql | |
| oracle_sql | 执行oracle sql | |
| gp_sql | 执行greenplum或postgreSql sql | |
| mssql_sql | 执行sqlserver sql | |
| myql_sql | 执行mysql sql | |
| sp | 使用默认的数据库执行存储过程 | |
| py | 执行python脚本 | |
| diy | 自定义任务 | 使用方法参考下文 | 
| dataset | 实现对任意数据库的查询与校验 | 需要smartchart的支持,使用方法参考下文 | 
| refresh_quality | 刷新数据质量 | #refresh_quality jobname 项目名 | 
| refresh_smc | 刷新smartchart仪表盘 | #refresh_smc jobname 报表ID或报表名 | 
| link | DAG依赖 | #link DAG的名字 sleeptime[可选] maxtime[可选],使用方法参考下文 | 
| branch | 实现基于自定义的条件function执行不同的分支 | #branch jobname 函数名(具体方法参考下文) 注意: jobname不可和函数名同名 | 
| trigger | 触放目标dag执行 | #trigger 任务名称 目标DAG名称 | 
| validate | 使用默认的数据库执行select sql来做数据校验 | 如果select无返回值, 则认为校验失败, 并返回查询的结果,使用方法#validate sql_filename | 
| refresh_tableau | 刷新tableau数据源 | #refresh_tableau jobname sourceID, sourceID是tableau对的需要刷新的数据源ID | 
| ktr | 执行kettle的transform | |
| kjb | 执行kettle的job | |
| ktr_r | 执行远程kettle的transform | |
| kjb_r | 执行远程kettle的job | |
| dataxx | 执行datax的抽取 | 采用原生配置方式 | 
| sqoop | 使用sqoop抽取到大数据平台 | 使用方法参考不常用方法 | 
| hdfsstarrocks | 使用brokerload方法到starrocks | 使用方法参考不常用方法 | 
| sap | 调用sap的rfc功能抽取数据到表 | 使用方法参考不常用方法 | 
| hdfs | 转移到大数据平台执行ktr | 将自动把生成的文本文件导入到hive的stage表, 表名需要与ktr名称一样 | 
| send_mail | 发送邮件 | #send_mail reportmail msg maillist 使用方法 | 
dataset实现对任意数据库的查询与校验 #
使用方法: #dataset jobname id remark[可选] maillist[可选] 
remark参数说明: 
      不填写 - 默认无查询结果抛出错误
      info - 只打印查询结果,
	  e1 - 如果有返回结果抛出错误, 
      e2 - 邮件通知查询结果(可以用于需要定时邮件收到数据的需求)
	  e3 - 无查询结果时,每15分钟会重试,如2小时还无结果抛出错误,
	       可用于轮询业务系统, 等待业务系统数据ready后再抽取
			  
id参数: 可以是数据集ID也可以填数据集的名称, 
maillist: 格式 xxx@xxxx,xxx@xxx
你也可以采用高级自定义,使用
get_dataset(id, param=None):传入数据集ID, 可传参如{"p1":"xxx"}
返回值: {"result":"success","data":[[]]}
link DAG依赖 #
使用方法 #link DAG的名字  sleeptime[可选]  maxtime[可选]
    #link DAG_A  --如DAG_A Fail或进行中,此节点后的流程不会再执行
    #link DAG_A  60  3600  --如DAG_A还在进行中, 每60秒检查一次
	   如有完成则继续任务, 重试达到3600秒后, 还未完成, 则报错
kafkastarrocks 使用Stream load方法消费kafka到starrocks #
##kafkaConn=xxx  --连接器名
##topic=Test1       --kafka topic名称
##table=test.loadcsv    --目标表名
--------- 通用参数[可选] ----------
##skipError=1   -- 是否跳过错误
##max_filter_ratio=0.2 --容错率
##columns=msg_seq,kafka_time,create_time=unix_timestamp()  --指定字段
--------- 如果kafka是Json格式[可选] ----------
##format=json
##jsonpaths= a,b
##json_root: list   --json 根节点名
-------  如果是kafka是类csv[可选]  ------
##column_separator=,   -- 指定分隔为逗号, starrocks默认使用\t
------- 只在运维时使用 ------
##offsets=-996 -- 指定从最后一个offset开始消费
也可指定初始offset, 也可使用{"0":"123", "1":"99"}来指定不同partition的offset
branch[分支判断]/trigger[DAG触发] #
'''
编写自定义函数名为:任务名 ,基于条件自定义返回的分支进行后续执行,未返回的分支将不会执行
'''
def inv_validate():
    dataset = get_dataset(407)
    if len(dataset['data']) > 1:
        return 'task1'
    return 'task2'
#impala_sql task1
#trigger  task2  targetdagname
#branch  branch_job  inv_validate
/////////////////////////
branch_job  >> [task1, task2]
diy 自定义任务 #
-- 自定义你的执行函数
def refresh_XX():
    import requests
    response=requests.get('',verify=False).json()
    if response['status']!=200:
        raise Exception(str(response))
#diy refresh_job  refresh_XX  -- 注意: jobname不可与函数名同名
更多自定义任务请参考自定义任务章节
其它高级方法 #
任务触发方式 #
- all_success: 所有上游成功
- all_failed: 所有上游任务失败
- all_done: 所有上游任务完成
- one_failed: 只要一个任务失败
- one_success:只有一个任成功
- none_failed: 上游任务没有失败
- dummy: 无依赖
样列
#datax task1
#datax task2
/////
dag.trigger_rule='all_success'   #可全局指定
task2.trigger_rule = 'all_done'  #指定单个任务
高级用法, 分配到不同worker执行 #
指定dag的queue即可, worker的启动方式
airflow celery worker -q bg
#ktr abc
////////
abc.queue = 'bg'
更多高级自定义 #
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'queue': 'bash_queue',
'pool': 'backfill',
'priority_weight': 10,
'end_date': datetime(2016, 1, 1),
'wait_for_downstream': False,
'sla': timedelta(hours=2),
'execution_timeout': timedelta(seconds=300),
'on_failure_callback': some_function,
'on_success_callback': some_other_function,
'on_retry_callback': another_function,
'sla_miss_callback': yet_another_function,
'trigger_rule': 'all_success'