消息隊列的七種經(jīng)典應(yīng)用場景
在筆者心中,消息隊列,緩存,分庫分表是高并發(fā)解決方案三劍客。
在職業(yè)生涯中,筆者曾經(jīng)使用過 ActiveMQ 、RabbitMQ 、Kafka 、RocketMQ 這些知名的消息隊列 。
這篇文章,筆者結(jié)合自己的真實經(jīng)歷,和大家分享消息隊列的七種經(jīng)典應(yīng)用場景。
圖片
1 異步&解耦
筆者曾經(jīng)負(fù)責(zé)某電商公司的用戶服務(wù),該服務(wù)提供用戶注冊,查詢,修改等基礎(chǔ)功能。用戶注冊成功之后,需要給用戶發(fā)送短信。
圖片
圖中,新增用戶和發(fā)送短信都揉在用戶中心服務(wù)里,這種方式缺點非常明顯:
- 假如短信渠道接口不穩(wěn)定,發(fā)送短信發(fā)生超時,用戶注冊接口耗時很大,影響前端用戶體驗;
- 短信渠道接口發(fā)生變化,用戶中心代碼就必須修改了。但用戶中心是核心系統(tǒng)。每次上線都必要謹(jǐn)小慎微。這種感覺很別扭,非核心功能影響到核心系統(tǒng)了。
為了解決這個問題,筆者采用了消息隊列進(jìn)行了重構(gòu)。
圖片
- 異步:用戶中心服務(wù)保存用戶信息成功后,發(fā)送一條消息到消息隊列 ,立即將結(jié)果返回給前端,這樣能避免總耗時比較長,從而影響用戶的體驗的問題。
- 解耦:任務(wù)服務(wù)收到消息調(diào)用短信服務(wù)發(fā)送短信,將核心服務(wù)與非核心功能剝離,顯著的降低了系統(tǒng)間的耦合度。
2 消峰
高并發(fā)場景下,面對突然出現(xiàn)的請求峰值,非常容易導(dǎo)致系統(tǒng)變得不穩(wěn)定,比如大量請求訪問數(shù)據(jù)庫,會對數(shù)據(jù)庫造成極大的壓力,或者系統(tǒng)的資源 CPU 、IO 出現(xiàn)瓶頸。
筆者曾服務(wù)于神州專車訂單團(tuán)隊,在訂單的載客生命周期里,訂單的修改操作先修改訂單緩存,然后發(fā)送消息到 MetaQ ,訂單落盤服務(wù)消費消息,并判斷訂單信息是否正常(比如有無亂序),若訂單數(shù)據(jù)無誤,則存儲到數(shù)據(jù)庫中。
圖片
當(dāng)面對請求峰值時,由于消費者的并發(fā)度在一個閾值范圍內(nèi),同時消費速度相對均勻,因此不會對數(shù)據(jù)庫造成太大的影響,同時真正面對前端的訂單系統(tǒng)生產(chǎn)者也會變得更穩(wěn)定。
3 消息總線
所謂總線,就是像主板里的數(shù)據(jù)總線一樣, 具有數(shù)據(jù)的傳遞和交互能力,各方不直接通信,使用總線作為標(biāo)準(zhǔn)通信接口。
筆者曾經(jīng)服務(wù)于某彩票公司訂單團(tuán)隊,在彩票訂單的生命周期里,經(jīng)過創(chuàng)建,拆分子訂單,出票,算獎等諸多環(huán)節(jié)。每一個環(huán)節(jié)都需要不同的服務(wù)處理,每個系統(tǒng)都有自己獨立的表,業(yè)務(wù)功能也相對獨立。假如每個應(yīng)用都去修改訂單主表的信息,那就會相當(dāng)混亂了。
因此,公司的架構(gòu)師設(shè)計了調(diào)度中心的服務(wù),調(diào)度中心維護(hù)訂單的信息,但它不與子服務(wù)通訊,而是通過消息隊列和出票網(wǎng)關(guān),算獎服務(wù)等系統(tǒng)傳遞和交換信息。
圖片
消息總線這種架構(gòu)設(shè)計,可以讓系統(tǒng)更加解耦,同時也可以讓每個系統(tǒng)各司其職。
4 延時任務(wù)
用戶在美團(tuán) APP 下單,假如沒有立即支付,進(jìn)入訂單詳情會顯示倒計時,如果超過支付時間,訂單就會被自動取消。
非常優(yōu)雅的方式是:使用消息隊列的延時消息。
訂單服務(wù)生成訂單后,發(fā)送一條延時消息到消息隊列。消息隊列在消息到達(dá)支付過期時間時,將消息投遞給消費者,消費者收到消息之后,判斷訂單狀態(tài)是否為已支付,假如未支付,則執(zhí)行取消訂單的邏輯。
圖片
RocketMQ 4.X 生產(chǎn)者發(fā)送延遲消息代碼如下:
Message msg = new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
//設(shè)置延遲level為5,對應(yīng)延遲1分鐘
msg.setDelayTimeLevel(5);
producer.send(msg);
RocketMQ 4.X 版本默認(rèn)支持 18 個 level 的延遲消息, 通過 broker 端的 messageDelayLevel 配置項確定的。
圖片
RocketMQ 5.X 版本支持任意時刻延遲消息,客戶端在構(gòu)造消息時提供了 3 個 API 來指定延遲時間或定時時間。
圖片
5 廣播消費
廣播消費是指每條消息推送給集群內(nèi)所有的消費者,保證消息至少被每個消費者消費一次。
圖片
廣播消費主要用于兩種場景:消息推送和緩存同步。
01 消息推送
下圖是專車的司機端推送機制,用戶下單之后,訂單系統(tǒng)生成專車訂單,派單系統(tǒng)會根據(jù)相關(guān)算法將訂單派給某司機,司機端就會收到派單推送消息。
圖片
推送服務(wù)是一個 TCP 服務(wù)(自定義協(xié)議),同時也是一個消費者服務(wù),消息模式是廣播消費。
司機打開司機端 APP 后,APP 會通過負(fù)載均衡和推送服務(wù)創(chuàng)建長連接,推送服務(wù)會保存 TCP 連接引用 (比如司機編號和 TCP channel 的引用)。
派單服務(wù)是生產(chǎn)者,將派單數(shù)據(jù)發(fā)送到 MetaQ , 每個推送服務(wù)都會消費到該消息,推送服務(wù)判斷本地內(nèi)存中是否存在該司機的 TCP channel , 若存在,則通過 TCP 連接將數(shù)據(jù)推送給司機端。
02 緩存同步
高并發(fā)場景下,很多應(yīng)用使用本地緩存,提升系統(tǒng)性能 。
本地緩存可以是 HashMap 、ConcurrentHashMap ,也可以是緩存框架 Guava Cache 或者 Caffeine cache 。
圖片
如上圖,應(yīng)用A啟動后,作為一個 RocketMQ 消費者,消息模式設(shè)置為廣播消費。為了提升接口性能,每個應(yīng)用節(jié)點都會將字典表加載到本地緩存里。
當(dāng)字典表數(shù)據(jù)變更時,可以通過業(yè)務(wù)系統(tǒng)發(fā)送一條消息到 RocketMQ ,每個應(yīng)用節(jié)點都會消費消息,刷新本地緩存。
6 分布式事務(wù)
以電商交易場景為例,用戶支付訂單這一核心操作的同時會涉及到下游物流發(fā)貨、積分變更、購物車狀態(tài)清空等多個子系統(tǒng)的變更。
圖片
1、傳統(tǒng)XA事務(wù)方案:性能不足
為了保證上述四個分支的執(zhí)行結(jié)果一致性,典型方案是基于 XA 協(xié)議的分布式事務(wù)系統(tǒng)來實現(xiàn)。將四個調(diào)用分支封裝成包含四個獨立事務(wù)分支的大事務(wù)?;?XA 分布式事務(wù)的方案可以滿足業(yè)務(wù)處理結(jié)果的正確性,但最大的缺點是多分支環(huán)境下資源鎖定范圍大,并發(fā)度低,隨著下游分支的增加,系統(tǒng)性能會越來越差。
2、基于普通消息方案:一致性保障困難
圖片
該方案中消息下游分支和訂單系統(tǒng)變更的主分支很容易出現(xiàn)不一致的現(xiàn)象,例如:
- 消息發(fā)送成功,訂單沒有執(zhí)行成功,需要回滾整個事務(wù)。
- 訂單執(zhí)行成功,消息沒有發(fā)送成功,需要額外補償才能發(fā)現(xiàn)不一致。
- 消息發(fā)送超時未知,此時無法判斷需要回滾訂單還是提交訂單變更。
3、基于 RocketMQ 分布式事務(wù)消息:支持最終一致性
上述普通消息方案中,普通消息和訂單事務(wù)無法保證一致的原因,本質(zhì)上是由于普通消息無法像單機數(shù)據(jù)庫事務(wù)一樣,具備提交、回滾和統(tǒng)一協(xié)調(diào)的能力。
而基于 RocketMQ 實現(xiàn)的分布式事務(wù)消息功能,在普通消息基礎(chǔ)上,支持二階段的提交能力。將二階段提交和本地事務(wù)綁定,實現(xiàn)全局提交結(jié)果的一致性。
交互流程如下圖所示:
圖片
1、生產(chǎn)者將消息發(fā)送至 Broker 。
2、Broker 將消息持久化成功之后,向生產(chǎn)者返回 Ack 確認(rèn)消息已經(jīng)發(fā)送成功,此時消息被標(biāo)記為"暫不能投遞",這種狀態(tài)下的消息即為半事務(wù)消息。
3、生產(chǎn)者開始執(zhí)行本地事務(wù)邏輯。
4、生產(chǎn)者根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn)結(jié)果( Commit 或是 Rollback ),Broker 收到確認(rèn)結(jié)果后處理邏輯如下:
- 二次確認(rèn)結(jié)果為 Commit :Broker 將半事務(wù)消息標(biāo)記為可投遞,并投遞給消費者。
- 二次確認(rèn)結(jié)果為 Rollback :Broker 將回滾事務(wù),不會將半事務(wù)消息投遞給消費者。
5、在斷網(wǎng)或者是生產(chǎn)者應(yīng)用重啟的特殊情況下,若 Broker 未收到發(fā)送者提交的二次確認(rèn)結(jié)果,或 Broker 收到的二次確認(rèn)結(jié)果為 Unknown 未知狀態(tài),經(jīng)過固定時間后,服務(wù)端將對消息生產(chǎn)者即生產(chǎn)者集群中任一生產(chǎn)者實例發(fā)起消息回查。
- 生產(chǎn)者收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
- 生產(chǎn)者根據(jù)檢查到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),服務(wù)端仍按照步驟4對半事務(wù)消息進(jìn)行處理。
7 數(shù)據(jù)中轉(zhuǎn)樞紐
近10多年來,諸如 KV 存儲(HBase)、搜索(ElasticSearch)、流式處理(Storm、Spark、Samza)、時序數(shù)據(jù)庫(OpenTSDB)等專用系統(tǒng)應(yīng)運而生。這些系統(tǒng)是為單一的目標(biāo)而產(chǎn)生的,因其簡單性使得在商業(yè)硬件上構(gòu)建分布式系統(tǒng)變得更加容易且性價比更高。
通常,同一份數(shù)據(jù)集需要被注入到多個專用系統(tǒng)內(nèi)。
例如,當(dāng)應(yīng)用日志用于離線日志分析時,搜索單個日志記錄同樣不可或缺,而構(gòu)建各自獨立的工作流來采集每種類型的數(shù)據(jù)再導(dǎo)入到各自的專用系統(tǒng)顯然不切實際,利用消息隊列 Kafka 作為數(shù)據(jù)中轉(zhuǎn)樞紐,同份數(shù)據(jù)可以被導(dǎo)入到不同專用系統(tǒng)中。
日志同步主要有三個關(guān)鍵部分:日志采集客戶端,Kafka 消息隊列以及后端的日志處理應(yīng)用。
- 日志采集客戶端,負(fù)責(zé)用戶各類應(yīng)用服務(wù)的日志數(shù)據(jù)采集,以消息方式將日志“批量”“異步”發(fā)送Kafka客戶端。Kafka客戶端批量提交和壓縮消息,對應(yīng)用服務(wù)的性能影響非常小。
- Kafka 將日志存儲在消息文件中,提供持久化。
- 日志處理應(yīng)用,如 Logstash,訂閱并消費Kafka中的日志消息,最終供文件搜索服務(wù)檢索日志,或者由 Kafka 將消息傳遞給 Hadoop 等其他大數(shù)據(jù)應(yīng)用系統(tǒng)化存儲與分析。
圖片
如果我的文章對你有所幫助,還請幫忙點贊、在看、轉(zhuǎn)發(fā)一下,你的支持會激勵我輸出更高質(zhì)量的文章,非常感謝!