Spark连接JDBC数据源

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 在实际的项目环境中,成熟的技术体系对关系型数据库的依赖远远超过hdfs,而且对大数据运算的结果,实践中也倾向于保存到数据库中,以便进行多种方式的可视化。所以本次实践主要完成spark从mysql中读取和写入数据。

在实际的项目环境中,成熟的技术体系对关系型数据库的依赖远远超过hdfs,而且对大数据运算的结果,实践中也倾向于保存到数据库中,以便进行多种方式的可视化。所以本次实践主要完成spark从mysql中读取和写入数据。一般这个操作有两种方式,一种是自己建立jdbc连接,像一般数据库操作一样的写法,一种就是利用spark自带的jdbc操作函数。

首先要把mysql jdbc connector的jar包上传到集群中每台机器的spark/jars目录,这是一个讨巧的办法,因为spark运行之前一定把这里面所有的jar都加到CALSS_PATH里面去了。

通过spark.read.jdbc读取出来的返回值是DataFrame,如下代码所示。`

val rfidCardMap = spark.read.jdbc(mysqlHelper.DB_URL_R,"t_rfid_card",Array("org_id="+ ORG_ID), mysqlHelper.PROPERTIES).map(row => {
  (row.getAs[String]("card_id"), row.getAs[String]("card_label"))
}).rdd.collect() toMap`

此函数需要传入参数依次为:数据库连接url,表名,过滤条件表达式列表,带有用户名密码信息的属性对象。读取了数据之后,形成一个(String,String)对象返回。这里有两个要注意的:

  1. getAs的类型必须和数据库中列的类型严格匹配
  2. 返回元组类型的对象比返回自定义类的对象写法要轻松一些。如果是返回自定义类的对象,编译会出错,一般说法是语句之前加入import spark.implicits._会有效,但未必见得。尚待进一步探索。

如下是一个比较复杂的解析处理代码示例。`

val teamWeightMapRDD = dfMedicalWaste.map(row => {
  (rfidCardMap.get(row.getAs[String]("team_id")) toString,
  sdf.format(new Date(row.getAs[Timestamp]("rec_ts").getTime)) toInt,
  row.getAs[Double]("mw_weight"))
}).rdd.cache()`

这里sdf就是java里面常用的SimpleDateFormat,它把一个时间戳字段转化成了6个长度的整型。

处理完成后,将结果回写数据库时采用的是本地jdbc连接写法,这块内容很普通了。

这次实践有个特别清晰的理解就是scala的类型推断,由于要统计某个地点一段时间之内的产量总和、平均产量、最大和最小单位时间产量,使用到了DoubleRDDFunctions,代码如下:`

val weightArrayRDD = teamWeightMapRDD.filter(teamWeight => {
  teamWeight._1 == teamName && teamWeight._2 >= week._1 && teamWeight._2 < week._2
}).map(teamWeight => {
  (teamWeight._2, teamWeight._3)
}).reduceByKey((a, b) =>
  a + b
).map(item => {
  item._2
}).cache()`

使用的时候如下:`

line.append(weightArrayRDD sum).append("\t")
line.append(weightArrayRDD mean).append("\t")
line.append(weightArrayRDD max).append("\t")
line.append(weightArrayRDD min).append("\t")`

scala会根据返回值类型进行类型推断,从而匹配可以使用的函数,同样是RDD或者DataFram,包含的类型不同,可以使用的函数也不同,这一切都是透明的。

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
6天前
|
SQL Java 关系型数据库
JDBC数据库的连接
JDBC数据库的连接
38 0
|
6月前
|
Java 关系型数据库 MySQL
JDBC连接数据库工具类
JDBC连接数据库工具类
|
6月前
|
Java 数据库连接 应用服务中间件
原生JDBC使用C3p0数据源和dbcp数据源
原生JDBC使用C3p0数据源和dbcp数据源
66 0
|
6月前
|
分布式计算 Java 关系型数据库
202 Spark JDBC
202 Spark JDBC
49 0
|
6天前
|
SQL Java 数据库连接
JDBC的连接参数的设置导致rowid自动添加到sql
JDBC的连接参数的设置导致rowid自动添加到sql
14 1
|
3天前
|
Java 数据处理 流计算
实时计算 Flink版产品使用合集之可以通过JDBC连接器来连接Greenplum数据库吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
14 1
|
6天前
|
Oracle Java 关系型数据库
【服务器】python通过JDBC连接到位于Linux远程服务器上的Oracle数据库
【服务器】python通过JDBC连接到位于Linux远程服务器上的Oracle数据库
17 6
|
6天前
|
JSON 前端开发 Java
管理系统总结(前端:Vue-cli, 后端Jdbc连接mysql数据库,项目部署tomcat里)
管理系统总结(前端:Vue-cli, 后端Jdbc连接mysql数据库,项目部署tomcat里)
|
6天前
|
Java 关系型数据库 MySQL
JDBC连接数据库
JDBC连接数据库
|
6天前
|
SQL Java 数据库连接
数据库访问: JDBC是什么,如何使用它连接数据库?
JDBC是Java访问关系数据库的标准API,包含一组Java类和接口,如java.sql和javax.sql。要连接数据库,需先下载相应JDBC驱动,然后使用DriverManager.getConnection()方法。以下是一个连接MySQL数据库的示例代码片段,展示如何加载驱动、建立连接并关闭连接。注意替换代码中的数据库URL、用户名和密码。
12 3