在一些小型項(xiàng)目當(dāng)中,沒(méi)有引入消息中間件,也不想引入,但有一些業(yè)務(wù)邏輯想要解耦異步,那怎么辦呢?
我們的web項(xiàng)目,單獨(dú)內(nèi)網(wǎng)部署,由于大數(shù)據(jù)背景,公司消息中間件統(tǒng)一使用的kafka,在一些小項(xiàng)目上kafka就顯得很笨重。 引入rocketmq或rabittmq也沒(méi)必要。 事件或多線程也不適合。
具體一點(diǎn)的,之前對(duì)接的一個(gè)系統(tǒng),一張記錄表有10+以上的類(lèi)型狀態(tài),新的需求是,針對(duì)每種狀態(tài)做出對(duì)應(yīng)的不同的操作。 之前寫(xiě)入這張記錄表的時(shí)候,方式也是五花八門(mén),有的是單條記錄寫(xiě)入,有的是批量寫(xiě)入,有的調(diào)用了統(tǒng)一的service,有的呢直接調(diào)用了DAO層mapper直接寫(xiě)入。
所以想找到一個(gè)統(tǒng)一入口進(jìn)行切入處理,就不行了。
這個(gè)時(shí)候就算引入消息隊(duì)列,也需要在不同的業(yè)務(wù)方法里進(jìn)行寫(xiě)入消息的操作。業(yè)務(wù)方也不太愿意配合改。
可以使用觸發(fā)器,但它是屬于上個(gè)時(shí)代的產(chǎn)物,槽點(diǎn)太多。(這里并不是完全不主張使用觸發(fā)器,技術(shù)永遠(yuǎn)是為業(yè)務(wù)服務(wù)的,只要評(píng)估覺(jué)得可行,就可以使用)那么這個(gè)時(shí)候,CDC技術(shù)就可以粉墨登場(chǎng)了。
CDC(change data capture)數(shù)據(jù)更改捕獲。常見(jiàn)的數(shù)據(jù)更改捕獲都是通過(guò)數(shù)據(jù)庫(kù)比如mysql的binlog來(lái)達(dá)到目的。
我們可以監(jiān)控mysql binlog日志,當(dāng)寫(xiě)入一條數(shù)據(jù)的時(shí)候,接收到數(shù)據(jù)變更日志,做出相應(yīng)的操作。
這樣的好處是,只需導(dǎo)入依賴(lài),不額外引入組件,同時(shí)無(wú)需改動(dòng)之前的代碼。 兩邊完全解耦,互不干擾。
常見(jiàn)的CDC框架,比如,canal (非Camel)
canal [k?'n?l],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi) 早期阿里巴巴因?yàn)楹贾莺兔绹?guó)雙機(jī)房部署,存在跨機(jī)房同步的業(yè)務(wù)需求,實(shí)現(xiàn)方式主要是基于業(yè)務(wù) trigger 獲取增量變更。 從 2010 年開(kāi)始,業(yè)務(wù)逐步嘗試數(shù)據(jù)庫(kù)日志解析獲取增量變更進(jìn)行同步,由此衍生出了大量的數(shù)據(jù)庫(kù)增量訂閱和消費(fèi)業(yè)務(wù)。
它是基于日志增量訂閱和消費(fèi)的業(yè)務(wù),包括
數(shù)據(jù)庫(kù)鏡像 數(shù)據(jù)庫(kù)實(shí)時(shí)備份 索引構(gòu)建和實(shí)時(shí)維護(hù)(拆分異構(gòu)索引、倒排索引等) 業(yè)務(wù) cache 刷新 帶業(yè)務(wù)邏輯的增量數(shù)據(jù)處理

它的原理
canal 模擬 MySQL slave 的交互協(xié)議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送dump 協(xié)議
MySQL master 收到 dump 請(qǐng)求,開(kāi)始推送 binary log 給 slave (即 canal );關(guān)注工眾號(hào):碼猿技術(shù)專(zhuān)欄,回復(fù)關(guān)鍵詞:1111 獲取阿里內(nèi)部Java性能調(diào)優(yōu)手冊(cè)!
canal 解析 binary log 對(duì)象(原始為 byte 流)
再比如,debezium(音同 dbzm 滴BZ姆)很多人可能不太了解. 包括databus,maxwell,flink cdc(大數(shù)據(jù)領(lǐng)域)等等,它們同屬CDC捕獲數(shù)據(jù)更改(change data capture)類(lèi)的技術(shù)。

為什么是debezium
這么多技術(shù)框架,為什么選debezium?
看起來(lái)很多。但一一排除下來(lái)就debezium和canal。
sqoop,kettle,datax之類(lèi)的工具,屬于前大數(shù)據(jù)時(shí)代的產(chǎn)物,地位類(lèi)似于web領(lǐng)域的structs2。而且,它們基于查詢(xún)而非binlog日志,其實(shí)不屬于CDC。首先排除。
flink cdc是大數(shù)據(jù)領(lǐng)域的框架,一般web項(xiàng)目的數(shù)據(jù)量屬于大材小用了。
同時(shí)databus,maxwell相對(duì)比較冷門(mén),用得比較少。
最后不用canal的原因有以下幾點(diǎn)。
canal需要安裝,這違背了“如非必要,勿增實(shí)體”的原則。
canal只能對(duì)MYSQL進(jìn)行CDC監(jiān)控。有很大的局限性。
大數(shù)據(jù)領(lǐng)域非常流行的flink cdc(阿里團(tuán)隊(duì)主導(dǎo))底層使用的也是debezium,而非同是阿里出品的canal。
debezium可借助kafka組件,將變動(dòng)的數(shù)據(jù)發(fā)到kafka topic,后續(xù)的讀取操作只需讀取kafka,可有效減少數(shù)據(jù)庫(kù)的讀取壓力??杀WC一次語(yǔ)義,至少一次語(yǔ)義。 同時(shí),也可基于內(nèi)嵌部署模式,無(wú)需我們手動(dòng)部署kafka集群,可滿(mǎn)足”如非必要,勿增實(shí)體“的原則。

Debezium是一個(gè)捕獲數(shù)據(jù)更改(CDC)平臺(tái),并且利用Kafka和Kafka Connect實(shí)現(xiàn)了自己的持久性、可靠性和容錯(cuò)性。 每一個(gè)部署在Kafka Connect分布式的、可擴(kuò)展的、容錯(cuò)性的服務(wù)中的connector監(jiān)控一個(gè)上游數(shù)據(jù)庫(kù)服務(wù)器,捕獲所有的數(shù)據(jù)庫(kù)更改, 然后記錄到一個(gè)或者多個(gè)Kafka topic(通常一個(gè)數(shù)據(jù)庫(kù)表對(duì)應(yīng)一個(gè)kafka topic)。
Kafka確保所有這些數(shù)據(jù)更改事件都能夠多副本并且總體上有序(Kafka只能保證一個(gè)topic的單個(gè)分區(qū)內(nèi)有序),這樣, 更多的客戶(hù)端可以獨(dú)立消費(fèi)同樣的數(shù)據(jù)更改事件而對(duì)上游數(shù)據(jù)庫(kù)系統(tǒng)造成的影響降到很小(如果N個(gè)應(yīng)用都直接去監(jiān)控?cái)?shù)據(jù)庫(kù)更改,對(duì)數(shù)據(jù)庫(kù)的壓力為N, 而用debezium匯報(bào)數(shù)據(jù)庫(kù)更改事件到kafka,所有的應(yīng)用都去消費(fèi)kafka中的消息,可以把對(duì)數(shù)據(jù)庫(kù)的壓力降到1)。
另外,客戶(hù)端可以隨時(shí)停止消費(fèi),然后重啟, 從上次停止消費(fèi)的地方接著消費(fèi)。每個(gè)客戶(hù)端可以自行決定他們是否需要exactly-once或者at-least-once消息交付語(yǔ)義保證, 并且所有的數(shù)據(jù)庫(kù)或者表的更改事件是按照上游數(shù)據(jù)庫(kù)發(fā)生的順序被交付的。

