作者简介
冯若航,PostgreSQLDBA,全栈工程师,架构师,曾任职于苹果,探探,阿里巴巴。PostgreSQL传教士,译著有《PostgreSQL指南内幕探索》与《设计数据密集型应用》。
前驱知识
CDC与ETL
数据库在本质上是一个状态集合,任何对数据库的变更(增删改)本质上都是对状态的修改。
在实际生产中,我们经常需要把数据库的状态同步到其他地方去,例如同步到数据仓库进行分析,同步到消息队列供下游消费,同步到缓存以加速查询。总的来说,搬运状态有两大类方法:ETL与CDC。
ETL(ExtractTransformLoad)着眼于状态本身,用定时批量轮询的方式拉取状态本身。
CDC(ChangingDataCapture)则着眼于变更事件,以流式的方式持续收集状态变化事件(变更)。
ETL大家都耳熟能详,每天批量跑ETL任务,从生产OLTP数据库拉取(E),转换(T)格式,导入(L)数仓,在此不赘述。相比ETL而言,CDC算是个新鲜玩意,随着流计算的崛起也越来越多地进入人们的视线。
变更数据捕获(changedatacapture,CDC)是一种观察写入数据库的所有数据变更,并将其提取并转换为可以复制到其他系统中的形式的过程。CDC很有意思,特别是当变更能在被写入数据库后立刻用于后续的流处理时。
例如用户可以捕获数据库中的变更,并不断将相同的变更应用至搜索索引(e.gelasticsearch)。如果变更日志以相同的顺序应用,则可以预期的是,搜索索引中的数据与数据库中的数据是匹配的。同理,这些变更也可以应用于后台刷新缓存(redis),送往消息队列(Kafka),导入数据仓库(EventSourcing,存储不可变的事实事件记录而不是每天取快照),收集统计数据与监控(Prometheus),等等等等。在这种意义下,外部索引,缓存,数仓都成为了PostgreSQL在逻辑上的从库,这些衍生数据系统都成为了变更流的消费者,而PostgreSQL成为了整个数据系统的主库。在这种架构下,应用只需要操心怎样把数据写入数据库,剩下的事情交给CDC即可。系统设计可以得到极大地简化:所有的数据组件都能够自动与主库在逻辑上保证(最终)一致。用户不用再为如何保证多个异构数据系统之间数据同步而焦头烂额了。
实际上PostgreSQL自10.0版本以来提供的逻辑复制(logicalreplication)功能,实质上就是一个CDC应用:从主库上提取变更事件流:INSERT,UPDATE,DELETE,TRUNCATE,并在另一个PostgreSQL主库实例上重放。如果这些增删改事件能够被解析出来,它们就可以用于任何感兴趣的消费者,而不仅仅局限于另一个PostgreSQL实例。
逻辑复制
想在传统关系型数据库上实施CDC并不容易,关系型数据库本身的预写式日志WAL实际上就是数据库中变更事件的记录。因此从数据库中捕获变更,基本上可以认为等价于消费数据库产生的WAL日志/复制日志。(当然也有其他的变更捕获方式,例如在表上建立触发器,当变更发生时将变更记录写入另一张变更日志表,客户端不断tail这张日志表,当然也有一定的局限性)。
大多数数据库的复制日志的问题在于,它们一直被当做数据库的内部实现细节,而不是公开的API。客户端应该通过其数据模型和查询语言来查询数据库,而不是解析复制日志并尝试从中提取数据。许多数据库根本没有记录在案的获取变更日志的方式。因此捕获数据库中所有的变更然后将其复制到其他状态存储(搜索索引,缓存,数据仓库)中是相当困难的。
此外,仅有数据库变更日志仍然是不够的。如果你拥有全量变更日志,当然可以通过重放日志来重建数据库的完整状态。但是在许多情况下保留全量历史WAL日志并不是可行的选择(例如磁盘空间与重放耗时的限制)。例如,构建新的全文索引需要整个数据库的完整副本——仅仅应用最新的变更日志是不够的,因为这样会丢失最近没有更新过的项目。因此如果你不能保留完整的历史日志,那么你至少需要包留一个一致的数据库快照,并保留从该快照开始的变更日志。
因此实施CDC,数据库至少需要提供以下功能:
1.获取数据库的变更日志(WAL),并解码成逻辑上的事件(对表的增删改而不是数据库的内部表示)
2.获取数据库的"一致性快照",从而订阅者可以从任意一个一致性状态开始订阅而不是数据库创建伊始。
3.保存消费者偏移量,以便跟踪订阅者的消费进度,及时清理回收不用的变更日志以免撑爆磁盘。
我们会发现,PostgreSQL在实现逻辑复制的同时,已经提供了一切CDC所需要的基础设施:
?逻辑解码(LogicalDecoding),用于从WAL日志中解析逻辑变更事件
?复制协议(ReplicationProtocol):提供了消费者实时订阅(甚至同步订阅)数据库变更的机制
?快照导出(exportsnapshot):允许导出数据库的一致性快照(pg_export_snapshot)
?复制槽(ReplicationSlot),用于保存消费者偏移量,跟踪订阅者进度。
因此,在PostgreSQL上实施CDC最为直观优雅的方式,就是按照PostgreSQL的复制协议编写一个"逻辑从库",从数据库中实时地,流式地接受逻辑解码后的变更事件,完成自己定义的处理逻辑,并及时向数据库汇报自己的消息消费进度。就像使用Kafka一样。在这里CDC客户端可以将自己伪装成一个PostgreSQL的从库,从而不断地实时从PostgreSQL主库中接收逻辑解码后的变更内容。同时CDC客户端还可以通过PostgreSQL提供的复制槽(ReplicationSlot)机制来保存自己的消费者偏移量,即消费进度,实现类似消息队列一至少次的保证,保证不错过变更数据。(客户端自己记录消费者偏移量跳过重复记录,即可实现"恰好一次"的保证)
逻辑解码
在开始进一步的讨论之前,让我们先来看一看期待的输出结果到底是什么样子。
PostgreSQL的变更事件以二进制内部表示形式保存在预写式日志(WAL)中,使用其自带的pg_waldump工具可以解析出来一些人类可读的信息:
rmgr:Btreelen(rec/tot):64/64,tx:,lsn:2D/AAFFC9F0,prev2D/AAFFC,desc:INSERT_LEAFoff,blkref#0:rel//blk4rmgr:Heaplen(rec/tot):/,tx:,lsn:2D/AAFFCA30,prev2D/AAFFC9F0,desc:INSERToff10,blkref#0:rel//blk
WAL日志里包含了完整权威的变更事件记录,但这种记录格式过于底层。用户并不会对磁盘上某个数据页里的二进制变更(文件A页面B偏移量C追加写入二进制数据D)感兴趣,他们感兴趣的是某张表中增删改了哪些行哪些字段。逻辑解码就是将物理变更记录翻译为用户期望的逻辑变更事件的机制(例如表A上的增删改事件)。
例如用户可能期望的是,能够解码出等价的SQL语句:
INSERTINTOpublic.test(id,data)VALUES(14,hoho);
或者最为通用的JSON结构(这里以JSON格式记录了一条UPDATE事件):
{"change":[{"kind":"update","schema":"public","table":"test","columnnames":["id","data"],"columntypes":["integer","text"],"columnvalues":[1,"hoho"],"oldkeys":{"keynames":["id"],"keytypes":["integer"],"keyvalues":[1]}}]}
当然也可以是更为紧凑高效严格的Protobuf格式,更为灵活的Avro格式,抑或是任何用户感兴趣的格式。
逻辑解码所要解决的问题,就是将数据库内部二进制表示的变更事件,解码(Decoding)成为用户感兴趣的格式。之所以需要这样一个过程,是因为数据库内部表示是非常紧凑的,想要解读原始的二进制WAL日志,不仅仅需要WAL结构相关的知识,还需要系统目录(SystemCatalog),即元数据。没有元数据就无从得知用户可能感兴趣的模式名,表名,列名,只能解析出来的一系列数据库自己才能看懂的oid。
关于流复制协议,复制槽,事务快照等概念与功能,这里就不展开了,让我们进入动手环节。
快速开始
假设我们有一张用户表,我们希望捕获任何发生在它上面的变更,假设数据库发生了如下变更操作
下面会重复用到这几条命令
DROPTABLEIFEXISTSusers;CREATETABLEusers(idSERIALPRIMARYKEY,nameTEXT);INSERTINTOusersVALUES(,Vonng);INSERTINTOusersVALUES(,XiaoWang);DELETEFROMusersWHEREid=;UPDATEusersSETname=LaoWangWHEREid=;
执行这几条命令的结果是,数据库中有一张用户表,表里最终的状态是只有一条(,LaoWang)的记录。而无论是曾经有一个名为Vonng的用户存在过的痕迹,抑或是隔壁老王也曾年轻过的事实,都随着对数据库的删改而烟消云散。我们希望这些事实不应随风而逝,它们也应当被记录下来。
操作流程
通常来说,订阅变更需要以下几步操作:
选择一个一致性的数据库快照,作为订阅变更的起点。(创建一个复制槽)
(数据库发生了一些变更)
读取这些变更,更新自己的的消费进度。
那么,让我们先从最简单的办法开始,从PostgreSQL自带的的SQL接口开始
SQL接口
逻辑复制槽的增删查API:
TABLEpg_replication_slots;--查pg_create_logical_replication_slot(slot_namename,pluginname)--增pg_drop_replication_slot(slot_namename)--删
从逻辑复制槽中获取最新的变更数据:
pg_logical_slot_get_changes(slot_namename,...)--消费掉pg_logical_slot_peek_changes(slot_namename,...)--只查看不消费
在正式开始前,还需要对数据库参数做一些修改,修改wal_level=logical,这样在WAL日志中的信息才能足够用于逻辑解码。
--创建一个复制槽test_slot,使用系统自带的测试解码插件test_decoding,解码插件会在后面介绍
SELECT*FROMpg_create_logical_replication_slot(test_slot,test_decoding);
--重放上面的建表与增删改操作
--DROPTABLE
CREATETABLE
INSERT1
INSERT1
DELETE1
UPDATE1
--读取复制槽test_slot中未消费的最新的变更事件流
SELECT*FROMpg_logical_slot_get_changes(test_slot,NULL,NULL);lsn
xid
data-----------+-----+--------------------------------------------------------------------0/C7E8
BEGIN0/F6F8
COMMIT0/F6F8
BEGIN0/F6F8
tablepublic.users:INSERT:id[integer]:name[text]:Vonng0/F
COMMIT0/F
BEGIN0/F
tablepublic.users:INSERT:id[integer]:name[text]:XiaoWang0/F8C8
COMMIT0/F8C8
BEGIN0/F8C8
tablepublic.users:DELETE:id[integer]:0/F
COMMIT0/F
BEGIN0/F
tablepublic.users:UPDATE:id[integer]:name[text]:LaoWang0/F9F0
COMMIT--清理掉创建的复制槽SELECTpg_drop_replication_slot(test_slot);
这里,我们可以看到一系列被触发的事件,其中每个事务的开始与提交都会触发一个事件。因为目前逻辑解码机制不支持DDL变更,因此CREATETABLE与DROPTABLE并没有出现在事件流中,只能看到空荡荡的BEGIN+COMMIT。另一点需要注意的是,只有成功提交的事务才会产生逻辑解码变更事件。也就是说用户不用担心收到并处理了很多行变更消息之后,最后发现事务回滚了,还需要担心怎么通知消费者去回滚变更。
通过SQL接口,用户已经能够拉取最新的变更了。这也就意味着任何有着PostgreSQL驱动的语言都可以通过这种方式从数据库中捕获最新的变更。当然这种方式实话说还是略过于土鳖。更好的方式是利用PostgreSQL的复制协议直接从数据库中订阅变更数据流。当然相比使用SQL接口,这也需要更多的工作。
使用客户端接收变更
在编写自己的CDC客户端之前,让我们先来试用一下官方自带的CDC客户端样例——pg_recvlogical。与pg_receivewal类似,不过它接收的是逻辑解码后的变更,下面是一个具体的例子:
#启动一个CDC客户端,连接数据库postgres,创建名为test_slot的槽,使用test_decoding解码插件,标准输出
pg_recvlogical\-dpostgres\--create-slot--if-not-exists--slot=test_slot\--plugin=test_decoding\--start-f-#开启另一个会话,重放上面的建表与增删改操作#DROPTABLE
CREATETABLE
INSERT1
INSERT1
DELETE1
UPDATE1#pg_recvlogical输出结果BEGINCOMMITBEGINtablepublic.users:INSERT:id[integer]:name[text]:VonngCOMMITBEGINtablepublic.users:INSERT:id[integer]:name[text]:XiaoWangCOMMITBEGINtablepublic.users:DELETE:id[integer]:COMMITBEGINtablepublic.users:UPDATE:id[integer]:name[text]:LaoWangCOMMIT#清理:删除创建的复制槽pg_recvlogical-dpostgres--drop-slot--slot=test_slot
上面的例子中,主要的变更事件包括事务的开始与结束,以及数据行的增删改。这里默认的test_decoding插件的输出格式为:
BEGIN{事务标识}
table{模式名}.{表名}{命令INSERT
UPDATE
DELETE}{列名}[{类型}]:{取值}...
COMMIT{事务标识}
实际上,PostgreSQL的逻辑解码是这样工作的,每当特定的事件发生(表的Truncate,行级别的增删改,事务开始与提交),PostgreSQL都会调用一系列的钩子函数。所谓的逻辑解码输出插件(LogicalDecodingOutputPlugin),就是这样一组回调函数的集合。它们接受二进制内部表示的变更事件作为输入,查阅一些系统目录,将二进制数据翻译成为用户感兴趣的结果。
逻辑解码输出插件
除了PostgreSQL自带的"用于测试"的逻辑解码插件:test_decoding之外,还有很多现成的输出插件,例如:
JSON格式输出插件:wal2json
SQL格式输出插件:decoder_raw
Protobuf输出插件:decoderbufs
当然还有PostgreSQL自带逻辑复制所使用的解码插件:pgoutput,其消息格式文档地址。
安装这些插件非常简单,有一些插件(例如wal2json)可以直接从官方二进制源轻松安装。
yuminstallwal2json11aptinstallpostgresql-11-wal2json
或者如果没有二进制包,也可以自己下载编译。只需要确保pg_config已经在你的PATH中,然后执行makesudomakeinstall两板斧即可。以输出SQL格式的decoder_raw插件为例:
gitclone