用Fluent实现MySQL到ODPS数据集成

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: Fluentd是一个日志收集系统,可以定制输入或输出到常用系统(如MongoDB, RDS, Mysql, HDFS)或直接用于Log收集。 DHS(Datahub)向开发人员提供Fluentd插件,可以通过Fluentd将其它系统数据利用DHS导入到ODPS中。 以下操作均在root 用户下进

安装ruby

首先通过 /etc/issue 命令查看当前使用centos是哪个版本:

[hadoop@hadoop03 ~]$   cat /etc/issue

025f6311f124dd610e452335c4feecf6f30cda17

由于centos版本是6.6,安装ruby时就要选择在centos 6.X环境,具体安装步骤参考如下所示即可!

yum install gcc-c++ patch readline readline-devel zlib zlib-devel libyaml-devel libffi-devel openssl-devel make bzip2 autoconf automake libtool bison iconv-devel wget tar

cd ~/
wget https://ruby.taobao.org/mirrors/ruby/ruby-2.2.3.tar.gz

tar xvf ruby-2.2.3.tar.gz
cd ruby-2.2.3
./configure

makemake install

查看验证

[root@hadoop02  ~]#    ruby -v
  ruby 2.2.3p173 (2015-08-18 revision 51636) [x86_64-linux]

012a6969261bcea10489f2c9212f84f21e3fc68f

安装fluent-plugin-sql插件(输入源)

[root@hadoop03 ~]#   gem install fluent-plugin-sql
64a08fad70766b661e4505903ec8e023b1b45128

准备MySQL表及数据

在test数据库创建一张表,建表语句如下:
use test; 进入test数据库里操作
create table test_fluent
(
  id int unsigned not null  auto_increment,
  sex varchar(1),
  name varchar(225),
  primary key(id)
)engine=innodb default charset=utf8 auto_increment=1;
其中id是主键,自增

插入数据,插入语句如下命令:
insert into test_fluent(sex,name) values('f','dongdong');
insert into test_fluent(sex,name) values('m','heihei');
insert into test_fluent(sex,name) values('m','qingsong');
insert into test_fluent(sex,name) values('f','jiafu');
insert into test_fluent(sex,name) values('m','angrybaby');
insert into test_fluent(sex,name) values('f','jack');
insert into test_fluent(sex,name) values('f','helloword');
insert into test_fluent(sex,name) values('m','sunlongfei');
insert into test_fluent(sex,name) values('m','donglang');
insert into test_fluent(sex,name) values('f','deguang');
insert into test_fluent(sex,name) values('m','yuanijng');
insert into test_fluent(sex,name) values('f','yangqun');

备注:
由于我操作的MySQL数据库位于172.16.1.156机器上,用户名是dong,密码是123456
而我安装的fluent位于172.16.1.158机器上,不在一台机器上,如果要从158机器远程访问156机器上MySQL会受限,禁止访问。
为此,需要在156机器上执行以下命令给指定IP授权。
授权158机器上的dong用户可以远程访问156上MySQL,dong用户登录密码是123456
grant ALL PRIVILEGES ON *.* to dong@"172.16.1.158" identified by "123456" WITH GRANT OPTION; 
使刚授予权限立即生效
flush privileges;

准备ODPS测试表

创建ODPS 表为 demo_access_log,其建表语句为:
drop table if exists demo_access_log;
create table demo_access_log(
sex string, 
name string) 
into 5 shards hublifecycle 7;
编辑fluent.conf配置文件

编辑fluent.conf配置文件

配置mysql输入源、ODPS输出源:

state_file  /var/run/fluentd/sql_state  配置项 (path to a file to store last rows该文件默认不存在,需要提前创建好!)

state_file stores last selected rows to a file (named state_file) to not forget last row when Fluentd restarts.

[root@hadoop03 ~]#  vi /etc/fluent/fluent.conf    --编辑fluent.conf配置文件

 
        
<source>
  @type sql
  host 172.16.1.156
  port 3306
  database test
  adapter mysql
  username dong
  password 123456
  select_interval 10s
  select_limit 10
  state_file /var/run/fluentd/sql_state
  <table>
    table test_fluent
     tag in.sql
    update_column id
  </table>
</source>

<match  in.**>
  type aliyun_odps
  aliyun_access_id UQV2yoSSWNgquhhe
  aliyun_access_key bG8xSLwhmKYRmtBoE3HbhOBYXvknG6
  aliyun_odps_endpoint  http://service.odps.aliyun.com/api
  aliyun_odps_hub_endpoint  http://dh.odps.aliyun.com
  buffer_chunk_limit 2m
  buffer_queue_limit 128
  flush_interval 5s
  project dtstack_dev
  <table  in.sql>
    table demo_access_log
    fields sex,name
    shard_number 5
  </table>
</match>

启动fluent

fluentd启动时会自动加载/etc/fluent/fluent.conf中读取fluent.conf配置文件
fluentd  --启动命令
大概 5 分钟后,实时导入数据会被同步到离线表,可以使用 select count(*) from demo_access_log这样sql 语句进行验证。

如果安装Fluentd 用的是Ruby Gem,可以创建一个配置文件运行下面命令。发出一个终止信号将会重新安装配置文件。(如果修改了配置文件—fluent.conf 文件,ctrl c 终止进程,然后在配置文件下重新启动)

ctrl c 

fluentd -c fluent.conf

如果有类似如下输出,就可以说明数据实时写入Datahub服务已经成功。
30b7a6d16985950389ab9e0989c8815653bb3927

运行过程遇到异常及排查

