开发者社区> 问答> 正文

使用AvroParquetWriter将flink接收到parquet文件不会将数据写入文件

我正在尝试使用AvroParquetWriter将parquet文件写为接收器。文件已创建但长度为0(未写入数据)。无法清楚问题在哪里?

import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ParquetFileWriter, ParquetWriter}
import org.apache.parquet.hadoop.metadata.CompressionCodecName

import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"/tmp/test-$now.parquet")
val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
val schema: Schema = new Schema.Parser().parse(schemaString)
val compressionCodecName = CompressionCodecName.SNAPPY
val config = ParquetWriterConfig()
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", "test_b")
genericReocrd.put("code", "NoError")
genericReocrd.put("ts", 100L)
val stream = env.fromElements(genericReocrd)
val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builderGenericRecord

.withSchema(schema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()

writer.write(genericReocrd)
stream.addSink{r =>

writer.write(r)

}
env.execute()

展开
收起
flink小助手 2018-12-11 16:48:07 10119 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    问题是你没有关闭ParquetWriter。这是将待处理元素刷新到磁盘所必需的。你可以解决定义自己的问题RichSinkFunction,你关闭ParquetWriter的close方法:

    class ParquetWriterSink(val path: String, val schema: String, val compressionCodecName: CompressionCodecName, val config: ParquetWriterConfig) extends RichSinkFunction[GenericRecord] {
    var parquetWriter: ParquetWriter[GenericRecord] = null

    override def open(parameters: Configuration): Unit = {

    parquetWriter = AvroParquetWriter.builder[GenericRecord](new Path(path))
      .withSchema(new Schema.Parser().parse(schema))
      .withCompressionCodec(compressionCodecName)
      .withPageSize(config.pageSize)
      .withRowGroupSize(config.blockSize)
      .withDictionaryEncoding(config.enableDictionary)
      .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
      .withValidation(config.validating)
      .build()

    }

    override def close(): Unit = {

    parquetWriter.close()

    }

    override def invoke(value: GenericRecord, context: SinkFunction.Context[_]): Unit = {

    parquetWriter.write(value)

    }
    }

    2019-07-17 23:19:54
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载