基于Spark Streaming 进行 MySQL Binlog 日志准实时传输

简介: 基本架构 RDS -> SLS -> Spark Streaming -> Spark HDFS 上述链路主要包含3个过程: 如何把 RDS 的 binlog 收集到 SLS。 如何通过 Spark Streaming 将 SLS 中的日志读取出来,进行分析。

基本架构

RDS -> SLS -> Spark Streaming -> Spark HDFS

上述链路主要包含3个过程:

  1. 如何把 RDS 的 binlog 收集到 SLS。
  2. 如何通过 Spark Streaming 将 SLS 中的日志读取出来,进行分析。
  3. 如何把链路 2 中读取和处理过的日志,保存到 Spark HDFS中。

环境准备

  1. 安装一个 MySQL 类型的数据库(使用 MySQL 协议,例如 RDS、DRDS 等),开启 log-bin 功能,且配置 binlog 类型为 ROW 模式(RDS默认开启)。
  2. 开通 SLS 服务。

操作步骤

  1. 检查 MySQL 数据库环境。

    1. 查看是否开启 log-bin 功能。
    mysql> show variables like "log_bin";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | log_bin       | ON    |
    +---------------+-------+
    1 row in set (0.02 sec)
    1. 查看 binlog 类型
    mysql> show variables like "binlog_format";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | binlog_format | ROW   |
    +---------------+-------+
    1 row in set (0.03 sec)  
  2. 添加用户权限。(也可以直接通过RDS控制台添加)

    CREATE USER canal IDENTIFIED BY ‘canal’;GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;FLUSH PRIVILEGES;
  3. 为 SLS 服务添加对应的配置文件,并检查数据是否正常采集。

    1. 在 SLS 控制台添加对应的 project 和 logstore,例如:创建一个名称为 canaltest 的 project,然后创建一个名称为 canal 的 logstore。
    2. 对 SLS 进行配置:在 /etc/ilogtail 目录下创建文件user_local_config.json,具体配置如下:
    {
    "metrics": {
     "##1.0##canaltest$plugin-local": {
         "aliuid": "****",
         "enable": true,
         "category": "canal",
         "defaultEndpoint": "*******",
         "project_name": "canaltest",
         "region": "cn-hangzhou",
         "version": 2
         "log_type": "plugin",
         "plugin": {
             "inputs": [
                 {
                     "type": "service_canal",
                     "detail": {
                         "Host": "*****",
                         "Password": "****",
                         "ServerID": ****,
                         "User" : "***",
                         "DataBases": [
                             "yourdb"
                         ],
                         "IgnoreTables": [
                             "\\S+_inner"
                         ],
                          "TextToString" : true
                     }
                 }
             ],
             "flushers": [
                 {
                     "type": "flusher_sls",
                     "detail": {}
                 }
             ]
         }
     }
    }
    }

    其中 detail 中的 Host 和 Password 等信息为 MySQL 数据库信息,User 信息为之前授权过的用户名。aliUid、defaultEndpoint、project_name、category 请根据自己的实际情况填写对应的用户和 SLS 信息。

    1. 等待约 2 分钟,通过 SLS 控制台查看日志数据是否上传成功,具体如图所示。
      image

如果日志数据没有采集成功,请根据SLS的提示,查看SLS的采集日志进行排查。

  1. 准备代码,将代码编译成 jar 包,然后上传到 OSS。

    1. 将 EMR 的示例代码通过 git 复制下来,然后进行修改,具体命令为:
    git clone https://github.com/aliyun/aliyun-emapreduce-demo.git。

    示例代码中已经有 LoghubSample 类,该类主要用于从 SLS 采集数据并打印。以下是修改后的代码,供参考:

    package com.aliyun.emr.example
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    object LoghubSample {
    def main(args: Array[String]): Unit = {
    if (args.length < 7) {
     System.err.println(
       """Usage: bin/spark-submit --class LoghubSample examples-1.0-SNAPSHOT-shaded.jar
         |            
         |           
       """.stripMargin)
     System.exit(1)
    }
    val loghubProject = args(0)
    val logStore = args(1)
    val loghubGroupName = args(2)
    val endpoint = args(3)
    val accessKeyId = args(4)
    val accessKeySecret = args(5)
    val batchInterval = Milliseconds(args(6).toInt * 1000)
    val conf = new SparkConf().setAppName("Mysql Sync")
    //    conf.setMaster("local[4]");
    val ssc = new StreamingContext(conf, batchInterval)
    val loghubStream = LoghubUtils.createStream(
     ssc,
     loghubProject,
     logStore,
     loghubGroupName,
     endpoint,
     1,
     accessKeyId,
     accessKeySecret,
     StorageLevel.MEMORY_AND_DISK)
    loghubStream.foreachRDD(rdd =>
       rdd.saveAsTextFile("/mysqlbinlog")
    )
    ssc.start()
    ssc.awaitTermination()
    }
    }

其中的主要改动是:

loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile(“/mysqlbinlog”) )

这样在 EMR 集群中运行时,就会把Spark Streaming 中流出来的数据,保存到 EMR 的 HDFS 中。

  1. 说明
    由于如果要在本地运行,请在本地环境提前搭建 Hadoop 集群。

由于 EMR 的 Spark SDK 做了升级,其示例代码比较旧,不能直接在参数中传递 OSS 的 AccessKeyId、AccessKeySecret, 而是需要通过 SparkConf 进行设置,如下所示。

