创建消息队列(MQ)源表

简介:

什么是消息队列MQ

消息队列(Message Queue),简称MQ。是阿里云商用的专业消息中间件,是企业级互联网架构的核心产品。基于高可用分布式集群技术,搭建了包括发布订阅、消息轨迹、资源统计、定时(延时)、监控报警等一套完整的消息云服务,实现分布式计算场景中所有异步解耦功能。
实时计算 Flink可以将消息队列作为流式数据输入,如下:

示例

 
  
  1. create table metaq_stream(
  2. x varchar,
  3. y varchar,
  4. z varchar
  5. ) with (
  6. type='mq',
  7. topic='blink_dXXXXXXX',
  8. endpoint='onsaddr.aliyun.com',
  9. pullIntervalMs='100',
  10. accessId='xxx',
  11. accessKey='xxx',
  12. startMessageOffset='547',
  13. consumerGroup='CID_BLINK_SOURCE_001',
  14. fieldDelimiter='####'
  15. );

注意:MQ实际上是一个非结构化存储格式,对于数据的Schema不提供强制定义,完全由业务层指定。目前实时计算支持类CSV格式文本和二进制格式。

CSV类格式

对于CSV类格式,假定你的一条MQ消息记录格式如下:

 
  
  1. 1,name,male
  2. 2,name,female

一条MQ消息可以包括0条到多条数据记录,记录与记录之间使用\n分隔。您在实时计算必须定义该MQ的DDL:

 
  
  1. create table metaq_stream(
  2. id varchar,
  3. name varchar,
  4. sex varchar
  5. ) with (
  6. type='mq',
  7. topic='blink_dXXXXXXX',
  8. endpoint='onsaddr.aliyun.com',
  9. pullIntervalMs='100',
  10. accessId='xxx',
  11. accessKey='xxx',
  12. startMessageOffset='547',
  13. consumerGroup='CID_BLINK_SOURCE_001',
  14. fieldDelimiter='####'
  15. );

WITH参数

参数 注释说明 备注
topic topic名
endPoint endPoint地址 公共云内网接入(阿里云经典网络/VPC):华东1、华东2、华北1、华北2、华南1、
香港的区域endpoint的地址是:onsaddr-internal.aliyun.com:8080 
公共云公网接入地址是:http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
accessId accessId
accessKey accessKey
consumerGroup 订阅消费group名
pullIntervalMs 拉取时间间隔,毫秒
startTime 可选,消息消费启动的时间点
startMessageOffset 可选,消息开始的偏移量 如果填了优先以startMessageoffset的位点开始加载
tag 订阅的标签 可选
lineDelimiter 解析TT block时行分隔符 可选,默认为 “\n”
fieldDelimiter 字段分隔符 可选,默认为”\u0001” 表示 Crtl+A 和 \001
encoding 编码格式 可选,默认为 “utf-8”
lengthCheck 单行字段条数检查策略 可选,默认为SKIP,其它可选值为EXCEPTION、PAD。SKIP:字段数目不符合时跳过 。EXCEPTION:字段数目不符合时抛出异常。PAD:按顺序填充,不存在的置为null。
columnErrorDebug 是否打开调试开关,如果打开,会把解析异常的log打印出来 可选,默认为false

类型映射

MQ字段类型 建议实时计算字段类型
STRING VARCHAR
本文转自实时计算—— 创建消息队列(MQ)源表
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2天前
|
消息中间件 网络协议 JavaScript
MQTT常见问题之微消息队列mqtt支持ipv6失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2天前
|
消息中间件 物联网 Java
MQTT常见问题之微消息队列配置失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2天前
|
消息中间件 存储 监控
RabbitMQ:分布式系统中的高效消息队列
RabbitMQ:分布式系统中的高效消息队列
|
2天前
|
消息中间件 分布式计算 监控
Python面试:消息队列(RabbitMQ、Kafka)基础知识与应用
【4月更文挑战第18天】本文探讨了Python面试中RabbitMQ与Kafka的常见问题和易错点,包括两者的基础概念、特性对比、Python客户端使用、消息队列应用场景及消息可靠性保证。重点讲解了消息丢失与重复的避免策略,并提供了实战代码示例,帮助读者提升在分布式系统中使用消息队列的能力。
42 2
|
2天前
|
消息中间件 Java
springboot整合消息队列——RabbitMQ
springboot整合消息队列——RabbitMQ
82 0
|
2天前
|
消息中间件 存储 运维
为什么选择云消息队列 RocketMQ 版
为什么选择云消息队列 RocketMQ 版
6 1
|
2天前
|
消息中间件 存储 运维
深入理解MQ消息队列的高可用与可靠性策略
深入理解MQ消息队列的高可用与可靠性策略
26 3
|
2天前
|
消息中间件 大数据 Java
消息队列 MQ
消息队列 MQ
28 3
|
2天前
|
消息中间件 数据安全/隐私保护
MQTT微消息队列服务器连接报错:Error: Connection refused: Not authorized
使用MQTTX工具进行测试时,通过AccessKey创建了Client ID的用户名和密码。配置了公网接入点及端口1883,但尝试连接时出现错误。已附上工具截图:![](https://ucc.alicdn.com/pic/developer-ecology/3byii5uar64gg_36327474e991439da422f38c450ef153.png)。确认过用户名、密码和Client ID无误,问题仍未解决,期待回复!
|
2天前
|
消息中间件 存储 监控
解析RocketMQ:高性能分布式消息队列的原理与应用
RocketMQ是阿里开源的高性能分布式消息队列,具备低延迟、高吞吐和高可靠性,广泛应用于电商、金融等领域。其核心概念包括Topic、Producer、Consumer、Message和Name Server/Broker。RocketMQ支持异步通信、系统解耦、异步处理和流量削峰。关键特性有分布式架构、顺序消息、高可用性设计和消息事务。提供发布/订阅和点对点模型,以及消息过滤功能。通过集群模式、存储方式、发送和消费方式的选择进行性能优化。RocketMQ易于部署,可与Spring集成,并与Kafka等系统对比各有优势,拥有丰富的生态系统。
168 4

热门文章

最新文章