PostgreSQL pg_recvlogical 与 test_decoding 自定义,支持source table filter, 对接kafka,es等

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云原生数据库 PolarDB 分布式版,标准版 2核8GB
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介:

标签

PostgreSQL , pg_receivewal , pg_recvlogical


背景

pg_recvlogical 是PG提供的一个通过流复制协议,实时接收数据库逻辑变更的命令行客户端。

逻辑变更内容的来源是上游数据库的wal sender进程调用logical decode plugin处理的。

通过给decode plugin传递plugin option,可以实现参数的传递。

PostgreSQL 9.5后引入的特性,支持通过流复制协议传递option, value.

stream protocol 协议支持option上报

https://www.postgresql.org/docs/10/static/protocol-replication.html

START_REPLICATION SLOT slot_name LOGICAL XXX/XXX [ ( option_name [ option_value ] [, ...] ) ]  
Instructs server to start streaming WAL for logical replication, starting at WAL location XXX/XXX. The server can reply with an error, for example if the requested section of WAL has already been recycled. On success, server responds with a CopyBothResponse message, and then starts to stream WAL to the frontend.  
  
The messages inside the CopyBothResponse messages are of the same format documented for START_REPLICATION ... PHYSICAL.  
  
The output plugin associated with the selected slot is used to process the output for streaming.  
  
SLOT slot_name  
The name of the slot to stream changes from. This parameter is required, and must correspond to an existing logical replication slot created with CREATE_REPLICATION_SLOT in LOGICAL mode.  
  
XXX/XXX  
The WAL location to begin streaming at.  
  
option_name  
The name of an option passed to the slot's logical decoding plugin.  
  
option_value  
Optional value, in the form of a string constant, associated with the specified option.  

所以,从9.5开始,我们可以在客户端向服务端的decode plugin传递信息,例如告诉decode plugin,想同步哪些表,哪些条件等等(当然这些需要在decode plugin中有对应逻辑)。

pic

test_decode是pg内置的一个逻辑decode插件,通过修改test_decode,可以实现需要的格式输出,例如JSON。也可以实现TABLE的过滤等。

通过修改客户端pg_recvlogical 的代码,可以实现通过stream protocol上报需要过滤什么TABLE,以及写入到出标准输出、文件以外的例如kafka或者其他目标端。

pg_recvlogical

https://www.postgresql.org/docs/10/static/app-pgrecvlogical.html

-o name[=value]  
--option=name[=value]  
Pass the option name to the output plugin with, if specified, the option value value. Which options exist and their effects depends on the used output plugin.  
  
  
  
-P plugin  
--plugin=plugin  
When creating a slot, use the specified logical decoding output plugin. See Chapter 48. This option has no effect if the slot already exists.  
  
  
  
-F interval_seconds  
--fsync-interval=interval_seconds  
Specifies how often pg_recvlogical should issue fsync() calls to ensure the output file is safely flushed to disk.  
  
The server will occasionally request the client to perform a flush and report the flush position to the server. This setting is in addition to that, to perform flushes more frequently.  
  
Specifying an interval of 0 disables issuing fsync() calls altogether, while still reporting progress to the server. In this case, data could be lost in the event of a crash.  

logical 代码

https://www.postgresql.org/docs/10/static/logicaldecoding-output-plugin.html

初始化decodeing

