Flink Window 排序
概述
- 对增量Window进行输出排序
- WordCount增量(按单词名称排序)
- WordCount增量(按单词个数,再单词名称排序)
源码
源码分析
WordCount 程序(增量按单词升序排序)
- DataStream.windowAll 说明是window中的所有Key返回AllWindowedStream
- AllWindowedStream.process(ProcessAllWindowFunction),ProcessAllWindowFunction数定义整个Window的所有数据传过来,进行处理
可以进行按key合并,按单词排序,按单词个数排序
- BucketingSink指定文件输出目录
package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc
import java.time.ZoneId
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, DateTimeBucketer}
import org.apache.flink.util.Collector
import scala.collection.mutable
object SocketWindowWordCountLocalSinkHDFSAndWindowAllAndSorted {
def getConfiguration(isDebug:Boolean = false):Configuration={
val configuration : Configuration = new Configuration()
if(isDebug){
val timeout = "100000 s"
val timeoutHeartbeatPause = "1000000 s"
configuration.setString("akka.ask.timeout",timeout)
configuration.setString("akka.lookup.timeout",timeout)
configuration.setString("akka.tcp.timeout",timeout)
configuration.setString("akka.transport.heartbeat.interval",timeout)
configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
configuration.setString("akka.watch.heartbeat.pause",timeout)
configuration.setInteger("heartbeat.interval",10000000)
configuration.setInteger("heartbeat.timeout",50000000)
}
configuration
}
def main(args: Array[String]): Unit = {
val port = 1234
val configuration : Configuration = getConfiguration(true)
val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
val dataStream = env.socketTextStream("localhost", port, '\n')
import org.apache.flink.streaming.api.scala._
val dataStreamDeal = dataStream.flatMap( w => w.split("\\s") ).map( w => WordWithCount(w,1))
.keyBy("word")
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new ProcessAllWindowFunction[WordWithCount,WordWithCount,TimeWindow] {
override def process(context: Context, elements: Iterable[WordWithCount], out: Collector[WordWithCount]): Unit = {
val set = new mutable.HashSet[WordWithCount]{}
for(wordCount <- elements){
if(set.contains(wordCount)){
set.remove(wordCount)
set.add(new WordWithCount(wordCount.word,wordCount.count + 1))
}else{
set.add(wordCount)
}
}
val sortSet = set.toList.sortWith( (a,b) => a.word.compareTo(b.word) < 0 )
for(wordCount <- sortSet) out.collect(wordCount)
}
})
val bucketingSink = new BucketingSink[WordWithCount]("file:/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/sink-data")
bucketingSink.setBucketer(new DateTimeBucketer[WordWithCount]("yyyy-MM-dd--HHmm", ZoneId.of("Asia/Shanghai")))
bucketingSink.setBatchSize(1024 * 1024 * 400 )
bucketingSink.setBatchRolloverInterval( 2 * 1000);
bucketingSink.setInactiveBucketCheckInterval(2 * 1000)
bucketingSink.setInactiveBucketThreshold(2 * 1000)
bucketingSink.setAsyncTimeout(1 * 1000)
dataStreamDeal.setParallelism(1)
.addSink(bucketingSink)
if(args == null || args.size ==0){
env.execute("默认作业")
}else{
env.execute(args(0))
}
println("结束")
}
case class WordWithCount(word: String, count: Long)
}
输入数据
1 2 1 5 3
输出数据
WordWithCount(1,2)
WordWithCount(2,1)
WordWithCount(3,1)
WordWithCount(5,1)
WordCount 程序(增量,按单词个数排序,个数相同,再按单词排序)
- DataStream.windowAll 说明是window中的所有Key返回AllWindowedStream
- AllWindowedStream.process(ProcessAllWindowFunction),ProcessAllWindowFunction数定义整个Window的所有数据传过来,进行处理
可以进行按key合并,按单词排序,按单词个数排序
- BucketingSink指定文件输出目录
package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.sort
import java.time.ZoneId
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, DateTimeBucketer}
import org.apache.flink.util.Collector
import scala.collection.mutable
object SocketWindowWordCountLocalSinkHDFSAndWindowAllAndSortedByCount {
def getConfiguration(isDebug:Boolean = false):Configuration={
val configuration : Configuration = new Configuration()
if(isDebug){
val timeout = "100000 s"
val timeoutHeartbeatPause = "1000000 s"
configuration.setString("akka.ask.timeout",timeout)
configuration.setString("akka.lookup.timeout",timeout)
configuration.setString("akka.tcp.timeout",timeout)
configuration.setString("akka.transport.heartbeat.interval",timeout)
configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
configuration.setString("akka.watch.heartbeat.pause",timeout)
configuration.setInteger("heartbeat.interval",10000000)
configuration.setInteger("heartbeat.timeout",50000000)
}
configuration
}
def main(args: Array[String]): Unit = {
val port = 1234
val configuration : Configuration = getConfiguration(true)
val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
val dataStream = env.socketTextStream("localhost", port, '\n')
import org.apache.flink.streaming.api.scala._
val dataStreamDeal = dataStream.flatMap( w => w.split("\\s") ).map( w => WordWithCount(w,1))
.keyBy("word")
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new ProcessAllWindowFunction[WordWithCount,WordWithCount,TimeWindow] {
override def process(context: Context, elements: Iterable[WordWithCount], out: Collector[WordWithCount]): Unit = {
val set = new mutable.HashSet[WordWithCount]{}
for(wordCount <- elements){
if(set.contains(wordCount)){
set.remove(wordCount)
set.add(new WordWithCount(wordCount.word,wordCount.count + 1))
}else{
set.add(wordCount)
}
}
val sortSet = set.toList.sortWith( (a,b) => {
if(a.count == b.count){
a.word.compareTo(b.word) < 0
}else{
a.count < b.count
}
} )
for(wordCount <- sortSet) out.collect(wordCount)
}
})
val bucketingSink = new BucketingSink[WordWithCount]("file:/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/sink-data")
bucketingSink.setBucketer(new DateTimeBucketer[WordWithCount]("yyyy-MM-dd--HHmm", ZoneId.of("Asia/Shanghai")))
bucketingSink.setBatchSize(1024 * 1024 * 400 )
bucketingSink.setBatchRolloverInterval(2 * 1000)
bucketingSink.setInactiveBucketThreshold(2 * 1000)
dataStreamDeal.setParallelism(1)
.addSink(bucketingSink)
if(args == null || args.size ==0){
env.execute("默认作业")
}else{
env.execute(args(0))
}
println("结束")
}
case class WordWithCount(word: String, count: Long)
}
输入数据
1 1 2 4 4 3 2 1
输出数据
WordWithCount(3,1)
WordWithCount(2,2)
WordWithCount(4,2)
WordWithCount(1,3)