自定义参数方法 #
-- 当前时间
report_time = datetime.datetime.now()
-- 当前时间,往后推一天, 也可以小时(hours), 分钟(minutes), 秒(seconds)...
report_time = datetime.datetime.now()- datetime.timedelta(days=1)
-- 获取上个月最后一天
report_time = datetime.datetime.now().replace(day=1) - datetime.timedelta(days=1)
-- 格式化字符串时间:(%Y-%m-%d %H:%M:%S): 2015-03-08 23:30:42
ZYMD= report_time.strftime('%Y%m%d')
-- 也可以采用格式化参数f"xxxxxx",变量用{}包裹
PARAM=f"where create_time >= '{ZYMD}' "
设定方法 #
-- 每一个JOB为一个节点,填写驱动名称和要执行的任务名
-- 任务名不能使用数字开头如00xxx, 不可以使用非法字符, 不可使用符号如(.)
#starrocks_sql sql_filename
-- 参数和备注
#impala_sql sql_filename ZYMD,MSG -- 我是备注
-- DAG的设定方法
JOB写完在下方输入////, 4个以上/分隔为DAG设定
后可输入如: sql_filename1 >> sql_filename2 >> sql_filename3 为一个分支
完整样列可参考 完整样列
目前支持的驱动 #
驱动名 | 说明 | 使用说明 |
---|---|---|
datax | 执行datax的抽取 | 模板会自动带出 |
kafkastarrocks | 使用Streamload方法消费kafka到starrocks | 使用方法参考下文 |
apistarrocks | api接入到starrocks | 使用方法 |
routinestarrocks | 监控routineload方法消费kafka到starrocks | #routinestarrocks 任务名 标签名 不预警[可选], 标签名带库:db.label |
starrocks_sql | 使用starrocks执行sql | |
flinkcdc | 实时导入数据,支持自动建表 | |
hive_sql | 使用hive执行sql | refresh table语句时会自动调用impala来执行 |
impala_sql | 使用impala执行sql | |
oracle_sql | 执行oracle sql | |
gp_sql | 执行greenplum或postgreSql sql | |
mssql_sql | 执行sqlserver sql | |
myql_sql | 执行mysql sql | |
sp | 使用默认的数据库执行存储过程 | |
py | 执行python脚本 | |
diy | 自定义任务 | 使用方法参考下文 |
dataset | 实现对任意数据库的查询与校验 | 需要smartchart的支持,使用方法参考下文 |
refresh_quality | 刷新数据质量 | #refresh_quality jobname 项目名 |
refresh_smc | 刷新smartchart仪表盘 | #refresh_smc jobname 报表ID或报表名 |
link | DAG依赖 | #link DAG的名字 sleeptime[可选] maxtime[可选],使用方法参考下文 |
branch | 实现基于自定义的条件function执行不同的分支 | #branch jobname 函数名(具体方法参考下文) 注意: jobname不可和函数名同名 |
trigger | 触放目标dag执行 | #trigger 任务名称 目标DAG名称 |
validate | 使用默认的数据库执行select sql来做数据校验 | 如果select无返回值, 则认为校验失败, 并返回查询的结果,使用方法#validate sql_filename |
refresh_tableau | 刷新tableau数据源 | #refresh_tableau jobname sourceID, sourceID是tableau对的需要刷新的数据源ID |
ktr | 执行kettle的transform | |
kjb | 执行kettle的job | |
ktr_r | 执行远程kettle的transform | |
kjb_r | 执行远程kettle的job | |
dataxx | 执行datax的抽取 | 采用原生配置方式 |
sqoop | 使用sqoop抽取到大数据平台 | 使用方法参考不常用方法 |
hdfsstarrocks | 使用brokerload方法到starrocks | 使用方法参考不常用方法 |
sap | 调用sap的rfc功能抽取数据到表 | 使用方法参考不常用方法 |
hdfs | 转移到大数据平台执行ktr | 将自动把生成的文本文件导入到hive的stage表, 表名需要与ktr名称一样 |
send_mail | 发送邮件 | #send_mail reportmail msg maillist 使用方法 |
dataset实现对任意数据库的查询与校验 #
使用方法: #dataset jobname id remark[可选] maillist[可选]
remark参数说明:
不填写 - 默认无查询结果抛出错误
info - 只打印查询结果,
e1 - 如果有返回结果抛出错误,
e2 - 邮件通知查询结果(可以用于需要定时邮件收到数据的需求)
e3 - 无查询结果时,每15分钟会重试,如2小时还无结果抛出错误,
可用于轮询业务系统, 等待业务系统数据ready后再抽取
id参数: 可以是数据集ID也可以填数据集的名称,
maillist: 格式 xxx@xxxx,xxx@xxx
你也可以采用高级自定义,使用
get_dataset(id, param=None):传入数据集ID, 可传参如{"p1":"xxx"}
返回值: {"result":"success","data":[[]]}
link DAG依赖 #
使用方法 #link DAG的名字 sleeptime[可选] maxtime[可选]
#link DAG_A --如DAG_A Fail或进行中,此节点后的流程不会再执行
#link DAG_A 60 3600 --如DAG_A还在进行中, 每60秒检查一次
如有完成则继续任务, 重试达到3600秒后, 还未完成, 则报错
kafkastarrocks 使用Stream load方法消费kafka到starrocks #
##kafkaConn=xxx --连接器名
##topic=Test1 --kafka topic名称
##table=test.loadcsv --目标表名
--------- 通用参数[可选] ----------
##skipError=1 -- 是否跳过错误
##max_filter_ratio=0.2 --容错率
##columns=msg_seq,kafka_time,create_time=unix_timestamp() --指定字段
--------- 如果kafka是Json格式[可选] ----------
##format=json
##jsonpaths= a,b
##json_root: list --json 根节点名
------- 如果是kafka是类csv[可选] ------
##column_separator=, -- 指定分隔为逗号, starrocks默认使用\t
------- 只在运维时使用 ------
##offsets=-996 -- 指定从最后一个offset开始消费
也可指定初始offset, 也可使用{"0":"123", "1":"99"}来指定不同partition的offset
branch[分支判断]/trigger[DAG触发] #
'''
编写自定义函数名为:任务名 ,基于条件自定义返回的分支进行后续执行,未返回的分支将不会执行
'''
def inv_validate():
dataset = get_dataset(407)
if len(dataset['data']) > 1:
return 'task1'
return 'task2'
#impala_sql task1
#trigger task2 targetdagname
#branch branch_job inv_validate
/////////////////////////
branch_job >> [task1, task2]
diy 自定义任务 #
-- 自定义你的执行函数
def refresh_XX():
import requests
response=requests.get('',verify=False).json()
if response['status']!=200:
raise Exception(str(response))
#diy refresh_job refresh_XX -- 注意: jobname不可与函数名同名
更多自定义任务请参考自定义任务章节
其它高级方法 #
任务触发方式 #
- all_success: 所有上游成功
- all_failed: 所有上游任务失败
- all_done: 所有上游任务完成
- one_failed: 只要一个任务失败
- one_success:只有一个任成功
- none_failed: 上游任务没有失败
- dummy: 无依赖
样列
#datax task1
#datax task2
/////
dag.trigger_rule='all_success' #可全局指定
task2.trigger_rule = 'all_done' #指定单个任务
高级用法, 分配到不同worker执行 #
指定dag的queue即可, worker的启动方式
airflow celery worker -q bg
#ktr abc
////////
abc.queue = 'bg'
更多高级自定义 #
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'queue': 'bash_queue',
'pool': 'backfill',
'priority_weight': 10,
'end_date': datetime(2016, 1, 1),
'wait_for_downstream': False,
'sla': timedelta(hours=2),
'execution_timeout': timedelta(seconds=300),
'on_failure_callback': some_function,
'on_success_callback': some_other_function,
'on_retry_callback': another_function,
'sla_miss_callback': yet_another_function,
'trigger_rule': 'all_success'