Kafka数据源

用于获取kafka指定分区的最后一条记录, 用于实时场景 使用方法参考"自定义数据源" 以下为参考代码:

def dataset(*args, **kwargs):
    """
    返回查询数据集
    :return: 二维数组或JSON字典
    """
    from kafka import  KafkaConsumer, TopicPartition
    import json

    sqlList = args[0]   # 数据集编辑界面的输入已按分号拆分成数组 [sql1, sql2...]
    config = args[1]    # 相关的配置字典{'host','port','user','password','db'}
    # 插入你的数据查询及处理代码, 生成result即可
    result = {}
    consumer = KafkaConsumer(sasl_mechanism='PLAIN',
                             security_protocol='SASL_PLAINTEXT',
                             sasl_plain_username=config['user'],
                             sasl_plain_password=config['password'],
                             bootstrap_servers=config['host'],
                             auto_offset_reset='earliest',
                             api_version=(1, 0, 0),
                             consumer_timeout_ms=50,
                             value_deserializer=lambda v: json.loads(v.decode('utf-8')),
                             )
    topic = sqlList[0]
    partition = int(config['db'])
    tp = TopicPartition(topic=topic, partition=partition)
    consumer.assign([tp])
    end_offsets = consumer.end_offsets([tp]).get(tp)  # 获取当前消费者最大偏移量
    consumer.seek(tp, offset=end_offsets-1)
    for message in consumer:
        result = message.value
        break
    return result

def insert_dataset(*args, **kwargs):
    """
    数据填报实现
    """
    from kafka import KafkaProducer
    import json

    contents = args[0]  # 传入的数据集二维数组格式
    table = args[1]   # 配置中的表名
    config = args[3]  # 相关的配置字典{'host','port','user','password','db'}
    # 插入你的写入数据逻辑代码
    producer = KafkaProducer(sasl_mechanism='PLAIN',
                             security_protocol='SASL_PLAINTEXT',
                             sasl_plain_username=config['user'],
                             sasl_plain_password=config['password'],
                             bootstrap_servers=config['host'],
                             value_serializer=lambda v: json.dumps(v).encode('utf-8')
                             )
    producer.send(table, value=contents, partition=int(config['db']))