在大数据ETL场景,将Kafka中的消息流转到其他下游服务是很常见的场景,除了常规的消息流转外,很多场景还需要基于消息体内容做判断,然后决定下游服务做何种操作。 该方案实现了通过Kafka中消息Key的内容来判断应该对MongoDB做增、删、改的哪种DML操作。 当Kafka收到消息后,会自动触发函数计算中的函数,接收到消息,对消息内容做判断,然后再操作MongoDB。用户可以对提供的默认函数代码做修改,来满足更复杂的逻辑。 整体方案通过CADT可以一键拉起依赖的产品,并完成了大多数的配置,用户只需要到函数计算和MongoDB控制台做少量配置即可。
collection mongodb_collection=mongodb_db[mongodb_collection_name]def handler(event,context):evt=json.loads(event)logger=logging.getLogger()logger.info(evt)global mongodb_collection#如果是攒批消费,event 是会包含多条消息的,所以这里循环处理每条消息 for message in evt:#从 event 中获取消息的各个信息 ...