开发者社区> 问答> 正文

以超过1的并行度运行Flink

"

我从2个来源读取消息,根据公共密钥进行连接并将其全部下载到kafka。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
...
source1
.keyBy(_.searchId)
.connect(source2.keyBy(_.searchId))
.process(new SearchResultsJoinFunction)
.addSink(KafkaSink.sink)
所以当我在本地启动它时它完全有效,它也可以在Parallelism设置为1的集群上工作,但是不再有3。

当我将它部署到1个作业管理器和3个任务管理器并使每个任务处于“RUNNING”状态时,在2分钟后(当没有任何东西正在下沉时),其中一个任务管理器获得以下日志:https:
//gist.github.com / zavalit / 1b1bf6621bed2a3848a05c1ef84c689c#文件gistfile1-TXT-L108

整个事件就这样关闭了。"

展开
收起
flink小助手 2018-11-28 15:54:09 3008 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    "问题似乎是这个任务管理器--flink-taskmanager-12-2qvcd(10.81.53.209) - 无法与至少一个其他任务管理器交谈,即flink-taskmanager-12-57jzd(10.81。 40.124:46240)。这就是为什么这份工作从未真正开始运作的原因。

    我会检查其他任务管理器的日志以查看它的内容,我还会检查你的网络配置。也许防火墙正在阻碍?
    "

    2019-07-17 23:16:47
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载