ODPS功能介绍之数据导入

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介:

在使用ODPS强大的数据处理能力之前,大家最关心的是自己的数据如何导入到ODPS中。 下面介绍一款向ODPS导入数据的工具-Fluentd。

Fluentd是一个开源的软件,用来收集各种源头日志(包括Application Log、Sys Log及Access Log),允许用户选择插件对日志数据进行过滤、并存储到不同的数据处理端(包括MySQL、Oracle、MongoDB、Hadoop、Treasure Data、AWS Services、Google Services以及ODPS等)。Fluentd以小巧灵活而著称,允许用户自定义数据源、过滤处理及目标端等插件,目前在这款软件中已经有300+个插件运行Fluentd的架构上,而且这些插件全部是开源的。 ODPS也在这款软件上开源了数据导入插件。

环境准备

使用这款软件,向ODPS导入数据,需要具备如下环境:

Ruby 2.1.0 或更新

Gem 2.4.5 或更新

Fluentd-0.10.49 或从Fluentd 官网http://www.fluentd.org/ 查找最新,Fluentd为不同的OS提供了不同的版本,详见http://docs.fluentd.org/articles/quickstart

Protobuf-3.5.1 或更新(Ruby protobuf)

安装导入插件

接下来可以通过以下两种方式中的任意一种来安装ODPS Fluentd 导入插件。

方式一:通过ruby gem安装:

$ gem install fluent-plugin-aliyun-odps

ODPS已经将这个插件发布到GEM库中, 名称为 fluent-plugin-aliyun-odps,只需要通过gem install 命令来安装即可(大家在使用gem 时在国内可能会遇到gem库无法访问,可以在网上搜一下更改gem 库源来解决)。

方式二:通过插件源码安装:

$ gem install protobuf

$ gem install fluentd –no-ri –no-rdoc

$ git clone https://github.com/aliyun/aliyun-odps-fluentd-plugin.git