src/backend/replication/logical/logical.c

 195 /*  
 196  * Create a new decoding context, for a new logical slot.  
 197  *  
 198  * plugin contains the name of the output plugin  
 199  * output_plugin_options contains options passed to the output plugin  
 200  * read_page, prepare_write, do_write are callbacks that have to be filled to  
 201  *      perform the use-case dependent, actual, work.  
 202  *  
 203  * Needs to be called while in a memory context that's at least as long lived  
 204  * as the decoding context because further memory contexts will be created  
 205  * inside it.  
 206  *  
 207  * Returns an initialized decoding context after calling the output plugin's  
 208  * startup function.  
 209  */  
 210 LogicalDecodingContext *  
 211 CreateInitDecodingContext(char *plugin,  
 212                           List *output_plugin_options,   // 传递参数  
 213                           bool need_full_snapshot,  
 214                           XLogPageReadCB read_page,  
 215                           LogicalOutputPluginWriterPrepareWrite prepare_write,  
 216                           LogicalOutputPluginWriterWrite do_write)  
 217 {  

test decoding

解析option参数

contrib/test_decoding/test_decoding.c

/* initialize this plugin */  
  89 static void  
  90 pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,  
  91                   bool is_init)  
  92 {  
  93     ListCell   *option;  
  94     TestDecodingData *data;  
  95   
  96     data = palloc0(sizeof(TestDecodingData));  
  97     data->context = AllocSetContextCreate(ctx->context,  
  98                                           "text conversion context",  
  99                                           ALLOCSET_DEFAULT_MINSIZE,  
 100                                           ALLOCSET_DEFAULT_INITSIZE,  
 101                                           ALLOCSET_DEFAULT_MAXSIZE);  
 102     data->include_xids = true;  
 103     data->include_timestamp = false;  
 104     data->skip_empty_xacts = false;  
 105     data->only_local = false;  
 106   
 107     ctx->output_plugin_private = data;  
 108   
 109     opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;  
 110   
 111     foreach(option, ctx->output_plugin_options)  
 112     {  
 113         DefElem    *elem = lfirst(option);  
 114   
 115         Assert(elem->arg == NULL || IsA(elem->arg, String));  
 116   
 117         if (strcmp(elem->defname, "include-xids") == 0)   // 参数  
 118         {  
 119             /* if option does not provide a value, it means its value is true */  
 120             if (elem->arg == NULL)  
 121                 data->include_xids = true;  
 122             else if (!parse_bool(strVal(elem->arg), &data->include_xids))  
 123                 ereport(ERROR,  
 124                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),  
 125                   errmsg("could not parse value \"%s\" for parameter \"%s\"",  
 126                          strVal(elem->arg), elem->defname)));  
 127         }  
 128         else if (strcmp(elem->defname, "include-timestamp") == 0)  // 参数  
 129         {  
 130             if (elem->arg == NULL)  
 131                 data->include_timestamp = true;  
 132             else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))  
 133                 ereport(ERROR,  
 134                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),  
 135                   errmsg("could not parse value \"%s\" for parameter \"%s\"",  
 136                          strVal(elem->arg), elem->defname)));  
 137         }  
 138         else if (strcmp(elem->defname, "force-binary") == 0)  // 参数  
 139         {  
 140             bool        force_binary;  
 141   
 142             if (elem->arg == NULL)  
 143                 continue;  
 144             else if (!parse_bool(strVal(elem->arg), &force_binary))  
 145                 ereport(ERROR,  
 146                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),  
 147                   errmsg("could not parse value \"%s\" for parameter \"%s\"",  
 148                          strVal(elem->arg), elem->defname)));  
 149   
 150             if (force_binary)  
 151                 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;  
 152         }  
 153         else if (strcmp(elem->defname, "skip-empty-xacts") == 0)  // 参数  
 154         {  
 155   
 156             if (elem->arg == NULL)  
 157                 data->skip_empty_xacts = true;  
 158             else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))  
 159                 ereport(ERROR,  
 160                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),  
 161                   errmsg("could not parse value \"%s\" for parameter \"%s\"",  
 162                          strVal(elem->arg), elem->defname)));  
 163         }  
 164         else if (strcmp(elem->defname, "only-local") == 0)  // 参数  
 165         {  
 166   
 167             if (elem->arg == NULL)  
 168                 data->only_local = true;  
 169             else if (!parse_bool(strVal(elem->arg), &data->only_local))  
 170                 ereport(ERROR,  
 171                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),  
 172                   errmsg("could not parse value \"%s\" for parameter \"%s\"",  
 173                          strVal(elem->arg), elem->defname)));  
 174         }  
 175         else  
 176         {  
 177             ereport(ERROR,  
 178                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),  
 179                      errmsg("option \"%s\" = \"%s\" is unknown",  
 180                             elem->defname,  
 181                             elem->arg ? strVal(elem->arg) : "(null)")));  
 182         }  
 183     }  
 184 }  
 185   

参考

https://www.2ndquadrant.com/en/resources/pglogical/

https://github.com/eulerto/wal2json

函数接口

https://www.postgresql.org/docs/devel/static/functions-admin.html#FUNCTIONS-REPLICATION

https://www.postgresql.org/docs/devel/static/test-decoding.html

相关文章
|
5月前
|
关系型数据库 Go PostgreSQL
golang pgx自定义PostgreSQL类型
golang的pgx驱动提供了大约70种PostgreSQL类型支持,但还是有一些类型没有涵盖,本文介绍如何自己编写代码支持特殊的类型。
75 3
|
1月前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
128 4
|
6月前
|
消息中间件 关系型数据库 MySQL
Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)
Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)
|
29天前
|
消息中间件 算法 Java
面试官:Kafka和ES选主有什么区别?
Kafka 和 ES,作为大数据处理的中间件,分别用于流处理和全文检索。它们的选主(Kafka 的 Controller 和 ES 的 Master)都基于 Raft 算法实现一致性。Raft 算法通过选举确保分布式系统数据一致性,涉及领导者、追随者和候选人间的身份转换。当超过一半的节点投票给同一候选节点时,该节点成为新领导者。Kafka 和 ES 在此基础上可能有各自优化调整。更多关于 Raft 算法的详细流程和选举规则见原文。
40 2
|
3月前
|
消息中间件 SQL Java
阿里云Flink-自定义kafka sink partitioner实践及相关踩坑记录
阿里云Flink-自定义kafka sink partitioner实践及相关踩坑记录
|
7月前
|
消息中间件 关系型数据库 MySQL
Logstash接收Kafka数据写入至ES
Logstash接收Kafka数据写入至ES
238 0
|
消息中间件 关系型数据库 MySQL
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
|
9月前
|
消息中间件 资源调度 关系型数据库
Flink初试——对接Kafka
Flink初试——对接Kafka
198 0
Flink初试——对接Kafka
|
10月前
|
消息中间件 数据采集 存储
大数据数据采集的数据采集(收集/聚合)的Flume之数据采集流程的Source的Kafka Source
在Flume中,Kafka Source是一种常见的Source类型。它可以从Kafka的Topic中采集数据,并将其转换成Flume事件进行处理和存储。本文将介绍Kafka Source的配置和数据采集流程。
117 0
|
存储 并行计算 Cloud Native
PolarDB 开源版通过pg_rational插件支持Stern-Brocot trees , 实现高效自定义顺序和调整顺序需求
PolarDB 的云原生存算分离架构, 具备低廉的数据存储、高效扩展弹性、高速多机并行计算能力、高速数据搜索和处理; PolarDB与计算算法结合, 将实现双剑合璧, 推动业务数据的价值产出, 将数据变成生产力. 本文将介绍PolarDB 开源版通过pg_rational插件支持Stern-Brocot trees , 实现高效自定义顺序和调整顺序需求.
162 0

相关产品

  • 云原生数据库 PolarDB