基于大数据开发套件的增量同步策略

简介: 因为近期遇到用户在做ETL操作导入数据到MaxCompute的时候,对如何设置数据同步策略有疑惑,所以今天第一波我们来聊一下数据的同步策略,根据数据的特性,看看哪些数据适合增量同步,哪些适合全量同步,又是如何实现的?请认真看完下面的介绍,这些问题都不是事儿。

今天我们来讨论如何使用大数据开发套件进行增量同步。

我们把需要同步的数据,根据数据写入后是否会发生变化,分为会变化的数据(人员表比如说,人员的状态会发生变化)和不会发生变化的数据(一般是日志数据)。针对这两种场景,我们需要设计不同的同步策略。这里以把业务RDS数据库的数据同步到MaxCompute为例做一些说明,其他的数据源的道理是一样的。根据等幂性原则(也就是说一个任务,多次运行的结果是一样的,这样才能支持重跑调度。如果任务出现错误,也比较容易清理脏数据),我每次导入数据都是导入到一张单独的表/分区里,或者覆盖里面的历史记录。

本文的测试时间是2016-11-14,全量同步是在14号做的,同步历史数据到ds=20161113这个分区里。至于本文涉及的增量同步的场景,配置了自动调度,把增量数据在15号凌晨同步到ds=20161114的分区里。数据里有一个时间字段optime,用来表示这条数据的修改时间,从而判断这条数据是否是增量数据。

不变的数据

对应这种场景,因为数据生成后就不会发生变化,我们可以很方便地根据数据的生成规律进行分区,比较常见的是根据日期进行分区,比如每天一个分区。以下是测试数据:

drop table if exists oplog;
create table if not exists oplog(
 optime DATETIME,
 uname varchar(50),
 action varchar(50),
 status varchar(10)
 );

Insert into oplog values(str_to_date('2016-11-11','%Y-%m-%m'),'LiLei','SELECT','SUCCESS');
Insert into oplog values(str_to_date('2016-11-12','%Y-%m-%m'),'HanMM','DESC','SUCCESS');

这里有2条数据,当成历史数据。我先做一次全量数据同步,到昨天的分区里。配置方法如下:
先在MaxCompute创建好表:

--创建好MaxCompute表,按天进行分区
create table if not exists ods_oplog(
 optime datetime,
 uname string,
 action string,
 status string
) partitioned by (ds string);

然后配置了历史数据数据同步:
screenshot

因为只需要跑一次,做以下测试就可以了。测试后到数据开发里把任务的状态改成暂停(最右边的调度配置了)并重新发布,免得明天他继续跑了。之后到MaxCompute里看一下结果:
screenshot
测试通过后。往Mysql里多写一些数据作为增量数据:

 insert into oplog values(CURRENT_DATE,'Jim','Update','SUCCESS');
 insert into oplog values(CURRENT_DATE,'Kate','Delete','Failed'); 
 insert into oplog values(CURRENT_DATE,'Lily','Drop','Failed'); 

然后配置同步任务如下。需要特别注意的是数据过滤这的配置,通过这个配置,可以在15号的凌晨的同步的时候,把14号全天新增的数据查询出来,然后同步到增量分区里。
screenshot
这个任务需要发布,设置调度周期为每天调度,第二天过来一看,MaxCompute里的数据变成了:
screenshot

会变的数据

如人员表、订单表一类的会发生变化的数据,根据数据仓库的4个特点里的反映历史变化的这个特点的要求,我们建议每天对数据进行全量同步。也就是说每天保存的都是数据的全量数据,这样历史的数据和当前的数据都可以很方便地获得。不过如果真实的场景下因为某些特殊情况,需要每天也只做增量同步,因为MaxCompute不支持Update语句来修改数据,只能用别的一些方法来实现。两种同步策略的具体方法如下:

首先我们需要造一些数据:

drop table if exists user ;
create table if not exists user(
    uid int,
    uname varchar(50),
    deptno int,
    gender VARCHAR(1),
    optime DATETIME
    );
--历史数据
insert into user values (1,'LiLei',100,'M',str_to_date('2016-11-13','%Y-%m-%d'));
insert into user values (2,'HanMM',null,'F',str_to_date('2016-11-13','%Y-%m-%d'));
insert into user values (3,'Jim',102,'M',str_to_date('2016-11-12','%Y-%m-%d'));
insert into user values (4,'Kate',103,'F',str_to_date('2016-11-12','%Y-%m-%d'));
insert into user values (5,'Lily',104,'F',str_to_date('2016-11-11','%Y-%m-%d'));
--增量数据
update user set deptno=101,optime=CURRENT_TIME  where uid = 2; --null改成非null
update user set deptno=104,optime=CURRENT_TIME  where uid = 3; --非null改成非null
update user set deptno=null,optime=CURRENT_TIME  where uid = 4; --非null改成null
delete from user where uid = 5;
insert into user(uid,uname,deptno,gender,optime) values (6,'Lucy',105,'F',CURRENT_TIME);

每天全量同步

每天全量同步同步比较简单:

--全量同步
create table ods_user_full(
    uid bigint,
    uname string,
    deptno bigint,
    gender string,
    optime DATETIME 
) partitioned by (ds string);

然后配置同步为:
screenshot
测试后结果为
screenshot
因为每天都是全量同步,没有全量和增量的区别,所以第二天就能看到数据结果为
screenshot
如果需要查询的话,就用where ds =‘20161114’ 来取全量数据即可了。

