我经常更新MySql表。我想为过去20秒内更新的每个id拍摄快照,并将值写入redis。我使用binlog作为流输入,并将数据流转换为Flink表。我运行以下sql。
SELECT id, ts, val
FROM my_tbl
WHERE (id, ts) IN
(
SELECT id, MAX(ts)
FROM my_tbl
GROUP BY TUMBLE(proctime, INTERVAL '20' SECOND), id
)
我知道表连接会使状态大小过大,我将StreamQueryConfig设置如下
qConfig.withIdleStateRetentionTime(Time.seconds(600), Time.seconds(1200));
我运行任务一天,并得到内存不足错误。我怎么解决这个问题?
您也可以使用时间窗口连接而不是具有已配置的空闲状态保留时间的常规连接来解决此问题。
以下查询应该可以解决问题。
SELECT id, ts, val
FROM my_tbl m1,
(SELECT id, MAX(ts), TUMBLE_PROCTIME(proctime, INTERVAL '20' SECOND) as ptime
FROM my_tbl
GROUP BY TUMBLE(proctime, INTERVAL '20' SECOND), id) m2
WHERE m1.id = m2.id AND m1.ts = m2.ts ANS
m1.proctime BETWEEN m2.ptime - INTERVAL '25' SECOND AND m2.ptime
窗口化连接谓词(BETWEEN)确保自动清除状态。由于您使用的处理时间不准确,我增加了5秒的闲置时间。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。