Flink落HDFS数据按事件时间分区解决方案

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 0x1 摘要Hive离线数仓中为了查询分析方便,几乎所有表都会划分分区,最为常见的是按天分区,Flink通过以下配置把数据写入HDFS,BucketingSink<Object> sink = new BucketingSink<>(path);//通过这样的方式来实现数据跨天分区sink.

0x1 摘要

Hive离线数仓中为了查询分析方便,几乎所有表都会划分分区,最为常见的是按天分区,Flink通过以下配置把数据写入HDFS,

BucketingSink<Object> sink = new BucketingSink<>(path);
//通过这样的方式来实现数据跨天分区
sink.setBucketer(new DateTimeBucketer<>("yyyy/MM/dd"));
sink.setWriter(new StringWriter<>());
sink.setBatchSize(1024 * 1024 * 256L);
sink.setBatchRolloverInterval(30 * 60 * 1000L);
sink.setInactiveBucketThreshold(3 * 60 * 1000L);
sink.setInactiveBucketCheckInterval(30 * 1000L);
sink.setInProgressSuffix(".in-progress");
sink.setPendingSuffix(".pending");

0x2 问题点

如果要做到数据完全正确的落到相应分区,那必须用eventTime来划分,我们先来看看DateTimeBucketer桶实现代码,

public class DateTimeBucketer<T> implements Bucketer<T> {
 private static final long serialVersionUID = 1L;
 private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
 private final String formatString;
 private final ZoneId zoneId;
 private transient DateTimeFormatter dateTimeFormatter;

 /**
  * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"} using JVM's default timezone.
  */
 public DateTimeBucketer() {
  this(DEFAULT_FORMAT_STRING);
 }

 /**
  * Creates a new {@code DateTimeBucketer} with the given date/time format string using JVM's default timezone.
  *
  * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine
  * the bucket path.
  */
 public DateTimeBucketer(String formatString) {
  this(formatString, ZoneId.systemDefault());
 }

 /**
  * Creates a new {@code DateTimeBucketer} with the given date/time format string using the given timezone.
  *
  * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine
  * the bucket path.
  * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket path.
  */
 public DateTimeBucketer(String formatString, ZoneId zoneId) {
  this.formatString = Preconditions.checkNotNull(formatString);
  this.zoneId = Preconditions.checkNotNull(zoneId);

  this.dateTimeFormatter = DateTimeFormatter.ofPattern(this.formatString).withZone(zoneId);
 }

 @Override
 public Path getBucketPath(Clock clock, Path basePath, T element) {
  //分桶关键代码在这里,通过clock获取当前时间戳后格式
  String newDateTimeString = dateTimeFormatter.format(Instant.ofEpochMilli(clock.currentTimeMillis()));
  return new Path(basePath + "/" + newDateTimeString);
 }
}

以上代码clock实例是在BucketingSink#open方法中实例化,代码如下:

this.clock = new Clock() {
 @Override
 public long currentTimeMillis() {
  //直接返回当前处理时间
  return processingTimeService.getCurrentProcessingTime();
 }
};

结合以上源码分析发现,使用DateTimeBucketer分桶是采用当前处理时间,采用当前处理时间必然会跟事件事件存在差异,因此会导致数据跨分区落入HDFS文件,举个例子,假设有一条数据事件时间是2019-09-29 23:59:58,那这条数据应该落在2019/09/29分区,但由于这条数据延迟了3秒过来,当处理过来时当前处理时间已经是2019-09-30 00:00:01,所以这条数据会被落到2019/09/30分区,针对一些重要场景数据这样的结果是不可接受的。

0x3 解决方案

从以上第二节源码分析可以看出,解决问题的核心在getBucketPath方法中时间的获取,只要把这里的时间改为事件即可,而正好这个方法的第三参数就是element,代表每一条记录,只要记录中有事件时间就可以获取。既然现有的实现源码不好改,那我们可以自己基于Bucketer接口实现一个EventTimeBucketer分桶器,实现源码如下:

