开发者社区> 问答> 正文

Flink SQL:用于连接表的内存不足

我经常更新MySql表。我想为过去20秒内更新的每个id拍摄快照,并将值写入redis。我使用binlog作为流输入,并将数据流转换为Flink表。我运行以下sql。

SELECT id, ts, val
FROM my_tbl
WHERE (id, ts) IN
(
SELECT id, MAX(ts)
FROM my_tbl
GROUP BY TUMBLE(proctime, INTERVAL '20' SECOND), id
)
我知道表连接会使状态大小过大,我将StreamQueryConfig设置如下

qConfig.withIdleStateRetentionTime(Time.seconds(600), Time.seconds(1200));
我运行任务一天,并得到内存不足错误。我怎么解决这个问题?

展开
收起
flink小助手 2018-12-06 18:03:05 3926 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    您也可以使用时间窗口连接而不是具有已配置的空闲状态保留时间的常规连接来解决此问题。

    以下查询应该可以解决问题。

    SELECT id, ts, val
    FROM my_tbl m1,

     (SELECT id, MAX(ts), TUMBLE_PROCTIME(proctime, INTERVAL '20' SECOND) as ptime
      FROM my_tbl
      GROUP BY TUMBLE(proctime, INTERVAL '20' SECOND), id) m2

    WHERE m1.id = m2.id AND m1.ts = m2.ts ANS

      m1.proctime BETWEEN m2.ptime - INTERVAL '25' SECOND AND m2.ptime

    窗口化连接谓词(BETWEEN)确保自动清除状态。由于您使用的处理时间不准确,我增加了5秒的闲置时间。

    2019-07-17 23:18:38
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
SQL Server 2017 立即下载
云服务器ECS内存增强型实例re6全新发布 立即下载