每天增量

非常不推荐用这种方法,只有在极特殊的场景下才考虑。首先这种场景不支持delete语句,因为被删除的数据无法通过SQL语句的过滤条件查到。当然实际上公司里的代码很少直接有直接删除数据的,都是使用逻辑删除,那delete就转化成update来处理了。但是这里毕竟限制了一些特殊的业务场景不能做了,当出现特殊情况可能导致数据不一致。另外还有一个缺点就是同步后要对新增的数据和历史数据做合并。具体的做法如下:
首先需要创建2张表,一张写当前的最新数据,一张写增量数据:

--结果表
create table dw_user_inc(
    uid bigint,
    uname string,
    deptno bigint,
    gender string,
    optime DATETIME 
);
--增量记录表
create table ods_user_inc(
    uid bigint,
    uname string,
    deptno bigint,
    gender string,
    optime DATETIME 
)

然后全量数据可以直接写入结果表:
screenshot
结果如下:
screenshot
这个只要跑一次的,记得跑好后要暂停掉。
然后把增量数据写入到增量表里:
screenshot
结果如下
screenshot
然后做一次合并

insert overwrite table dw_user_inc 
select 
--所有select操作,如果ODS表有数据,说明发生了变动,以ODS表为准
case when b.uid is not null then b.uid else a.uid end as uid,
case when b.uid is not null then b.uname else a.uname end as uname,
case when b.uid is not null then b.deptno else a.deptno end as deptno,
case when b.uid is not null then b.gender else a.gender end as gender,
case when b.uid is not null then b.optime else a.optime end as optime
from 
dw_user_inc a 
full outer join ods_user_inc b
on a.uid  = b.uid ;

最终结果是:
screenshot
可以看到,delete的那条记录没有同步成功。

对比以上两种同步方式,可以很清楚看到两种同步方法的区别和优劣。第二种方法的优点是同步的增量数据量比较小,但是带来的缺点有可能有数据不一致的风险,而且还需要用额外的计算进行数据合并。如无必要,会变化的数据就使用方法一即可。如果对历史数据希望只保留一定的时间,超出时间的做自动删除,可以设置Lifecycle。

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
4月前
|
资源调度 分布式计算 大数据
【云计算与大数据技术】资源管理、调度模型策略的讲解
【云计算与大数据技术】资源管理、调度模型策略的讲解
102 0
|
5月前
|
存储 分布式计算 监控
在云原生环境中构建可扩展的大数据平台:方法和策略
在云原生环境中构建可扩展的大数据平台:方法和策略
136 0
|
9月前
|
存储 大数据
大数据数据存储的分布式文件系统的HDFS的核心机制理解的副本策略
在 Hdfs 中,数据的复制和原理是基于块的分布式存储。
66 1
|
11月前
|
设计模式 算法 大数据
大数据开发基础的设计模式的策略
策略模式是大数据开发基础的设计模式之一。它是一种行为型模式,用于定义一系列算法,并将每个算法封装起来,使其可以相互替换。这样客户端代码就可以在不改变原始类代码的情况下,选择不同的算法。
76 0
|
SQL 分布式计算 运维
【大数据开发运维解决方案】Sqoop增量同步mysql/oracle数据到hive(merge-key/append)测试文档
上一篇文章介绍了sqoop全量同步数据到hive, 本片文章将通过实验详细介绍如何增量同步数据到hive,以及sqoop job与crontab定时结合无密码登录的增量同步实现方法。
【大数据开发运维解决方案】Sqoop增量同步mysql/oracle数据到hive(merge-key/append)测试文档
|
SQL 运维 Oracle
【大数据开发运维解决方案】Sqoop增量同步Oracle数据到hive:merge-key再次详解
这篇文章是基于上面连接的文章继续做的拓展,上篇文章结尾说了如果一个表很大。我第一次初始化一部分最新的数据到hive表,如果没初始化进来的历史数据今天发生了变更,那merge-key的增量方式会不会报错呢?之所以会提出这个问题,是因为笔者真的有这个测试需求,接下来先对oracle端的库表数据做下修改,来模拟这种场景。
【大数据开发运维解决方案】Sqoop增量同步Oracle数据到hive:merge-key再次详解
|
存储 分布式计算 算法
基于阿里云Maxcompute搭建广告策略的多维实时洞察方案
本次分享的主题为广告策略工程架构体系演进,将介绍广告在从0到1,从1到 N 的过程中,广告架构是如何支持策略、算法、模型迭代的,包括以下几部分:概述、广告策略工程架构体系演进、精益驱动思想工具:“两翼计划”。
179 0
基于阿里云Maxcompute搭建广告策略的多维实时洞察方案
|
机器学习/深度学习 存储 数据采集
大数据智能平台的构建策略与步骤
大数据智能平台的构建策略与步骤
294 0
大数据智能平台的构建策略与步骤
|
机器学习/深度学习 人工智能 安全
如何为企业业务制定大数据和人工智能策略
大数据和人工智能正在接管很多行业岗位,无论是银行业务、零售行业,还是旅游业和服务行业,这都不是什么秘密。而这个话题或者让那些对这项创新毫无准备的企业感到恐慌,或者为其令人难以置信的可能性而感到兴奋。
154 0
如何为企业业务制定大数据和人工智能策略
|
存储 分布式计算 资源调度
带你入坑大数据(二) --- HDFS的读写流程和一些重要策略
HDFS的读写流程分析,高可用和联邦的一些简短说明