自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

剖析 Redis List 消息隊列的三種消費線程模型

數(shù)據(jù)庫 Redis
使用 List 做消息隊列,不可避免的會有消息丟失,所以我們需要用定時任務(wù)做補償,每隔一段時間去業(yè)務(wù)表里查詢業(yè)務(wù)狀態(tài)機,若狀態(tài)機不符合條件,則觸發(fā)補償策略。

Redis 列表(List)是一種簡單的字符串列表,它的底層實現(xiàn)是一個雙向鏈表。

生產(chǎn)環(huán)境,很多公司都將 Redis 列表應(yīng)用于輕量級消息隊列 。這篇文章,我們聊聊如何使用 List 命令實現(xiàn)消息隊列的功能以及剖析消費者線程模型 。

圖片圖片

一、核心流程

生產(chǎn)者使用 LPUSH key element[element...] 將消息插入到隊列的頭部,如果 key 不存在則會創(chuàng)建一個空的隊列再插入消息。

如下,生產(chǎn)者向隊列 queue 先后插入了 「Java」「勇哥」「Go」,返回值表示消息插入隊列后的個數(shù)。

> LPUSH queue Java 勇哥 Go
(integer) 3

消費者使用 RPOP key 依次讀取隊列的消息,先進(jìn)先出,所以 「Java」會先讀取消費:

> RPOP queue
"Java"
> RPOP queue
"勇哥"
> RPOP queue
"Go"

圖片圖片

接下來,我們可以通過 spring-data-redis API 演示生產(chǎn)消費流程:

  • 生產(chǎn)者
redisTemplate.opsForList().leftPush("queue" , "Java");
redisTemplate.opsForList().leftPush("queue" , "勇哥");
redisTemplate.opsForList().leftPush("queue" , "Go");
  • 消費者

我們啟動一個獨立的線程從隊列中讀取消息(RPOP 命令),讀取成功之后,消費消息,若沒有消息,則休眠一會,下一次循環(huán)再繼續(xù)。

圖片圖片

上圖的偽代碼中, while(true) 循環(huán)內(nèi)不停地調(diào)用 RPOP 指令,當(dāng)有消息時,可以及時處理,但假如沒有讀取到消息,則需要休眠一會。

這里要加休眠,主要是為了減少空讀的頻率,避免 CPU 無意義的消耗。

有什么更優(yōu)化的方式嗎?有,那就是使用 Redis 阻塞讀取 List 的命令。

Redis 提供了 BLPOP、BRPOP 阻塞讀取的命令,消費者在在讀取隊列沒有數(shù)據(jù)的時自動阻塞,直到有新的消息寫入隊列,才會繼續(xù)讀取新消息執(zhí)行業(yè)務(wù)邏輯。

BRPOP queue 0

參數(shù) 0 表示阻塞等待時間無限制 。

圖片圖片

如圖,我們啟動一個消費線程永動機,消費線程拉取消息后,執(zhí)行消費邏輯。

這種消費者線程模型非常容易理解,同時也非常適合順序消費的模式。同時,假如我們在消費消息時,服務(wù)器宕機或者斷電,可能丟失一條消息。

接下來,我們想一想,有沒有消費速度更高的消費模型嗎?筆者根據(jù)過往的經(jīng)歷,列舉三種模式:

  • 拉取線程 + 消費線程池(非阻塞模式)
  • 拉取線程 + 消費線程池 (阻塞模式)
  • 拉取線程 + Disruptor(阻塞模式)

二、拉取線程 + 消費線程池(非阻塞模式)

為了提升消費速度,我們可以將拉取和消費拆分成兩種動作,分別通過不同的線程池來處理。拉取線程池負(fù)責(zé)拉取消息,消費線程池負(fù)責(zé)消費消息。

圖片圖片

偽代碼類似:

圖片圖片

如圖,在拉取線程內(nèi)部,我們拉取完消息后,將消息提交到消費線程 consumeExecutor 。

這樣方式可以通過多線程執(zhí)行大幅度提升消費速度 ,但是這里還是有一個問題:

假如消費速度很慢,生產(chǎn)者速度很高,那么就會在線程池內(nèi)容易產(chǎn)生消息堆積,這里面會產(chǎn)生兩個隱形風(fēng)險:

  • 線程池隊列無限堆積,則可能有 OOM 的風(fēng)險 ;
  • 假如消費者服務(wù)器宕機或者斷電,那么會丟失大量的消息。

那么如何優(yōu)化這種模式呢 ?

答案是:拉取線程提交消息到線程池時,當(dāng)隊列中消息數(shù)量到達(dá)一定數(shù)量時,提交消息到線程池會阻塞。

三、拉取線程 + 消費線程池(阻塞模式)

我們將消息包裝為 Runnable ,然后通過消費線程池執(zhí)行 execute ,拉取線程會不會阻塞呢 ?

下圖是執(zhí)行的源碼:

圖片圖片

可以看到,第 30 行調(diào)用的是 workQueue 的非阻塞的 offer 方法。

如果隊列已滿,新提交的任務(wù)并不會被 block 住,反而會調(diào)用后續(xù)的 reject 流程。

如果我們想要達(dá)到阻塞生產(chǎn)者的目的的話,可以采取如下的兩種方案:

  • 信號量限制同時進(jìn)入線程池等待隊列的任務(wù)數(shù) 。

圖片圖片

  • 使用線程池的拒絕機制,把新加入的任務(wù) put 到等待隊列里,這樣也可以阻塞住生產(chǎn)者。

圖片圖片

四、拉取線程 + Disruptor

下圖展示了 Disruptor 的流程圖 。

圖片圖片

