RDS 触发器eventFormat 为 protobuf 的代码使用示例

2019-01-15 13:28:51 2242

前言

函数计算发布了新的触发器-- RDS 触发器,这标志着函数计算的事件源新增加了一名成员 -- 阿里云关系型数据库( Relational Database Service,简称 RDS), 根据官方文档的示例,只有 eventFormat 为 json 的示例,本文作为补充, 讲解 eventFormat 为 protobuf 时的代码示例。

protobuf 格式

syntax = "proto3";
package protocol;
enum DBType {
  MySQL = 0;
  Redis = 1;
  Mongo = 2;
  HBase = 3;
}
message Message {
  //消费该message后可以ack的offset.考虑进行编码.
  int64 offset = 1;
  //该message后可以ack的record对应的timestamp
  int64 timestamp = 2;
  //留作备用:以后传送大的数据行可以拿来使用.
  int32 spare_flag = 3;
  int32 spare_seq = 4;
  //message的version
  int32 version = 5;
  //数据源
  DBType db_type = 6;
  //数据行
  repeated Entry entries = 7;
}
//操作
enum OpType {
  UNKOWN_TYPE = 0;
  BEGIN = 1;
  COMMIT = 2;
  //不在下列DDL操作用的query
  QUERY = 3;
  INSERT = 4;
  UPDATE = 5;
  DELETE = 6;
  CREATE = 7;
  ALTER = 8;
  DROP = 9;
  TRUNCATE = 10;
  RENAME = 11;
  //CREATE INDEX
  CINDEX = 12;
  //DROP INDEX
  DINDEX = 13;
  OPTIMIZE = 14;
  XA = 15;
}
message Entry {
  OpType operation = 1;
  //时间戳,单位s
  int64 timestamp = 2;
  //Transaction id
  string id = 3;
  //一个事务中的第几行
  int64 sequence = 4;
  //DML操作的db_name,DDL操作时候session的默认db
  string db_name = 5;
  //DML操作的table_name
  string table_name = 6;
  //after image,操作的后镜像.
  repeated Field row = 7;
  //before image,操作的前镜像
  repeated Field before_row = 8;
  //非DML语句的sql
  string query = 9;
}
message Field {
  //column name
  string name = 1;
  //字符集
  string charset = 3;
  //第几列
  int32 idx = 2;
  //对应java中的type
  int32 type_num = 4;
  //对于mysql,mysql中的type num;
  int32 org_type = 5;
  //在db中原始type的name
  string org_type_name = 6;
  //预留的flag
  int32 flag = 7;
  //是否为NULL
  bool is_null = 8;
  //是否是pk
  bool is_pk = 9;
  //是否是unsigned value
  bool is_unsigned = 10;
  //是否是timestamp value(timestamp展示值和时区相关,这里记录标准时区的值)
  bool is_timestamp = 11;
  //value的值都用bytes表示,消费端需要先用charset结合bytes生成对应string.然后再转换为type_num对应的type value.
  //同时当charset为空,即表示原数据类型就是binary类型
  bytes value = 12;
}

代码示例

完整的代码示例请下载本文最后的附件, 包含 3种runtime( python, php, nodejs)的使用案例, 下面展示各自使用的代码简短示例。

java runtime 示例请参考:Redis缓存淘汰

python

# -*- coding: utf-8 -*-

from proto import rdsEvent_pb2

bin_data = b""
# local data only for test
with open('./proto.bin', 'rb') as f:
   bin_data = f.read()

def handler(event, context):
    target = rdsEvent_pb2.Message()
    target.ParseFromString(event)
    #target.ParseFromString(bin_data)
    print(target.offset, target.db_type)
    entry = target.entries[0]
    print(entry.operation, entry.db_name, entry.table_name)
    print(entry.row[0].name, entry.row[1].value, entry.row[2].type_num)
    
    return "OK"

nodejs

'use strict';

var fs = require("fs");
var protobuf = require("protobufjs");

module.exports.handler = function(event, context, callback) {
    protobuf.load("rdsEvent.proto", function(err, root) {
        if (err)
            throw err;

        var rdsEventMessage = root.lookupType("proto.Message");
        
        // local data only for test
        //var event = fs.readFileSync('proto.bin');

        var message = rdsEventMessage.decode(event);

        console.log(JSON.stringify(message))
        callback(null, 'hello world');

    });
};

php

<?php

require_once __DIR__ . '/Proto/Field.php';
require_once __DIR__ . '/Proto/DBType.php';
require_once __DIR__ . '/Proto/OpType.php';
require_once __DIR__ . '/Proto/Entry.php';
require_once __DIR__ . '/Proto/Message.php';
require_once __DIR__ . '/GPBMetadata/RdsEvent.php';

// local data only for test
$filename = __DIR__ . "/proto.bin";
$handle = fopen($filename, "rb");
$data   = fread($handle, filesize($filename));
fclose($handle);

function handler($event, $context) {
    $res = new \Proto\Message();
    //$res->mergeFromString($data);
    $res->mergeFromString($event);
    echo $res->getDbType() . PHP_EOL;
    echo $res->getOffset() . PHP_EOL;
    echo $res->getEntries()[0]->getTableName() . PHP_EOL;
    return "ok";
}

php RDS aliyun string charset timestamp type handler lambda 触发器 serverless 云数据库RDS 函数计算

作者

rsong
TA的文章

相关文章