$ cp aliyun-odps-fluentd-plugin/lib/fluent/plugin/* {YOUR_FLUENTD_DIRECTORY}/lib/fluent/plugin/ -r

其中第二条命令是安装fluentd,如果已经安装可以省略。 ODPS Fluentd插件源码在github上,clone下来之后直接放到Fluentd的plugin目录中即可。

插件的使用

使用Fluentd导入数据时,最主要的是配置Fluentd的conf文件,更多conf文件 的介绍请参见: http://docs.fluentd.org/articles/config-file

示例一:导入Nginx日志 。Conf中source的配置如下:

<source>

type tail

path /opt/log/in/in.log

pos_file /opt/log/in/in.log.pos

refresh_interval 5s

tag in.log

format /^(?<remote>[^ ]*) – - \[(?<datetime>[^\]]*)\] “(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?” (?<code>[^ ]*) (?<size>[^ ]*) “-” “(?<agent>[^\"]*)”$/

time_format %Y%b%d %H:%M:%S %z

</source>

fluentd 以tail方式监控指定的文件内容是否有变化,更多的tail配置参见:http://docs.fluentd.org/articles/in_tail

match 配置如下:

<match in.**>

type aliyun_odps

aliyun_access_id ************

aliyun_access_key *********

aliyun_odps_endpoint http://service.odps.aliyun.com/api

aliyun_odps_hub_endpoint http://dh.odps.aliyun.com

buffer_chunk_limit 2m

buffer_queue_limit 128

flush_interval 5s

project projectforlog

<table in.log>

table nginx_log

fields remote,method,path,code,size,agent

partition ctime=${datetime.strftime(‘%Y%m%d’)}

time_format %d/%b/%Y:%H:%M:%S %z

</table>

</match>

数据会导入到projectforlog project的nginx_log表中,其中会以源中的datetime字段作为分区,插件遇到不同的值时会自动创建分区;

示例二:导入MySqL中的数据。导入MySQL中数据时,需要安装fluent-plugin-sql插件作为source:

$ gem install fluent-plugin-sql

配置conf中的source:

<source>

type sql

host 127.0.0.1

database test

adapter mysql

username xxxx

password xxxx

select_interval 10s

select_limit 100

state_file /path/sql_state

<table>

table test_table

tag in.sql

update_column id

</table>

</source>

这个例子是从test_table中SELECT数据,每间隔10s去读取100条数据出来,SELECT 时将ID列作为主键(id字段是自增型)。关于fluent-plugin-sql的更多说明参见:https://github.com/fluent/fluent-plugin-sql

match 配置如下:

<match in.**>

type aliyun_odps

aliyun_access_id ************

aliyun_access_key *********

aliyun_odps_endpoint http://service.odps.aliyun.com/api

aliyun_odps_hub_endpoint http://dh.odps.aliyun.com

buffer_chunk_limit 2m

buffer_queue_limit 128

flush_interval 5s

project your_projectforlog

<table in.log>

table mysql_data

fields id,field1,field2,fields3

</table>

</match>

数据会导出到ODPS projectforlog project的mysql_data表中,导入的字段包括id,field1,field2,field3。

关于导入表的说明

通过Fluentd导入数据是走的ODPS实时数据流入通道-Datahub,这个通道需要一个特殊的ODPS表,这个表在创建时需要指定为Hub Table。创建表时可以使用如下语名:

CREATE TABLE <table_name) (field_name type,…) PARTITIONED BY (pt_name type) INTO <n1> SHARDS HUBLIFECYCLE <n2>;

其中:n1 是指shards数量,有效值为1-20。在导入数据时,每个shard的流入量是10M/秒。N2是指数据在Datahub上的保留期,有效值1-7,主要用于流计算场景中使用历史数据。 例如:

create table access_log(f1 string, f2 string,f3 string,f4 string,f5 string,f6 string, f7 string) partitioned by(ctime string) into 5 shards hublifecycle 7;

如果向已经存在的表导入数据,也需要将表修改为HUB表, 其命令为:

ALTER TABLE table_name ENABLE HUTTABLE with <n1> SHARDS HUBLIFECYCLE <n2>;

插件参数说明

向ODPS导入数据,需要将ODPS插件配置在conf文件中match项中。插件支持的参数说明如下:

type(Fixed): 固定值 aliyun_odps.

aliyun_access_id(Required):云账号access_id.

aliyun_access_key(Required):云账号access key.

aliyun_odps_hub_endpoint(Required):如果你的服务部署在ESC上,请把本值设定为 http://dh-ext.odps.aliyun-inc.com, 否则设置为http://dh.odps.aliyun.com.

aliyunodps_endpoint(Required):如果你的服务部署在ESC上,请把本值设定为 http://odps-ext.aiyun-inc.com/api, 否则设置为http://service.odps.aliyun.com/api .

buffer_chunk_limit(Optional): 块大小,支持“k”(KB),“m”(MB),“g”(GB)单位,默认 8MB,建议值2MB.

buffer_queue_limit(Optional): 块队列大小,此值与buffer_chunk_limit共同决定整个缓冲区大小。

flush_interval(Optional): 强制发送间隔,达到时间后块数据未满则强制发送, 默认 60s.

project(Required): project名称.

table(Required): table名称.

fields(Required): 与source对应,字段名必须存在于source之中.

partition(Optional):若为分区表,则设置此项.

分区名支持的设置模式:

固定值: partition ctime=20150804

关键字: partition ctime=${remote} (其中remote为source中某字段)

时间格式关键字: partition ctime=${datetime.strftime(‘%Y%m%d’)} (其中datetime为source中某时间格式字段,输出为%Y%m%d格式作为分区名称)

time_format(Optional):如果使用时间格式关键字为<partition>, 请设置本参数. 例如: source[datetime]=”29/Aug/2015:11:10:16 +0800″,则设置<time_format>为”%d/%b/%Y:%H:%M:%S %z”

 

更多

除了使用Fluentd可以导入数据外,ODPS还支持通过Flume导入数据。Flume是Apache的一款开源软件,ODPS团队基于Flume开源了导入插件源代码,感兴趣的朋友可以参见https://github.com/aliyun/aliyun-odps-flume-plugin 了解更多细节。

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
1月前
|
SQL DataWorks 关系型数据库
DataWorks报错问题之dataX数据导入报错如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
DataWorks报错问题之dataX数据导入报错如何解决
|
1月前
|
消息中间件 分布式计算 DataWorks
DataWorks常见问题之kafka数据导入datahub失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
分布式计算 关系型数据库 大数据
【大数据技巧】数据导入到MaxCompute的技巧汇总
用Fluent实现MySQL到ODPS数据集成 ---可以通过Fluentd将其它系统数据利用DHS导入到ODPS中 海量数据计算应该如何选择数据库 ---MaxCompute向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问
4517 0
|
分布式计算 大数据 Java
Flume数据导入ODPS方法
Apache Flume是一个分布式的、可靠的、可用的系统,可用于从不同的数据源中高效地收集、聚合和移动海量日志数据到集中式数据存储系统。 ODPS Sink是基于ODPS DataHub Service开发的Flume插件,可以将Flume的Event数据导入到ODPS中。插件兼容Flume的原
4615 0
|
2月前
|
分布式计算 DataWorks IDE
MaxCompute数据问题之忽略脏数据如何解决
MaxCompute数据包含存储在MaxCompute服务中的表、分区以及其他数据结构;本合集将提供MaxCompute数据的管理和优化指南,以及数据操作中的常见问题和解决策略。
46 0
|
2月前
|
SQL 存储 分布式计算
MaxCompute问题之下载数据如何解决
MaxCompute数据包含存储在MaxCompute服务中的表、分区以及其他数据结构;本合集将提供MaxCompute数据的管理和优化指南,以及数据操作中的常见问题和解决策略。
37 0
|
2月前
|
分布式计算 关系型数据库 MySQL
MaxCompute问题之数据归属分区如何解决
MaxCompute数据包含存储在MaxCompute服务中的表、分区以及其他数据结构;本合集将提供MaxCompute数据的管理和优化指南,以及数据操作中的常见问题和解决策略。
33 0
|
2月前
|
分布式计算 DataWorks BI
MaxCompute数据问题之运行报错如何解决
MaxCompute数据包含存储在MaxCompute服务中的表、分区以及其他数据结构;本合集将提供MaxCompute数据的管理和优化指南,以及数据操作中的常见问题和解决策略。
38 1
|
2月前
|
分布式计算 关系型数据库 数据库连接
MaxCompute数据问题之数据迁移如何解决
MaxCompute数据包含存储在MaxCompute服务中的表、分区以及其他数据结构;本合集将提供MaxCompute数据的管理和优化指南,以及数据操作中的常见问题和解决策略。
31 0