在大数据ETL场景,将Kafka中的消息流转到其他下游服务是很常见的场景,除了常规的消息流转外,很多场景还需要基于消息体内容做判断,然后决定下游服务做何种操作。 该方案实现了通过Kafka中消息Key的内容来判断应该对MongoDB做增、删、改的哪种DML操作。 当Kafka收到消息后,会自动触发函数计算中的函数,接收到消息,对消息内容做判断,然后再操作MongoDB。用户可以对提供的默认函数代码做修改,来满足更复杂的逻辑。 整体方案通过CADT可以一键拉起依赖的产品,并完成了大多数的配置,用户只需要到函数计算和MongoDB控制台做少量配置即可。
mongodb_collection_name=os.environ['mongodb_collection_name']#构建完整的 MongoDB 连接地址 mongodb_endpoint_full='mongodb:/'+mongodb_user+':'+mongodb_password+'@'+mongodb_endpoint#初始化 MongoDB 连接客户端 mongodb_client=MongoClient(mongodb_endpoint_full)mongodb_db=mongodb_client[mongodb_db_name]...