开发者社区> 问答> 正文

如何使用Flink CEP创建批处理或幻灯片窗口?

Flink CEP来自Esper CEP引擎。正如您可能(或不知道)所知,在Esper使用他们的语法(EPL)时,您可以轻松地创建一个batch或一个slide窗口,在这些窗口中对事件进行分组,并允许您将这些事件与函数(avg,max,min,...)一起使用。

例如,使用以下模式,您可以创建5秒的批处理窗口,并计算在指定窗口中收到price的所有Stock事件的属性的平均值。

select avg(price) from Stock#time_batch(5 sec)
问题是我想知道如何实现这一点Flink CEP。我知道,可能的目标或方法可能Flink CEP不同,因此实现这一目标的方式可能并不像in中那么简单Esper CEP。

我已经看过关于时间窗口的文档,但我无法实现这个窗口Flink CEP。因此,给出以下代码:

DataStream stream = ...; // Consume events from Kafka

// Filtering events with negative price
Pattern pattern = Pattern.begin("start")

        .where(
                new SimpleCondition<Stock>() {
                    public boolean filter(Stock event) {
                        return event.getPrice() >= 0;
                    }
                }
        );

PatternStream patternStream = CEP.pattern(stream, pattern);

/**
CREATE A BATCH WINDOW OF 5 SECONDS IN WHICH
I COMPUTE OVER THE AVERAGE PRICES AND, IF IT IS
GREATER THAN A THREESHOLD, AN ALERT IS DETECTED

return avg(allEventsInWindow.getPrice()) > 1;
*/

DataStream result = patternStream.select(

        new PatternSelectFunction<Stock, Alert>() {
            @Override
            public Alert select(Map<String, List<Stock>> pattern) throws Exception {
                return new Alert(pattern.toString());
            }
        }
);

如何创建该窗口,从第一个窗口开始,我开始计算5秒内后续事件的平均值。例如:

t = 0 seconds
Stock(price = 1); (...starting batch window...)
Stock(price = 1);
Stock(price = 1);
Stock(price = 2);
Stock(price = 2);
Stock(price = 2);
t = 5 seconds (...end of batch window...)
Avg = 1.5 => Alert detected!
5秒后的平均值为1.5,并将触发警报。我该如何编码呢?

展开
收起
flink小助手 2018-12-06 18:11:30 3043 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    使用Flink的CEP库时,此行为无法表达。我建议使用Flink DataStream或Table API来计算平均值。基于此,您可以再次使用CEP生成其他事件。

    final DataStream input = env

    .fromElements(
            new Stock(1L, 1.0),
            new Stock(2L, 2.0),
            new Stock(3L, 1.0),
            new Stock(4L, 2.0))
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Stock>(Time.seconds(0L)) {
        @Override
        public long extractTimestamp(Stock element) {
            return element.getTimestamp();
        }
    });
    

    final DataStream windowAggregation = input

    .timeWindowAll(Time.milliseconds(2))
    .aggregate(new AggregateFunction<Stock, Tuple2<Integer, Double>, Double>() {
        @Override
        public Tuple2<Integer, Double> createAccumulator() {
            return Tuple2.of(0, 0.0);
        }
    
        @Override
        public Tuple2<Integer, Double> add(Stock value, Tuple2<Integer, Double> accumulator) {
            return Tuple2.of(accumulator.f0 + 1, accumulator.f1 + value.getValue());
        }
    
        @Override
        public Double getResult(Tuple2<Integer, Double> accumulator) {
            return accumulator.f1 / accumulator.f0;
        }
    
        @Override
        public Tuple2<Integer, Double> merge(Tuple2<Integer, Double> a, Tuple2<Integer, Double> b) {
            return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
        }
    });
    2019-07-17 23:18:39
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载