Kafka 為什么這么快的七大秘訣,漲知識(shí)了
我們都知道 Kafka 是基于磁盤進(jìn)行存儲(chǔ)的,但 Kafka 官方又稱其具有高性能、高吞吐、低延時(shí)的特點(diǎn),其吞吐量動(dòng)輒幾十上百萬。
在座的靚仔和靚女們是不是有點(diǎn)困惑了,一般認(rèn)為在磁盤上讀寫數(shù)據(jù)是會(huì)降低性能的,因?yàn)閷ぶ窌?huì)比較消耗時(shí)間。那 Kafka 又是怎么做到其吞吐量動(dòng)輒幾十上百萬的呢?
一、Kafka Reactor I/O 網(wǎng)絡(luò)模型
Kafka Reactor I/O 網(wǎng)絡(luò)模型是一種非阻塞 I/O 模型,利用事件驅(qū)動(dòng)機(jī)制來處理網(wǎng)絡(luò)請求。
該模型通過 Reactor 模式實(shí)現(xiàn),即一個(gè)或多個(gè) I/O 多路復(fù)用器(如 Java 的 Selector)監(jiān)聽多個(gè)通道的事件,當(dāng)某個(gè)通道準(zhǔn)備好進(jìn)行 I/O 操作時(shí),觸發(fā)相應(yīng)的事件處理器進(jìn)行處理。
這種模型在高并發(fā)場景下具有很高的效率,能夠同時(shí)處理大量的網(wǎng)絡(luò)連接請求,而不需要為每個(gè)連接創(chuàng)建一個(gè)線程,從而節(jié)省系統(tǒng)資源。
Reactor 線程模型如圖 2 所示。
圖 2
Reacotr 模型主要分為三個(gè)角色。
- Reactor:把 I/O 事件根據(jù)類型分配給分配給對應(yīng)的 Handler 處理。
- Acceptor:處理客戶端連接事件。
- Handler:處理讀寫等任務(wù)。
Kafka 基于 Reactor 模型架構(gòu)如圖 3 所示。
圖 3
Kafka 的網(wǎng)絡(luò)通信模型基于 NIO(New Input/Output)庫,通過 Reactor 模式實(shí)現(xiàn),具體包括以下幾個(gè)關(guān)鍵組件:
- SocketServer:管理所有的網(wǎng)絡(luò)連接,包括初始化 Acceptor 和 Processor 線程。
- Acceptor:監(jiān)聽客戶端的連接請求,并將其分配給 Processor 線程。Acceptor 使用 Java NIO 的 Selector 進(jìn)行 I/O 多路復(fù)用,并注冊 OP_ACCEPT 事件來監(jiān)聽新的連接請求。每當(dāng)有新的連接到達(dá)時(shí),Acceptor 會(huì)接受連接并創(chuàng)建一個(gè) SocketChannel,然后將其分配給一個(gè) Processor 線程進(jìn)行處理。
- Processor:處理具體的 I/O 操作,包括讀取客戶端請求和寫入響應(yīng)數(shù)據(jù)。Processor 同樣使用 Selector 進(jìn)行 I/O 多路復(fù)用,注冊 OP_READ 和 OP_WRITE 事件來處理讀寫操作。每個(gè) Processor 線程都有一個(gè)獨(dú)立的 Selector,用于管理多個(gè) SocketChannel。
- RequestChannel:充當(dāng) Processor 和請求處理線程之間的緩沖區(qū),存儲(chǔ)請求和響應(yīng)數(shù)據(jù)。Processor 將讀取的請求放入 RequestChannel 的請求隊(duì)列,而請求處理線程則從該隊(duì)列中取出請求進(jìn)行處理。
- KafkaRequestHandler:請求處理線程,從 RequestChannel 中讀取請求,調(diào)用 KafkaApis 進(jìn)行業(yè)務(wù)邏輯處理,并將響應(yīng)放回 RequestChannel 的響應(yīng)隊(duì)列。KafkaRequestHandler 線程池中的線程數(shù)量由配置參數(shù) num.io.threads 決定。
圖 4
Chaya:該模型和如何提高 kafka 的性能和效率?
高并發(fā)處理能力:通過 I/O 多路復(fù)用機(jī)制,Kafka 能夠同時(shí)處理大量的網(wǎng)絡(luò)連接請求,而不需要為每個(gè)連接創(chuàng)建一個(gè)線程,從而節(jié)省了系統(tǒng)資源。
低延遲:非阻塞 I/O 操作避免了線程的阻塞等待,使得 I/O 操作能夠更快地完成,從而降低了系統(tǒng)的響應(yīng)延遲。
資源節(jié)省:通過減少線程的數(shù)量和上下文切換,Kafka 在處理高并發(fā)請求時(shí)能夠更有效地利用 CPU 和內(nèi)存資源。
擴(kuò)展性強(qiáng):Reactor 模式的分層設(shè)計(jì)使得 Kafka 的網(wǎng)絡(luò)模塊具有很好的擴(kuò)展性,可以根據(jù)需要增加更多的 I/O 線程或調(diào)整事件處理器的邏輯。
二、零拷貝技術(shù)的運(yùn)用
零拷貝技術(shù)是一種計(jì)算機(jī)操作系統(tǒng)技術(shù),用于在內(nèi)存和存儲(chǔ)設(shè)備之間進(jìn)行數(shù)據(jù)傳輸時(shí),避免 CPU 的參與,從而減少 CPU 的負(fù)擔(dān)并提高數(shù)據(jù)傳輸效率。
Kafka 使用零拷貝技術(shù)來優(yōu)化數(shù)據(jù)傳輸,特別是在生產(chǎn)者將數(shù)據(jù)寫入 Kafka 和消費(fèi)者從 Kafka 讀取數(shù)據(jù)的過程中。在 Kafka 中,零拷貝主要通過以下幾種方式實(shí)現(xiàn):
- sendfile() 系統(tǒng)調(diào)用:在發(fā)送數(shù)據(jù)時(shí),Kafka 使用操作系統(tǒng)的 sendfile() 系統(tǒng)調(diào)用直接將文件從磁盤發(fā)送到網(wǎng)絡(luò)套接字,而無需將數(shù)據(jù)復(fù)制到應(yīng)用程序的用戶空間。這減少了數(shù)據(jù)復(fù)制次數(shù),提高了傳輸效率。
- 文件內(nèi)存映射(Memory-Mapped Files):Kafka 使用文件內(nèi)存映射技術(shù)(mmap),將磁盤上的日志文件映射到內(nèi)存中,使得讀寫操作可以在內(nèi)存中直接進(jìn)行,無需進(jìn)行額外的數(shù)據(jù)復(fù)制。
比如 Broker 讀取磁盤數(shù)據(jù)并把數(shù)據(jù)發(fā)送給 Consumer 的過程,傳統(tǒng) I/O 經(jīng)歷以下步驟。
- 讀取數(shù)據(jù):通過read 系統(tǒng)調(diào)用將磁盤數(shù)據(jù)通過 DMA copy 到內(nèi)核空間緩沖區(qū)(Read buffer)。
- 拷貝數(shù)據(jù):將數(shù)據(jù)從內(nèi)核空間緩沖區(qū)(Read buffer) 通過 CPU copy 到用戶空間緩沖區(qū)(Application buffer)。
- 寫入數(shù)據(jù):通過write()系統(tǒng)調(diào)用將數(shù)據(jù)從用戶空間緩沖區(qū)(Application) CPU copy 到內(nèi)核空間的網(wǎng)絡(luò)緩沖區(qū)(Socket buffer)。
- 發(fā)送數(shù)據(jù):將內(nèi)核空間的網(wǎng)絡(luò)緩沖區(qū)(Socket buffer)DMA copy 到網(wǎng)卡目標(biāo)端口,通過網(wǎng)卡將數(shù)據(jù)發(fā)送到目標(biāo)主機(jī)。
這一過程經(jīng)過的四次 copy 如圖 5 所示。
圖 5
Chaya:零拷貝技術(shù)如何提高 Kakfa 的性能?
零拷貝技術(shù)通過減少 CPU 負(fù)擔(dān)和內(nèi)存帶寬消耗,提高了 Kakfa 性能。
- 降低 CPU 使用率:由于數(shù)據(jù)不需要在內(nèi)核空間和用戶空間之間多次復(fù)制,CPU 的參與減少,從而降低了 CPU 使用率,騰出更多的 CPU 資源用于其他任務(wù)。
- 提高數(shù)據(jù)傳輸速度:直接從磁盤到網(wǎng)絡(luò)的傳輸路徑減少了中間步驟,使得數(shù)據(jù)傳輸更加高效,延遲更低。
- 減少內(nèi)存帶寬消耗:通過減少數(shù)據(jù)在內(nèi)存中的復(fù)制次數(shù),降低了內(nèi)存帶寬的消耗,使得系統(tǒng)能夠處理更多的并發(fā)請求。
三、Partition 并發(fā)和分區(qū)負(fù)載均衡
在說 Topic patition 分區(qū)并發(fā)之前,我們先了解下 kafka 架構(gòu)設(shè)計(jì)。
1.Kafka 架構(gòu)
一個(gè)典型的 Kafka 架構(gòu)包含以下幾個(gè)重要組件,如圖 6 所示。
圖 6
- Producer(生產(chǎn)者):發(fā)送消息的一方,負(fù)責(zé)發(fā)布消息到 Kafka 主題(Topic)。
- Consumer(消費(fèi)者):接受消息的一方,訂閱主題并處理消息。Kafka 有ConsumerGroup 的概念,每個(gè)Consumer 只能消費(fèi)所分配到的 Partition 的消息,每一個(gè)Partition只能被一個(gè)ConsumerGroup 中的一個(gè)Consumer 所消費(fèi),所以同一個(gè)ConsumerGroup 中Consumer 的數(shù)量如果超過了Partiton 的數(shù)量,將會(huì)出現(xiàn)有些Consumer 分配不到 partition 消費(fèi)。
- Broker(代理):服務(wù)代理節(jié)點(diǎn),Kafka 集群中的一臺(tái)服務(wù)器就是一個(gè) broker,可以水平無限擴(kuò)展,同一個(gè) Topic 的消息可以分布在多個(gè) broker 中。
- Topic(主題)與 Partition(分區(qū)) :Kafka 中的消息以 Topic 為單位進(jìn)行劃分,生產(chǎn)者將消息發(fā)送到特定的 Topic,而消費(fèi)者負(fù)責(zé)訂閱 Topic 的消息并進(jìn)行消費(fèi)。圖中 TopicA 有三個(gè) Partiton(TopicA-par0、TopicA-par1、TopicA-par2)
為了提升整個(gè)集群的吞吐量,Topic 在物理上還可以細(xì)分多個(gè)Partition,一個(gè) Partition 在磁盤上對應(yīng)一個(gè)文件夾。 - Replica(副本):副本,是 Kafka 保證數(shù)據(jù)高可用的方式,Kafka 同一 Partition 的數(shù)據(jù)可以在多 Broker 上存在多個(gè)副本,通常只有 leader 副本對外提供讀寫服務(wù),當(dāng) leader副本所在 broker 崩潰或發(fā)生網(wǎng)絡(luò)一場,Kafka 會(huì)在 Controller 的管理下會(huì)重新選擇新的 Leader 副本對外提供讀寫服務(wù)。
- ZooKeeper:管理 Kafka 集群的元數(shù)據(jù)和分布式協(xié)調(diào)。
2.Topic 主題
Topic 是 Kafka 中數(shù)據(jù)的邏輯分類單元,可以理解成一個(gè)隊(duì)列。Broker 是所有隊(duì)列部署的機(jī)器,Producer 將消息發(fā)送到特定的 Topic,而 Consumer 則從特定的 Topic 中消費(fèi)消息。
3.Partition
為了提高并行處理能力和擴(kuò)展性,Kafka 將一個(gè) Topic 分為多個(gè) Partition。每個(gè) Partition 是一個(gè)有序的消息隊(duì)列,消息在 Partition 內(nèi)部是有序的,但在不同的 Partition 之間沒有順序保證。
Producer 可以并行地將消息發(fā)送到不同的 Partition,Consumer 也可以并行地消費(fèi)不同的 Partition,從而提升整體處理能力。
因此,可以說,每增加一個(gè) Paritition 就增加了一個(gè)消費(fèi)并發(fā)。Partition的引入不僅提高了系統(tǒng)的可擴(kuò)展性,還使得數(shù)據(jù)處理更加靈活。
4.Partition 分區(qū)策略
碼樓:“生產(chǎn)者將消息發(fā)送到哪個(gè)分區(qū)是如何實(shí)現(xiàn)的?不合理的分配會(huì)導(dǎo)致消息集中在某些 Broker 上,豈不是完?duì)僮??!?/p>
主要有以下幾種分區(qū)策略:
- 輪詢策略:也稱Round-robin策略,即順序分配。
- 隨機(jī)策略:也稱Randomness策略。所謂隨機(jī)就是我們隨意地將消息放置到任意一個(gè)分區(qū)上。
- 按消息鍵保序策略。
- 基于地理位置分區(qū)策略。
輪詢策略
比如一個(gè) Topic 下有 3個(gè)分區(qū),那么第一條消息被發(fā)送到分區(qū)0,第二條被發(fā)送到分區(qū)1,第三條被發(fā)送到分區(qū)2,以此類推。
當(dāng)生產(chǎn)第4條消息時(shí)又會(huì)重新開始,即將其分配到分區(qū)0,如圖 5 所示。
輪詢策略有非常優(yōu)秀的負(fù)載均衡表現(xiàn),它總是能保證消息最大限度地被平均分配到所有分區(qū)上,故默認(rèn)情況下它是最合理的分區(qū)策略,也是我們最常用的分區(qū)策略之一。
隨機(jī)策略
所謂隨機(jī)就是我們隨意地將消息放置到任意一個(gè)分區(qū)上。如圖所示,9 條消息隨機(jī)分配到不同分區(qū)。
按消息鍵分配策略
一旦消息被定義了 Key,那么你就可以保證同一個(gè) Key 的所有消息都進(jìn)入到相同的分區(qū)里面,比如訂單 ID,那么綁定同一個(gè) 訂單 ID 的消息都會(huì)發(fā)布到同一個(gè)分區(qū),由于每個(gè)分區(qū)下的消息處理都是有順序的,故這個(gè)策略被稱為按消息鍵保序策略,如圖所示。
基于地理位置
這種策略一般只針對那些大規(guī)模的 Kafka 集群,特別是跨城市、跨國家甚至是跨大洲的集群。
我們就可以根據(jù) Broker 所在的 IP 地址實(shí)現(xiàn)定制化的分區(qū)策略。比如下面這段代碼:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream()
.filter(p -> isSouth(p.leader().host()))
.map(PartitionInfo::partition)
.findAny()
.get();
我們可以從所有分區(qū)中找出那些Leader副本在南方的所有分區(qū),然后隨機(jī)挑選一個(gè)進(jìn)行消息發(fā)送。
四、Segment 日志文件和稀疏索引
前面已經(jīng)介紹過,Kafka 的 Topic 可以分為多個(gè) Partition,每個(gè) Partition 有多個(gè)副本,你可以理解為副本才是存儲(chǔ)消息的物理存在。其實(shí)每個(gè)副本都是以日志(Log)的形式存儲(chǔ)。
碼樓:“日志文件過大怎么辦?”
為了解決單一日志文件過大的問題,kafka采用了分段(Segment)的形式進(jìn)行存儲(chǔ)。
所謂 Segment,就是當(dāng)一個(gè)日志文件大小到達(dá)一定條件之后,就新建一個(gè)新的 Segment,然后在新的Segment寫入數(shù)據(jù)。Topic、Partition、和日志的關(guān)系如圖 8 所示。
圖 8
一個(gè) segment 對應(yīng)磁盤上多個(gè)文件。
.index
: 消息的 offset 索引文件。.timeindex
: 消息的時(shí)間索引文件(0.8版本加入的)。.log
: 存儲(chǔ)實(shí)際的消息數(shù)據(jù)。.snapshot
: 記錄了 producer 的事務(wù)信息。.swap
: 用于 Segment 恢復(fù)。.txnindex
文件,記錄了中斷的事務(wù)信息。
.log
文件存儲(chǔ)實(shí)際的 message,kafka為每一個(gè)日志文件添加了2 個(gè)索引文件 .index
以及 .timeindex
。
segment 文件命名規(guī)則:partition 第一個(gè) segment 從 0 開始,后續(xù)每個(gè) segment 文件名為上一個(gè) segment 文件最后一條消息的 offset 值。數(shù)值最大為 64 位 long 大小,19 位數(shù)字字符長度,沒有數(shù)字用 0 填充。
碼樓:“為什么要有 .index 文件?”
為了提高查找消息的性能。kafka 為消息數(shù)據(jù)建了兩種稀疏索引,一種是方便 offset 查找的 .index 稀疏索引,還有一種是方便時(shí)間查找的 .timeindex 稀疏索引。
1.稀疏索引
Chaya:“為什么不創(chuàng)建一個(gè)哈希索引,從 offset 到物理消息日志文件偏移量的映射關(guān)系?”
萬萬不可,Kafka 作為海量數(shù)據(jù)處理的中間件,每秒高達(dá)幾百萬的消息寫入,這個(gè)哈希索引會(huì)把把內(nèi)存撐爆炸。
稀疏索引不會(huì)為每個(gè)記錄都保存索引,而是寫入一定的記錄之后才會(huì)增加一個(gè)索引值,具體這個(gè)間隔有多大則通過 log.index.interval.bytes
參數(shù)進(jìn)行控制,默認(rèn)大小為 4 KB,意味著 Kafka 至少寫入 4KB 消息數(shù)據(jù)之后,才會(huì)在索引文件中增加一個(gè)索引項(xiàng)。
哈希稀疏索引把消息劃分為多個(gè) block ,只索引每個(gè) block 第一條消息的 offset 即可 。
- Offset 偏移量:表示第幾個(gè)消息。
- position:消息在磁盤的物理位置。
Chaya:如果消費(fèi)者要查找 Offset 為 4 的消息,查找過程是怎樣的?
- 首先用二分法定位消息在哪個(gè) Segment ,Segment 文件命名是 Partition 第一個(gè) segment 從 0 開始,后續(xù)每個(gè) segment 文件名為上一個(gè) segment 文件最后一條消息的 offset 值。
- 打開這個(gè) Segment 對應(yīng)的 index 索引文件,用二分法查找 offset 不大于 4 的索引條目,對應(yīng)上圖第二條條目,也就是 offset = 3 的那個(gè)索引。通過索引我們可以知道 offset 為 4 的消息所在的日志文件磁盤物理位置為 495。
- 打開日志文件,從 Position 為 495 位置開始開始順序掃描文件,將掃描過程中每條消息的 offset 與 4 比較,直到找到 offset 為 4 的那條Message。
.timeindex 文件同理,只不過它的查找結(jié)果是 offset,之后還要在走一遍 .index 索引查找流程。
由于 kafka 設(shè)計(jì)為順序讀寫磁盤,因此遍歷區(qū)間的數(shù)據(jù)并對速度有太大的影響,而選擇稀疏索引還能節(jié)約大量的磁盤空間。
2.mmap
有了稀疏索引,當(dāng)給定一個(gè) offset 時(shí),Kafka 采用的是二分查找來掃描索引定位不大于 offset 的物理位移 position,再到日志文件找到目標(biāo)消息。
利用稀疏索引,已經(jīng)基本解決了高效查詢的問題,但是這個(gè)過程中仍然有進(jìn)一步的優(yōu)化空間,那便是通過 mmap(memory mapped files) 讀寫上面提到的稀疏索引文件,進(jìn)一步提高查詢消息的速度。
就是基于 JDK nio 包下的 MappedByteBuffer 的 map 函數(shù),將磁盤文件映射到內(nèi)存中。
進(jìn)程通過調(diào)用mmap系統(tǒng)函數(shù),將文件或物理內(nèi)存的一部分映射到其虛擬地址空間。這個(gè)過程中,操作系統(tǒng)會(huì)為映射的內(nèi)存區(qū)域分配一個(gè)虛擬地址,并將這個(gè)地址與文件或物理內(nèi)存的實(shí)際內(nèi)容關(guān)聯(lián)起來。
一旦內(nèi)存映射完成,進(jìn)程就可以通過指針直接訪問映射的內(nèi)存區(qū)域。這種訪問方式就像訪問普通內(nèi)存一樣簡單和高效。
圖引自《碼農(nóng)的荒島求生》
五、順序讀寫磁盤
碼樓:“不管如何,Kafka 讀寫消息都要讀寫磁盤,如何變快呢?”
磁盤就一定很慢么?人們普遍錯(cuò)誤地認(rèn)為硬盤很慢。然而,存儲(chǔ)介質(zhì)的性能,很大程度上依賴于數(shù)據(jù)被訪問的模式。
同樣在一塊普通的7200 RPM SATA硬盤上,隨機(jī)I/O(random I/O)與順序I/O相比,隨機(jī)I/O的性能要比順序I/O慢3到4個(gè)數(shù)量級(jí)。
合理的方式可以讓磁盤寫操作更加高效,減少了尋道時(shí)間和旋轉(zhuǎn)延遲。
碼樓,你還留著課本嗎?來,翻到講磁盤的章節(jié),讓我們回顧一下磁盤的運(yùn)行原理。
碼樓:“鬼還留著哦,課程還沒上到一半書就沒了。要不是考試俺眼神好,就掛科了?!?/p>
磁盤的運(yùn)行原理如圖所示。
硬盤在邏輯上被劃分為磁道、柱面以及扇區(qū)。硬盤的每個(gè)盤片的每個(gè)面都有一個(gè)讀寫磁頭。
完成一次磁盤 I/O ,需要經(jīng)過尋道、旋轉(zhuǎn)和數(shù)據(jù)傳輸三個(gè)步驟。
- 尋道:首先必須找到柱面,即磁頭需要移動(dòng)到相應(yīng)磁道,這個(gè)過程叫做尋道,所耗費(fèi)時(shí)間叫做尋道時(shí)間。尋道時(shí)間越短,I/O 操作越快,目前磁盤的平均尋道時(shí)間一般在 3-15ms。
- 旋轉(zhuǎn):磁盤旋轉(zhuǎn)將目標(biāo)扇區(qū)旋轉(zhuǎn)到磁頭下。這個(gè)過程耗費(fèi)的時(shí)間叫做旋轉(zhuǎn)時(shí)間。旋轉(zhuǎn)延遲取決于磁盤轉(zhuǎn)速,通常用磁盤旋轉(zhuǎn)一周所需時(shí)間的 1/2 表示。比如:7200rpm 的磁盤平均旋轉(zhuǎn)延遲大約為 60*1000/7200/2 = 4.17ms,而轉(zhuǎn)速為 15000rpm 的磁盤其平均旋轉(zhuǎn)延遲為 2ms。
- 數(shù)據(jù)傳輸:數(shù)據(jù)在磁盤與內(nèi)存之間的實(shí)際傳輸。
因此,如果在寫磁盤的時(shí)候省去尋道、旋轉(zhuǎn)可以極大地提高磁盤讀寫的性能。
Kafka 采用順序?qū)懳募姆绞絹硖岣叽疟P寫入性能。順序?qū)懳募樞?I/O 的時(shí)候,磁頭幾乎不用換道,或者換道的時(shí)間很短。減少了磁盤尋道和旋轉(zhuǎn)的次數(shù)。磁頭再也不用在磁道上亂舞了,而是一路向前飛速前行。
Kafka 中每個(gè)Partition 是一個(gè)有序的,不可變的消息序列,新的消息可以不斷追加到 Partition 的末尾,在 Kafka 中 Partition 只是一個(gè)邏輯概念,每個(gè)Partition 劃分為多個(gè) Segment,每個(gè) Segment 對應(yīng)一個(gè)物理文件,Kafka 對 Segment 文件追加寫,這就是順序?qū)懳募?/p>
每條消息在發(fā)送前會(huì)根據(jù)負(fù)載均衡策略計(jì)算出要發(fā)往的目標(biāo) Partition 中,broker 收到消息之后把該條消息按照追加的方式順序?qū)懭?Partition 的日志文件中。
如下圖所示,可以看到磁盤順序?qū)懙男阅苓h(yuǎn)高于磁盤隨機(jī)寫,甚至比內(nèi)存隨機(jī)寫還快。
六、PageCache
Chaya:“碼哥,使用稀疏索引和 mmap 內(nèi)存映射技術(shù)提高讀消息的性能;Topic Partition 加磁盤順序?qū)懗志没⒌脑O(shè)計(jì)已經(jīng)很快了,但是與內(nèi)存順序?qū)戇€是慢了,還有優(yōu)化空間么?”
小姑娘,你的想法很好,作為快到令人發(fā)指的 Kafka,確實(shí)想到了一個(gè)方式來提高讀寫寫磁盤文件的性能。這就是接下來的主角 Page Cache 。
簡而言之:利用操作系統(tǒng)的緩存技術(shù),在讀寫磁盤日志文件時(shí),操作的是內(nèi)存,而不是文件,由操作系統(tǒng)決定什么在某個(gè)時(shí)間將 Page Cache 的數(shù)據(jù)刷寫到磁盤中。
- Producer 發(fā)送消息到 Broker 時(shí),Broker 會(huì)使用
pwrite()
系統(tǒng)調(diào)用寫入數(shù)據(jù),此時(shí)數(shù)據(jù)都會(huì)先寫入page cache
。 - Consumer 消費(fèi)消息時(shí),Broker 使用
sendfile()
系統(tǒng)調(diào)用函數(shù),通零拷貝技術(shù)地將 Page Cache 中的數(shù)據(jù)傳輸?shù)?Broker 的 Socket buffer,再通過網(wǎng)絡(luò)傳輸?shù)?Consumer。 - leader 與 follower 之間的同步,與上面 consumer 消費(fèi)數(shù)據(jù)的過程是同理的。
Kafka重度依賴底層操作系統(tǒng)提供的PageCache功能。當(dāng)上層有寫操作時(shí),操作系統(tǒng)只是將數(shù)據(jù)寫入PageCache,同時(shí)標(biāo)記Page屬性為Dirty。
當(dāng)讀操作發(fā)生時(shí),先從PageCache中查找,如果發(fā)生缺頁才進(jìn)行磁盤調(diào)度,最終返回需要的數(shù)據(jù)。
于是我們得到一個(gè)重要結(jié)論:如果Kafka producer的生產(chǎn)速率與consumer的消費(fèi)速率相差不大,那么就能幾乎只靠對broker page cache的讀寫完成整個(gè)生產(chǎn)-消費(fèi)過程,磁盤訪問非常少。
實(shí)際上PageCache是把盡可能多的空閑內(nèi)存都當(dāng)做了磁盤緩存來使用。
七、數(shù)據(jù)壓縮和批量處理
數(shù)據(jù)壓縮在 Kafka 中有助于減少磁盤空間的使用和網(wǎng)絡(luò)帶寬的消耗,從而提升整體性能。
通過減少消息的大小,壓縮可以顯著降低生產(chǎn)者和消費(fèi)者之間的數(shù)據(jù)傳輸時(shí)間。
Chaya:Kafka 支持的壓縮算法有哪些?
在Kafka 2.1.0版本之前,Kafka支持3種壓縮算法:GZIP、Snappy和LZ4。從2.1.0開始,Kafka正式支持Zstandard算法(簡寫為zstd)。
Chaya:這么多壓縮算法,我如何選擇?
一個(gè)壓縮算法的優(yōu)劣,有兩個(gè)重要的指標(biāo):壓縮比,文件壓縮前的大小與壓縮后的大小之比,比如源文件占用 1000 M 內(nèi)存,經(jīng)過壓縮后變成了 200 M,壓縮比 = 1000 /200 = 5,壓縮比越高越高;另一個(gè)指標(biāo)是壓縮/解壓縮吞吐量,比如每秒能壓縮或者解壓縮多少 M 數(shù)據(jù),吞吐量越高越好。
1.生產(chǎn)者壓縮
Kafka 的數(shù)據(jù)壓縮主要在生產(chǎn)者端進(jìn)行。具體步驟如下:
- 生產(chǎn)者配置壓縮方式:在 KafkaProducer 配置中設(shè)置
compression.type
參數(shù),可以選擇gzip
、snappy
、lz4
或zstd
。 - 消息壓縮:生產(chǎn)者將消息批量收集到一個(gè)
batch
中,然后對整個(gè)batch
進(jìn)行壓縮。這種批量壓縮方式可以獲得更高的壓縮率。 - 壓縮消息存儲(chǔ):壓縮后的
batch
以壓縮格式存儲(chǔ)在 Kafka 的主題(Topic)分區(qū)中。 - 消費(fèi)者解壓縮:消費(fèi)者從 Kafka 主題中獲取消息時(shí),首先對接收到的
batch
進(jìn)行解壓縮,然后處理其中的每一條消息。
2.解壓縮
有壓縮,那必有解壓縮。通常情況下,Producer 發(fā)送壓縮后的消息到 Broker ,原樣保存起來。
Consumer 消費(fèi)這些消息的時(shí)候,Broker 原樣發(fā)給 Consumer,由 Consumer 執(zhí)行解壓縮還原出原本的信息。
Chaya:Consumer 咋知道用什么壓縮算法解壓縮?
Kafka會(huì)將啟用了哪種壓縮算法封裝進(jìn)消息集合中,這樣當(dāng)Consumer讀取到消息集合時(shí),它自然就知道了這些消息使用的是哪種壓縮算法。
總之一句話:Producer端壓縮、Broker端保持、Consumer端解壓縮。
3.批量數(shù)據(jù)處理
Kafka Producer 向 Broker 發(fā)送消息不是一條消息一條消息的發(fā)送,將多條消息打包成一個(gè)批次發(fā)送。
批量數(shù)據(jù)處理可以顯著提高 Kafka 的吞吐量并減少網(wǎng)絡(luò)開銷。
Kafka Producer 的執(zhí)行流程如下圖所示:
發(fā)送消息依次經(jīng)過以下處理器:
- Serialize:鍵和值都根據(jù)傳遞的序列化器進(jìn)行序列化。優(yōu)秀的序列化方式可以提高網(wǎng)絡(luò)傳輸?shù)男省?/li>
- Partition:決定將消息寫入主題的哪個(gè)分區(qū),默認(rèn)情況下遵循 murmur2 算法。自定義分區(qū)程序也可以傳遞給生產(chǎn)者,以控制應(yīng)將消息寫入哪個(gè)分區(qū)。
- Compression:默認(rèn)情況下,在 Kafka 生產(chǎn)者中不啟用壓縮。Compression 不僅可以更快地從生產(chǎn)者傳輸?shù)酱?,還可以在復(fù)制過程中進(jìn)行更快的傳輸。壓縮有助于提高吞吐量,降低延遲并提高磁盤利用率。
- Record Accumulator:
Accumulate
顧名思義,就是一個(gè)消息累計(jì)器。其內(nèi)部為每個(gè) Partition 維護(hù)一個(gè)Deque
雙端隊(duì)列,隊(duì)列保存將要發(fā)送的 Batch批次數(shù)據(jù),Accumulate
將數(shù)據(jù)累計(jì)到一定數(shù)量,或者在一定過期時(shí)間內(nèi),便將數(shù)據(jù)以批次的方式發(fā)送出去。記錄被累積在主題每個(gè)分區(qū)的緩沖區(qū)中。根據(jù)生產(chǎn)者批次大小屬性將記錄分組。主題中的每個(gè)分區(qū)都有一個(gè)單獨(dú)的累加器 / 緩沖區(qū)。 - Group Send:記錄累積器中分區(qū)的批次按將它們發(fā)送到的代理分組。批處理中的記錄基于
batch.size
和linger.ms
屬性發(fā)送到代理。記錄由生產(chǎn)者根據(jù)兩個(gè)條件發(fā)送。當(dāng)達(dá)到定義的批次大小或達(dá)到定義的延遲時(shí)間時(shí)。 - Send Thread:發(fā)送線程,從 Accumulator 的隊(duì)列取出待發(fā)送的 Batch 批次消息發(fā)送到 Broker。
- Broker 端處理:Kafka Broker 接收到
batch
后,將其存儲(chǔ)在對應(yīng)的主題分區(qū)中。 - 消費(fèi)者端的批量消費(fèi):消費(fèi)者可以配置一次拉取多條消息的數(shù)量,通過
fetch.min.bytes
和fetch.max.wait.ms
參數(shù)控制批量大小和等待時(shí)間。
八、無鎖輕量級(jí) offset
Offset 是 Kafka 中的一個(gè)重要概念,用于標(biāo)識(shí)消息在分區(qū)中的位置。
每個(gè)分區(qū)中的消息都有一個(gè)唯一的 offset,消費(fèi)者通過維護(hù)自己的 offset 來確保準(zhǔn)確消費(fèi)消息。offset 的高效管理對于 Kafka 的性能至關(guān)重要。
offset 是從 0 開始的,每當(dāng)有新的消息寫入分區(qū)時(shí),offset 就會(huì)加 1。offset 是不可變的,即使消息被刪除或過期,offset 也不會(huì)改變或重用。
Consumer需要向Kafka匯報(bào)自己的位移數(shù)據(jù),這個(gè)匯報(bào)過程被稱為提交位移(Committing Offsets)。因?yàn)镃onsumer能夠同時(shí)消費(fèi)多個(gè)partition的數(shù)據(jù),所以位移的提交實(shí)際上是在partition粒度上進(jìn)行的,即Consumer需要為分配給它的每個(gè)partition提交各自的位移數(shù)據(jù)。
提交位移主要是為了表征Consumer的消費(fèi)進(jìn)度,這樣當(dāng)Consumer發(fā)生故障重啟之后,就能夠從Kafka中讀取之前提交的位移值,然后從相應(yīng)的位移處繼續(xù)消費(fèi)。
在傳統(tǒng)的消息隊(duì)列系統(tǒng)中,offset 通常需要通過鎖機(jī)制來保證一致性,但這會(huì)帶來性能瓶頸。Kafka 的設(shè)計(jì)哲學(xué)是盡量減少鎖的使用,以提升并發(fā)處理能力和整體性能。
1.無鎖設(shè)計(jì)思想
Kafka 在 offset 設(shè)計(jì)中采用了一系列無鎖的技術(shù),使其能夠在高并發(fā)的環(huán)境中保持高效。
- 順序?qū)懭?/strong>:Kafka 使用順序?qū)懭氲姆绞綄⑾⒆芳拥饺罩疚募哪┪玻苊饬宋募恢玫念l繁變動(dòng),從而減少了鎖的使用。
- MMAP 內(nèi)存映射文件:Kafka 使用內(nèi)存映射文件(Memory Mapped File)來訪問日志數(shù)據(jù)和索引文件。這種方式使得文件數(shù)據(jù)可以直接映射到進(jìn)程的虛擬地址空間中,從而減少了系統(tǒng)調(diào)用的開銷,提高了數(shù)據(jù)訪問的效率。
- 零拷貝:Kafka 使用零拷貝(Zero Copy)技術(shù),將數(shù)據(jù)從磁盤直接傳輸?shù)骄W(wǎng)絡(luò),繞過了用戶態(tài)的復(fù)制過程,大大提高了數(shù)據(jù)傳輸?shù)男省?/li>
- 批量處理:Kafka 支持批量處理消息,在一個(gè)批次中同時(shí)處理多個(gè)消息,減少了網(wǎng)絡(luò)和 I/O 的開銷。
2.消費(fèi)者 Offset 管理流程
graph TD;
A[啟動(dòng)消費(fèi)者] --> B[從分區(qū)讀取消息];
B --> C[處理消息];
C --> D{是否成功處理?};
D --> |是| E[更新 Offset];
D --> |否| F[記錄失敗, 重新處理];
E --> G[提交 Offset];
G --> H[繼續(xù)處理下一個(gè)消息];
F --> B;
H --> B;
- 啟動(dòng)消費(fèi)者:消費(fèi)者啟動(dòng)并訂閱 Kafka 主題的某個(gè)分區(qū)。
- 從分區(qū)讀取消息:消費(fèi)者從指定分區(qū)中讀取消息。
- 處理消息:消費(fèi)者處理讀取到的消息。
- 是否成功處理:判斷消息是否成功處理。
- 如果成功處理,更新 Offset。
- 如果處理失敗,記錄失敗原因并準(zhǔn)備重新處理。
- 更新 Offset:成功處理消息后,更新 Offset 以記錄已處理消息的位置。
- 提交 Offset:將更新后的 Offset 提交到 Kafka,以確保消息處理進(jìn)度的持久化。
- 繼續(xù)處理下一個(gè)消息:提交 Offset 后,繼續(xù)讀取并處理下一個(gè)消息。
Kafka 通過無鎖輕量級(jí) offset 的設(shè)計(jì),實(shí)現(xiàn)了高性能、高吞吐和低延時(shí)的目標(biāo)。
九、總結(jié)
Kafka 通過無鎖輕量級(jí) offset 的設(shè)計(jì),實(shí)現(xiàn)了高性能、高吞吐和低延時(shí)的目標(biāo)。
其 Reactor I/O 網(wǎng)絡(luò)模型、磁盤順序?qū)懭?、?nèi)存映射文件、零拷貝、數(shù)據(jù)壓縮和批量處理等技術(shù),為 Kafka 提供了強(qiáng)大的數(shù)據(jù)處理能力和高效的消息隊(duì)列服務(wù)。
- Reactor I/O 網(wǎng)絡(luò)模型:通過 I/O 多路復(fù)用機(jī)制,Kafka 能夠同時(shí)處理大量的網(wǎng)絡(luò)連接請求,而不需要為每個(gè)連接創(chuàng)建一個(gè)線程,從而節(jié)省了系統(tǒng)資源。
- 順序?qū)懭?/strong>:Kafka 使用順序?qū)懭氲姆绞綄⑾⒆芳拥饺罩疚募哪┪?,避免了文件位置的頻繁變動(dòng),從而減少了鎖的使用。
- MMAP 內(nèi)存映射文件:Kafka 使用內(nèi)存映射文件(Memory Mapped File)來訪問日志數(shù)據(jù)和索引文件。這種方式使得文件數(shù)據(jù)可以直接映射到進(jìn)程的虛擬地址空間中,從而減少了系統(tǒng)調(diào)用的開銷,提高了數(shù)據(jù)訪問的效率。
- 零拷貝:Kafka 使用零拷貝(Zero Copy)技術(shù),將數(shù)據(jù)從磁盤直接傳輸?shù)骄W(wǎng)絡(luò),繞過了用戶態(tài)的復(fù)制過程,大大提高了數(shù)據(jù)傳輸?shù)男省?/li>
- 數(shù)據(jù)壓縮和批量處理:數(shù)據(jù)壓縮在 Kafka 中有助于減少磁盤空間的使用和網(wǎng)絡(luò)帶寬的消耗,從而提升整體性能。;Kafka 支持批量處理消息,在一個(gè)批次中同時(shí)處理多個(gè)消息,減少了網(wǎng)絡(luò)和 I/O 的開銷。