Apache Carbondata接入Kafka实时流数据

简介: 1.导入carbondata依赖的jar包 将apache-carbondata-1.5.3-bin-spark2.3.2-hadoop2.7.2.jar导入$SPARKHOME/jars;或将apache-carbondata-1.

1.导入carbondata依赖的jar包

apache-carbondata-1.5.3-bin-spark2.3.2-hadoop2.7.2.jar导入$SPARKHOME/jars;或将apache-carbondata-1.5.3-bin-spark2.3.2-hadoop2.7.2.jar导入在$SPARKHOME创建的carbondlib目录

2.导入kafka依赖的jar包

接入kafka数据需要依赖kafka的jars,将以下jars导入$SPARKHOME/jars

kafka-clients-0.10.0.1.jar
spark-sql-kafka-0-10_2.11-2.3.2.jar

3.spark-shell启动服务

./bin/spark-shell --master spark://hostname:7077 --jars apache-carbondata-1.5.3-bin-spark2.3.2-hadoop2.7.2.jar

a).导入依赖

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._

b).创建session

启动第一个目录是数据存储目录,第二个目录是元数据目录;都可以是hdfs目录

val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("/home/bigdata/carbondata/data","/home/bigdata/carbondata/carbon.metastore")

c).创建source表

carbon.sql(
            s"""
                | CREATE TABLE IF NOT EXISTS kafka_json_source(
                |     id STRING,
                |    name STRING,
                |    age INT,
                |    brithday TIMESTAMP)
                | STORED AS carbondata
                | TBLPROPERTIES(
                |    'streaming'='source',
                |    'format'='kafka',
                |    'kafka.bootstrap.servers'='hostname:9092',
                |    'subscribe'='kafka_json',
                |    'record_format'='json',
                |    'comment'='get kafka data')
            """.stripMargin).show()

d).创建sink表

carbon.sql(
            s"""
                | CREATE TABLE IF NOT EXISTS kafka_json_sink(
                |     id STRING,
                |    name STRING,
                |    age INT,
                |    brithday TIMESTAMP)
                | STORED AS carbondata
                | TBLPROPERTIES(
                |    'streaming'='sink')
            """.stripMargin).show()

e).创建job任务

carbon.sql(
            s"""
                | CREATE STREAM kafka_json_job ON TABLE kafka_json_sink(
                | STMPROPERTIES(
                |    'trigger'='ProcessingTime',
                |    'interval'='10 seconds')
                | AS SELECT * FROM kafka_json_source
            """.stripMargin).show()

f).创建DATAMAP

carbon.sql(
            s"""
                | CREATE DATAMAP agg_kafka_json_sink 
                | ON TABLE kafka_json_sink(
                | USING "preaggregate"
                | AS
                |    SELECT id,name,sum(age),max(age),min(age),avg(age)
                |    FROM kafka_json_sink
                |    GRPUP BY id,name
            """.stripMargin).show()

4.常用SQL命令

a).导入本地数据

carbon.sql("LOAD DATA INPATH '/home/bigdata/carbondata/sample.csv' INTO TABLE kafka_json_source").show()

b).查看表结构

carbon.sql("DESC kafka_json_source").show()

c).查看表数据

carbon.sql("SELECT * FROM kafka_json_source WHERE id=1").show()

d).清理表数据

carbon.sql("TRUNCATE TABLE kafka_json_sink").show()

e).删除表

carbon.sql("DROP TABLE IF EXISTS kafka_json_source").show()

f).查看job任务状态

carbon.sql("SHOW STREAMS ON TABLE kafka_json_sink").show()

g).删除job任务

carbon.sql("DROP STREAM kafka_json_job").show()

h).查询DATAMAP表信息

carbon.sql("DESC agg_kafka_json_sink_kafka_json_sink").show()

i).查询表Segments信息

carbon.sql("SHOW SEGMENTS FOR TABLE kafka_json_sink").show()

j).条件查询

carbon.sql("SELECT * FROM kafka_json_sink WHERE agent_id=499 AND signature=''").show()

k).聚合查询

carbon.sql("SELECT agent_id,signature,method_type,sum(elapse_time),max(elapse_time),min(elapse_time) FROM kafka_json_sink GROUP BY agent_id,signature,method_type").show()

5.注意事项

a).kafka使用配置

由于Carbondata的kafka-consumer反序列化配置如下,所以在kafka-producer应该使用对于配置,否则无法解析数据

key.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializer
目录
相关文章
|
1月前
|
消息中间件 存储 大数据
Apache Kafka: 强大消息队列系统的介绍与使用
Apache Kafka: 强大消息队列系统的介绍与使用
|
3月前
|
消息中间件 JSON druid
Druid:通过 Kafka 加载流数据
Druid:通过 Kafka 加载流数据
39 0
|
8天前
|
消息中间件 存储 算法
深入了解Kafka的数据持久化机制
深入了解Kafka的数据持久化机制
25 0
|
2月前
|
消息中间件 Kafka Apache
Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
【2月更文挑战第6天】Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
71 2
|
2天前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【4月更文挑战第17天】本文介绍了在Java环境下使用Apache Kafka进行消息队列处理的方法。Kafka是一个分布式流处理平台,采用发布/订阅模型,支持高效的消息生产和消费。文章详细讲解了Kafka的核心概念,包括主题、生产者和消费者,以及消息的存储和消费流程。此外,还展示了Java代码示例,说明如何创建生产者和消费者。最后,讨论了在高并发场景下的优化策略,如分区、消息压缩和批处理。通过理解和应用这些策略,可以构建高性能的消息系统。
|
1月前
|
消息中间件 存储 缓存
Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
【2月更文挑战第20天】Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
28 1
|
2月前
|
分布式计算 资源调度 Hadoop
Flink报错问题之Sql往kafka表写聚合数据报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
机器学习/深度学习 消息中间件 人工智能
机器学习PAI报错问题之读取kafka数据报错如何解决
人工智能平台PAI是是面向开发者和企业的机器学习/深度学习工程平台,提供包含数据标注、模型构建、模型训练、模型部署、推理优化在内的AI开发全链路服务;本合集将收录PAI常见的报错信息和解决策略,帮助用户迅速定位问题并采取相应措施,确保机器学习项目的顺利推进。
|
2月前
|
SQL 消息中间件 关系型数据库
Flink CDC数据同步问题之向kafka同步数据报错如何解决
Flink CDC数据同步是指利用Flink CDC实现不同数据源之间的实时数据同步任务;本合集旨在提供Flink CDC数据同步的操作指南、性能优化建议和常见问题处理,助力用户高效实施数据同步。
|
3月前
|
NoSQL Java 关系型数据库
使用Kafka实现Java异步更新通知解决Redis与MySQL数据不一致
使用Kafka实现Java异步更新通知解决Redis与MySQL数据不一致
41 0

推荐镜像

更多