(1) 异常描述:fluent.conf文件没有配置正确
0f23223c004226c6e901b2f4e83e9693c8405e0f
异常产生原因:输入端没有配置 tag,输出端table上也没有制定对应tag。输入tag,在输出match时要能
匹配上,在输出table 要能对应上才行。
解决方法:在mysql输入源上添加上tag标签,即 tag in.sql

(2)在fluent.conf配置正确基础上运行fluentd启动命令,又报以下异常:
/usr/local/lib/ruby/gems/2.2.0/gems/activerecord- 4.2.6/lib/active_record/connection_adapters/connection_specification.rb:177:in `rescue in spec': Specified 'mysql' for database adapter, but the gem is not loaded. Add `gem 'mysql'` to your Gemfile (and ensure its version is at the minimum required by ActiveRecord). (Gem::LoadError)
这个问题是mysql插件需要用到mysql adapter适配器,需要安装mysql adapter适配器,执行以下命令:
[root@hadoop03 fluent]#   yum install mysql-devel
[root@hadoop03 fluent]#   gem install mysql
Building native extensions.  This could take a while...
Successfully installed mysql-2.9.1
Parsing documentation for mysql-2.9.1
Installing ri documentation for mysql-2.9.1
Done installing documentation for mysql after 2 seconds
1 gem installed

连接数据库适配器路径:
mysql2_adapter.rb、mysql_adapter.rb、 postgresql_adapter.rb
/usr/local/lib/ruby/gems/2.2.0/gems/activerecord-4.2.6/lib/active_record/connection_adapters目录下

gem安装插件时遇到异常及排查

ERROR:  While executing gem ... (Gem::RemoteFetcher::FetchError)
     Errno::ECONNRESET: Connection reset by peer - SSL_connect (https://api.rubygems.org/quick/Marshal.4.8/cool.io-1.4.4.gemspec.rz)
87e96aeca2f6d3af42fc917d1aa087dbbb056add
异常产生原因:
由于gem源引起,需要加上淘宝源后要把原来那个rubygems那个删掉 
解决方法:
# 删除默认的官方源 
gem sources -r https://rubygems.org/
# 添加淘宝源 
gem sources -a https://ruby.taobao.org/
具体解决截图展示如下:
a8c4b9469fe22a92ee1de6a1fb3269c44b251ba3

                                         第二种异常
[hadoop@hadoop03 ~]$   gem install fluent-plugin-aliyun-odps
ERROR:  While executing gem ... (Gem::FilePermissionError)
    You don't have write permissions for the /usr/local/lib/ruby/gems/2.2.0 directory.
异常产生原因:
没有写入执行权限
解决方法:
切换到root用户下进行gem install操作
具体解决截图展示如下:
14a82b299bb6e2b6bb66aff2fccb5b47565b848a
再次查看已安装插件:
66f3e1ee0d15b7230288cd71ae86cca55f46099c

相关文章
|
2月前
|
分布式计算 DataWorks 关系型数据库
DataWorks支持将ODPS表拆分并回流到MySQL的多个库和表中
【2月更文挑战第14天】DataWorks支持将ODPS表拆分并回流到MySQL的多个库和表中
59 8
|
3月前
|
分布式计算 DataWorks 关系型数据库
DataWorks支持将ODPS表拆分并回流到MySQL的多个库和表中
DataWorks支持将ODPS表拆分并回流到MySQL的多个库和表中
37 4
|
4月前
|
关系型数据库 MySQL Serverless
高顿教育:大数据抽数分析业务引入polardb mysql serverless
高顿教育通过使用polardb serverless形态进行数据汇总,然后统一进行数据同步到数仓,业务有明显高低峰期,灵活的弹性伸缩能力,大大降低了客户使用成本。
|
4月前
|
存储 关系型数据库 MySQL
Mysql 存储大数据量问题
Mysql 存储大数据量问题
96 1
|
1天前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之在DataWorks中,使用JSON解析函数将MySQL表中的字段解析成多个字段将这些字段写入到ODPS(MaxCompute)中如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
8 3
|
1天前
|
存储 分布式计算 DataWorks
MaxCompute产品使用合集之大数据计算MaxCompute dataworks可以批量修改数据集成任务的数据源配置信息吗
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2月前
|
存储 SQL 关系型数据库
【MySQL 数据库】6、一篇文章学习【索引知识】,提高大数据量的查询效率【文末送书】
【MySQL 数据库】6、一篇文章学习【索引知识】,提高大数据量的查询效率【文末送书】
59 0
|
3月前
|
SQL 分布式计算 关系型数据库
Dataphin实现MaxCompute外表数据快速批量同步至ADB MySQL
当前大数据时代背景下,企业对数据的处理、分析和实时应用的需求日益增强。阿里云MaxCompute广泛应用于海量数据的ETL、数据分析等场景,但在将处理后的数据进一步同步至在线数据库系统,如ADB MySQL 3.0(阿里云自研的新一代云原生关系型数据库MySQL版)以支持实时查询、业务决策等需求时,可能会遇到数据迁移速度缓慢的问题。 DataphinV3.14版本支持外表导入SQL的带参调度,实现通过MaxCompute外表的方式将数据批量同步至ADB MySQL 3.0中,显著提升数据迁移的速度和效率。
287 1
|
4月前
|
DataWorks 关系型数据库 MySQL
DataWorks的数据集成实时同步mysql数据吗?
DataWorks的数据集成实时同步mysql数据吗?
120 0
|
5月前
|
关系型数据库 MySQL 大数据
程序员小sister的烦恼_快速上手大数据ETL神器Kettle(xls导入mysql)
程序员小sister的烦恼_快速上手大数据ETL神器Kettle(xls导入mysql)
55 0