袋鼠云研发手记 | 数栈DTinsight:详解FlinkX中的断点续传和实时采集

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 数栈-离线开发平台(BatchWorks) 中的数据离线同步任务、数栈-实时开发平台(StreamWorks)中的数据实时采集任务已经统一基于FlinkX来实现。

袋鼠云云原生一站式数据中台PaaS——数栈,覆盖了建设数据中心过程中所需要的各种工具(包括数据开发平台、数据资产平台、数据科学平台、数据服务引擎等),完整覆盖离线计算、实时计算应用,帮助企业极大地缩短数据价值的萃取过程,提高提炼数据价值的能力。

_

数栈架构图

目前,数栈-离线开发平台(BatchWorks) 中的数据离线同步任务、数栈-实时开发平台(StreamWorks)中的数据实时采集任务已经统一基于FlinkX来实现。数据的离线采集和实时采集基本的原理的是一样的,主要的不同之处是源头的流是否有界,所以统一用Flink的Stream API 来实现这两种数据同步场景,实现数据同步的批流统一。

1、功能介绍

断点续传

断点续传是指数据同步任务在运行过程中因各种原因导致任务失败,不需要重头同步数据,只需要从上次失败的位置继续同步即可,类似于下载文件时因网络原因失败,不需要重新下载文件,只需要继续下载就行,可以大大节省时间和计算资源。断点续传是数栈-离线开发平台(BatchWorks)里数据同步任务的一个功能,需要结合任务的出错重试机制才能完成。当任务运行失败,会在Engine里进行重试,重试的时候会接着上次失败时读取的位置继续读取数据,直到任务运行成功为止。

实时采集

实时采集是数栈-实时开发平台(StreamWorks)里数据采集任务的一个功能,当数据源里的数据发生了增删改操作,同步任务监听到这些变化,将变化的数据实时同步到目标数据源。除了数据实时变化外,实时采集和离线数据同步的另一个区别是:实时采集任务是不会停止的,任务会一直监听数据源是否有变化。这一点和Flink任务是一致的,所以实时采集任务是数栈流计算应用里的一个任务类型,配置过程和离线计算里的同步任务基本一样。

2、Flink中的Checkpoint机制

断点续传和实时采集都依赖于Flink的Checkpoint机制,所以咱们先来简单了解一下。Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。

flink4

Checkpoint触发时,会向多个分布式的Stream Source中插入一个Barrier标记,这些Barrier会随着Stream中的数据记录一起流向下游的各个Operator。当一个Operator接收到一个Barrier时,它会暂停处理Steam中新接收到的数据记录。因为一个Operator可能存在多个输入的Stream,而每个Stream中都会存在对应的Barrier,该Operator要等到所有的输入Stream中的Barrier都到达。

当所有Stream中的Barrier都已经到达该Operator,这时所有的Barrier在时间上看来是同一个时刻点(表示已经对齐),在等待所有Barrier到达的过程中,Operator的Buffer中可能已经缓存了一些比Barrier早到达Operator的数据记录(Outgoing Records),这时该Operator会将数据记录(Outgoing Records)发射(Emit)出去,作为下游Operator的输入,最后将Barrier对应Snapshot发射(Emit)出去作为此次Checkpoint的结果数据。

3、断点续传

前提条件

同步任务要支持断点续传,对数据源有一些强制性的要求:

1、 数据源(这里特指关系数据库)中必须包含一个升序的字段,比如主键或者日期类型的字段,同步过程中会使用checkpoint机制记录这个字段的值,任务恢复运行时使用这个字段构造查询条件过滤已经同步过的数据,如果这个字段的值不是升序的,那么任务恢复时过滤的数据就是错误的,最终导致数据的缺失或重复;

2、数据源必须支持数据过滤,如果不支持的话,任务就无法从断点处恢复运行,会导致数据重复;

3、目标数据源必须支持事务,比如关系数据库,文件类型的数据源也可以通过临时文件的方式支持;

任务运行的详细过程

我们用一个具体的任务详细介绍一下整个过程,任务详情如下:

数据源:mysql表,假设表名data_test,表中包含主键字段id

目标数据源 :hdfs文件系统,假设写入路径为 /data_test

并发数:   2

checkpoint配置: 时间间隔为60s,checkpoint的StateBackend为FsStateBackend,路径为 /flinkx/checkpoint

jobId:用来构造数据文件的名称,假设为 abc123

1) 读取数据

读取数据时首先要构造数据分片,构造数据分片就是根据通道索引和checkpoint记录的位置构造查询sql,sql模板如下:

select*fromdata_test

whereidmod${channel_num}=${channel_index}

andid> ${offset}

如果是第一次运行,或者上一次任务失败时还没有触发checkpoint,那么offset就不存在,根据offset和通道可以确定具体的查询sql:offset存在时第一个通道:

