开发者社区> 问答> 正文

Apache Flink:如何使用SourceFunction以指定的时间间隔执行任务?

"
我需要我的flink作业以指定的时间间隔从数据库中提取记录并在处理后将其归档。我已经实现了SourceFunction来从数据库中获取所需的记录,并添加了SourceFunction作为StreamExecutionEnvironment的源。如何指定StreamExecutionEnvironment需要每隔10分钟使用SourceFunction从数据库中获取记录?

SourceFunction:

public class MongoDBSourceFunction implements SourceFunction>{

public void cancel() {
    // TODO Auto-generated method stub
}

public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext

> context) throws Exception {

    List<Book> books = getBooks();

    context.collect(books);

}

public List<Book> getBooks() {
    List<Book> books = new ArrayList<Book>();

    //fetch all books from database     
    return books;
}

}
处理器:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ArchiveJob {

public static void main(String[] args) {

    final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

    env.addSource(new MongoDBSourceFunction()).print();
}

}"

展开
收起
flink小助手 2018-11-28 16:33:00 8970 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    "你需要将此功能添加到MongoDBSourceFunction自身。例如,您可以ScheduledExecutorService在open方法中实例化a 并使用此执行程序安排读取任务。

    请注意,在发出记录时保持检查点锁定很重要。"

    2019-07-17 23:16:52
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载

相关镜像