Spark-Spark Streaming-广告点击的在线黑名单过滤

简介: 任务广告点击的在线黑名单过滤 使用 nc -lk 9999 在数据发送端口输入若干数据,比如:1375864674543 Tom1375864674553 Spy1375864674571 Andy1375864688436 Cheater...

任务

广告点击的在线黑名单过滤
使用
nc -lk 9999
在数据发送端口输入若干数据,比如:

1375864674543 Tom
1375864674553 Spy
1375864674571 Andy
1375864688436 Cheater
1375864784240 Kelvin
1375864853892 Steven
1375864979347 John

代码

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds

object OnlineBlackListFilter { 
    def main(args: Array[String]){ 
            /** 
             * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息, 
             * 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如 
             */ 
             // 创建SparkConf对象 
             val conf = new SparkConf() 
             // 设置应用程序的名称,在程序运行的监控界面可以看到名称             
             conf.setAppName("OnlineBlackListFilter") 
             // 此时,程序在Spark集群 
             conf.setMaster("spark://Master:7077") 
             val ssc = new StreamingContext(conf, Seconds(30)) 
             /** 
              * 黑名单数据准备,实际上黑名单一般都是动态的,例如在Redis或者数据库中,
              * 黑名单的生成往往有复杂的业务逻辑,具体情况算法不同,
              * 但是在Spark Streaming进行处理的时候每次都能够访问完整的信息。 
              */ 
             val blackList = Array(("Spy", true),("Cheater", true)) 
             val blackListRDD = ssc.sparkContext.parallelize(blackList, 8) 
             val adsClickStream = ssc.socketTextStream("Master", 9999) 
             /** 
              * 此处模拟的广告点击的每条数据的格式为:time、name 
              * 此处map操作的结果是name、(time,name)的格式 
              */ 
             val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) } 
             adsClickStreamFormatted.transform(userClickRDD => { 
                  // 通过leftOuterJoin操作既保留了左侧用户广告点击内容的RDD的所有内容, 
                  // 又获得了相应点击内容是否在黑名单中 
                  val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD) 
                  /** 
                   * 进行filter过滤的时候,其输入元素是一个Tuple:(name,((time,name), boolean)) 
                   * 其中第一个元素是黑名单的名称,第二元素的第二个元素是进行leftOuterJoin的时候是否存在的值。 
                   * 如果存在的话,表明当前广告点击是黑名单,需要过滤掉,否则的话是有效点击内容; 
                   */ 
                      val validClicked = joinedBlackListRDD.filter(joinedItem => { if(joinedItem._2._2.getOrElse(false)) { false } else { true } }) 
                  validClicked.map(validClick => {validClick._2._1}) }).print 
                  /** 
                   * 计算后的有效数据一般都会写入Kafka中,下游的计费系统会从kafka中pull到有效数据进行计费 
                   */ 
                  ssc.start() 
                  ssc.awaitTermination() }
     } 
} 
 **注:**
 //把程序的Batch Interval设置从30秒改成300秒:
 val ssc = new StreamingContext(conf, Seconds(300))

用spark-submit运行前面生成的jar包
/usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.test.spark.sparkstreaming.Filter --master spark://Master:7077 /root/Documents/SparkApps/Filter.jar

分析

  • 5个job

这里写图片描述

Job 0:不体现业务逻辑代码,对后面计算的负载均衡的考虑
这里写图片描述
Job 0包含有Stage 0、Stage 1。
比如Stage 1,其中的Aggregated Metrics by Executor部分:
这里写图片描述
Stage在所有Executor上都存在.

  • Job 1:运行时间比较长,耗时1.5分钟
    这里写图片描述
    Stage 2,Aggregated Metrics By Executor部分:
    这里写图片描述
    Stage 2只在Worker上的一个Executor执行,而且执行了1.5分钟(4个worker),从业务处理的角度看,我们发送的那么一点数据,没有必要去启动一个运行1.5分钟的任务吧。那这个任务是做什么呢? 从DAG Visualization部分,就知道此Job实际就是启动了一个接收数据的Receiver:
    这里写图片描述
    Receiver是通过一个Job来启动的。那肯定有一个Action来触发它
    Tasks部分:
    这里写图片描述
    只有一个Worker运行此Job,是用于接收数据。
    Locality Level是PROCESS_LOCAL,原来是内存节点。所以,默认情况下,数据接收不会使用磁盘,而是直接使用内存中的数据。Spark Streaming应用程序启动后,自己会启动一些Job。默认启动了一个Job来接收数据,为后续处理做准备。一个Spark应用程序中可以启动很多Job,而这些不同的Job之间可以相互配合。

  • Job 2:看Details可以发现有我们程序的主要业务逻辑,体现在Stag 3、Stag4、Stag 5中
    这里写图片描述