select*fromdata_test

whereidmod2=0

andid> ${offset_0};

第二个通道

select*fromdata_testwhereidmod2=1andid> ${offset_1};

offset不存在时第一个通道:

select*fromdata_test

whereidmod2=0;

第二个通道

select*fromdata_test

whereidmod2=1;

数据分片构造好之后,每个通道就根据自己的数据分片去读数据了。

2)写数据

写数据前会先做几个操作:

检测 /data_test 目录是否存在,如果目录不存在,则创建这个目录,如果目录存在,进行2操作;

判断是不是以覆盖模式写数据,如果是,则删除 /data_test目录,然后再创建目录,如果不是,则进行3操作;

检测 /data_test/.data 目录是否存在,如果存在就先删除,再创建,确保没有其它任务因异常失败遗留的脏数据文件;

数据写入hdfs是单条写入的,不支持批量写入。数据会先写入/data_test/.data/目录下,数据文件的命名格式为:channelIndex.jobId.fileIndex

包含通道索引,jobId,文件索引三个部分。

3)checkpoint触发时

在FlinkX中“状态”表示的是标识字段id的值,我们假设checkpoint触发时两个通道的读取和写入情况如图中所示:

flink5

checkpoint触发后,两个reader先生成Snapshot记录读取状态,通道0的状态为 id=12,通道1的状态为 id=11。Snapshot生成之后向数据流里面插入barrier,barrier随数据流向Writer。以Writer_0为例,Writer_0接收Reader_0和Reader_1发来的数据,假设先收到了Reader_0的barrier,这个时候Writer_0停止写出数据到HDFS,将接收到的数据先放到 InputBuffer里面,一直等待Reader_1的barrier到达之后再将Buffer里的数据全部写出,然后生成Writer的Snapshot,整个checkpoint结束后,记录的任务状态为:

Reader_0:id=12Reader_1:id=11Writer_0:id=无法确定Writer_1:id=无法确定

任务状态会记录到配置的HDFS目录/flinkx/checkpoint/abc123下。因为每个Writer会接收两个Reader的数据,以及各个通道的数据读写速率可能不一样,所以导致writer接收到的数据顺序是不确定的,但是这不影响数据的准确性,因为读取数据时只需要Reader记录的状态就可以构造查询sql,我们只要确保这些数据真的写到HDFS就行了。在Writer生成Snapshot之前,会做一系列操作保证接收到的数据全部写入HDFS:

a.close写入HDFS文件的数据流,这时候会在/data_test/.data目录下生成两个两个文件:/data_test/.data/0.abc123.0/data_test/.data/1.abc123.0

b.将生成的两个数据文件移动到/data_test目录下;

c.更新文件名称模板更新为:channelIndex.abc123.1;

快照生成后任务继续读写数据,如果生成快照的过程中有任何异常,任务会直接失败,这样这次快照就不会生成,任务恢复时会从上一个成功的快照恢复。

4)任务正常结束

任务正常结束时也会做和生成快照时同样的操作,close文件流,移动临时数据文件等。

5)任务异常终止

任务如果异常结束,假设任务结束时最后一个checkpoint记录的状态为:Reader_0:id=12Reader_1:id=11

那么任务恢复的时候就会把各个通道记录的状态赋值给offset,再次读取数据时构造的sql为:

第一个通道:

select*fromdata_test

whereidmod2=0

andid>12;

第二个通道

select*fromdata_test

whereidmod2=1

andid>11;

这样就可以从上一次失败的位置继续读取数据了。

支持断点续传的插件

理论上只要支持过滤数据的数据源,和支持事务的数据源都可以支持断点续传的功能,目前FlinkX支持的插件如下:

ReaderWriter

mysql等关系数据读取插件HDFS、FTP、mysql等关系数据库写入插件

4、实时采集

目前FlinkX支持实时采集的插件有KafKa、binlog插件,binlog插件是专门针对mysql数据库做实时采集的,如果要支持其它的数据源,只需要把数据打到Kafka,然后再用FlinkX的Kafka插件消费数据即可,比如oracle,只需要使用oracle的ogg将数据打到Kafka。这里我们专门讲解一下mysql的实时采集插件binlog。

binlog

binlog是Mysql sever层维护的一种二进制日志,与innodb引擎中的redo/undo log是完全不同的日志;其主要是用来记录对mysql数据更新或潜在发生更新的SQL语句,并以"事务"的形式保存在磁盘中。binlog的作用主要有:

复制:MySQL Replication在Master端开启binlog,Master把它的二进制日志传递给slaves并回放来达到master-slave数据一致的目的;

数据恢复:通过mysqlbinlog工具恢复数据;

增量备份;

MySQL 主备复制

有了记录数据变化的binlog日志还不够,我们还需要借助MySQL的主备复制功能:主备复制是指 一台服务器充当主数据库服务器,另一台或多台服务器充当从数据库服务器,主服务器中的数据自动复制到从服务器之中。

