开发者社区> 问答> 正文

Flink滑动计数窗口行为

假设我们有这样的数据结构:

Tuple2, Integer>
第一个字段是ArrayList长度为1的字段,其中包含一个时间戳,而Integer字段是一个1到40之间的数字channel。目标是使用相同的密钥(channel)聚合每400条消息并应用ReduceFunction它们(它只合并元组的第一个字段中的400条消息的时间戳)。我将channel字段设置为消息的键,并创建一个400的计数窗口。例如,如果我们有160000个消息作为输入,它应该输出160000/400 = 400行,并且计数窗口按照需要工作。问题是当我使用Sliding Count窗口时,我的预期行为是:

Flink为每个channel数字创建逻辑窗口,并ReduceFunction 在第一次应用时,如果逻辑窗口的长度达到400,那么每100个输入数据(与逻辑窗口的密钥相同的密钥)将调用ReduceFunction最后400个消息。窗口,所以我们应该:

160000 - 400 = 159600// 第一个400输入将首次调用reduce函数
159600 / 100 = 1596 // 在第一个400输入之后,每100个输入Flink调用最后400个输入的reduce函数
1 + 1596 = 1597 //输出的行数
但是运行Sliding Count窗口,它输出1600行,其长度可变。(我预计输出长度只有400)

要点:说长度我的意思是ArrayList的大小(Tuple2的第一个字段)

前40个频道 - >长度100
第二个40通道 - >长度为299
第三个40通道 - >长度为598
第四个40通道 - >长度为997
遗体40通道 - >长度400
我如何证明这种行为并实现我想要的滑动计数窗口?

这是源代码:

DataStream, Integer>> data ;
data.keyBy(1).countWindow(400, 100)

             .reduce(new ReduceFunction<Tuple2<ArrayList<Long>, Integer>>() {
         @Override
         public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
             t0.f0.add(t1.f0.get(0));
             return t0;
         }
     }).writeAsText("results400").setParallelism(1);

更新:根据@DavidAnderson的建议,我也尝试在ReduceFunstion而不是修改中创建一个新的元组t0,但它产生了相同的输出。

public Tuple2, Integer> reduce(Tuple2, Integer> t0, Tuple2, Integer> t1) throws Exception {

                     ArrayList<Long> times = t0.f0;

                     times.addAll(t1.f0);

                     return new Tuple2<>(times, t0.f1) ;
                 }

展开
收起
社区小助手 2018-12-11 16:15:03 3577 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    这是countWindow的实现

    public WindowedStream countWindow(long size, long slide) {

    return window(GlobalWindows.create())
            .evictor(CountEvictor.of(size))
            .trigger(CountTrigger.of(slide));

    }
    它的表现与你期望的完全不同。每100个元素(幻灯片)触发窗口,无论它是否包含400个元素(大小)。大小控制最多保留多少元素。

    2019-07-17 23:19:51
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 覃立辉 立即下载
Flink CDC Meetup PPT - 孙家宝 立即下载
Flink CDC Meetup PPT - 徐榜江 立即下载