开发者社区> 问答> 正文

在Apach Flink中为每个已处理的输入文件生成单个输出文件

我正在使用Scala和Apache Flink构建一个ETL,它定期读取本地文件系统中目录下的所有文件,并将每个文件的处理结果写入另一个目录下的单个输出文件中。

所以这方面的一个例子是:

/dir/to/input/files/file1
/dir/to/intput/files/fil2
/dir/to/input/files/file3
并且ETL的输出将完全符合:

/dir/to/output/files/file1
/dir/to/output/files/file2
/dir/to/output/files/file3
我尝试了各种方法,包括在写入dataSink时将并行处理减少到一个,但我仍然无法达到所需的结果。

这是我目前的代码:

val path = "/path/to/input/files/"
val format = new TextInputFormat(new Path(path))
val socketStream = env.readFile(format, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 10)

val wordsStream = socketStream.flatMap(value => value.split(",")).map(value => WordWithCount(value,1))

val keyValuePair = wordsStream.keyBy(_.word)

val countPair = keyValuePair.sum("count")

countPair.print()

countPair.writeAsText("/path/to/output/directory/"+

 DateTime.now().getHourOfDay.toString
 +
 DateTime.now().getMinuteOfHour.toString
 +
 DateTime.now().getSecondOfMinute.toString
 , FileSystem.WriteMode.NO_OVERWRITE)

// The first write method I trid:

val sink = new BucketingSinkWordWithCount
sink.setBucketer(new DateTimeBucketerWordWithCount)

// The second write method I trid:

val sink3 = new BucketingSinkWordWithCount
sink3.setUseTruncate(false)
sink3.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
sink3.setWriter(new StringWriter[WordWithCount])
sink3.setBatchSize(3)
sink3.setPendingPrefix("file-")
sink3.setPendingSuffix(".txt")
两种写入方法都无法产生想要的结果。

有一些有Apache Flink经验的人可以指导我写入方法吗。

展开
收起
flink小助手 2018-12-10 11:32:09 5416 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    我解决了导入下一个依赖项以在本地计算机上运行的问题:

    Hadoop的AWS-2.7.3.jar
    AWS-Java的SDK-s3-1.11.183.jar
    AWS-Java的SDK-核心1.11.183.jar
    AWS-Java的SDK-公里,1.11.183.jar
    杰克逊的注解 - 2.6.7.jar
    杰克逊核心2.6.7.jar
    杰克逊 - 数据绑定 - 2.6.7.jar
    乔达时间 - 2.8.1.jar
    的HttpCore-4.4.4.jar
    HttpClient的-4.5.3.jar
    你可以查看:

    https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html

    “提供S3文件系统依赖性”部分

    2019-07-17 23:19:08
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载