trait RunLocally {
val conf = new SparkConf().setAppName(getAppName).setMaster("local[4]")
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
conf.set("spark.hadoop.mapreduce.job.run-local", "true")
conf.set("spark.hadoop.fs.oss.endpoint", "YourEndpoint")
conf.set("spark.hadoop.fs.oss.accessKeyId", "YourId")
conf.set("spark.hadoop.fs.oss.accessKeySecret", "YourSecret")
conf.set("spark.hadoop.job.runlocal", "true")
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
conf.set("spark.hadoop.fs.oss.buffer.dirs", "/mnt/disk1")
val sc = new SparkContext(conf)
def getAppName: String
}

在本地调试时,需要把 loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile(“/mysqlbinlog”) ) 中的 /mysqlbinlog 修改成本地 HDFS的地址。

  1. 代码编译。
    在本地调试完成后,我们可以通过如下命令进行打包编译:

  2. clean install

  3. 上传 jar 包。
    请先在 OSS 上建立 bucket 为 qiaozhou-EMR/jar的目录,然后通过OSS 控制台或 OSS 的 SDK 将 /target/shaded目录下的 examples-1.1-shaded.jar上传到 OSS 的这个目录下。上传后的 jar 包地址为 oss://qiaozhou-EMR/jar/examples-1.1-shaded.jar,这个地址在后面会用上,如下图所示:

image

  1. 搭建 EMR 集群,创建任务并运行执行计划。

    1. 通过 EMR 控制台创建一个 EMR 集群,大约需要 10 分钟左右,请耐心等待。
    2. 创建一个类型为 Spark 的作业。
      请根据您具体的配置将 SLS_endpoint $SLS_access_id $SLS_secret_key 替换成真实值。请注意参数的顺序,否则可能会报错。
    —master yarn —deploy-mode client —driver-memory 4g —executor-memory 2g —executor-cores 2 —class com.aliyun.EMR.example.LoghubSample ossref://EMR-test/jar/examples-1.1-shaded.jar canaltest canal sparkstreaming $SLS_endpoint $SLS_access_id $SLS_secret_key 1

运行以上的命令

  1. 查询 Master 节点的IP
  2. 通过 SSH 登录后,执行以下命令:

  3. fs -ls /

  4. 可以看到 mysqlbinlog 开头的目录,再通过以下命令查看 mysqlbinlog 文件:

  5. fs -ls /mysqlbinlog

还可以通过 hadoop fs -cat /mysqlbinlog/part-00000 命令查看文件内容。

  1. 错误排查。
    如果没有看到正常的结果,可以登陆节点,查看对应的作业的错误情况。
相关实践学习
数据湖构建DLF快速入门
本教程通过使⽤数据湖构建DLF产品对于淘宝用户行为样例数据的分析,介绍数据湖构建DLF产品的数据发现和数据探索功能。
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
11天前
|
存储 安全 关系型数据库
Mysql 的binlog日志的优缺点
MySQL的binlog(二进制日志)是一个记录数据库更改的日志文件,它包含了所有对数据库执行的更改操作,如INSERT、UPDATE和DELETE等。binlog的主要目的是复制和恢复。以下是binlog日志的优缺点: ### 优点: 1. **数据恢复**:当数据库出现意外故障或数据丢失时,可以利用binlog进行点恢复(point-in-time recovery),将数据恢复到某一特定时间点。 2. **主从复制**:binlog是实现MySQL主从复制功能的核心组件。主服务器将binlog中的事件发送到从服务器,从服务器再重放这些事件,从而实现数据的同步。 3. **审计**:b
|
20天前
|
SQL 关系型数据库 MySQL
mysql的binlog恢复数据
mysql的binlog恢复数据
20 0
|
21天前
|
SQL 关系型数据库 MySQL
MySQL数据库,可以使用二进制日志(binary log)进行时间点恢复
对于MySQL数据库,可以使用二进制日志(binary log)进行时间点恢复。二进制日志是MySQL中记录所有数据库更改操作的日志文件。要进行时间点恢复,您需要执行以下步骤: 1. 确保MySQL配置文件中启用了二进制日志功能。在配置文件(通常是my.cnf或my.ini)中找到以下行,并确保没有被注释掉: Copy code log_bin = /path/to/binary/log/file 2. 在需要进行恢复的时间点之前创建一个数据库备份。这将作为恢复的基准。 3. 找到您要恢复到的时间点的二进制日志文件和位置。可以通过执行以下命令来查看当前的二进制日志文件和位
|
2月前
|
存储 SQL 安全
浅谈MySQL Binlog
浅谈MySQL Binlog
45 0
|
1月前
|
存储 SQL 关系型数据库
[MySQL]事务原理之redo log,undo log
[MySQL]事务原理之redo log,undo log
|
1月前
|
SQL 缓存 关系型数据库
MySQL的万字总结(缓存,索引,Explain,事务,redo日志等)
MySQL的万字总结(缓存,索引,Explain,事务,redo日志等)
65 0
|
2月前
|
SQL 存储 关系型数据库
binlog 日志的三种格式
binlog 日志的三种格式
|
2月前
|
存储 监控 关系型数据库
ELK架构监控MySQL慢日志
ELK架构监控MySQL慢日志
|
2月前
|
SQL 运维 关系型数据库
MySQL中常见的几种日志类型
MySQL中常见的几种日志类型
|
2月前
|
关系型数据库 MySQL 数据库
MySQL员工打卡日志表——数据库练习
MySQL员工打卡日志表——数据库练习
136 0