2.2 数据缓冲区
数据缓冲区是处于生产环境和分析环境之间的中间区域,它是数据闭环中各个系统间的数据中转站,从各个系统接收原始数据,并将其暂存在对应的目录中。其他系统可以从数据缓冲区中获取需要的数据文件。
为了便于管理和迁移数据,我们规定存入数据缓冲区中的数据使用文本文件的格式,这样一来,数据缓冲区就可以使用一台或几台文件服务器实现。几乎所有的应用系统都支持文本文件的数据交互,新的系统可以轻松加入数据闭环之中。
数据缓冲区的一端连接生产环境中的大量应用系统,另一端连接分析环境中的数据平台,避免了生产环境和分析环境的相互影响,同时也为系统之间数据文件的交互制定了统一标准(见图2-2)。
图2-2 数据缓冲区连接生产环境和分析环境
数据缓冲区的另一个优势在于方便自动化和数据管理,多个应用系统的文件存档在同一个文件服务器中,便于数据的统一管理和分发。比如,在一个多部门、跨地域的企业中,不同地区、不同部门的数据文件之间的交互,如果没有数据缓冲区的统一收集与分发,那将会形成一个复杂的交叉网络。
表2-2列举了数据缓冲区的一些主要优点,本书主要专注于系统解耦,并基于数据缓冲区完成原始数据的自动加载过程。
表2-2 数据缓冲区的优点
作 用 备 注
系统解耦 系统间互不影响
便于数据平台切换 基于文件,文件两端的数据平台互不影响
便于数据系统扩展 基于文件系统,便于不同数据平台之间对接
便于数据管理 文件备份,存储空间管理
便于数据分发 企业不同机构间的数据分发
2.2.1 系统解耦
1. 数据直连交互
数据直连的方式是将原始数据从生产系统直接导出至分析系统,是数据交互的紧耦合方式。这种方式在数据规模较小时并不会出现问题,因此很多企业搭建数据体系时采用了数据直连的方式,但随着数据规模的增大,数据直连的弊端会逐步展现,让我们看一下如下的场景:
某个星期一的早晨,数据工程师小王走入办公室,发现开发工程师小李、系统工程师小周、开发经理和产品经理正聚在一起商讨问题。
开发工程师小李:我昨晚上线完成后,业务人员验证通过,当时系统没有任何问题的,程序肯定没问题!
产品经理:可是现在系统反应奇慢无比,基本处于瘫痪状态,很多业务人员都等着开工呢,怎么办?
系统工程师小周:你自己看,数据库服务器磁盘I/O好大……这种情况之前可是没有的……这个数据库进程是怎么回事?磁盘I/O就是被它拖垮的。
开发经理(一脸黑线):赶紧查一查,看看谁干的!
系统工程师小周:这好像是分析数据库在抽数据。……小王你刚好来了,你看这个作业是你的吧?
数据工程师小王(紧张中):这个我看看……平时都是20分钟就抽取结束了啊!今天怎么还没有完成?怎么回事?
开发经理:那先赶紧停下来,解决了生产问题再说。小王,最近数据这块怎么老出问题啊!
数据工程师小王(委屈):ETL作业跑了半年多了,都没问题。开发昨晚上线,今天就出问题了……
开发工程师小李(打断小王):上线前都经过测试了,上线后也验证了,没有问题的。现在是数据库的问题,和系统没有关系好吧?
数据工程师小王:那为什么上线后,ETL作业就这么慢呢?!也不能怪我啊……
上面的场景是数据直连方式经常会遇到的问题,这种问题可能在生产系统上线后突然出现,也可能在平常的日子里莫名奇妙地发生。由于生产系统和分析系统之间的紧耦合,一旦出现问题,生产系统和分析系统都可能受到影响,而问题产生的原因却很难查清。表2-3总结了数据直连的弊端。
表2-3 数据直连的弊端
弊 端 说 明
系统紧耦合 双方互相影响,生产系统切换影响分析系统,分析系统也会影响生产系统效率
不利于数据库权限管理 生产数据库需要为ETL作业开放权限,不利于生产数据的安全
不利于数据平台扩展 由于采用数据直连,其他数据平台(如Hadoop平台)加入时比较困难
2. 数据缓冲区交互
前面已经论述使用数据缓冲区进行交互的优点,本节将进一步研究数据缓冲区进行数据交互的详细流程,图2-3是整个过程的示意图。
图2-3 数据缓冲区进行数据交互的流程
在这个过程中,数据由生产环境流入分析环境,共经过以下四个步骤。
1)批量导出。数据从生产数据库批量导出为文本文件,该过程使用DBMS系统自带的批量导出命令,对于大数据平台数据库,使用对应的命令或者使用第三方插件。2.2.2节将对批量导出命令进行详细介绍。
2)FTP传输第一阶段。将步骤1)导出的数据文件通过FTP上传至数据缓冲区。步骤1)和步骤2)的自动化过程可以通过ETL定时作业完成,实现方法参阅ETL作业章节。
3)FTP传输第二阶段。将数据文件从数据缓冲区下载至分析环境中。
4)批量加载。使用批量导入命令将文件加载至数据平台。
这种方式解决了职责不清的问题。从图2-3中可以发现,步骤1)和步骤2)处于数据缓冲区之前,它们属于生产环境的范畴,由开发工程师负责;步骤3)和步骤4)属于分析系统的范畴,由数据工程师负责。
这个框架还解决了数据直连方式面临的以下问题。
1)生产系统与分析系统的耦合问题。通过数据缓冲区实现了生产系统和分析系统的解耦,无论生产系统如何变更,只要传输至数据缓冲区中的文本文件格式不变,分析系统就不受影响;而分析系统在将数据加载至数据平台的时候,也不会影响到生产系统的性能。
2)数据权限的问题,让数据更加安全。数据缓冲区的左边完全由开发工程师负责,因此生产数据库权限不会流转至后端的数据工程师;而通过在数据批量导出过程中对敏感数据的屏蔽处理(如对手机号码加密等)后,后端数据平台无法看到敏感数据,提高了数据安全性。
3)增强了数据平台的可扩展性。由于各种数据平台,如传统数据仓库、Hadoop平台、MPP数据库等均对文本文件有良好的支持,不同平台之间的数据交互,均可以通过数据缓冲区实现数据交互。例如,Hadoop平台的Hive数据仓库可以通过数据缓冲区与传统数据仓库中的关系数据库实现交互。
2.2.2 批量导出
批量导出是将数据库中的数据一次性导出至文本文件中,导出的文件有固定的列分隔符和行分隔符,或者有固定的字段长度。批量导出的方法大致可以分为以下两种(见表2-4)。
1)使用ODBC或JDBC接口,如ETL工具或定制的Java程序等。
2)使用批量导出命令,一般是数据库自带的命令。
表2-4 常用数据库批量导出方法
方法 举例 性能 适用场景
ODBC/JDBC接口 ETL工具、Java程序 速度慢,对源数据库影响大 数据量较小的场景
批量导出命令 批量导出命令、hadoop shell命令 速度快,对源数据库影响小 数据量大或跨系统的场景,如生产数据到分析数据,或数据仓库到大数据平台
表2-5中列出了常用数据库的批量导出命令,这些命令将在第3章详细介绍。
表2-5 常用数据库批量导出命令
数据库系统 批量导出命令
SQL Server bcp out
DB2 export
Oracle 第三方插件sqluldr
MySQL mysqldump
Hive hadoop shell
1. SQL Server : bcp out
bcp是SQL Server自带的批量导出/导入命令,它包括批量导出命令bcp out和批量导入命令bcp in。bcp out命令的语法如代码清单2-1所示(表2-6为参数说明)。
代码清单 2-1
bcp table_name out data_file
[-f format_file]
[-U login_id]
[-P password]
[-S [server_name[\instance_name]]
表2-6 参数说明
参 数 参数含义
table_name 表名称
data_file 输出的数据文件的完整路径
-f format_file 指定格式化文件的完整路径
-U login_id 指定用于连接到 SQL Server 的登录 ID
-P password 指定登录 ID 的密码
-S[server_name[\instance_name] 指定要连接的 SQL Server 实例
例如,从SQL Server数据库表“dbo.巡检商户明细”中批量导出数据到文件“巡检商户明细.dat”中的bcp out命令如代码清单2-2所示。
代码清单 2-2
bcp dbo.巡检商户明细 out d:\巡检商户明细.dat -fd:\巡检商户明细.fmt -U test -P 123 -S localhost
*注:实际输入命令时,应在一行,中间不能有换行。
代码清单2-1中各参数说明如下。
1)dbo.巡检商户明细是需要导出数据的表名称。
2)d:\巡检商户明细.dat为导出的文件路径及文件名称。
3)-f d:\巡检商户明细.fmt 指明格式文件的路径及名称。
4)-U test指明登录数据库所用的用户名称为“test”。
5)-P 123指明数据库用户“test”的密码为“123”。
6)-S localhost指明登录的数据库为“localhost”。
bcp out命令使用了格式文件选项“-f format_file”,用来指明数据文件的格式。格式文件是用来描述数据文件格式的文件,在格式文件中需指明要导出的字段名称、长度、列分割符、行分隔符、排序方式等,可以用在bcp out和bcp in命令中。用在bcp out命令中时,它用来定义导出文件的格式;用在bcp in命令中时,它用来描述待导入文本文件的格式。图2-4所示为格式文件的式样。
图2-4 SQL Server 控制文件格式
对图2-4中涉及参数的说明如下:
1)Version是微软公司对SQL Server系列产品的版本编号,如表2-7所示。
2)Number of columns是所要导出的数据库表所含的字段个数。
3)Host file field order是输出的数据文件字段列的序号,与Server column order对应,一般与Server column order保持一致。
4)Host file data type保持默认值“SQLCHAR”即可。
5)Prefix length(前缀长度),一般导出的数据文件字段都不填充前缀,因此此处为0。
6)Host file data length为输出文件字段的长度,此处如果Terminator指明的分隔符不为“”,则该参数并不起实际作用。
7)Terminator指明数据文件的列分隔符和行分隔符(格式文件最后一行该字段为行分隔符)。
8)Server column order为数据表中字段的数据,与Host file field order保持一致。
9)Server column name是数据库中的字段名称。
10)Column collation指明字段的collation,一般仅为字符格式的字段使用,保持默认即可。
表2-7 SQL Server版本号对照表
产品名称 版本号
SQL Server 2014 12
SQL Server 2012 11
SQL Server 2008 R2 10
SQL Server 2008 10
SQL Server 2005 9
SQL Server 2000 8
格式文件中应重点关注的部分为Number of columns、Host file field order、Terminator、Server column order、Server column name,其余保持默认即可。
如何制作格式文件呢?下面使用一个具体的例子说明如何创建格式文件。假设在数据库中创建表“dbo.巡检商户明细”,其创建的表脚本如代码清单2-3所示。
代码清单 2-3
CREATE TABLE [dbo].[巡检商户明细]
(
[商户代码] [varchar](30) NULL,
[商户名称] [nvarchar](100) NULL,
[机构号] [varchar](20) NULL,
[合作方名称] [nvarchar](100) NULL,
[分公司] [nvarchar](50) NULL
);
使用bcp format命令,可得到该表的控制文件模板“巡检商户明细.fmt”,命令如代码清单2-4所示。
代码清单 2-4
bcp dbo.巡检商户明细 format nul -c -f 巡检商户明细.fmt -Utest -P123 -Slocalhost
巡检商户明细.fmt文件中默认的列分隔符为“\t”(制表符)、行分隔符为“\r\n”(回车换行符),其内容如代码清单2-5所示。
代码清单 2-5
9.0
5
1 SQLCHAR 0 30 "\t" 1 [商户代码]
SQL_Latin1_General_CP1_CI_AS
2 SQLCHAR 0 100 "\t" 2 [商户名称]
SQL_Latin1_General_CP1_CI_AS
3 SQLCHAR 0 20 "\t" 3 [机构号]
SQL_Latin1_General_CP1_CI_AS
4 SQLCHAR 0 100 "\t" 4 [合作方名称]
SQL_Latin1_General_CP1_CI_AS
5 SQLCHAR 0 50 "\r\n" 5 [分公司]
SQL_Latin1_General_CP1_CI_AS
注:格式文件的最后一行必须为空白行,否则在使用bcp命令时会报格式错误。
得到格式文件模板之后,就可以在此基础上进行修改了,比如使用“#”作为列分隔符,使用“\n”作为行分隔符,修改后的格式文件内容如代码清单2-6所示。
代码清单 2-6
9.0
5
1 SQLCHAR 0 30 "#" 1 [商户代码] SQL_Latin1_General_CP1_CI_AS
2 SQLCHAR 0 100 "#" 2 [商户名称] SQL_Latin1_General_CP1_CI_AS
3 SQLCHAR 0 20 "#" 3 [机构号] SQL_Latin1_General_CP1_CI_AS
4 SQLCHAR 0 100 "#" 4 [合作方名称] SQL_Latin1_General_CP1_CI_AS
5 SQLCHAR 0 50 "\n" 5 [分公司] SQL_Latin1_General_CP1_CI_AS
这样,将修改后的格式文件用于bcp out命令,则输出的数据文件将使用“#”作为列分隔符,使用“\n”作为行分隔符。作为验证,可以执行如代码清单2-7所示的bcp out命令。
代码清单 2-7
bcp dbo.巡检商户明细 out D:\巡检商户明细.dat -f D:\巡检商户明细.fmt -Utest -P123
-Slocalhost
查看输出的数据文件“巡检商户明细数据.dat”,其内容如代码清单2-8所示。
代码清单 2-8
000391952#新白鹿餐厅(百联中环店)#0880#安智餐饮有限公司#安徽分公司
000472873#颐和四季体验馆#0880#万源城娱乐城#北京分公司
000901032#书院人家#0880#万家灯火餐饮文化传播公司#北京分公司
000900109#三阳湾食府#0880#三阳众城文化管理有限公司#北京分公司
......
从以上代码清单中可以看到,输出的文本文件使用“#”作为列分隔符,使用“\n”作为行分隔符。可以根据需要修改格式文件,从而得到满足要求的数据文件输出。更详细的bcp命令可参阅微软官方帮助文档。
2. DB2 : export
IBM DB2数据库中数据的批量导出可以使用export命令,其基础语法如代码清单2-9所示(表2-8为参数说明)。
代码清单 2-9
export to filename of {ixf |del | wsf }
[ modified by {filetype-mod …} ] { select-statement |[ where … ]}
例如,在DB2中自带sample数据库中的一张表,其创建的表脚本如代码清单2-10所示。
表2-8 DB2 export命令的参数说明
参 数 参数含义
filebname 导出的文件名称
IXF | DEL | WSF 输出格式,对于文本文件,选择DEL格式
filetype-mod 文件类型修饰符,包括设置字符串定界符、列分隔符等多种选项
chardel 指定字符串定界符
coldel 指定列分隔符
select-statement 用于提取数据的select语句
代码清单 2-10
CREATE TABLE HB_STATIC (
STA_MTH VARCHAR(8) DEFAULT NULL,
FACE_VALUE INTEGER DEFAULT 5,
TOTAL_AMT BIGINT DEFAULT 0
);
现在需要把这张表中的数据导出成文本文件,运行db2cmd,依次运行如代码清单2-11所示的命令。
代码清单 2-11
db2 connect to sample
db2 export to d:\\hb_static.del of del modified by chardel'' coldel; select *
from hb_static
export命令指明导出文件路径及名称“d:\\hb_static.del”,chardel“指明使用”作为字符串定界符(数据库表中的字符类型的数据使用该字符串包裹),coldel; 指明使用;作为列分隔符。最终得到的文本文件d:\\hb_static.del的内容(部分)如代码清单2-12所示。
代码清单 2-12
'201402';20;480
'201402';5;165
'201402';10;200
'201402';50;1100
'201403';5;915
由于DB2的export命令没有提供形如SQL Server格式文件之类的控制文件,而仅通过命令选项指定列分隔符、字符串界定符,使得DB2的export命令导出的文本文件格式较为单一(例如,只能使用一个字符作为列分隔符),但在大多数场景中,export可以满足要求。
3. Oracle : sqluldr2
Oracle数据库未提供批量导出命令,一般采用第三方工具进行批量导出。SQLPLUS提供的Spool工具虽然可以进行数据导出,但是它并不适合大量数据的快速导出,主要原因是其导出效率很低,大量数据导出会非常耗时。
另一款批量导出工具sqluldr2适用于大批量数据的导出,速度非常快,可以将数据以csv、txt等格式导出。
首先需要下载sqluldr2.exe(可上网搜索),如果安装的是64位的Oracle,则需要下载sqluldr264.exe,然后将sqluldr2.exe复制到$ORACLE_HOME的BIN目录(该目录中有Oracle自带的sqlldr.exe,这是Oracle的批量导入工具。没错,Oracle提供了批量导入工具。却没有提供批量导出工具)中。现在就可以开始使用sqluldr2.exe了,sqluldr2的命令格式如代码清单2-13所示(表2-9为其参数说明)。
代码清单 2-13
sqluldr2 logon_str {query="select_statement" | sql=sql_file }
[file=output_file]
[field=col_del]
[record=row_del]
[quote=quote]
表2-9 sqluldr2的参数说明
参 数 含 义
logon_str 数据库登录信息,必需参数
select_statement 查询语句,用于提取数据,与sql_file两者二选一
sql_file 指定的sql语句脚本文件
output_file 输出的数据文件路径及名称,如果不指定此选项,则默认输出uldrdata.txt
col_del 列分隔符,默认为逗号
row_del 行分隔符,默认为\r\n
quote 字符串界定符
例如,在Oracle数据库中有一张表,其创建表的脚本如代码清单2-14所示。
代码清单 2-14
CREATE TABLE “ODS"."DP_COMMENT"
(
"MEMBERID" VARCHAR2(50),
"TASTE" VARCHAR2(50),
"ENVIRONMENT" VARCHAR2(50),
"SERVICE" VARCHAR2(50),
"LEVEL_SCORE" VARCHAR2(50),
"CONTENT" VARCHAR2(2000),
"SHOPID" VARCHAR2(50)
);
使用sqluldr2将表中的数据导出至文本文件DP_COMMENT.txt中,如代码清单2-15所示。
代码清单 2-15
sqluldr2 ods/ods@yfb_orc query="select * from ODS.DP_COMMENT"
file=d:\\DP_COMMENT.txt field=#$
sqluldr2命令将表中的数据导出至DP_COMMENT.txt,field=#进行分割。查看文件DP_COMMENT.txt,其内容如代码清单2-16所示(部分)。
代码清单 2-16
195084790#3#40#17222108
630568#4#50#17222108
178216498#3#50#17222108
该命令的一个很有用的选项为table选项,该选项可以生成一个默认的控制文件,该控制文件可以用于Oracle的sqlldr命令进行数据批量导入。在上述导出的命令中加入table选项,如代码清单2-17所示。
代码清单 2-17
sqluldr2 ods/ods@yfb_orc query="select * from ods.dp_comment"
file=d:\\dianping_comment.txt field=#$ table=dp_comment
执行上述命令后,除了输出文件d:\\DP_COMMENT.txt外,还生成了控制文件dp_comment_sqlldr.ctl,代码清单2-18是dp_comment_sqlldr.ctl的内容(略做了修改,删除了注释部分)。
代码清单 2-18
load data
infile 'd:\\dp_comment.txt'
insert into table dp_comment
fields terminated by x'2324' trailing nullcols
(
"memberid" char(50) nullif "memberid"=blanks,
"taste" char(50) nullif "taste"=blanks,
"environment" char(50) nullif "environment"=blanks,
"service" char(50) nullif "service"=blanks,
"level_score" char(50) nullif "level_score"=blanks,
"content" char(2000) nullif "content"=blanks,
"shopid" char(50) nullif "shopid"=blanks
)
代码清单2-18中,x'2324'是十六进制的字符#$(由上文中sqluldr2的field选项指明)。上述控制文件可以用于数据批量导入Oracle数据库中,控制文件用于批量导入的方法请参阅批量导入章节的内容。
4. Hive : hadoop fs
Hive是Hadoop平台上一款非常流行的数据仓库分析工具,由于类似SQL的语言风格,使得其学习成本很低,所以是大数据分析的必学工具。
Hive数据导出,主要的应用场景是将Hive表中的数据导出到Linux操作系统中,然后供其他数据产品使用。该导出过程可以使用ETL工具(参阅第2.3节),也可以使用hadoop shell命令完成。
例如,现在有一张Hive表,其创建表的脚本如代码清单2-19所示(未列出全部字段)。
代码清单 2-19
create table adobe_nwd_prd
(
accept_language string comment '浏览器中可接受的语言标题' ,
browser bigint comment '实际用于单击的浏览器ID' ,
domain string comment '用户ISP域' ,
duplicate_events string comment '列出计为重复的每个事件' ,
......
)
comment 'adobe pc端原始数据'
partitioned by(load_day string)
row format delimited
fields terminated by '\t'
stored as textfile;
Hive表使用了load_day字段作为partition字段,即每天的数据存放在一个partition中,通过hive shell命令可以查看当前表中的partition情况,如代码清单2-20所示。
代码清单 2-20
hive> show partitions adobe_nwd_app;
OK
load_day=20150908
load_day=20150909
load_day=20150910
load_day=20150911
load_day=20150912
load_day=20150913
load_day=20150914
Time taken: 0.379 seconds, Fetched: 7 row(s)
每个partition对应一个hdfs目录,按照1.2.3节中“Hive分区表与增量更新”中的规则,Hive表的数据存放路径如代码清单2-21所示。
代码清单 2-21
[root@qzy ~]# hadoop fs -ls /data/adobe.ADOBE_NWD_APP
Found 7 items
drwxr-xr-x -root supergroup 0 2015-09-09 00:40
/data/adobe.ADOBE_NWD_APP/20150908
drwxr-xr-x - root supergroup 0 2015-09-10 00:39
/data/adobe.ADOBE_NWD_APP/20150909
drwxr-xr-x - root supergroup 0 2015-09-11 06:18
/data/adobe.ADOBE_NWD_APP/20150910
drwxr-xr-x - root supergroup 0 2015-09-12 00:33
/data/adobe.ADOBE_NWD_APP/20150911
drwxr-xr-x - root supergroup 0 2015-09-13 00:33
/data/adobe.ADOBE_NWD_APP/20150912
drwxr-xr-x - root supergroup 0 2015-09-14 00:38
/data/adobe.ADOBE_NWD_APP/20150913
drwxr-xr-x - root supergroup 0 2015-09-14 10:33
/data/adobe.ADOBE_NWD_APP/20150914
使用代码清单2-22所示命令将partition(load_day=20150908)的所有数据导出至Linux系统的本地目录/tmp/datafiles中。
代码清单 2-22
hadoop fs -copyToLocal /data/adobe.ADOBE_NWD_APP/20150908 /tmp/datafiles
通过程序可以循环导出指定时间范围内的数据,第3章将提供一种Java批量导出Hive数据的多线程实现。
上述方式实现的是不含条件参数的批量导出,如果希望导出where条件限定的数据,则需要将数据事先生成一张中间表,然后将此中间表的全部数据导出,此处不再赘述。
2.2.3 FTP传输
由于数据缓冲区实际上是文件服务器,在内网环境中使用FTP进行传输是一种很方便的方式。在数据闭环中,FTP传输连接了数据缓冲区的上游和下游,稳定高效的文件传输对整个数据闭环起重要作用。
对数据工程师来说,FTP自动传输可以通过ETL工具、命令行、定制程序等方式实现。
ETL工具一般都自带文件传输模块,可以直接使用FTP文件传输功能。例如,开源ETL工具Pentaho Kettle的作业功能中,即有文件传输模块,包含FTP上传、FTP下载等组件,图2-5所示的为Kettle的文件传输组件。
图2-5 Kettle中的文件传输模块
但ETL工具提供的FTP传输模块有一个局限,就是待传输的文件名如果是动态的,例如文件名称以日期作为后缀,或者根据条件选择不同的文件进行传输,则使用ETL工具实现起来会比较困难(虽然ETL工具本身也提供简单的参数输入及文件名称表达式匹配,但总体实现成本比其他两种方式要高很多)。
一种替代的方案是使用脚本语言,也就是通过脚本将FTP命令包装起来,以实现参数的传入,解决动态文件名问题。例如在Linux系统上,使用shell脚本或Python脚本,调用FTP命令来实现文件的FTP上传或FTP下载。
本书推荐的方式是使用高级程序语言进行FTP文件传输的包装,因为高级程序语言一般都提供FTP文件传输的程序包,可以根据需要记录传输日志,并且可以方便实现多线程FTP文件传输,提高传输效率。比如Java的commons-net-x.x.x.jar(x.x.x代表版本号)包就提供了FTP命令的API接口,代码清单2-23是该接口的调用示例(具体实现请参阅第3章)。
代码清单 2-23
import org.apache.commons.net.ftp.FTPClient;
FTPClient ftpClient = new FTPClient();
ftpClient.connect(serverIP);
ftpClient.login(user,passsWord));
ftpClient.retrieveFile(remoteFileName, new FileOutputStream(localFilePath));
2.2.4 批量导入
1. SQL Server:bcp in
SQL Server提供的批量导入命令bcp in是bcp out的反向操作。其命令的格式与bcp out的也基本一致,相比bcp out,bcpin增加了几个重要选项,即错误文件-e选项、最大允许错误行数-m选项,如代码清单2-24所示(各选项的详细说明见表2-10)。
代码清单 2-24
bcp table_name in data_file
[-f format_file]
[-e err_file]
[-m max_errors]
[-U login_id]
[-P password]
[-S server_name]
表2-10 bcp in的选项说明
选 项 说 明
-e err_file 用于指定错误文件的完整路径,此文件用于存储 bcp实用工具无法从文件传输到数据库的所有行,相当于错误日志
-m max_errors 用于指定bcp命令允许出现的最大错误行数,错误行数未达到max_errors时,bcp导入将继续进行,默认值为10
-f format_file 用于指定格式化文件的完整路径,格式化文件用于指定行列分隔符、是否跳过某些列等
-m选项在批量导入大文件时是非常有用的,由于一些数据会含有少量的错误数据,而这些错误数据并不影响整体的数据效果,这些场景本身对数据的完整性要求并不高(不同于交易明细数据),这时使用-m选项指定一个阈值,当错误行数小于阈值的时候,bcp继续执行,可以将正确的数据导入,确保业务的正常进行。
-m选项通常与-e选项配合使用,可以从-e选项指定的err_file中查看出现错误的数据行。
目前许多互联网公司网站中会嵌入网站日志分析工具,实时收集用户的单击行为,如Adobe omniture、WebTrends等,这些工具会产生大量日志数据,而这些日志数据中不可避免地存在部分无法正常导入数据库的记录,通过-m选项可以跳过这些错误数据,从而保证绝大部分数据可用。
2. DB2:import
DB2批量数据导入可使用db2 import 命令,该命令是db2 export命令的反向命令,其格式与db2 export命令的一致。例如将第2.2.2节中“DB2:export”导出的文本文件“d:\\hb_static.del”再导入至表HB_STATIC2(该表结构与表HB_STATIC的相同)中,方法如代码清单2-25所示。
代码清单 2-25
db2 import from d:\\hb_static.del of del modified by chardel'' coldel; insert
into HB_STATIC2
上述命令成功执行后,文件中的数据即被导入至数据库中。该命令执行后的反馈信息如代码清单2-26所示。
代码清单 2-26
SQL3109N 实用程序正在开始从文件 "d:\\hb_static.del" 装入数据中。
SQL3110N 实用程序已完成处理。从输入文件读了 "115" 行。
SQL3221W ...开始 COMMIT WORK。输入记录计数 = "115"。
SQL3222W ...对任何数据库更改的 COMMIT 都成功。
SQL3149N 处理了输入文件中的 "115" 行。已将 "115" 行成功插入表中。拒绝了 "0"行。
读取行数 = 115
跳过行数 = 0
插入行数 = 115
更新行数 = 0
拒绝行数 = 0
落实行数 = 115
3. Oracle:sqlldr
Oracle自带的批量导入工具sqlldr可以实现数据的快速批量导入,其命令格式如代码清单2-27所示(表2-11为其参数说明)。
代码清单 2-27
sqlldr logon_str control=ctr_file log=log_file bad=bad_file errors=max_errors
表2-11 sqlldr的参数说明
参 数 说 明
logon_str 数据库登录信息,必需参数
ctr_file 控制文件完整路径
log_file 日志文件完整路径
bad_file 错误文件完整路径
max_errors 最大允许错误行数
其中控制文件的作用类似于SQL Server中的格式文件,可以通过此控制文件指明输入文本的格式,以及导入数据库时的加载方式,图2-6所示的为Oracle控制文件的格式说明。
图2-6 Oracle控制文件的格式
Oracle sqlldr批量导入有多种方式,表2-12对此做了总结。
表2-12 sqlldr的四种导入方式
导入方式 说 明
append into 若原表有数据,则在后面追加数据
insert into 装载空表,如果原表有数据,则sqlldr会停止加载,此为默认值
replace into 原表数据全部删除后加载
truncate into 功能和replace的相同,会用truncate语句删除原有数据
下面尝试将Adobe omniture对某网站的监控日志导入Oracle的数据库中,命令如代码清单2-28所示。
代码清单 2-28
sqlldr ods/ods@yfb_orc control=e:\Adobe\adobe_src_data.ctl
log=e:\Adobe\log.txt bad=e:\Adobe\error_record.txt errors=1000000
由于该文件是对网站单击行为的日志记录,因此对错误行的容忍度是比较大的,为了保证能够将正确的数据导入,我们设置errors=1000000,即允许导入过程出现1000000条错误记录,这些错误记录会被记录到bad选项指定的“e:\Adobe\error_record.txt”中。
控制文件“e:\Adobe\adobe_src_data.ctl”指明了数据文件、文件分隔符信息、加载方式等内容,如代码清单2-29所示。
代码清单 2-29
load data infile 'E:\Adobe\01-niwodai-prd_2015-07-26.tsv'
append into table adobe_src_data
fields terminated by '\t'
trailing nullcols
(
accept_language
,browser
,browser_height
......
)
上述sqlldr命令在Windows的cmd窗口中执行后,查看log文件“e:\Adobe\log.txt ”的内容,如代码清单2-30(仅列出部分内容)所示。
代码清单 2-30
SQL*Loader: Release 11.2.0.1.0 - Production on 星期五 7月 31 09:22:22 2015
Copyright (c) 1982, 2009, Oracle and/or its affiliates. All rights reserved.
控制文件: e:\Adobe\adobe_src_data.ctl
数据文件: e:\Adobe\01-niwodai-prd_2015-07-26.tsv
错误文件: e:\Adobe\error_record.txt
废弃文件: 未作指定
(可废弃所有记录)
要加载的数: ALL
要跳过的数: 0
允许的错误: 1000000
绑定数组: 64 行, 最大 256000 字节
继续: 未作指定
所用路径: 常规
表 ADOBE_SRC_DATA,已加载从每个逻辑记录
插入选项对此表 APPEND 生效
TRAILING NULLCOLS 选项生效
列名 位置 长度 中止 包装数据类型
------------------------------ ---------- ----- ---- ---- ---------------------
ACCEPT_LANGUAGE FIRST * WHT CHARACTER
BROWSER NEXT * WHT CHARACTER
……
记录 4245: 被拒绝 - 表 ADOBE_SRC_DATA 的列 USER_AGENT 出现错误。
数据文件的字段超出最大长度
......
表 ADOBE_SRC_DATA:
417707 行 加载成功。
由于数据错误, 所以913 行 没有加载。
由于所有 WHEN 子句失败, 所以 0 行 没有加载。
由于所有字段都为空的, 所以 0 行 没有加载。
为绑定数组分配的空间: 168216 字节 (1 行)
读取缓冲区字节数: 1048576
跳过的逻辑记录总数: 0
读取的逻辑记录总数: 418620
拒绝的逻辑记录总数: 913
废弃的逻辑记录总数: 0
从 星期五 7月 31 09:22:22 2015 开始运行
在 星期五 7月 31 10:19:56 2015 处运行结束
经过时间为: 00: 57: 34.13
CPU 时间为: 00: 03: 21.71
从以上代码中可以看到,日志文件记录了很多重要信息,如sqlldr命令中指定的控制文件(control=e:\Adobe\adobe_src_data.ctl)、错误文件(bad= e:\Adobe\ error_record.txt)、允许的错误条数(errors=1000000)。
日志还记录了控制文件指定的数据文件(e:\Adobe\01-niwodai-prd_2015-07-26.tsv)、插入选项对表 APPEND 生效(即加载方式,append into)、TRAILING NULLCOLS 选项生效(trailing nullcols,该选项生效时,当数据文件中出现连续两个列分隔符时,对应字段值将被置为null)。
在罗列了表中所有字段信息之后,日志文件记录了被拒绝的记录在数据文件中所在的行,以及被拒绝的原因,之后的信息还展示了加载成功的行数,以及由于错误被拒绝的行数,最后记录了导入该文件的耗时为57分钟34秒,之所以消耗这么长时间,是由于数据文件每行包含了265个字段,且字段长度都比较大。
日志文件有很多潜在的用途,通过程序读取日志文件可以将其中的重要信息展示在页面上,或写入日志数据库,从而便于作业的管理和监控。对日志文件的处理是数据闭环监控中的重要手段之一。
4. Hive:add partition
Hive有多种批量加载方式,根据数据文件存放的位置不同,Hive加载数据面临两种情形:从本地文件系统加载数据以及从HDFS中加载数据。Hive shell提供load data命令可以完成上述两种情形下的数据批量导入,如代码清单2-31所示。
代码清单 2-31
load data [local] inpath 'data_file' into table table_name;
当导入本地文件至Hive表中时,需要指明local关键字,并且随后的data_file参数用于指明基于本地文件系统的完整文件路径;当导入hdfs文件至Hive表中时,不需要local关键字,且data_file为hdfs文件系统的文件路径或hdfs文件url。
为了便于程序化实现,这里采用Hadoop shell结合Hve shell 的方式实现Hive表的批量导入。
根据第1章的Hive表更新规则,我们通过两个步骤完成数据的批量导入。首先通过Hive shell为Hive表增加一个partition,并指定location;然后使用Hadoop shell将文件copy至该partition对应的location目录,图2-7展示了这个过程。
按照图2-7所示的方式,我们尝试将上述Adobe数据文件“e:\Adobe\01-niwodai-prd_2015-07-26.tsv”导入至Hive表中。
为了确保location指定的hdfs目录存在,先执行hadoop shell命令,创建一个目录,如代码清单2-32所示。
图2-7 Hive shell + Hadoop shell批量导入数据至Hive表
代码清单 2-32
hadoop fs -mkdir /data/adobe_log_app/20150726
下面就可以分两步完成数据的导入。首先为表adobe_log_app增加一个新的partition,在hive shell环境中,执行如代码清单2-33所示的命令。
代码清单 2-33
hive>alter table adobe_log_app add partition(load_day='20150726') location
'/data/adobe_log_app/20150726'
然后,使用hadoop shell将文件复制到hdfs目录“/data/adobe_log_app/20150726”中,如代码清单2-34所示。
代码清单 2-34
hadoop fs -copyFromLocal
/usr/queziyang/data/01-niwodai-prd_2015-07-26.tsv
/data/adobe_log_app/20150726
至此,即完成了数据文件从本地文件系统批量导入至Hive表中,整个过程相对比较耗时的操作仅出现在数据文件复制的过程中,且复制效率要比hive shell的load data的高。上述的各个命令,可以通过编程语言调用,从而实现自动化。
5. Hbase:bulk load
Hbase的数据加载方式也有很多种,但最高效的方式是使用Hbase的bulk load命令。Hbase的bulk load命令分为两步:
1)使用一个MapReduce作业将数据转换为Hbase的内部数据格式。
2)将生成的StoreFiles直接加载到Hbase集群中。
代码清单2-35展示了MapReduce生成HFile的过程,这个过程仅包含Map,不需要Reduce。
代码清单 2-35
public class HFileGenerator {
public static class HFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 获取RowKey
ImmutableBytesWritable rowkey = new
ImmutableBytesWritable(items[0].getBytes());
//此处添加处理输入数据文件的代码……
//按照org.apache.hadoop.hbase.KeyValue的格式输出
KeyValue kv = new KeyValue(Bytes.toBytes(items[0]),
Bytes.toBytes("item"), Bytes.toBytes(column),System.currentTimeMillis(),
Bytes.toBytes(prefValue));
context.write(rowkey, kv);
}
}
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] dfsArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "HFile bulk load Job");
job.setJarByClass(HFileGenerator.class);
job.setMapperClass(HFileMapper.class);
job.setReducerClass(KeyValueSortReducer.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
FileInputFormat.addInputPath(job, new Path(dfsArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(dfsArgs[1]));
HFileOutputFormat.configureIncrementalLoad(job, hbaseTableName);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
代码主要包含HFileMapper和一个main()函数,在HFileMapper的map方法中处理输入数据文件,并按照org.apache.hadoop.hbase.KeyValue的格式输出。
在main()函数中, FileInputFormat.addInputPath用于指定输入文件的路径(HDFS路径),FileOutputFormat.setOutputPath则用于指定MapReduce作业的输出路径,即生成的HFile最终的存放路径。
最后通过HFileOutputFormat.configureIncrementalLoad(job, hbaseTableName)导入hbase表的相关信息,从而使得Map的输出最终与hbaseTableName相匹配。
上述过程在指定的输出路径中生成HFile文件,可以通过org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles的doBulkLoad方法将其挂载到对应的hbase表中,代码清单2-36展示了这个过程。
代码清单 2-36
public class HFileLoader {
public static void main(String[] args) {
String[] dfsArgs = null;
try {
dfsArgs = new
GenericOptionsParser(HbaseUtils.getConfiguration(),
args).getRemainingArgs();
LoadIncrementalHFiles loader = new
LoadIncrementalHFiles(HbaseUtils.getConfiguration());
loader.doBulkLoad(new Path(dfsArgs[0]),
HbaseUtils.getTable(hbaseTableName));
} catch (Exception e) {
e.printStackTrace();
}
}
}
这个过程非常简单,只需要指明已经生成的HFile的路径,其他的工作交给doBulkLoad方法即可。加载非常快,因为它实际上是执行类似hadoop的mv操作。
bulk load主要的耗时阶段在于生成HFile,而在加载阶段则非常迅速。这种方式比较适用于往Hbase空表中加载数据的情况,因此当对Hbase进行全量更新时,这是首选方式。
但这种方式在增量加载时,就没有那么高效了,因为新的HFile的加入,会触发Hbase的split和rebalance操作,这会使doBulkLoad的过程非常缓慢。所以在对Hbase进行批量加载的时候,应该尽量使用全量更新的方式,如果增量更新不可避免,则使用原生的API接口逐条put入库将是最后的选择。