對(duì)于不需要或者不想要這種容錯(cuò)級(jí)別、性能、可擴(kuò)展性、可靠性的應(yīng)用,他們可以使用內(nèi)嵌的Debezium connector引擎來(lái)直接在應(yīng)用內(nèi)部運(yùn)行connector。 這種應(yīng)用仍需要消費(fèi)數(shù)據(jù)庫(kù)更改事件,但更希望connector直接傳遞給它,而不是持久化到Kafka里。
簡(jiǎn)介
Debezium是一個(gè)開(kāi)源項(xiàng)目,為捕獲數(shù)據(jù)更改(change data capture,CDC)提供了一個(gè)低延遲的流式處理平臺(tái)。你可以安裝并且配置Debezium去監(jiān)控你的數(shù)據(jù)庫(kù),然后你的應(yīng)用就可以消費(fèi)對(duì)數(shù)據(jù)庫(kù)的每一個(gè)行級(jí)別(row-level)的更改。只有已提交的更改才是可見(jiàn)的,所以你的應(yīng)用不用擔(dān)心事務(wù)(transaction)或者更改被回滾(roll back)。Debezium為所有的數(shù)據(jù)庫(kù)更改事件提供了一個(gè)統(tǒng)一的模型,所以你的應(yīng)用不用擔(dān)心每一種數(shù)據(jù)庫(kù)管理系統(tǒng)的錯(cuò)綜復(fù)雜性。另外,由于Debezium用持久化的、有副本備份的日志來(lái)記錄數(shù)據(jù)庫(kù)數(shù)據(jù)變化的歷史,因此,你的應(yīng)用可以隨時(shí)停止再重啟,而不會(huì)錯(cuò)過(guò)它停止運(yùn)行時(shí)發(fā)生的事件,保證了所有的事件都能被正確地、完全地處理掉。
監(jiān)控?cái)?shù)據(jù)庫(kù),并且在數(shù)據(jù)變動(dòng)的時(shí)候獲得通知一直是很復(fù)雜的事情。關(guān)系型數(shù)據(jù)庫(kù)的觸發(fā)器可以做到,但是只對(duì)特定的數(shù)據(jù)庫(kù)有效,而且通常只能更新數(shù)據(jù)庫(kù)內(nèi)的狀態(tài)(無(wú)法和外部的進(jìn)程通信)。一些數(shù)據(jù)庫(kù)提供了監(jiān)控?cái)?shù)據(jù)變動(dòng)的API或者框架,但是沒(méi)有一個(gè)標(biāo)準(zhǔn),每種數(shù)據(jù)庫(kù)的實(shí)現(xiàn)方式都是不同的,并且需要大量特定的知識(shí)和理解特定的代碼才能運(yùn)用。確保以相同的順序查看和處理所有更改,同時(shí)最小化影響數(shù)據(jù)庫(kù)仍然非常具有挑戰(zhàn)性。
Debezium提供了模塊為你做這些復(fù)雜的工作。一些模塊是通用的,并且能夠適用多種數(shù)據(jù)庫(kù)管理系統(tǒng),但在功能和性能方面仍有一些限制。另一些模塊是為特定的數(shù)據(jù)庫(kù)管理系統(tǒng)定制的,所以他們通??梢愿嗟乩脭?shù)據(jù)庫(kù)系統(tǒng)本身的特性來(lái)提供更多功能。
github官網(wǎng)上羅列的一些典型應(yīng)用場(chǎng)景 :
緩存失效(Cache invalidation) 經(jīng)典問(wèn)題 Redis與MySQL雙寫(xiě)一致性如何保證?Debezium利用kafka單分區(qū)的有序性(忽略mysql binlog本身可能的延遲和亂序),可完全解決此問(wèn)題。 在緩存中緩存的條目(entry)在源頭被更改或者被刪除的時(shí)候立即讓緩存中的條目失效。 如果緩存在一個(gè)獨(dú)立的進(jìn)程中運(yùn)行(例如Redis,Memcache,Infinispan或者其他的),那么簡(jiǎn)單的緩存失效邏輯可以放在獨(dú)立的進(jìn)程或服務(wù)中, 從而簡(jiǎn)化主應(yīng)用的邏輯。在一些場(chǎng)景中,緩存失效邏輯可以更復(fù)雜一點(diǎn),讓它利用更改事件中的更新數(shù)據(jù)去更新緩存中受影響的條目。
簡(jiǎn)化單體應(yīng)用(Simplifying monolithic applications) 許多應(yīng)用更新數(shù)據(jù)庫(kù),然后在數(shù)據(jù)庫(kù)中的更改被提交后,做一些額外的工作:更新搜索索引,更新緩存,發(fā)送通知,運(yùn)行業(yè)務(wù)邏輯,等等。 這種情況通常稱(chēng)為雙寫(xiě)(dual-writes),因?yàn)閼?yīng)用沒(méi)有在一個(gè)事務(wù)內(nèi)寫(xiě)多個(gè)系統(tǒng)。這樣不僅應(yīng)用邏輯復(fù)雜難以維護(hù), 而且雙寫(xiě)容易丟失數(shù)據(jù)或者在一些系統(tǒng)更新成功而另一些系統(tǒng)沒(méi)有更新成功的時(shí)候造成不同系統(tǒng)之間的狀態(tài)不一致。使用捕獲更改數(shù)據(jù)技術(shù)(change data capture,CDC), 在源數(shù)據(jù)庫(kù)的數(shù)據(jù)更改提交后,這些額外的工作可以被放在獨(dú)立的線程或者進(jìn)程(服務(wù))中完成。這種實(shí)現(xiàn)方式的容錯(cuò)性更好,不會(huì)丟失事件,容易擴(kuò)展,并且更容易支持升級(jí)。
共享數(shù)據(jù)庫(kù)(Sharing databases) 當(dāng)多個(gè)應(yīng)用共用同一個(gè)數(shù)據(jù)庫(kù)的時(shí)候,一個(gè)應(yīng)用提交的更改通常要被另一個(gè)應(yīng)用感知到。一種實(shí)現(xiàn)方式是使用消息總線, 盡管非事務(wù)性(non-transactional)的消息總線總會(huì)受上面提到的雙寫(xiě)(dual-writes)影響。但是,另一種實(shí)現(xiàn)方式,即Debezium,變得很直接:每個(gè)應(yīng)用可以直接監(jiān)控?cái)?shù)據(jù)庫(kù)的更改,并且響應(yīng)更改。
數(shù)據(jù)集成(Data integration) 數(shù)據(jù)通常被存儲(chǔ)在多個(gè)地方,尤其是當(dāng)數(shù)據(jù)被用于不同的目的的時(shí)候,會(huì)有不同的形式。保持多系統(tǒng)的同步是很有挑戰(zhàn)性的, 但是可以通過(guò)使用Debezium加上簡(jiǎn)單的事件處理邏輯來(lái)實(shí)現(xiàn)簡(jiǎn)單的ETL類(lèi)型的解決方案。
命令查詢(xún)職責(zé)分離(CQRS) 在命令查詢(xún)職責(zé)分離 Command Query Responsibility Separation (CQRS) 架構(gòu)模式中,更新數(shù)據(jù)使用了一種數(shù)據(jù)模型, 讀數(shù)據(jù)使用了一種或者多種數(shù)據(jù)模型。由于數(shù)據(jù)更改被記錄在更新側(cè)(update-side),這些更改將被處理以更新各種讀展示。 所以CQRS應(yīng)用通常更復(fù)雜,尤其是他們需要保證可靠性和全序(totally-ordered)處理。Debezium和CDC可以使這種方式更可行: 寫(xiě)操作被正常記錄,但是Debezium捕獲數(shù)據(jù)更改,并且持久化到全序流里,然后供那些需要異步更新只讀視圖的服務(wù)消費(fèi)。 寫(xiě)側(cè)(write-side)表可以表示面向領(lǐng)域的實(shí)體(domain-oriented entities),或者當(dāng)CQRS和 Event Sourcing 結(jié)合的時(shí)候,寫(xiě)側(cè)表僅僅用做追加操作命令事件的日志。
springboot 整合 Debezium
依賴(lài)
1.7.0.Final 8.0.26 mysql mysql-connector-java ${mysql.connector.version} runtime io.debezium debezium-api ${debezium.version} io.debezium debezium-embedded ${debezium.version} io.debezium debezium-connector-mysql ${debezium.version} mysql mysql-connector-java
注意debezium版本為1.7.0.Final,對(duì)應(yīng)mysql驅(qū)動(dòng)為8.0.26,低于這個(gè)版本會(huì)報(bào)兼容錯(cuò)誤。
配置
相應(yīng)的配置
debezium.datasource.hostname=localhost debezium.datasource.port=3306 debezium.datasource.user=root debezium.datasource.password=123456 debezium.datasource.tableWhitelist=test.test debezium.datasource.storageFile=E:/debezium/test/offsets/offset.dat debezium.datasource.historyFile=E:/debezium/test/history/custom-file-db-history.dat debezium.datasource.flushInterval=10000 debezium.datasource.serverId=1 debezium.datasource.serverName=name-1
然后進(jìn)行配置初始化。
主要的配置項(xiàng):
connector.class
監(jiān)控的數(shù)據(jù)庫(kù)類(lèi)型,這里選mysql。
offset.storage
選擇FileOffsetBackingStore時(shí),意思把讀取進(jìn)度存到本地文件,因?yàn)槲覀儾挥胟afka,當(dāng)使用kafka時(shí),選KafkaOffsetBackingStore 。
offset.storage.file.filename
存放讀取進(jìn)度的本地文件地址。
offset.flush.interval.ms
讀取進(jìn)度刷新保存頻率,默認(rèn)1分鐘。如果不依賴(lài)kafka的話,應(yīng)該就沒(méi)有exactly once只讀取一次語(yǔ)義,應(yīng)該是至少讀取一次。意味著可能重復(fù)讀取。如果web容器掛了,最新的讀取進(jìn)度沒(méi)有刷新到文件里,下次重啟時(shí),就會(huì)重復(fù)讀取binlog。
table.whitelist
監(jiān)控的表名白名單,建議設(shè)置此值,只監(jiān)控這些表的binlog。
database.whitelist
監(jiān)控的數(shù)據(jù)庫(kù)白名單,如果選此值,會(huì)忽略table.whitelist,然后監(jiān)控此db下所有表的binlog。
/**
*@className:MysqlConfig
*@author:nyp
*@description:TODO
*@date:2023/8/713:53
*@version:1.0
*/
@Configuration
@ConfigurationProperties(prefix="debezium.datasource")
@Data
publicclassMysqlBinlogConfig{
privateStringhostname;
privateStringport;
privateStringuser;
privateStringpassword;
privateStringtableWhitelist;
privateStringstorageFile;
privateStringhistoryFile;
privateLongflushInterval;
privateStringserverId;
privateStringserverName;
@Bean
publicio.debezium.config.ConfigurationMysqlBinlogConfig()throwsException{
checkFile();
io.debezium.config.Configurationconfiguration=io.debezium.config.Configuration.create()
.with("name","mysql_connector")
.with("connector.class",MySqlConnector.class)
//.with("offset.storage",KafkaOffsetBackingStore.class)
.with("offset.storage",FileOffsetBackingStore.class)
.with("offset.storage.file.filename",storageFile)
.with("offset.flush.interval.ms",flushInterval)
.with("database.history",FileDatabaseHistory.class.getName())
.with("database.history.file.filename",historyFile)
.with("snapshot.mode","Schema_only")
.with("database.server.id",serverId)
.with("database.server.name",serverName)
.with("database.hostname",hostname)
//.with("database.dbname",dbname)
.with("database.port",port)
.with("database.user",user)
.with("database.password",password)
//.with("database.whitelist","test")
.with("table.whitelist",tableWhitelist)
.build();
returnconfiguration;
}
privatevoidcheckFile()throwsIOException{
Stringdir=storageFile.substring(0,storageFile.lastIndexOf("/"));
FiledirFile=newFile(dir);
if(!dirFile.exists()){
dirFile.mkdirs();
}
Filefile=newFile(storageFile);
if(!file.exists()){
file.createNewFile();
}
}
}
snapshot.mode 快照模式,指定連接器啟動(dòng)時(shí)運(yùn)行快照的條件??赡艿脑O(shè)置有:
initial 只有在沒(méi)有為邏輯服務(wù)器名記錄偏移量時(shí),連接器才運(yùn)行快照。
When_needed 當(dāng)連接器認(rèn)為有必要時(shí),它會(huì)在啟動(dòng)時(shí)運(yùn)行快照。也就是說(shuō),當(dāng)沒(méi)有可用的偏移量時(shí),或者當(dāng)先前記錄的偏移量指定了服務(wù)器中不可用的binlog位置或GTID時(shí)。
Never 連接器從不使用快照。在第一次使用邏輯服務(wù)器名啟動(dòng)時(shí),連接器從binlog的開(kāi)頭讀取。謹(jǐn)慎配置此行為。只有當(dāng)binlog保證包含數(shù)據(jù)庫(kù)的整個(gè)歷史記錄時(shí),它才有效。
Schema_only 連接器運(yùn)行模式而不是數(shù)據(jù)的快照。當(dāng)您不需要主題包含數(shù)據(jù)的一致快照,而只需要主題包含自連接器啟動(dòng)以來(lái)的更改時(shí),此設(shè)置非常有用。
Schema_only_recovery 這是已經(jīng)捕獲更改的連接器的恢復(fù)設(shè)置。當(dāng)您重新啟動(dòng)連接器時(shí),此設(shè)置允許恢復(fù)損壞或丟失的數(shù)據(jù)庫(kù)歷史主題。您可以定期將其設(shè)置為“清理”意外增長(zhǎng)的數(shù)據(jù)庫(kù)歷史主題。數(shù)據(jù)庫(kù)歷史主題需要無(wú)限保留。
database.server.id
偽裝成slave的Debezium服務(wù)的id,自定義,有多個(gè)Debezium服務(wù)不能重復(fù),如果重復(fù)的話會(huì)報(bào)以下異常。
io.debezium.DebeziumException:Aslavewiththesameserver_uuid/server_idasthisslavehasconnectedtothemaster;thefirstevent'binlog.000013'at46647257,thelasteventreadfrom'./binlog.000013'at125,thelastbytereadfrom'./binlog.000013'at46647257.Errorcode:1236;SQLSTATE:HY000. atio.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1167) atio.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1212) atcom.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:980) atcom.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599) atcom.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857) atjava.lang.Thread.run(Thread.java:750) Causedby:com.github.shyiko.mysql.binlog.network.ServerException:Aslavewiththesameserver_uuid/server_idasthisslavehasconnectedtothemaster;thefirstevent'binlog.000013'at46647257,thelasteventreadfrom'./binlog.000013'at125,thelastbytereadfrom'./binlog.000013'at46647257. atcom.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:944) ...3commonframesomitted
監(jiān)聽(tīng)
配置監(jiān)聽(tīng)服務(wù)
/**
*@projectName:test
*@package:com.test.config
*@className:MysqlBinlogListener
*@author:nyp
*@description:TODO
*@date:2023/8/713:56
*@version:1.0
*/
@Component
@Slf4j
publicclassMysqlBinlogListener{
@Resource
privateExecutortaskExecutor;
privatefinalList>>engineList=newArrayList<>();
privateMysqlBinlogListener(@Qualifier("mysqlConnector")Configurationconfiguration){
this.engineList.add(DebeziumEngine.create(Json.class)
.using(configuration.asProperties())
.notifying(record->receiveChangeEvent(record.value()))
.build());
}
privatevoidreceiveChangeEvent(Stringvalue){
if(Objects.nonNull(value)){
Mappayload=getPayload(value);
Stringop=JSON.parseObject(JSON.toJSONString(payload.get("op")),String.class);
if(!(StringUtils.isBlank(op)||Envelope.Operation.READ.equals(op))){
ChangeDatachangeData=getChangeData(payload);
log.info("changeData="+changeData);
}
}
}
@PostConstruct
privatevoidstart(){
for(DebeziumEngine>engine:engineList){
taskExecutor.execute(engine);
}
}
@PreDestroy
privatevoidstop(){
for(DebeziumEngine>engine:engineList){
if(engine!=null){
try{
engine.close();
}catch(IOExceptione){
log.error("",e);
}
}
}
}
publicstaticMapgetPayload(Stringvalue){
Mapmap=JSON.parseObject(value,Map.class);
Mappayload=JSON.parseObject(JSON.toJSONString(map.get("payload")),Map.class);
returnpayload;
}
publicstaticChangeDatagetChangeData(Mappayload){
Mapsource=JSON.parseObject(JSON.toJSONString(payload.get("source")),Map.class);
returnChangeData.builder()
.op(payload.get("op").toString())
.table(source.get("table").toString())
.after(JSON.parseObject(JSON.toJSONString(payload.get("after")),Map.class))
.source(JSON.parseObject(JSON.toJSONString(payload.get("source")),Map.class))
.before(JSON.parseObject(JSON.toJSONString(payload.get("before")),Map.class))
.build();
}
@Data
@Builder
publicstaticclassChangeData{
/**
*更改前數(shù)據(jù)
*/
privateMapafter;
privateMapsource;
/**
*更改后數(shù)據(jù)
*/
privateMapbefore;
/**
*更改的表名
*/
privateStringtable;
/**
*操作類(lèi)型,枚舉Envelope.Operation
*/
privateStringop;
}
}
將監(jiān)聽(tīng)到的binlog日志封裝為ChangeData對(duì)象,包括表名,更改前后的數(shù)據(jù),
以及操作類(lèi)型
READ("r"),
CREATE("c"),
UPDATE("u"),
DELETE("d"),
TRUNCATE("t");
測(cè)試
update操作輸出
MysqlListener.ChangeData(after={
name=SuzukiMio2,
id=1
},source={
file=binlog.000013,
connector=mysql,
pos=42587833,
name=test-1,
row=0,
server_id=1,
version=1.7.0.Final,
ts_ms=1691458956000,
snapshot=false,
db=test
table=test
},before={
name=SuzukiMio,
id=1
},table=test,op=u)
data={
name=SuzukiMio2,
id=1
}
新增操作輸出
MysqlListener.ChangeData(after={
name=王五,
id=0
},source={
file=binlog.000013,
connector=mysql,
pos=42588175,
name=test-1,
row=0,
server_id=1,
version=1.7.0.Final,
ts_ms=1691459066000,
snapshot=false,
db=test,
table=test
},before=null,table=test,op=c)
刪除操作輸出
MysqlListener.ChangeData(after=null,source={
file=binlog.000013,
connector=mysql,
pos=42588959,
name=test-1,
row=0,
server_id=1,
version=1.7.0.Final,
ts_ms=1691459104000,
snapshot=false,
db=test
table=test
},before={
name=王五,
id=0
},table=test,op=d)
我們之前配置的保存讀取進(jìn)度的文件storageFile,類(lèi)似于kafka的偏移量,記錄的內(nèi)容如下:

