开发者社区> 问答> 正文

Apache Flink:如何处理Kafka接收器中的异常?

我有一个Flink工作,将数据写入Kafka。Kafka主题的最大邮件大小设置为5 MB,因此如果我尝试写入任何大于5 MB的记录,它会抛出以下异常并将作业关闭。

java.lang.Exception: Failed to send data to Kafka: The request included a message larger than the max message size the server will accept.
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:350)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
现在我已在我的工作中配置了检查点,因此如果作业失败,它将再次重新启动。问题是,每次重新启动时,它都会因同一记录而失败,并进入无限循环的故障并重新启动。有没有办法在我的代码中处理这个Kafka异常,这样它就不会导致整个工作失败?

展开
收起
flink小助手 2018-12-10 13:09:18 6386 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    也许您可以在Kafka接收器前面引入一个过滤器来检测并过滤掉过大的记录。有点hacky,但它可能很容易。否则,我会考虑扩展FlinkKafkaProducer010以便能够处理异常。

    2019-07-17 23:19:11
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载

相关镜像