SparkStreming:使用Checkpoint创建StreamingContext修改executor-cores、executor-memory等资源信息不生效。

简介: 在使用SparkStreaming时,使用StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)创建StreamingContext。

在使用SparkStreaming时,使用StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)创建StreamingContext。代码示例如下:

// Function to create and setup a new StreamingContext
    def functionToCreateContext(): StreamingContext = {

      val conf = new SparkConf().setAppName("UserBrowse")
      val ssc = new StreamingContext(conf, batchInterval)

      //通过LogHubCursorPosition.BEGIN_CURSOR指定消费Cursor的模式。
      val loghubStream = LoghubUtils.createStream(...)

      loghubStream.checkpoint(batchInterval * 5).foreachRDD { rdd =>

      val spark = SparkSession.builder.config(rdd.sparkContext.getConf)
          .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .getOrCreate()
      }

      ssc.checkpoint(checkpointDirectory) // set checkpoint directory
      ssc
    }
    // Get StreamingContext from checkpoint data or create a new one
    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

此时通过控制台提交Spark任务命令如下:

--class com.test.StreamingText
--jars /spark-it/loghub-spark-0.6.13_2.4.3-1.0.4.jar,/spark-it/loghub-client-lib-0.6.13.jar
--driver-memory 1G 
--driver-cores 1
--executor-cores 3 
--executor-memory 3G
--num-executors 1
--name spark_on_loghub
/spark-it/sparkstreaming-0.0.1-SNAPSHOT.jar
/tmp/checkpoint_location_test 

其中/tmp/checkpoint_location_test 为StreamingContext的checkpoint路径。
运行一段时间后,用户期望修改executor-cores为4,executor-memory 为12G,num-executors为3。那如何修改呢?
由于SparkStreming的运行机制是长久运行,以及checkpoint的设置是为了任务异常能从checkpoint恢复数据。
首次提交任务后,StremingContext会把Spark的配置信息写入到Checkpoint中,包括:executor-cores、num-executors、executor-memory等配置信息。
当任务异常或者重启后,StremingContext会从Checkpoint中读取Spark的配置信息。所以这时如果在控制台修改executor-cores、executor-memory等配置信息,StremingContext不会读取的。
如果需要修改executor-cores、executor-memory等配置信息需要清除Checkpoint路径,或者重新指定一个新的Checkpoint路径。

相关文章
|
3月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
157 0
|
8天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
3月前
|
机器学习/深度学习 SQL 分布式计算
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
|
2月前
|
分布式计算 大数据 Java
Spark 大数据实战:基于 RDD 的大数据处理分析
Spark 大数据实战:基于 RDD 的大数据处理分析
120 0
|
3月前
|
分布式计算 监控 大数据
Spark RDD分区和数据分布:优化大数据处理
Spark RDD分区和数据分布:优化大数据处理
|
4月前
|
分布式计算 大数据 Linux
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
100 0
|
4月前
|
SQL 分布式计算 大数据
Hudi数据湖技术引领大数据新风口(三)解决spark模块依赖冲突
Hudi数据湖技术引领大数据新风口(三)解决spark模块依赖冲突
102 0
|
4月前
|
分布式计算 资源调度 大数据
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day30】——Spark数据调优(文末附完整文档)
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day30】——Spark数据调优(文末附完整文档)
65 0
|
4月前
|
SQL 机器学习/深度学习 分布式计算
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day17】——Spark4
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day17】——Spark4
41 0
|
4月前
|
存储 消息中间件 分布式计算
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day28】——Spark15+数据倾斜1
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day28】——Spark15+数据倾斜1
36 0