通过Spark SQL实时归档SLS数据

简介: 我在前一篇文章介绍过基于Spark SQL实现对HDFS操作的实时监控报警。今天,我再举例说明一下如何使用Spark SQL进行流式应用的开发。

我在前一篇文章介绍过基于Spark SQL实现对HDFS操作的实时监控报警。今天,我再举例说明一下如何使用Spark SQL进行流式应用的开发。本文主要分成三部分:

  • 流式计算和SQL
  • 简要介绍Spark SQL流式开发语法
  • 实时归档SLS数据到HDFS

1. 流式计算和SQL

数据的价值随着时间逐渐降低。及时尽早的对数据进行处理提升了数据的价值,所以流式计算系统的应用也越来越广泛。目前常用的流式计算框架有Storm,Spark Streaming及Flink等,也有Kafka Streams这类基于Kafka的流式处理类库。各种流式处理框架都有其各自的API,开发者不可避免的需要学习如何使用这些API。如何提供简单而有效的开发工具,从而把更多的精力投放在业务处理中。所以,各个流式处理系统都逐渐支持SQL API作为开发语言,让使用者可以像处理Table一样处理Stream。例如KSQL支持使用SQL进行流式处理Kafka数据。Spark同样提出来Structured Streaming作为最新一代的流式处理系统,底层的处理引擎也是Spark SQL。不过在上层SQL API,缺少Structured Streaming必要的功能,例如window,watermark等。EMR在Spark开源版本上进行了功能扩展,支持使用SQL API在Spark上进行完整的流式查询开发。

2. Spark SQL流式开发入门

这节将简要介绍Spark SQL中关于流式开的概念和语法。

2.1 建表

当我们需要对流式数据源进行读写操作时,需要首先创建一张表来表示这个数据源。定义表的语法如下:

CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
USING providerName
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*);

以上语法中,针对特殊source,不要求一定指定表的列定义。当不指定列定义时,会自动识别数据源的schema信息。举一个例子:

CREATE TABLE driver_behavior 
USING kafka 
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "${TOPIC_NAME}",
output.mode = "${OUTPUT_MODE}",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");

当数据源是Kafka时,会根据Kafka Topic名去到Kafka Schema Registry中查找schema信息。当然,我们也可以指定列定义,例如:

CREATE TABLE driverbehavior(deviceId string, velocity double)
USING kafka 
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "${TOPIC_NAME}",
output.mode = "${OUTPUT_MODE}",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");

当指定列定义时,要求必须和Source中的字段定义是一致的。当执行完CREATE TABLE操作,表的定义会保存到Hive MetaStore中。

2.2 CTAS

我们可以将创建表和将查询结果写入到表的语句合并到一起,那么就是CREATE TABLE ... AS SELECT ...语法:

CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
USING providerName
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*)
AS
queryStatement;

举一个例子(引用自这里: q103):

CREATE TABLE kafka_temp_table
USING kafka
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "${TOPIC_NAME}",
output.mode = "${OUTPUT_MODE}",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}") AS
SELECT
  i_brand_id brand_id,
  i_brand brand,
  sum(ss_ext_sales_price) ext_price
FROM date_dim, kafka_store_sales, item
WHERE d_date_sk = ss_sold_date_sk
  AND ss_item_sk = i_item_sk
  AND i_manager_id = 28
  AND d_moy = 11
  AND d_year = 1999
  AND delay(ss_data_time) < '2 minutes'
GROUP BY TUMBLING(ss_data_time, interval 1 minute), i_brand, i_brand_id

当执行完操作,将创建出表并实际生成一个StreamQuery实例,将查询结果写入到结果表中。

2.3 DML

流式查询SQL和离线SQL标准语法大部分是一样,这边主要介绍insert操作。流式查询是不允许单独进行SELECT操作,必须将SELECT的查询结果写入到表中。所以,需要在SELECT操作之前执行INSERT操作。

INSERT INTO tbName[(columnName[,columnName]*)]
queryStatement;

以上语法为一次流式查询:这个语句将实际生成一个StreamQuery实例,将查询结果写入到结果表中。举一个例子:

INSERT INTO kafka_temp_table
SELECT
  i_brand_id brand_id,
  i_brand brand,
  sum(ss_ext_sales_price) ext_price
FROM date_dim, kafka_store_sales, item
WHERE d_date_sk = ss_sold_date_sk
  AND ss_item_sk = i_item_sk
  AND i_manager_id = 28
  AND d_moy = 11
  AND d_year = 1999
  AND delay(ss_data_time) < '2 minutes'
GROUP BY TUMBLING(ss_data_time, interval 1 minute), i_brand, i_brand_id

2.4 window及watermark

限于篇幅,本文暂且不介绍Spark SQL中如何使用window和watermak,有兴趣的可以先看看资料,后续会专门撰文介绍。

2.5 流式作业配置

使用SQL进行流式作业开发时,有些必要的配置无法在Query表达出来,需要单独进行设置。这里我们使用SET操作进行流式作业必要参数配置,当前有两个参数需要设置:

name config
流式查询实例名称 streaming.query.name
流式作业Checkpoint地址 spark.sql.streaming.checkpointLocation.${streaming.query.name}

每一个流式查询实例前都需要进行配置,也就是说,当使用CTAS或者Insert操作时,必须前置这两个配置。一个SQL文件支持多个流式查询,例如:

-- test.sql

SET streaming.query.name=query1;
SET spark.sql.streaming.checkpointLocation.query1=/tmp/spark/query1
INSERT INTO tbName1 [(columnName[,columnName]*)]
queryStatement1;

SET streaming.query.name=query2;
SET spark.sql.streaming.checkpointLocation.query2=/tmp/spark/query2
INSERT INTO tbName2 [(columnName[,columnName]*)]
queryStatement2;

3. SLS数据实时归档实战

假定一个场景,现在通过SLS收集了业务服务器上的日志,需要归档到HDFS中,便于后续进行离线分析。这里涉及到两个数据源:SLS和HDFS。HDFS是Spark官方支持的数据源,支持流和批的读写。SLS是阿里云的服务,EMR已经支持了流式读写。

  • 环境准备
    需要E-MapReduce 3.21.0以上版本集群环境,当前正在发布准备中,很快和大家见面,敬请期待。
  • 命令行
spark-sql --master yarn-client --conf spark.sql.streaming.datasource.provider=loghub --jars emr-logservice_shaded_2.11-1.7.0-SNAPSHOT.jar

注:emr-logservice_shaded_2.11-1.7.0-SNAPSHOT.jar将会在EMR SDK 1.7.0版本发布出来。

  • 分别创建两张表:sls_service_log和hdfs_service_log
CREATE DATABASE IF NOT EXISTS default;
USE default;

DROP TABLE IF EXISTS hdfs_service_log;
CREATE TABLE hdfs_service_log (instance_name string, ip string, content string)
USING PARQUET
LOCATION '/tmp/hdfs_service_log';

DROP TABLE IF EXISTS sls_service_log;
CREATE TABLE sls_service_log
USING loghub
OPTIONS (
sls.project = "${logProjectName}",
sls.store = "${logStoreName}",
access.key.id = "${accessKeyId}",
access.key.secret = "${accessKeySecret}",
endpoint = "${endpoint}");
  • 通过Spark SQL启动一个Stream Query将SLS数据实时同步到HDFS中
set streaming.query.name=sync_sls_to_hdfs;
set spark.sql.streaming.checkpointLocation.sync_sls_to_hdfs=hdfs:///tmp/spark/sync_sls_to_hdfs;

INSERT INTO hdfs_service_log
select
__tag__hostname__ as instance_name,
ip,
content
from sls_service_log;
  • 查看HDFS数据归档情况

image

  • 使用Spark SQL对归档的数据进行离线分析:例如统计一共有多少个IP
select distinct(ip) from hdfs_service_log;

image

4. 结语

以上,我们介绍了Spark SQL在流式处理中的一个非常简单的例子。其实,我们还可以使用Spark SQL进行更加复杂的流式处理任务。后续文章,我将介绍窗口操作,watermark等概念,以及如何在流式数据上进行简单的机器学习运算。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
17天前
|
SQL 存储 关系型数据库
一文搞懂SQL优化——如何高效添加数据
**SQL优化关键点:** 1. **批量插入**提高效率,一次性建议不超过500条。 2. **手动事务**减少开销,多条插入语句用一个事务。 3. **主键顺序插入**避免页分裂,提升性能。 4. **使用`LOAD DATA INFILE`**大批量导入快速。 5. **避免主键乱序**,减少不必要的磁盘操作。 6. **选择合适主键类型**,避免UUID或长主键导致的性能问题。 7. **避免主键修改**,保持索引稳定。 这些技巧能优化数据库操作,提升系统性能。
213 4
一文搞懂SQL优化——如何高效添加数据
|
1月前
|
SQL 数据可视化 数据处理
使用SQL和Python处理Excel文件数据
使用SQL和Python处理Excel文件数据
51 0
|
27天前
|
SQL 安全 数据库
第三章用sql语句操作数据
第三章用sql语句操作数据
10 0
|
1月前
|
SQL 分布式计算 Java
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
72 1
|
1月前
|
SQL 数据库 数据库管理
SQL中如何添加数据:基础指南
SQL中如何添加数据:基础指南
24 2
|
1月前
|
存储 分布式计算 API
adb spark的lakehouse api访问内表数据,还支持算子下推吗
【2月更文挑战第21天】adb spark的lakehouse api访问内表数据,还支持算子下推吗
107 2
|
2月前
|
SQL 数据库 数据安全/隐私保护
sql注入碰到加密数据怎么办
sql注入碰到加密数据怎么办
18 1
|
2月前
|
分布式计算 资源调度 Hadoop
Flink报错问题之Sql往kafka表写聚合数据报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL 消息中间件 Kafka
Flink sql 问题之主动使数据延时一段时间如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
53 2
|
2月前
|
SQL Oracle 关系型数据库
Flink SQL 问题之看不到数据如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
50 3