Dag设定方法

自定义参数方法 #

-- 当前时间
report_time = datetime.datetime.now()
-- 当前时间,往后推一天, 也可以小时(hours), 分钟(minutes), 秒(seconds)...
report_time = datetime.datetime.now()- datetime.timedelta(days=1)
-- 获取上个月最后一天
report_time = datetime.datetime.now().replace(day=1) - datetime.timedelta(days=1)
-- 格式化字符串时间:(%Y-%m-%d %H:%M:%S):  2015-03-08 23:30:42
ZYMD= report_time.strftime('%Y%m%d')
-- 也可以采用格式化参数f"xxxxxx",变量用{}包裹
PARAM=f"where create_time >= '{ZYMD}' "

设定方法 #

-- 每一个JOB为一个节点,填写驱动名称和要执行的任务名
-- 任务名不能使用数字开头如00xxx, 不可以使用非法字符, 不可使用符号如(.)
#starrocks_sql  sql_filename
-- 参数和备注
#impala_sql  sql_filename ZYMD,MSG    -- 我是备注

-- DAG的设定方法
JOB写完在下方输入////, 4个以上/分隔为DAG设定
后可输入如: sql_filename1 >> sql_filename2 >> sql_filename3  为一个分支

完整样列可参考 完整样列

目前支持的驱动 #

驱动名 说明 使用说明
datax 执行datax的抽取 模板会自动带出
kafkastarrocks 使用Streamload方法消费kafka到starrocks 使用方法参考下文
apistarrocks api接入到starrocks 使用方法
routinestarrocks 监控routineload方法消费kafka到starrocks #routinestarrocks 任务名 标签名 不预警[可选], 标签名带库:db.label
starrocks_sql 使用starrocks执行sql
flinkcdc 实时导入数据,支持自动建表
hive_sql 使用hive执行sql refresh table语句时会自动调用impala来执行
impala_sql 使用impala执行sql
oracle_sql 执行oracle sql
gp_sql 执行greenplum或postgreSql sql
mssql_sql 执行sqlserver sql
myql_sql 执行mysql sql
sp 使用默认的数据库执行存储过程
py 执行python脚本
diy 自定义任务 使用方法参考下文
dataset 实现对任意数据库的查询与校验 需要smartchart的支持,使用方法参考下文
refresh_quality 刷新数据质量 #refresh_quality jobname 项目名
refresh_smc 刷新smartchart仪表盘 #refresh_smc jobname 报表ID或报表名
link DAG依赖 #link DAG的名字 sleeptime[可选] maxtime[可选],使用方法参考下文
branch 实现基于自定义的条件function执行不同的分支 #branch jobname 函数名(具体方法参考下文) 注意: jobname不可和函数名同名
trigger 触放目标dag执行 #trigger 任务名称 目标DAG名称
validate 使用默认的数据库执行select sql来做数据校验 如果select无返回值, 则认为校验失败, 并返回查询的结果,使用方法#validate sql_filename
refresh_tableau 刷新tableau数据源 #refresh_tableau jobname sourceID, sourceID是tableau对的需要刷新的数据源ID
ktr 执行kettle的transform
kjb 执行kettle的job
ktr_r 执行远程kettle的transform
kjb_r 执行远程kettle的job
dataxx 执行datax的抽取 采用原生配置方式
sqoop 使用sqoop抽取到大数据平台 使用方法参考不常用方法
hdfsstarrocks 使用brokerload方法到starrocks 使用方法参考不常用方法
sap 调用sap的rfc功能抽取数据到表 使用方法参考不常用方法
hdfs 转移到大数据平台执行ktr 将自动把生成的文本文件导入到hive的stage表, 表名需要与ktr名称一样
send_mail 发送邮件 #send_mail reportmail msg maillist 使用方法

dataset实现对任意数据库的查询与校验 #

