自定义循环抽取

简介 #

当数据库中存在按时间后缀命名的分表(如 order_202001order_202002),需要汇总到同一目标表时,可使用自定义循环抽取功能。通过 #diy 驱动结合 run_datax() 函数,实现灵活的多表循环同步。

实现步骤速查 #

步骤 操作
1 创建 DataX 抽取任务,配置参数占位符
2 创建 DIY 任务,编写循环逻辑
3 在 DAG 中编排任务依赖

使用方法 #

smartpip中已有datax组件, 但只能进行单一表格抽取, 不能增加逻辑 所以我们需要使用到smartpip的diy组件功能 首先我们需要新一个datax抽取任务,在这个任务中我们传递参数ZYM, 比如:

#datax job1  ZYM

之后再新一建一个diy任务, 实现循环抽取:

ZYM = '202001'
def fun_job2():
    job = os.path.join(ETL_FILE_PATH , '项目名/job1.sql')
    report_time = datetime.date(2022,8,1)
    for i in range(600):
        zym = report_time.strftime('%Y%m')
        print(zym)
        if zym < '202001':
            break
        para_dict = {"ZYM": zym}
        run_datax(job, para_dict)
        report_time = (report_time - datetime.timedelta(days=1)).replace(day=1)
		
#diy  job2  fun_job2