使用EMR-Kafka Connect进行数据迁移

简介: 本文介绍使用EMR Kafka Connect的REST API接口在Kafka集群间进行数据迁移,使用distributed模式。

作者:
云魄,阿里云E-MapReduce 高级开发工程师,专注于流式计算,Spark Contributor


1.背景

流式处理中经常会遇到Kafka与其他系统进行数据同步或者Kafka集群间数据迁移的情景。使用EMR Kafka Connect可以方便快速的实现数据同步或者数据迁移。

Kafka Connect是一种可扩展的、可靠的,用于在Kafka和其他系统之间快速地进行流式数据传输的工具。例如可以使用Kafka Connect获取数据库的binglog数据,将数据库的数据迁入Kafka集群,以同步数据库的数据,或者对接下游的流式处理系统。同时,Kafka Connect提供的REST API接口可以方便的进行Kafka Connect的创建和管理。
Kafka Connect分为standalone和distributed两种运行模式。standalone模式下,所有的worker都在一个进程中运行;相比之下,distributed模式更具扩展性和容错性,是最常用的方式,也是生产环境推荐使用的模式。

本文介绍使用EMR Kafka Connect的REST API接口在Kafka集群间进行数据迁移,使用distributed模式。

2.环境准备

创建两个EMR集群,集群类型为Kafka。EMR Kafka Connect安装在task节点上,进行数据迁移的目的Kafka集群需要创建task节点。集群创建好后,task节点上EMR Kafka Connect服务会默认启动,端口号为8083。

注意要保证两个集群的网路互通,详细的创建流程见创建集群

3.数据迁移


3.1准备工作


EMR Kafka Connect的配置文件路径为/etc/ecm/kafka-conf/connect-distributed.properties。

在源Kafka集群创建需要同步的topic,例如

image

另外,Kafka Connect会将offsets, configs和任务状态保存在topic中,topic名对应配置文件中的offset.storage.topic、config.storage.topic 和status.storage.topic三个配置项。默认的,Kafka Connect会自动的使用默认的partition和replication factor创建这三个topic。

3.2创建Kafka Connect


在目的Kafka集群的task节点(例如emr-worker-3节点),使用curl命令通过json数据创建一个Kafka Connect。

json数据中,name字段代表创建的connect的名称,此处为connect-test;config字段需要根据实际情况进行配置,其中的变量说明如下表

image

3.3查看Kafka Connect


查看所有的Kafka Connect

image

查看创建的connect-test的状态

image

查看task的信息

image

3.4数据同步


在源Kafka集群创建需要同步的数据。

image

3.5查看同步结果


在目的Kafka集群消费同步的数据。

image

可以看到,在源Kafka集群发送的100000条数据已经迁移到了目的Kafka集群。

4.小结

本文介绍并演示了使用EMR kafka Connect在Kafka集群间进行数据迁移的方法,关于Kafka Connect更详细的使用请参考Kafka官网资料REST API使用

_

相关文章
|
消息中间件 存储 NoSQL
一文读懂Kafka Connect核心概念
Kafka Connect 是一种用于在 Apache Kafka 和其他系统之间可扩展且可靠地流式传输数据的工具。 它使快速定义将大量数据移入和移出 Kafka 的连接器变得简单。 Kafka Connect 可以摄取整个数据库或从所有应用程序服务器收集指标到 Kafka 主题中,使数据可用于低延迟的流处理。 导出作业可以将数据从 Kafka 主题传送到二级存储和查询系统或批处理系统进行离线分析。
|
4月前
|
消息中间件 关系型数据库 MySQL
在kafka connect 同步 mysql 主从数据库
在kafka connect 同步 mysql 主从数据库
43 0
|
4月前
|
消息中间件 Kafka Windows
使用Kafka Connect 导入导出数据
使用Kafka Connect 导入导出数据
67 0
|
4月前
|
消息中间件 关系型数据库 MySQL
Kafka Connect :构建强大分布式数据集成方案
Kafka Connect 是 Apache Kafka 生态系统中的关键组件,专为构建可靠、高效的分布式数据集成解决方案而设计。本文将深入探讨 Kafka Connect 的核心架构、使用方法以及如何通过丰富的示例代码解决实际的数据集成挑战。
|
6月前
|
消息中间件 存储 Kafka
Apache Kafka - 构建数据管道 Kafka Connect
Apache Kafka - 构建数据管道 Kafka Connect
97 0
|
6月前
|
消息中间件 JSON 关系型数据库
[实战系列]SelectDB Cloud Kafka Connect 最佳实践张家锋
[实战系列]SelectDB Cloud Kafka Connect 最佳实践张家锋
92 1
|
10月前
|
消息中间件 JSON 监控
实时数据同步与共享:使用Apache Kafka Connect
在现代应用程序开发中,实时数据同步和共享变得越来越重要。而Apache Kafka Connect作为一个可靠的、分布式的数据集成工具,为我们提供了一种简单而强大的方式来实现实时数据的传输和共享。
554 0
|
消息中间件 JSON Kubernetes
云原生系列五:Kafka 集群数据迁移基于Kubernetes的内部
​ 1.概述 Kafka的使用场景非常广泛,一些实时流数据业务场景,均依赖Kafka来做数据分流。而在分布式应用场景中,数据迁移是一个比较常见的问题。关于Kafka集群数据如何迁移,今天叶秋学长将为大家详细介绍。 2.内容 本篇博客为大家介绍两种迁移场景,分别是同集群数据迁移、跨集群数据迁移。如下图所示:  2.1 同集群迁移 同集群之间数据迁移,比如在已有的集群中新增了一个Broker节点,此时需要将原来集群中已有的Topic的数据迁移部分到新的集群中,缓解集群压力。 将新的节点添加到Kafka集群很简单,只需为它们分配一个唯一的Broker ID,并在新服务器上启动Kafka。
192 0
云原生系列五:Kafka 集群数据迁移基于Kubernetes的内部
|
消息中间件 JSON Java
替代Flume——Kafka Connect简介
替代Flume——Kafka Connect简介
362 0
替代Flume——Kafka Connect简介
|
消息中间件 存储 弹性计算
基于Kafka connect+函数计算的轻量计算解决方案
Kafka ETL基于kafka connect加函数计算,为云上用户提供了一套数据流转加数据计算的一站式解决方案。
663 0