SparkStreaming 手动维护kafka Offset到Mysql实例

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: sparkstreamig kafka offset mysql

官网详解地址
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

手动提交offset,以保证数据不会丢失,尤其是在网络抖动严重的情况下,但是如果kafka挂掉重启后,可能会造成一些其他问题,
例如找不到保存的offset,这个具体问题再具体分析,先上代码。
import java.sql.{DriverManager, ResultSet}

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{OffsetRange, _}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

/**
*

  • 使用Spark-Kafka-0-10版本整合,并手动提交偏移量,维护到MySQL中
    */

object SparkKafkaTest2 {
def main(args: Array[String]): Unit = {

//1.创建StreamingContext
val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5))
//准备连接Kafka的参数
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "server1:9092,server2:9092,server3:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "SparkKafkaTest",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)

val topics = Array("spark_kafka_test").toSet

val recordDStream: DStream[ConsumerRecord[String, String]] = if (offsetMap.size > 0) { //有记录offset
  println("MySQL中记录了offset,则从该offset处开始消费")
  KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent, //位置策略,源码强烈推荐使用该策略,会让Spark的Executor和Kafka的Broker均匀对应
    Subscribe[String, String](topics, kafkaParams, offsetMap)) //消费策略,源码强烈推荐使用该策略
} else { //没有记录offset
  println("没有记录offset,则直接连接,从latest开始消费")
  KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent, //位置策略,源码强烈推荐使用该策略,会让Spark的Executor和Kafka的Broker均匀对应
    Subscribe[String, String](topics, kafkaParams)) //消费策略,源码强烈推荐使用该策略
}

recordDStream.foreachRDD {
  messages =>
    if (messages.count() > 0) { //当前这一时间批次有数据
      messages.foreachPartition { messageIter =>
        messageIter.foreach { message =>
          //println(message.toString())
        }
      }
      val offsetRanges: Array[OffsetRange] = messages.asInstanceOf[HasOffsetRanges].offsetRanges
      for (o <- offsetRanges) {
        println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset}")
      }
      //手动提交offset,默认提交到Checkpoint中
      //recordDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      //实际中偏移量可以提交到MySQL/Redis中
      saveOffsetRanges("SparkKafkaTest", offsetRanges)
    }
}

ssc.start()
ssc.awaitTermination()
}

/**

  • 从数据库读取偏移量
    */

def getOffsetMap(groupid: String, topic: String) = {

Class.forName("com.mysql.jdbc.Driver")
val connection = DriverManager.getConnection("jdbc:mysql://172.31.98.108:3306/bj_pfdh?characterEncoding=UTF-8", "root", "iflytek@web")
val sqlselect = connection.prepareStatement("""
      select * from kafka_offset 
      where groupid=? and topic =?
     """)
sqlselect.setString(1, groupid)
sqlselect.setString(2, topic)
val rs: ResultSet = sqlselect.executeQuery()
val offsetMap = mutable.Map[TopicPartition, Long]()
while (rs.next()) {
  offsetMap += new TopicPartition(rs.getString("topic"), rs.getInt("partition")) -> rs.getLong("offset")
}
rs.close()
sqlselect.close()
connection.close()
offsetMap

}

/**

  • 将偏移量保存到数据库
    */

def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {

val connection = DriverManager.getConnection("jdbc:mysql://172.31.98.108:3306/bj_pfdh?characterEncoding=UTF-8", "root", "iflytek@web")
//replace into表示之前有就替换,没有就插入
val select_ps = connection.prepareStatement("""
  select count(*) as count from kafka_offset
  where  `groupid`=? and `topic`=? and `partition`=?
  """)
val update_ps = connection.prepareStatement("""
  update kafka_offset set  `offset`=?
  where `groupid`=? and `topic`=? and `partition`=?
  """)
val insert_ps = connection.prepareStatement("""
  INSERT INTO kafka_offset(`groupid`, `topic`, `partition`, `offset`) 
  VALUE(?,?,?,?)
  """)
for (o <- offsetRange) {
  select_ps.setString(1, groupid)
  select_ps.setString(2, o.topic)
  select_ps.setInt(3, o.partition)
  val select_resut = select_ps.executeQuery()
  // println(select_resut.)// .getInt("count"))
  while (select_resut.next()) {
    println(select_resut.getInt("count"))
    if (select_resut.getInt("count") > 0) {
      //update
      update_ps.setLong(1, o.untilOffset)
      update_ps.setString(2, groupid)
      update_ps.setString(3, o.topic)
      update_ps.setInt(4, o.partition)
      update_ps.executeUpdate()
    } else {
      //insert
      insert_ps.setString(1, groupid)
      insert_ps.setString(2, o.topic)
      insert_ps.setInt(3, o.partition)
      insert_ps.setLong(4, o.untilOffset)
      insert_ps.executeUpdate()
    }
  }

}
select_ps.close()
update_ps.close()
insert_ps.close()
connection.close()

}

如果报错连不上数据库或连接数据库地址失败,请查看是否添加了mysql客户端jar包。

                                                                                                                                                                   --------五维空间s
相关文章
|
1月前
|
存储 SQL 关系型数据库
创建并配置RDS实例
在阿里云上创建RDS实例涉及登录控制台、进入RDS管理页面、创建实例、选择数据库引擎和版本、配置实例规格与存储、设定网络与安全组、设置实例信息、确认订单并支付,最后初始化数据库。操作步骤可能因界面更新或数据库引擎不同略有差异。
19 1
|
3月前
|
弹性计算 关系型数据库 MySQL
快速上手阿里云RDS MySQL实例创建,轻松管理数据库
快速上手阿里云RDS MySQL实例创建,轻松管理数据库 在数字化时代,数据已成为企业的核心资产。如何高效、安全地存储和管理这些数据,成为企业在云计算时代亟待解决的问题。阿里云的RDS(关系型数据库服务)应运而生,为用户提供稳定、可靠的云上数据库解决方案。本文将详细介绍如何通过阿里云RDS管理控制台快速创建RDS MySQL实例,让您轻松上手,快速部署数据库。
170 2
|
1月前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
149 4
|
2月前
|
消息中间件 关系型数据库 MySQL
Flink问题子实现Kafka到Mysql如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
379 2
|
1月前
|
消息中间件 分布式计算 Kafka
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
50 5
|
1月前
|
关系型数据库 MySQL 数据库
初始化RDS实例
初始化RDS实例
15 3
|
1月前
|
SQL 关系型数据库 MySQL
购买阿里云RDS实例
购买阿里云RDS实例
165 2
|
1月前
|
弹性计算 关系型数据库 MySQL
连接RDS实例
连接RDS实例
12 1
|
1月前
|
消息中间件 Kafka Linux
Kafka【付诸实践 03】Offset Explorer Kafka 的终极 UI 工具安装+简单上手+关键特性测试(一篇学会使用 Offset Explorer)
【2月更文挑战第21天】Kafka【付诸实践 03】Offset Explorer Kafka 的终极 UI 工具安装+简单上手+关键特性测试(一篇学会使用 Offset Explorer)
168 2
|
1月前
|
消息中间件 网络协议 Kafka
Kafka【付诸实践 02】消费者和消费者群组+创建消费者实例+提交偏移量(自动、手动)+监听分区再平衡+独立的消费者+消费者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka消费者】
【2月更文挑战第21天】Kafka【付诸实践 02】消费者和消费者群组+创建消费者实例+提交偏移量(自动、手动)+监听分区再平衡+独立的消费者+消费者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka消费者】
70 3