使用方法: #dataset jobname id remark[可选] maillist[可选] 
remark参数说明: 
      不填写 - 默认无查询结果抛出错误
      info - 只打印查询结果,
	  e1 - 如果有返回结果抛出错误, 
      e2 - 邮件通知查询结果(可以用于需要定时邮件收到数据的需求)
	  e3 - 无查询结果时,每15分钟会重试,如2小时还无结果抛出错误,
	       可用于轮询业务系统, 等待业务系统数据ready后再抽取
			  
id参数: 可以是数据集ID也可以填数据集的名称, 
maillist: 格式 xxx@xxxx,xxx@xxx

你也可以采用高级自定义使用
get_dataset(id, param=None):传入数据集ID, 可传参如{"p1":"xxx"}
返回值: {"result":"success","data":[[]]}
使用方法 #link DAG的名字  sleeptime[可选]  maxtime[可选]
    #link DAG_A  --如DAG_A Fail或进行中,此节点后的流程不会再执行
    #link DAG_A  60  3600  --如DAG_A还在进行中, 每60秒检查一次
	   如有完成则继续任务, 重试达到3600秒后, 还未完成, 则报错

kafkastarrocks 使用Stream load方法消费kafka到starrocks #

##kafkaConn=xxx  --连接器名
##topic=Test1       --kafka topic名称
##table=test.loadcsv    --目标表名

--------- 通用参数[可选] ----------
##skipError=1   -- 是否跳过错误
##max_filter_ratio=0.2 --容错率
##columns=msg_seq,kafka_time,create_time=unix_timestamp()  --指定字段

--------- 如果kafka是Json格式[可选] ----------
##format=json
##jsonpaths= a,b
##json_root: list   --json 根节点名

-------  如果是kafka是类csv[可选]  ------
##column_separator=,   -- 指定分隔为逗号, starrocks默认使用\t

------- 只在运维时使用 ------
##offsets=-996 -- 指定从最后一个offset开始消费
也可指定初始offset, 也可使用{"0":"123", "1":"99"}来指定不同partition的offset

branch[分支判断]/trigger[DAG触发] #

'''
编写自定义函数名为:任务名 ,基于条件自定义返回的分支进行后续执行,未返回的分支将不会执行
'''
def inv_validate():
    dataset = get_dataset(407)
    if len(dataset['data']) > 1:
        return 'task1'
    return 'task2'

#impala_sql task1
#trigger  task2  targetdagname
#branch  branch_job  inv_validate

/////////////////////////

branch_job  >> [task1, task2]

diy 自定义任务 #

-- 自定义你的执行函数
def refresh_XX():
    import requests
    response=requests.get('',verify=False).json()
    if response['status']!=200:
        raise Exception(str(response))

#diy refresh_job  refresh_XX  -- 注意: jobname不可与函数名同名

更多自定义任务请参考自定义任务章节

其它高级方法 #

任务触发方式 #

  • all_success: 所有上游成功
  • all_failed: 所有上游任务失败
  • all_done: 所有上游任务完成
  • one_failed: 只要一个任务失败
  • one_success:只有一个任成功
  • none_failed: 上游任务没有失败
  • dummy: 无依赖

样列

#datax task1
#datax task2
/////
dag.trigger_rule='all_success'   #可全局指定
task2.trigger_rule = 'all_done'  #指定单个任务

高级用法, 分配到不同worker执行 #

指定dag的queue即可, worker的启动方式

airflow celery worker -q bg
#ktr abc
////////
abc.queue = 'bg'

更多高级自定义 #

'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'queue': 'bash_queue',
'pool': 'backfill',
'priority_weight': 10,
'end_date': datetime(2016, 1, 1),
'wait_for_downstream': False,
'sla': timedelta(hours=2),
'execution_timeout': timedelta(seconds=300),
'on_failure_callback': some_function,
'on_success_callback': some_other_function,
'on_retry_callback': another_function,
'sla_miss_callback': yet_another_function,
'trigger_rule': 'all_success'