开发者社区> 问答> 正文

Flink CEP事件加入了后台数据流

"我有2个数据流(例如)

ts | device | custId | temp
1 | 'device1'| 1 | 10
1 | 'device2'| 4 | 7
2 | 'device1'| 1 | 10
3 | 'device1'| 1 | 10
4 | 'device1'| 1 | 10
5 | 'device2'| 4 | 10
我创建了一个CEP模式,我想在4秒内检查温度是否大于30。

val pattern = Pattern.beginDevice

  .where(_.sumtemp >= 30)
  .within(Time.seconds(4))

有没有办法将此模式流的输出连接到另一个传入数据流以获得低于?

ts | custId | morethanthiry
1 | 1 | yes
2 | 4 | no
如果可以分享一个例子,我将非常感激。"

展开
收起
flink小助手 2018-11-28 16:27:31 2692 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    "有多个选项。你可以加入你的溪stream coGroup

    例:

    set1.coGroup(set2).where().equalTo().with(new MyCoGroupFunction());
    您可以将其视为SQL中的连接。

    实现的一个小例子:

    class MyCoGroupFunction extends RichCoGroupFunction[DataTypeOfStream1, DataTypeOfStream2, DataTypeOfOutput] {

      override def coGroup(first: DataTypeOfStream1,
                         second: DataTypeOfStream2],
                         out: DataTypeOfOutput): Unit = {
    
           out.collect(...)
           //your output
    
      }

    }
    如果需要,您还可以使用状态。比如
    union(如果要连接的流具有相同的数据类型)

    coFlatMap这些方法之间的差异很小。
    有关 更多信息,请参阅https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/。"

    2019-07-17 23:16:51
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载