Stag3、Stage4的详情,2个Stage都是用4个Executor执行的,所有数据处理是在4台机器上进行的。
![这里写图片描述](http://img.blog.csdn.net/20160511121339417

Stag 5只在Worker4上,因为这个Stage有Shuffle操作。
这里写图片描述

  • Job3:有Stage 6、Stage 7、Stage 8。其中Stage 6、Stage 7被跳过
    这里写图片描述
    Stage 8的Aggregated Metrics by Executor部分。可以看到,数据处理是在4台机器上进行的:
    这里写图片描述
  • Job4:也体现了我们应用程序中的业务逻辑 。有Stage 9、Stage 10、Stage 11。其中Stage 9、Stage
    10被跳过
    这里写图片描述
    tage 11的详情。可以看到,数据处理是在Worker2之外的其它3台机器上进行的
    这里写图片描述

总结

Spark Streaming本质
这里写图片描述
park Streaming接收Kafka、Flume、HDFS和Kinesis等各种来源的实时输入数据,进行处理后,处理结果保存在HDFS、Databases等各种地方。
Spark Streaming接收这些实时输入数据流,会将它们按批次划分,然后交给Spark引擎处理,生成按照批次划分的结果流。
Spark Streaming提供了表示连续数据流的、高度抽象的被称为离散流的DStream。DStream本质上表示RDD的序列。任何对DStream的操作都会转变为对底层RDD的操作。
Spark Streaming使用数据源产生的数据流创建DStream,也可以在已有的DStream上使用一些操作来创建新的DStream。
前面的代码每300秒会接收一批数据,基于这批数据会生成RDD,进而触发Job,执行处理。DStream是一个没有边界的集合,没有大小的限制。DStream代表了时空的概念。随着时间的推移,里面不断产生RDD。锁定到时间片后,就是空间的操作,也就是对本时间片的对应批次的数据的处理。

Spark Streaming程序转换为Spark执行的作业的过程中,使用了DStreamGraph,Spark Streaming程序中一般会有若干个对DStream的操作。DStreamGraph就是由这些操作的依赖关系构成。
从程序到DStreamGraph的转换:
这里写图片描述
从每个foreach开始,都会进行回溯。从后往前回溯这些操作之间的依赖关系,也就形成了DStreamGraph。
执行从DStream到RDD的转换,也就形成了RDD Graph:
这里写图片描述
空间维度确定之后,随着时间不断推进,会不断实例化RDD Graph,然后触发Job去执行处理。

深入官方文档(摘抄王家书籍):

这里写图片描述
Spark Core处理的每一步都是基于RDD的,RDD之间有依赖关系。上图中的RDD的DAG显示的是有3个Action,会触发3个job,RDD自下向上依 赖,RDD产生job就会具体的执行。从DSteam Graph中可以看到,DStream的逻辑与RDD基本一致,它就是在RDD的基础上加上了时间的依赖。RDD的DAG又可以叫空间维度,也就是说整个 Spark Streaming多了一个时间维度,也可以成为时空维度。
从这个角度来讲,可以将Spark Streaming放在坐标系中。其中Y轴就是对RDD的操作,RDD的依赖关系构成了整个job的逻辑,而X轴就是时间。随着时间的流逝,固定的时间间 隔(Batch Interval)就会生成一个job实例,进而在集群中运行。
这里写图片描述
对于Spark Streaming来说,当不同的数据来源的数据流进来的时候,基于固定的时间间隔,会形成一系列固定不变的数据集或event集合(例如来自flume 和kafka)。而这正好与RDD基于固定的数据集不谋而合,事实上,由DStream基于固定的时间间隔行程的RDD Graph正是基于某一个batch的数据集的。
从上图中可以看出,在每一个batch上,空间维度的RDD依赖关系都是一样 的,不同的是这个五个batch流入的数据规模和内容不一样,所以说生成的是不同的RDD依赖关系的实例,所以说RDD的Graph脱胎于DStream 的Graph,也就是说DStream就是RDD的模版,不同的时间间隔,生成不同的RDD Graph实例。
从Spark Streaming本身出发:
1.需要RDD DAG的生成模版:DStream Graph
2需要基于Timeline的job控制器
3需要inputStreamings和outputStreamings,代表数据的输入和输出
4具体的job运行在Spark Cluster之上,由于streaming不管集群是否可以消化掉,此时系统容错就至关重要
5事务处理,我们希望流进来的数据一定会被处理,而且只处理一次。在处理出现崩溃的情况下如何保证Exactly once的事务语意
这里写图片描述
从这里可以看出,DStream就是Spark Streaming的核心,就想Spark Core的核心是RDD,它也有dependency和compute。更为关键的是下面的代码:
这里写图片描述
这是一个HashMap,以时间为key,以RDD为value,这也正应证了随着时间流逝,不断的生成RDD,产生依赖关系的job,并通过jobScheduler在集群上运行。再次验证了DStream就是RDD的模版。
DStream可以说是逻辑级别的,RDD就是物理级别的,DStream所表达的最终都是通过RDD的转化实现的。前者是更高级别的抽象,后者是底层的实现。DStream实际上就是在时间维度上对RDD集合的封装,DStream与RDD的关系就是随着时间流逝不断的产生RDD,对DStream的操作就是在固定时间上操作RDD。
总结:
在 空间维度上的业务逻辑作用于DStream,随着时间的流逝,每个Batch Interval形成了具体的数据集,产生了RDD,对RDD进行transform操作,进而形成了RDD的依赖关系RDD DAG,形成job。然后jobScheduler根据时间调度,基于RDD的依赖关系,把作业发布到Spark Cluster上去运行,不断的产生Spark作业。
这里写图片描述

目录
相关文章
|
5月前
|
消息中间件 分布式计算 Kafka
195 Spark Streaming整合Kafka完成网站点击流实时统计
195 Spark Streaming整合Kafka完成网站点击流实时统计
39 0
|
7月前
|
canal 分布式计算 关系型数据库
大数据Spark Streaming实时处理Canal同步binlog数据
大数据Spark Streaming实时处理Canal同步binlog数据
114 0
|
4月前
|
分布式计算 大数据 Apache
【大数据技术】流数据、流计算、Spark Streaming、DStream的讲解(图文解释 超详细)
【大数据技术】流数据、流计算、Spark Streaming、DStream的讲解(图文解释 超详细)
64 0
|
6天前
|
分布式计算 大数据 数据处理
【Flink】Flink跟Spark Streaming的区别?
【4月更文挑战第17天】【Flink】Flink跟Spark Streaming的区别?
|
1月前
|
存储 分布式计算 Spark
实战|使用Spark Streaming写入Hudi
实战|使用Spark Streaming写入Hudi
39 0
|
3月前
|
分布式计算 监控 数据处理
Spark Streaming的容错性与高可用性
Spark Streaming的容错性与高可用性
|
3月前
|
分布式计算 数据处理 Apache
Spark Streaming与数据源连接:Kinesis、Flume等
Spark Streaming与数据源连接:Kinesis、Flume等
|
3月前
|
消息中间件 分布式计算 Kafka
使用Kafka与Spark Streaming进行流数据集成
使用Kafka与Spark Streaming进行流数据集成
|
3月前
|
分布式计算 监控 数据处理
Spark Streaming的DStream与窗口操作
Spark Streaming的DStream与窗口操作
|
3月前
|
分布式计算 监控 数据处理
实时数据处理概述与Spark Streaming简介
实时数据处理概述与Spark Streaming简介