public class EventTimeBucketer implements Bucketer<BaseCountVO> {
    private static final String DEFAULT_FORMAT_STRING = "yyyy/MM/dd";

    private final String formatString;

    private final ZoneId zoneId;
    private transient DateTimeFormatter dateTimeFormatter;

    public EventTimeBucketer() {
        this(DEFAULT_FORMAT_STRING);
    }

    public EventTimeBucketer(String formatString) {
        this(formatString, ZoneId.systemDefault());
    }

    public EventTimeBucketer(ZoneId zoneId) {
        this(DEFAULT_FORMAT_STRING, zoneId);
    }

    public EventTimeBucketer(String formatString, ZoneId zoneId) {
        this.formatString = formatString;
        this.zoneId = zoneId;
        this.dateTimeFormatter = DateTimeFormatter.ofPattern(this.formatString).withZone(this.zoneId);
    }

    //记住,这个方法一定要加,否则dateTimeFormatter对象会是空,此方法会在反序列的时候调用,这样才能正确初始化dateTimeFormatter对象
    //那有的人问了,上面构造函数不是初始化了吗?反序列化的时候是不走构造函数的
    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();

        this.dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
    }

    @Override
    public Path getBucketPath(Clock clock, Path basePath, BaseCountVO element) {
        String newDateTimeString = dateTimeFormatter.format(Instant.ofEpochMilli(element.getTimestamp()));
        return new Path(basePath + "/" + newDateTimeString);
    }
}

大家实际项目中可以把BaseCountVO改成自己的实体类即可,使用的时候只要换一下setBucketer值,代码如下:

BucketingSink<Object> sink = new BucketingSink<>(path);
//通过这样的方式来实现数据跨天分区
sink.setBucketer(new EventTimeBucketer<>("yyyy/MM/dd"));
sink.setWriter(new StringWriter<>());
sink.setBatchSize(1024 * 1024 * 256L);
sink.setBatchRolloverInterval(30 * 60 * 1000L);
sink.setInactiveBucketThreshold(3 * 60 * 1000L);
sink.setInactiveBucketCheckInterval(30 * 1000L);
sink.setInProgressSuffix(".in-progress");
sink.setPendingSuffix(".pending");
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
消息中间件 关系型数据库 Kafka
flink cdc 数据问题之数据丢失如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
29天前
|
API 数据库 流计算
有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
【2月更文挑战第27天】有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
46 3
|
30天前
|
Oracle 关系型数据库 MySQL
Flink CDC产品常见问题之flink Oraclecdc 捕获19C数据时报错错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
30天前
|
分布式计算 Hadoop Java
Flink CDC产品常见问题之tidb cdc 数据量大了就疯狂报空指针如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
30天前
|
资源调度 关系型数据库 测试技术
Flink CDC产品常见问题之没有报错但是一直监听不到数据如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
4月前
|
存储 分布式计算 Hadoop
Hadoop系列HDFS详解
Hadoop系列HDFS详解
39 0
|
4月前
|
分布式计算 Java 大数据
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
146 0
|
5月前
|
存储 分布式计算 Hadoop
aws s3常用命令、hdfs dfs/hadoop fs常用命令
aws s3常用命令、hdfs dfs/hadoop fs常用命令
388 0
|
4月前
|
存储 分布式计算 Hadoop
【大数据技术Hadoop+Spark】HDFS概念、架构、原理、优缺点讲解(超详细必看)
【大数据技术Hadoop+Spark】HDFS概念、架构、原理、优缺点讲解(超详细必看)
98 0
|
2天前
|
分布式计算 Hadoop 测试技术
Hadoop【基础知识 05】【HDFS的JavaAPI】(集成及测试)
【4月更文挑战第5天】Hadoop【基础知识 05】【HDFS的JavaAPI】(集成及测试)
26 8