停止服務(wù),對(duì)數(shù)據(jù)庫(kù)進(jìn)行操作,再次重啟,會(huì)根據(jù)進(jìn)度重新讀取。
小結(jié)
本文介紹了debezium,更多的時(shí)候,我們一談到CDC,第一想到的是大量數(shù)據(jù)同步的工具。 但其實(shí)也可以利用其數(shù)據(jù)變更捕獲的特性,來(lái)達(dá)到一部份消息隊(duì)列的作用。 但其畢竟不能完全替代消息隊(duì)列。大家理性看待與選擇。
本文的重點(diǎn)在介紹一種思路,具體的某項(xiàng)技術(shù)反而不那么重要。
審核編輯:劉清
-
JAVA語(yǔ)言
+關(guān)注
關(guān)注
0文章
138瀏覽量
21658 -
MYSQL數(shù)據(jù)庫(kù)
+關(guān)注
關(guān)注
0文章
99瀏覽量
10302 -
CDC技術(shù)
+關(guān)注
關(guān)注
0文章
9瀏覽量
7100
原文標(biāo)題:不想引入MQ?試試debezium
文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
使用多片DAC61416芯片,如輸出50channel,這么多通道還能同時(shí)輸出嗎?
這里的那些是程序,要弄成word 文檔的,新手沒(méi)做過(guò)這么多程序
為什么OLED初始化的時(shí)候要這么多命令?
怎么記住這么多代碼格式?
為什么要搞這么多架構(gòu)
為什么有這么多編程語(yǔ)言呢
安卓8.0最新消息:安卓8.0初體驗(yàn),竟然這么流暢還有這么多黑科技功能
硬件電路設(shè)計(jì)有這么多坑,如何少走彎路?看大牛怎么說(shuō)
這么多內(nèi)網(wǎng)穿透工具怎么選?一篇讓你不再糾結(jié)的終極指南!
工業(yè)上面為什么有這么多通訊協(xié)議?
這么多技術(shù)框架,為什么選debezium?
評(píng)論