flink6

主备复制的过程:

MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看);

 MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log);

MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据;

写入Hive

binlog插件可以监听多张表的数据变更情况,解析出的数据中包含表名称信息,读取到的数据可以全部写入目标数据库的一张表,也可以根据数据中包含的表名信息写入不同的表,目前只有Hive插件支持这个功能。Hive插件目前只有写入插件,功能基于HDFS的写入插件实现,也就是说从binlog读取,写入hive也支持失败恢复的功能。

flink7

写入Hive的过程:

从数据中解析出MySQL的表名,然后根据表名映射规则转换成对应的Hive表名

检查Hive表是否存在,如果不存在就创建Hive表;

查询Hive表的相关信息,构造HdfsOutputFormat;

调用HdfsOutputFormat将数据写入HDFS;

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
29天前
|
运维 监控 前端开发
应用研发平台EMAS产品常见问题之厂商通道离线如何解决
应用研发平台EMAS(Enterprise Mobile Application Service)是阿里云提供的一个全栈移动应用开发平台,集成了应用开发、测试、部署、监控和运营服务;本合集旨在总结EMAS产品在应用开发和运维过程中的常见问题及解决方案,助力开发者和企业高效解决技术难题,加速移动应用的上线和稳定运行。
|
SQL 分布式计算 数据可视化
加速查询MaxCompute再对接FBI(帆软)的完整链路来啦!
本文旨在分享使用交互式分析查询MaxCompute数据,再对接FBI的完整链路,快速搭建大数据生态完整链路。
4013 0
|
5月前
|
数据采集 数据可视化 JavaScript
如何接入神策平台
如何接入神策平台
|
4月前
|
SQL 运维 监控
Dataphin V3.9 版本升级|支持一站式数据汇聚处理、优化研发体验、提升数据治理能力
Dataphin V3.9 版本升级|支持一站式数据汇聚处理、优化研发体验、提升数据治理能力
|
5月前
|
存储 消息中间件 数据可视化
Dataphin实时研发实践—电商场景下的实时数据大屏构建
实时数据大屏是实时计算的重要应用场景之一,广泛应用在电商业务中,用于实时监控和分析电商平台的运营情况。通过大屏展示实时的销售额、订单量、用户活跃度、商品热度等数据指标,帮助业务人员随时了解业务的实时状态,快速发现问题和机会。同时,通过数据可视化和趋势分析,大屏也提供了决策支持和优化运营的功能,帮助业务人员做出及时的决策和调整策略,优化电商业务的运营效果。 下面以电商业务为背景,介绍如何构建经典实时数仓,实现实时数据从业务库到ODS层、DWD层、DWS层全链路流转,基于Dataphin和Quick BI实现实时数据大屏。
266 0
|
6月前
|
大数据 数据挖掘 数据处理
直播预约丨《实时湖仓实践五讲》第二讲:实时湖仓功能架构设计与落地实战
《实时湖仓实践五讲》是袋鼠云打造的系列直播活动,将围绕实时湖仓的建设趋势和通用问题,邀请奋战于企业数字化一线的核心产品&技术专家,结合实践案例分析,和听众共同探讨实时湖仓领域的前沿技术。 《实时湖仓实践五讲》第二讲——《实时湖仓功能架构设计与落地实战》将于10月11日 15:00-16:00开播。 快快预约直播吧~
33 0
|
6月前
|
数据可视化 数据挖掘 数据库
TDengine OSS 与 qStudio 实现无缝协同,革新数据分析和管理方式
为了帮助社区用户更好地进行数据分析和管理,丰富可视化解决方案的多样性,我们将开源的时序数据库 TDengine OSS 与开源的数据库分析工具进行了集成,相信这对终极开源工具一定能帮助你释放数据潜力。
98 0
|
7月前
|
canal SQL 弹性计算
实时数据及离线数据上云方案
本实验通过使用CANAL、DataHub、DataWorks、MaxCompute服务,实现数据上云,解决了数据孤岛问题,同时把数据迁移到云计算平台,对后续数据的计算和应用提供了第一步开山之路。
144 0
|
9月前
|
Prometheus 监控 Cloud Native
SigNoz,一款集日志、指标及追踪一体的开源平台
Hello folks,我是 Luga,今天我们来分享一下与云原生体系有关的话题- 云原生可观测性平台-SigNoz。 作为一个“核心”体系,可观测性在监控分布式微服务应用程序和云基础设施的可见性和控制自动化层面具有举足轻重的意义。
357 0
|
11月前
|
存储 JavaScript 前端开发
TDengine极简实战:从采集到入库,从前端到后端,体验物联网设备数据流转
TDengine极简实战:从采集到入库,从前端到后端,体验物联网设备数据流转
1023 1