和線程池機制非常類似, Disruptor 也是非常典型的生產(chǎn)者/消費者模式。線程池存儲提交任務(wù)的容器是阻塞隊列,而 Disruptor 使用的是環(huán)形緩沖區(qū) RingBuffer。

環(huán)形緩沖區(qū)的設(shè)計相比阻塞隊列有如下優(yōu)點:

  • 環(huán)形數(shù)組結(jié)構(gòu)

為了避免垃圾回收,采用數(shù)組而非鏈表。同時,數(shù)組對處理器的緩存機制更加友好。

  • 元素位置定位

數(shù)組長度 2^n,通過位運算,加快定位的速度。下標(biāo)采取遞增的形式,不用擔(dān)心 index 溢出的問題。index 是 long 類型,即使100萬QPS的處理速度,也需要30萬年才能用完。

  • 無鎖設(shè)計

每個生產(chǎn)者或者消費者線程,會先申請可以操作的元素在數(shù)組中的位置,申請到之后,直接在該位置寫入或者讀取數(shù)據(jù)。

此刻大家并不需要理解環(huán)形緩沖區(qū)的讀寫機制,只需要明白 環(huán)形緩沖區(qū) RingBuffer 是 Disruptor 的精髓即可。

將消費線程池替換成 Disruptor 有兩個明顯的優(yōu)點:

  • 無鎖隊列,寫入讀取性能非常好
  • 當(dāng)拉取線程提交消息到 Disruptor 時,若環(huán)形緩沖區(qū) RingBuffer 已經(jīng)滿了,則拉取線程會阻塞,這樣天然的可以避免無限拉取,同時避免 OOM 的問題。

偽代碼類似:

1.定義 Disruptor

圖片圖片

2.拉取線程將消息發(fā)送到 Disruptor Ringbuffer

圖片圖片

3.消費消息

圖片圖片

整體的消費者線程模型如下圖:

圖片圖片

五、平滑停服 + 定時任務(wù)補償

當(dāng)我們分析消費者線程模型時,無論我們使用哪種方式,假如服務(wù)器突然宕機、或者物理機斷電,則會丟失消息。

筆者推薦兩種方式:

1.平滑停服

平滑停服是指在停止應(yīng)用程序時,盡量避免中斷正在進(jìn)行的請求或任務(wù),盡量讓正在進(jìn)行的任務(wù)處理完成,并且不再接收新的任務(wù),等所有任務(wù)執(zhí)行完成后關(guān)閉應(yīng)用。

在 Unix/Linux 系統(tǒng)中,可以使用 kill 命令發(fā)送信號給運行中的進(jìn)程。

常見的信號有:

  • SIGTERM (15):請求進(jìn)程終止,可以被捕捉和處理,用于優(yōu)雅地停止進(jìn)程。
  • SIGKILL (9):強制終止進(jìn)程,不能被捕捉或忽略。
  • SIGQUIT (3):進(jìn)程退出并生成核心轉(zhuǎn)儲(core dump)。

為了實現(xiàn)平滑停服,可以使用 Java 的 Runtime.getRuntime().addShutdownHook 方法注冊一個關(guān)閉鉤子(shutdown hook)。當(dāng) JVM 接收到SIGTERM信號時,關(guān)閉鉤子會被執(zhí)行,從而可以在應(yīng)用程序停止前執(zhí)行一些清理工作。

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        System.out.println("Shutdown hook triggered. Performing cleanup...");
        // 在這里執(zhí)行清理工作,如關(guān)閉資源、保存狀態(tài)等
}));

我們可以在鉤子里,關(guān)閉拉取線程池 ,優(yōu)雅關(guān)閉消費線程池等 ,這樣可以盡量避免丟失消息。

2.定時任務(wù)補償

使用 List 做消息隊列,不可避免的會有消息丟失,所以我們需要用定時任務(wù)做補償,每隔一段時間去業(yè)務(wù)表里查詢業(yè)務(wù)狀態(tài)機,若狀態(tài)機不符合條件,則觸發(fā)補償策略。

參考資料:

https://www.redis.net.cn/tutorial/3510.html

https://redis.io/docs/latest/develop/data-types/lists/

https://juejin.cn/post/7094272373930590245

https://blog.scottlogic.com/2021/12/01/disruptor.html

責(zé)任編輯:武曉燕 來源: 勇哥Java實戰(zhàn)
相關(guān)推薦

2021-01-12 08:43:29

Redis ListStreams

2023-03-06 08:40:43

RedisListJava

2009-09-22 14:12:16

Hibernate S

2009-07-16 16:23:59

Swing線程

2022-01-15 07:20:18

Redis List 消息隊列

2022-01-21 19:22:45

RedisList命令

2021-11-05 21:33:28

Redis數(shù)據(jù)高并發(fā)

2009-11-09 11:15:06

WCF消息隊列

2023-10-13 00:00:00

Redis模塊空間對象

2018-04-02 14:29:18

Java多線程方式

2013-05-07 09:39:14

軟件定義網(wǎng)絡(luò)SDNOpenFlow

2021-12-20 07:11:26

Java List排序 Java 基礎(chǔ)

2009-12-21 13:37:43

WCF消息交換

2024-10-25 08:41:18

消息隊列RedisList

2016-06-12 10:37:32

云計算私有云公有云

2020-11-03 19:52:54

Java數(shù)組編程語言

2011-01-18 15:35:59

jQueryJavaScriptweb

2024-11-18 08:08:21

2022-07-07 00:33:34

Java線程同步

2020-10-26 09:19:11

線程池消息
點贊
收藏

51CTO技術(shù)棧公眾號