使用EMR-Flume将非EMR集群的数据同步至EMR集群的HDFS

简介: E-MapReduce从3.20.0版本开始对EMR-Flume新增了Log Service Source。借助Log Service的Logtail等工具,可以将需要同步的数据实时采集并上传到LogHub,再使用EMR-Flume将LogHub的数据同步至EMR集群的HDFS

1.前言

E-MapReduce从3.20.0版本开始对EMR-Flume新增了Log Service Source。借助Log Service的Logtail等工具,可以将需要同步的数据实时采集并上传到LogHub,再使用EMR-Flume将LogHub的数据同步至EMR集群的HDFS。
本文将介绍使用EMR-Flume实时同步Log Service的数据至EMR集群的HDFS,并根据record timestamp将数据存入HDFS相应的partition中。
有关采集数据到Log Service的LogHub的详细方法及步骤参见采集方式

2.准备工作

创建Hadoop集群,在可选软件中选择Flume,详细步骤参考创建集群

3.配置Flume

3.1 配置source

配置项 说明
type org.apache.flume.source.loghub.LogHubSource
endpoint Lohub的endpoint 如果使用vpc/经典网络的endpoint,要保证与emr集群在同一个region;如果使用公网endpoint,要保证运行Flume agent的节点有公网IP
project Lohub的project
logstore Lohub的logstore
accessKeyId Aliyun的access key id
accessKey Aliyun的access key
useRecordTime true
consumerGroup consumer_1 消费组名称,默认值为consumer_1

配置项说明如下

  1. useRecordTime
    默认值为false。如果header中没有timestamp属性,接收event的时间戳会被加入到header中;
    但是在Flume Agent启停或者同步滞后等情况下,会将数据放入错误的时间分区中。为避免这种情况,可以将该值设置为true,使用数据收集到LogHub的时间作为timestamp。
  2. consumerPosition
    消费组在第一次消费LogHub数据时的位置,默认值为end,即从最近的数据开始消费;
    可以设置的其他值为begin或special。begin表示从最早的数据开始消费;special表示从指定的时间点开始消费,在配置为special时,需要配置startTime为开始消费的时间点,单位是秒。
    首次运行后LogHub服务端会记录消费组的消费点,此时如果想更改consumerPosition,可以清除LogHub的消费组状态,参考消费组状态;或者更改配置consumerGroup为新的消费组。
  3. heartbeatInterval和fetchInOrder
    heartbeatInterval表示消费组与服务端维持心跳的间隔,单位是毫秒,默认为30秒;fetchInOrder表示相同key的数据是否按序消费,默认值为false。
  4. batchSize和batchDurationMillis
    通用的source batch配置,表示触发event写入channel的阈值。
  5. backoffSleepIncrement和maxBackoffSleep
    通用的source sleep配置,表示LogHub没有数据时触发sleep的时间和增量。

3.2配置channel和sink

此处使用memory channel和hdfs sink,hdfs sink配置如下

type hdfs
hdfs.path /tmp/flume-data/loghub/datetime=%y%m%d/hour=%H
hdfs.fileType DataStream
hdfs.rollInterval 3600
hdfs.round true
hdfs.roundValue 60
hdfs.roundUnit minute
hdfs.rollSize 0
hdfs.rollCount 0

memory channel配置如下

type memory
capacity 2000
transactionCapacity 2000

4.运行Flume agent

在Console页面启动Flume agent的具体操作参见Flume使用说明。启动后,可以看到配置的HDFS路径下按照record timestamp存储的日志数据。

1

查看Log Service上的消费组状态

2

目录
相关文章
|
6月前
|
存储 监控
63 Flume采集目录到HDFS
63 Flume采集目录到HDFS
35 0
|
9月前
|
存储 SQL 分布式计算
阿里云全托管flink-vvp平台hudi connector实践(基于emr集群oss-hdfs存储)
阿里云全托管flink-vvp平台hudi sink connector实践,本文数据湖hudi基于阿里云E-MapReduce产品,以云对象存储oss-hdfs作为存储
|
2天前
|
存储 缓存 安全
阿里云EMR数据湖文件系统: 面向开源和云打造下一代 HDFS
本文作者详细地介绍了阿里云EMR数据湖文件系统JindoFS的起源、发展迭代以及性能。
|
26天前
|
分布式计算 资源调度 Hadoop
Hadoop【基础知识 03+04】【Hadoop集群资源管理器yarn】(图片来源于网络)(hadoop fs + hadoop dfs + hdfs dfs 使用举例)
【4月更文挑战第5天】Hadoop【基础知识 03】【Hadoop集群资源管理器yarn】(图片来源于网络)Hadoop【基础知识 04】【HDFS常用shell命令】(hadoop fs + hadoop dfs + hdfs dfs 使用举例)
54 9
|
5月前
|
分布式计算 Hadoop 数据安全/隐私保护
HDFS--HA部署安装:修改配置文件 测试集群工作状态的一些指令
HDFS--HA部署安装:修改配置文件 测试集群工作状态的一些指令
45 0
|
5月前
|
分布式计算 Hadoop 大数据
大数据成长之路-- hadoop集群的部署(3)HDFS新增节点
大数据成长之路-- hadoop集群的部署(3)HDFS新增节点
78 0
|
10月前
|
消息中间件 存储 分布式计算
Flume实现Kafka数据持久化存储到HDFS
Flume实现Kafka数据持久化存储到HDFS
435 0
|
6月前
|
消息中间件 存储 负载均衡
Kafka 集群如何实现数据同步?
Kafka 集群如何实现数据同步?
|
6月前
|
监控 Java
64 Flume采集文件到HDFS
64 Flume采集文件到HDFS
31 0
|
8月前
|
机器学习/深度学习 分布式计算 网络协议
HDFS集群滚动升级
HDFS集群滚动升级
95 1