剖析 Redis List 消息隊列的三種消費線程模型
Redis 列表(List)是一種簡單的字符串列表,它的底層實現(xiàn)是一個雙向鏈表。
生產(chǎn)環(huán)境,很多公司都將 Redis 列表應(yīng)用于輕量級消息隊列 。這篇文章,我們聊聊如何使用 List 命令實現(xiàn)消息隊列的功能以及剖析消費者線程模型 。
圖片
一、核心流程
生產(chǎn)者使用 LPUSH key element[element...] 將消息插入到隊列的頭部,如果 key 不存在則會創(chuàng)建一個空的隊列再插入消息。
如下,生產(chǎn)者向隊列 queue 先后插入了 「Java」「勇哥」「Go」,返回值表示消息插入隊列后的個數(shù)。
> LPUSH queue Java 勇哥 Go
(integer) 3
消費者使用 RPOP key 依次讀取隊列的消息,先進(jìn)先出,所以 「Java」會先讀取消費:
> RPOP queue
"Java"
> RPOP queue
"勇哥"
> RPOP queue
"Go"
圖片
接下來,我們可以通過 spring-data-redis API 演示生產(chǎn)消費流程:
- 生產(chǎn)者
redisTemplate.opsForList().leftPush("queue" , "Java");
redisTemplate.opsForList().leftPush("queue" , "勇哥");
redisTemplate.opsForList().leftPush("queue" , "Go");
- 消費者
我們啟動一個獨立的線程從隊列中讀取消息(RPOP 命令),讀取成功之后,消費消息,若沒有消息,則休眠一會,下一次循環(huán)再繼續(xù)。
圖片
上圖的偽代碼中, while(true) 循環(huán)內(nèi)不停地調(diào)用 RPOP 指令,當(dāng)有消息時,可以及時處理,但假如沒有讀取到消息,則需要休眠一會。
這里要加休眠,主要是為了減少空讀的頻率,避免 CPU 無意義的消耗。
有什么更優(yōu)化的方式嗎?有,那就是使用 Redis 阻塞讀取 List 的命令。
Redis 提供了 BLPOP、BRPOP 阻塞讀取的命令,消費者在在讀取隊列沒有數(shù)據(jù)的時自動阻塞,直到有新的消息寫入隊列,才會繼續(xù)讀取新消息執(zhí)行業(yè)務(wù)邏輯。
BRPOP queue 0
參數(shù) 0 表示阻塞等待時間無限制 。
圖片
如圖,我們啟動一個消費線程永動機,消費線程拉取消息后,執(zhí)行消費邏輯。
這種消費者線程模型非常容易理解,同時也非常適合順序消費的模式。同時,假如我們在消費消息時,服務(wù)器宕機或者斷電,可能丟失一條消息。
接下來,我們想一想,有沒有消費速度更高的消費模型嗎?筆者根據(jù)過往的經(jīng)歷,列舉三種模式:
- 拉取線程 + 消費線程池(非阻塞模式)
- 拉取線程 + 消費線程池 (阻塞模式)
- 拉取線程 + Disruptor(阻塞模式)
二、拉取線程 + 消費線程池(非阻塞模式)
為了提升消費速度,我們可以將拉取和消費拆分成兩種動作,分別通過不同的線程池來處理。拉取線程池負(fù)責(zé)拉取消息,消費線程池負(fù)責(zé)消費消息。
圖片
偽代碼類似:
圖片
如圖,在拉取線程內(nèi)部,我們拉取完消息后,將消息提交到消費線程 consumeExecutor 。
這樣方式可以通過多線程執(zhí)行大幅度提升消費速度 ,但是這里還是有一個問題:
假如消費速度很慢,生產(chǎn)者速度很高,那么就會在線程池內(nèi)容易產(chǎn)生消息堆積,這里面會產(chǎn)生兩個隱形風(fēng)險:
- 線程池隊列無限堆積,則可能有 OOM 的風(fēng)險 ;
- 假如消費者服務(wù)器宕機或者斷電,那么會丟失大量的消息。
那么如何優(yōu)化這種模式呢 ?
答案是:拉取線程提交消息到線程池時,當(dāng)隊列中消息數(shù)量到達(dá)一定數(shù)量時,提交消息到線程池會阻塞。
三、拉取線程 + 消費線程池(阻塞模式)
我們將消息包裝為 Runnable ,然后通過消費線程池執(zhí)行 execute ,拉取線程會不會阻塞呢 ?
下圖是執(zhí)行的源碼:
圖片
可以看到,第 30 行調(diào)用的是 workQueue 的非阻塞的 offer 方法。
如果隊列已滿,新提交的任務(wù)并不會被 block 住,反而會調(diào)用后續(xù)的 reject 流程。
如果我們想要達(dá)到阻塞生產(chǎn)者的目的的話,可以采取如下的兩種方案:
- 信號量限制同時進(jìn)入線程池等待隊列的任務(wù)數(shù) 。
圖片
- 使用線程池的拒絕機制,把新加入的任務(wù) put 到等待隊列里,這樣也可以阻塞住生產(chǎn)者。
圖片
四、拉取線程 + Disruptor
下圖展示了 Disruptor 的流程圖 。
圖片
和線程池機制非常類似, Disruptor 也是非常典型的生產(chǎn)者/消費者模式。線程池存儲提交任務(wù)的容器是阻塞隊列,而 Disruptor 使用的是環(huán)形緩沖區(qū) RingBuffer。
環(huán)形緩沖區(qū)的設(shè)計相比阻塞隊列有如下優(yōu)點:
- 環(huán)形數(shù)組結(jié)構(gòu)
為了避免垃圾回收,采用數(shù)組而非鏈表。同時,數(shù)組對處理器的緩存機制更加友好。
- 元素位置定位
數(shù)組長度 2^n,通過位運算,加快定位的速度。下標(biāo)采取遞增的形式,不用擔(dān)心 index 溢出的問題。index 是 long 類型,即使100萬QPS的處理速度,也需要30萬年才能用完。
- 無鎖設(shè)計
每個生產(chǎn)者或者消費者線程,會先申請可以操作的元素在數(shù)組中的位置,申請到之后,直接在該位置寫入或者讀取數(shù)據(jù)。
此刻大家并不需要理解環(huán)形緩沖區(qū)的讀寫機制,只需要明白 環(huán)形緩沖區(qū) RingBuffer 是 Disruptor 的精髓即可。
將消費線程池替換成 Disruptor 有兩個明顯的優(yōu)點:
- 無鎖隊列,寫入讀取性能非常好
- 當(dāng)拉取線程提交消息到 Disruptor 時,若環(huán)形緩沖區(qū) RingBuffer 已經(jīng)滿了,則拉取線程會阻塞,這樣天然的可以避免無限拉取,同時避免 OOM 的問題。
偽代碼類似:
1.定義 Disruptor
圖片
2.拉取線程將消息發(fā)送到 Disruptor Ringbuffer
圖片
3.消費消息
圖片
整體的消費者線程模型如下圖:
圖片
五、平滑停服 + 定時任務(wù)補償
當(dāng)我們分析消費者線程模型時,無論我們使用哪種方式,假如服務(wù)器突然宕機、或者物理機斷電,則會丟失消息。
筆者推薦兩種方式:
1.平滑停服
平滑停服是指在停止應(yīng)用程序時,盡量避免中斷正在進(jìn)行的請求或任務(wù),盡量讓正在進(jìn)行的任務(wù)處理完成,并且不再接收新的任務(wù),等所有任務(wù)執(zhí)行完成后關(guān)閉應(yīng)用。
在 Unix/Linux 系統(tǒng)中,可以使用 kill 命令發(fā)送信號給運行中的進(jìn)程。
常見的信號有:
- SIGTERM (15):請求進(jìn)程終止,可以被捕捉和處理,用于優(yōu)雅地停止進(jìn)程。
- SIGKILL (9):強制終止進(jìn)程,不能被捕捉或忽略。
- SIGQUIT (3):進(jìn)程退出并生成核心轉(zhuǎn)儲(core dump)。
為了實現(xiàn)平滑停服,可以使用 Java 的 Runtime.getRuntime().addShutdownHook 方法注冊一個關(guān)閉鉤子(shutdown hook)。當(dāng) JVM 接收到SIGTERM信號時,關(guān)閉鉤子會被執(zhí)行,從而可以在應(yīng)用程序停止前執(zhí)行一些清理工作。
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutdown hook triggered. Performing cleanup...");
// 在這里執(zhí)行清理工作,如關(guān)閉資源、保存狀態(tài)等
}));
我們可以在鉤子里,關(guān)閉拉取線程池 ,優(yōu)雅關(guān)閉消費線程池等 ,這樣可以盡量避免丟失消息。
2.定時任務(wù)補償
使用 List 做消息隊列,不可避免的會有消息丟失,所以我們需要用定時任務(wù)做補償,每隔一段時間去業(yè)務(wù)表里查詢業(yè)務(wù)狀態(tài)機,若狀態(tài)機不符合條件,則觸發(fā)補償策略。
參考資料:
https://www.redis.net.cn/tutorial/3510.html
https://redis.io/docs/latest/develop/data-types/lists/