自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Kafka 為什么這么快的七大秘訣,漲知識(shí)了

云計(jì)算 Kafka
Kafka Reactor I/O 網(wǎng)絡(luò)模型是一種非阻塞 I/O 模型,利用事件驅(qū)動(dòng)機(jī)制來處理網(wǎng)絡(luò)請求。該模型通過 Reactor 模式實(shí)現(xiàn),即一個(gè)或多個(gè) I/O 多路復(fù)用器(如 Java 的 Selector)監(jiān)聽多個(gè)通道的事件,當(dāng)某個(gè)通道準(zhǔn)備好進(jìn)行 I/O 操作時(shí),觸發(fā)相應(yīng)的事件處理器進(jìn)行處理。

我們都知道 Kafka 是基于磁盤進(jìn)行存儲(chǔ)的,但 Kafka 官方又稱其具有高性能、高吞吐、低延時(shí)的特點(diǎn),其吞吐量動(dòng)輒幾十上百萬。

在座的靚仔和靚女們是不是有點(diǎn)困惑了,一般認(rèn)為在磁盤上讀寫數(shù)據(jù)是會(huì)降低性能的,因?yàn)閷ぶ窌?huì)比較消耗時(shí)間。那 Kafka 又是怎么做到其吞吐量動(dòng)輒幾十上百萬的呢?

一、Kafka Reactor I/O 網(wǎng)絡(luò)模型

Kafka Reactor I/O 網(wǎng)絡(luò)模型是一種非阻塞 I/O 模型,利用事件驅(qū)動(dòng)機(jī)制來處理網(wǎng)絡(luò)請求。

該模型通過 Reactor 模式實(shí)現(xiàn),即一個(gè)或多個(gè) I/O 多路復(fù)用器(如 Java 的 Selector)監(jiān)聽多個(gè)通道的事件,當(dāng)某個(gè)通道準(zhǔn)備好進(jìn)行 I/O 操作時(shí),觸發(fā)相應(yīng)的事件處理器進(jìn)行處理。

這種模型在高并發(fā)場景下具有很高的效率,能夠同時(shí)處理大量的網(wǎng)絡(luò)連接請求,而不需要為每個(gè)連接創(chuàng)建一個(gè)線程,從而節(jié)省系統(tǒng)資源。

Reactor 線程模型如圖 2 所示。

圖 2

Reacotr 模型主要分為三個(gè)角色。

  • Reactor:把 I/O 事件根據(jù)類型分配給分配給對應(yīng)的 Handler 處理。
  • Acceptor:處理客戶端連接事件。
  • Handler:處理讀寫等任務(wù)。

Kafka 基于 Reactor 模型架構(gòu)如圖 3 所示。

圖 3

Kafka 的網(wǎng)絡(luò)通信模型基于 NIO(New Input/Output)庫,通過 Reactor 模式實(shí)現(xiàn),具體包括以下幾個(gè)關(guān)鍵組件:

  • SocketServer:管理所有的網(wǎng)絡(luò)連接,包括初始化 Acceptor 和 Processor 線程。
  • Acceptor:監(jiān)聽客戶端的連接請求,并將其分配給 Processor 線程。Acceptor 使用 Java NIO 的 Selector 進(jìn)行 I/O 多路復(fù)用,并注冊 OP_ACCEPT 事件來監(jiān)聽新的連接請求。每當(dāng)有新的連接到達(dá)時(shí),Acceptor 會(huì)接受連接并創(chuàng)建一個(gè) SocketChannel,然后將其分配給一個(gè) Processor 線程進(jìn)行處理。
  • Processor:處理具體的 I/O 操作,包括讀取客戶端請求和寫入響應(yīng)數(shù)據(jù)。Processor 同樣使用 Selector 進(jìn)行 I/O 多路復(fù)用,注冊 OP_READ 和 OP_WRITE 事件來處理讀寫操作。每個(gè) Processor 線程都有一個(gè)獨(dú)立的 Selector,用于管理多個(gè) SocketChannel。
  • RequestChannel:充當(dāng) Processor 和請求處理線程之間的緩沖區(qū),存儲(chǔ)請求和響應(yīng)數(shù)據(jù)。Processor 將讀取的請求放入 RequestChannel 的請求隊(duì)列,而請求處理線程則從該隊(duì)列中取出請求進(jìn)行處理。
  • KafkaRequestHandler:請求處理線程,從 RequestChannel 中讀取請求,調(diào)用 KafkaApis 進(jìn)行業(yè)務(wù)邏輯處理,并將響應(yīng)放回 RequestChannel 的響應(yīng)隊(duì)列。KafkaRequestHandler 線程池中的線程數(shù)量由配置參數(shù) num.io.threads 決定。

圖 4

Chaya:該模型和如何提高 kafka 的性能和效率?

高并發(fā)處理能力:通過 I/O 多路復(fù)用機(jī)制,Kafka 能夠同時(shí)處理大量的網(wǎng)絡(luò)連接請求,而不需要為每個(gè)連接創(chuàng)建一個(gè)線程,從而節(jié)省了系統(tǒng)資源。

低延遲:非阻塞 I/O 操作避免了線程的阻塞等待,使得 I/O 操作能夠更快地完成,從而降低了系統(tǒng)的響應(yīng)延遲。

資源節(jié)省:通過減少線程的數(shù)量和上下文切換,Kafka 在處理高并發(fā)請求時(shí)能夠更有效地利用 CPU 和內(nèi)存資源。

擴(kuò)展性強(qiáng):Reactor 模式的分層設(shè)計(jì)使得 Kafka 的網(wǎng)絡(luò)模塊具有很好的擴(kuò)展性,可以根據(jù)需要增加更多的 I/O 線程或調(diào)整事件處理器的邏輯。

二、零拷貝技術(shù)的運(yùn)用

零拷貝技術(shù)是一種計(jì)算機(jī)操作系統(tǒng)技術(shù),用于在內(nèi)存和存儲(chǔ)設(shè)備之間進(jìn)行數(shù)據(jù)傳輸時(shí),避免 CPU 的參與,從而減少 CPU 的負(fù)擔(dān)并提高數(shù)據(jù)傳輸效率。

Kafka 使用零拷貝技術(shù)來優(yōu)化數(shù)據(jù)傳輸,特別是在生產(chǎn)者將數(shù)據(jù)寫入 Kafka 和消費(fèi)者從 Kafka 讀取數(shù)據(jù)的過程中。在 Kafka 中,零拷貝主要通過以下幾種方式實(shí)現(xiàn):

  • sendfile() 系統(tǒng)調(diào)用:在發(fā)送數(shù)據(jù)時(shí),Kafka 使用操作系統(tǒng)的 sendfile() 系統(tǒng)調(diào)用直接將文件從磁盤發(fā)送到網(wǎng)絡(luò)套接字,而無需將數(shù)據(jù)復(fù)制到應(yīng)用程序的用戶空間。這減少了數(shù)據(jù)復(fù)制次數(shù),提高了傳輸效率。
  • 文件內(nèi)存映射(Memory-Mapped Files):Kafka 使用文件內(nèi)存映射技術(shù)(mmap),將磁盤上的日志文件映射到內(nèi)存中,使得讀寫操作可以在內(nèi)存中直接進(jìn)行,無需進(jìn)行額外的數(shù)據(jù)復(fù)制。

比如 Broker 讀取磁盤數(shù)據(jù)并把數(shù)據(jù)發(fā)送給 Consumer 的過程,傳統(tǒng) I/O 經(jīng)歷以下步驟。

  1. 讀取數(shù)據(jù):通過read 系統(tǒng)調(diào)用將磁盤數(shù)據(jù)通過 DMA copy 到內(nèi)核空間緩沖區(qū)(Read buffer)。
  2. 拷貝數(shù)據(jù):將數(shù)據(jù)從內(nèi)核空間緩沖區(qū)(Read buffer) 通過 CPU copy 到用戶空間緩沖區(qū)(Application buffer)。
  3. 寫入數(shù)據(jù):通過write()系統(tǒng)調(diào)用將數(shù)據(jù)從用戶空間緩沖區(qū)(Application) CPU copy 到內(nèi)核空間的網(wǎng)絡(luò)緩沖區(qū)(Socket buffer)。
  4. 發(fā)送數(shù)據(jù):將內(nèi)核空間的網(wǎng)絡(luò)緩沖區(qū)(Socket buffer)DMA copy 到網(wǎng)卡目標(biāo)端口,通過網(wǎng)卡將數(shù)據(jù)發(fā)送到目標(biāo)主機(jī)。

這一過程經(jīng)過的四次 copy 如圖 5 所示。

圖 5

Chaya:零拷貝技術(shù)如何提高 Kakfa 的性能?

零拷貝技術(shù)通過減少 CPU 負(fù)擔(dān)和內(nèi)存帶寬消耗,提高了 Kakfa 性能。

  • 降低 CPU 使用率:由于數(shù)據(jù)不需要在內(nèi)核空間和用戶空間之間多次復(fù)制,CPU 的參與減少,從而降低了 CPU 使用率,騰出更多的 CPU 資源用于其他任務(wù)。
  • 提高數(shù)據(jù)傳輸速度:直接從磁盤到網(wǎng)絡(luò)的傳輸路徑減少了中間步驟,使得數(shù)據(jù)傳輸更加高效,延遲更低。
  • 減少內(nèi)存帶寬消耗:通過減少數(shù)據(jù)在內(nèi)存中的復(fù)制次數(shù),降低了內(nèi)存帶寬的消耗,使得系統(tǒng)能夠處理更多的并發(fā)請求。

三、Partition 并發(fā)和分區(qū)負(fù)載均衡

在說 Topic patition 分區(qū)并發(fā)之前,我們先了解下 kafka 架構(gòu)設(shè)計(jì)。

1.Kafka 架構(gòu)

一個(gè)典型的 Kafka 架構(gòu)包含以下幾個(gè)重要組件,如圖 6 所示。

圖 6

  • Producer(生產(chǎn)者):發(fā)送消息的一方,負(fù)責(zé)發(fā)布消息到 Kafka 主題(Topic)。
  • Consumer(消費(fèi)者):接受消息的一方,訂閱主題并處理消息。Kafka 有ConsumerGroup 的概念,每個(gè)Consumer 只能消費(fèi)所分配到的 Partition 的消息,每一個(gè)Partition只能被一個(gè)ConsumerGroup 中的一個(gè)Consumer 所消費(fèi),所以同一個(gè)ConsumerGroup 中Consumer 的數(shù)量如果超過了Partiton 的數(shù)量,將會(huì)出現(xiàn)有些Consumer 分配不到 partition 消費(fèi)。
  • Broker(代理):服務(wù)代理節(jié)點(diǎn),Kafka 集群中的一臺(tái)服務(wù)器就是一個(gè) broker,可以水平無限擴(kuò)展,同一個(gè) Topic 的消息可以分布在多個(gè) broker 中。
  • Topic(主題)與 Partition(分區(qū)) :Kafka 中的消息以 Topic 為單位進(jìn)行劃分,生產(chǎn)者將消息發(fā)送到特定的 Topic,而消費(fèi)者負(fù)責(zé)訂閱 Topic 的消息并進(jìn)行消費(fèi)。圖中 TopicA 有三個(gè) Partiton(TopicA-par0、TopicA-par1、TopicA-par2)
    為了提升整個(gè)集群的吞吐量,Topic 在物理上還可以細(xì)分多個(gè)Partition,一個(gè) Partition 在磁盤上對應(yīng)一個(gè)文件夾。
  • Replica(副本):副本,是 Kafka 保證數(shù)據(jù)高可用的方式,Kafka 同一 Partition 的數(shù)據(jù)可以在多 Broker 上存在多個(gè)副本,通常只有 leader 副本對外提供讀寫服務(wù),當(dāng) leader副本所在 broker 崩潰或發(fā)生網(wǎng)絡(luò)一場,Kafka 會(huì)在 Controller 的管理下會(huì)重新選擇新的 Leader 副本對外提供讀寫服務(wù)。
  • ZooKeeper:管理 Kafka 集群的元數(shù)據(jù)和分布式協(xié)調(diào)。

2.Topic 主題

Topic 是 Kafka 中數(shù)據(jù)的邏輯分類單元,可以理解成一個(gè)隊(duì)列。Broker 是所有隊(duì)列部署的機(jī)器,Producer 將消息發(fā)送到特定的 Topic,而 Consumer 則從特定的 Topic 中消費(fèi)消息。

3.Partition

為了提高并行處理能力和擴(kuò)展性,Kafka 將一個(gè) Topic 分為多個(gè) Partition。每個(gè) Partition 是一個(gè)有序的消息隊(duì)列,消息在 Partition 內(nèi)部是有序的,但在不同的 Partition 之間沒有順序保證。

Producer 可以并行地將消息發(fā)送到不同的 Partition,Consumer 也可以并行地消費(fèi)不同的 Partition,從而提升整體處理能力。

因此,可以說,每增加一個(gè) Paritition 就增加了一個(gè)消費(fèi)并發(fā)。Partition的引入不僅提高了系統(tǒng)的可擴(kuò)展性,還使得數(shù)據(jù)處理更加靈活。

4.Partition 分區(qū)策略

碼樓:“生產(chǎn)者將消息發(fā)送到哪個(gè)分區(qū)是如何實(shí)現(xiàn)的?不合理的分配會(huì)導(dǎo)致消息集中在某些 Broker 上,豈不是完?duì)僮??!?/p>

主要有以下幾種分區(qū)策略:

  • 輪詢策略:也稱Round-robin策略,即順序分配。
  • 隨機(jī)策略:也稱Randomness策略。所謂隨機(jī)就是我們隨意地將消息放置到任意一個(gè)分區(qū)上。
  • 按消息鍵保序策略。
  • 基于地理位置分區(qū)策略。

輪詢策略

比如一個(gè) Topic 下有 3個(gè)分區(qū),那么第一條消息被發(fā)送到分區(qū)0,第二條被發(fā)送到分區(qū)1,第三條被發(fā)送到分區(qū)2,以此類推。

當(dāng)生產(chǎn)第4條消息時(shí)又會(huì)重新開始,即將其分配到分區(qū)0,如圖 5 所示。

輪詢策略有非常優(yōu)秀的負(fù)載均衡表現(xiàn),它總是能保證消息最大限度地被平均分配到所有分區(qū)上,故默認(rèn)情況下它是最合理的分區(qū)策略,也是我們最常用的分區(qū)策略之一。

隨機(jī)策略

所謂隨機(jī)就是我們隨意地將消息放置到任意一個(gè)分區(qū)上。如圖所示,9 條消息隨機(jī)分配到不同分區(qū)。

按消息鍵分配策略

一旦消息被定義了 Key,那么你就可以保證同一個(gè) Key 的所有消息都進(jìn)入到相同的分區(qū)里面,比如訂單 ID,那么綁定同一個(gè) 訂單 ID 的消息都會(huì)發(fā)布到同一個(gè)分區(qū),由于每個(gè)分區(qū)下的消息處理都是有順序的,故這個(gè)策略被稱為按消息鍵保序策略,如圖所示。

基于地理位置

這種策略一般只針對那些大規(guī)模的 Kafka 集群,特別是跨城市、跨國家甚至是跨大洲的集群。

我們就可以根據(jù) Broker 所在的 IP 地址實(shí)現(xiàn)定制化的分區(qū)策略。比如下面這段代碼:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream()
  .filter(p -> isSouth(p.leader().host()))
  .map(PartitionInfo::partition)
  .findAny()
  .get();

我們可以從所有分區(qū)中找出那些Leader副本在南方的所有分區(qū),然后隨機(jī)挑選一個(gè)進(jìn)行消息發(fā)送。

四、Segment 日志文件和稀疏索引

前面已經(jīng)介紹過,Kafka 的 Topic 可以分為多個(gè) Partition,每個(gè) Partition 有多個(gè)副本,你可以理解為副本才是存儲(chǔ)消息的物理存在。其實(shí)每個(gè)副本都是以日志(Log)的形式存儲(chǔ)。

碼樓:“日志文件過大怎么辦?”

為了解決單一日志文件過大的問題,kafka采用了分段(Segment)的形式進(jìn)行存儲(chǔ)。

所謂 Segment,就是當(dāng)一個(gè)日志文件大小到達(dá)一定條件之后,就新建一個(gè)新的 Segment,然后在新的Segment寫入數(shù)據(jù)。Topic、Partition、和日志的關(guān)系如圖 8 所示。

圖 8

一個(gè) segment 對應(yīng)磁盤上多個(gè)文件。

  • .index : 消息的 offset 索引文件。
  • .timeindex : 消息的時(shí)間索引文件(0.8版本加入的)。
  • .log  : 存儲(chǔ)實(shí)際的消息數(shù)據(jù)。
  • .snapshot : 記錄了 producer 的事務(wù)信息。
  • .swap : 用于 Segment 恢復(fù)。
  • .txnindex 文件,記錄了中斷的事務(wù)信息。

.log 文件存儲(chǔ)實(shí)際的 message,kafka為每一個(gè)日志文件添加了2 個(gè)索引文件 .index以及 .timeindex

segment 文件命名規(guī)則:partition 第一個(gè) segment 從 0 開始,后續(xù)每個(gè) segment 文件名為上一個(gè) segment 文件最后一條消息的 offset 值。數(shù)值最大為 64 位 long 大小,19 位數(shù)字字符長度,沒有數(shù)字用 0 填充。

碼樓:“為什么要有 .index 文件?”

為了提高查找消息的性能。kafka 為消息數(shù)據(jù)建了兩種稀疏索引,一種是方便 offset 查找的 .index 稀疏索引,還有一種是方便時(shí)間查找的 .timeindex 稀疏索引。

1.稀疏索引

Chaya:“為什么不創(chuàng)建一個(gè)哈希索引,從 offset 到物理消息日志文件偏移量的映射關(guān)系?”

萬萬不可,Kafka 作為海量數(shù)據(jù)處理的中間件,每秒高達(dá)幾百萬的消息寫入,這個(gè)哈希索引會(huì)把把內(nèi)存撐爆炸。

稀疏索引不會(huì)為每個(gè)記錄都保存索引,而是寫入一定的記錄之后才會(huì)增加一個(gè)索引值,具體這個(gè)間隔有多大則通過 log.index.interval.bytes 參數(shù)進(jìn)行控制,默認(rèn)大小為 4 KB,意味著 Kafka 至少寫入 4KB 消息數(shù)據(jù)之后,才會(huì)在索引文件中增加一個(gè)索引項(xiàng)。

哈希稀疏索引把消息劃分為多個(gè) block ,只索引每個(gè) block 第一條消息的 offset 即可 。

  • Offset 偏移量:表示第幾個(gè)消息。
  • position:消息在磁盤的物理位置。

Chaya:如果消費(fèi)者要查找 Offset 為 4 的消息,查找過程是怎樣的?

  • 首先用二分法定位消息在哪個(gè) Segment ,Segment 文件命名是 Partition 第一個(gè) segment 從 0 開始,后續(xù)每個(gè) segment 文件名為上一個(gè) segment 文件最后一條消息的 offset 值。
  • 打開這個(gè) Segment 對應(yīng)的 index 索引文件,用二分法查找 offset 不大于 4 的索引條目,對應(yīng)上圖第二條條目,也就是 offset = 3 的那個(gè)索引。通過索引我們可以知道 offset 為 4 的消息所在的日志文件磁盤物理位置為 495。
  • 打開日志文件,從 Position 為 495 位置開始開始順序掃描文件,將掃描過程中每條消息的 offset 與 4 比較,直到找到 offset 為 4 的那條Message。

.timeindex 文件同理,只不過它的查找結(jié)果是 offset,之后還要在走一遍 .index 索引查找流程。

由于 kafka 設(shè)計(jì)為順序讀寫磁盤,因此遍歷區(qū)間的數(shù)據(jù)并對速度有太大的影響,而選擇稀疏索引還能節(jié)約大量的磁盤空間。

2.mmap

有了稀疏索引,當(dāng)給定一個(gè) offset 時(shí),Kafka 采用的是二分查找來掃描索引定位不大于 offset 的物理位移 position,再到日志文件找到目標(biāo)消息。

利用稀疏索引,已經(jīng)基本解決了高效查詢的問題,但是這個(gè)過程中仍然有進(jìn)一步的優(yōu)化空間,那便是通過 mmap(memory mapped files) 讀寫上面提到的稀疏索引文件,進(jìn)一步提高查詢消息的速度

就是基于 JDK nio 包下的 MappedByteBuffer 的 map 函數(shù),將磁盤文件映射到內(nèi)存中。

進(jìn)程通過調(diào)用mmap系統(tǒng)函數(shù),將文件或物理內(nèi)存的一部分映射到其虛擬地址空間。這個(gè)過程中,操作系統(tǒng)會(huì)為映射的內(nèi)存區(qū)域分配一個(gè)虛擬地址,并將這個(gè)地址與文件或物理內(nèi)存的實(shí)際內(nèi)容關(guān)聯(lián)起來。

一旦內(nèi)存映射完成,進(jìn)程就可以通過指針直接訪問映射的內(nèi)存區(qū)域。這種訪問方式就像訪問普通內(nèi)存一樣簡單和高效。

圖引自《碼農(nóng)的荒島求生》

五、順序讀寫磁盤

碼樓:“不管如何,Kafka 讀寫消息都要讀寫磁盤,如何變快呢?”

磁盤就一定很慢么?人們普遍錯(cuò)誤地認(rèn)為硬盤很慢。然而,存儲(chǔ)介質(zhì)的性能,很大程度上依賴于數(shù)據(jù)被訪問的模式。

同樣在一塊普通的7200 RPM SATA硬盤上,隨機(jī)I/O(random I/O)與順序I/O相比,隨機(jī)I/O的性能要比順序I/O慢3到4個(gè)數(shù)量級(jí)。

合理的方式可以讓磁盤寫操作更加高效,減少了尋道時(shí)間和旋轉(zhuǎn)延遲。

碼樓,你還留著課本嗎?來,翻到講磁盤的章節(jié),讓我們回顧一下磁盤的運(yùn)行原理。

碼樓:“鬼還留著哦,課程還沒上到一半書就沒了。要不是考試俺眼神好,就掛科了?!?/p>

磁盤的運(yùn)行原理如圖所示。

硬盤在邏輯上被劃分為磁道、柱面以及扇區(qū)。硬盤的每個(gè)盤片的每個(gè)面都有一個(gè)讀寫磁頭。

完成一次磁盤 I/O ,需要經(jīng)過尋道、旋轉(zhuǎn)和數(shù)據(jù)傳輸三個(gè)步驟。

  • 尋道:首先必須找到柱面,即磁頭需要移動(dòng)到相應(yīng)磁道,這個(gè)過程叫做尋道,所耗費(fèi)時(shí)間叫做尋道時(shí)間。尋道時(shí)間越短,I/O 操作越快,目前磁盤的平均尋道時(shí)間一般在 3-15ms。
  • 旋轉(zhuǎn):磁盤旋轉(zhuǎn)將目標(biāo)扇區(qū)旋轉(zhuǎn)到磁頭下。這個(gè)過程耗費(fèi)的時(shí)間叫做旋轉(zhuǎn)時(shí)間。旋轉(zhuǎn)延遲取決于磁盤轉(zhuǎn)速,通常用磁盤旋轉(zhuǎn)一周所需時(shí)間的 1/2 表示。比如:7200rpm 的磁盤平均旋轉(zhuǎn)延遲大約為 60*1000/7200/2 = 4.17ms,而轉(zhuǎn)速為 15000rpm 的磁盤其平均旋轉(zhuǎn)延遲為 2ms。
  • 數(shù)據(jù)傳輸:數(shù)據(jù)在磁盤與內(nèi)存之間的實(shí)際傳輸。

因此,如果在寫磁盤的時(shí)候省去尋道、旋轉(zhuǎn)可以極大地提高磁盤讀寫的性能。

Kafka 采用順序?qū)懳募姆绞絹硖岣叽疟P寫入性能。順序?qū)懳募樞?I/O 的時(shí)候,磁頭幾乎不用換道,或者換道的時(shí)間很短。減少了磁盤尋道和旋轉(zhuǎn)的次數(shù)。磁頭再也不用在磁道上亂舞了,而是一路向前飛速前行。

Kafka 中每個(gè)Partition 是一個(gè)有序的,不可變的消息序列,新的消息可以不斷追加到 Partition 的末尾,在 Kafka 中 Partition 只是一個(gè)邏輯概念,每個(gè)Partition 劃分為多個(gè) Segment,每個(gè) Segment 對應(yīng)一個(gè)物理文件,Kafka 對 Segment 文件追加寫,這就是順序?qū)懳募?/p>

每條消息在發(fā)送前會(huì)根據(jù)負(fù)載均衡策略計(jì)算出要發(fā)往的目標(biāo) Partition 中,broker 收到消息之后把該條消息按照追加的方式順序?qū)懭?Partition 的日志文件中。

如下圖所示,可以看到磁盤順序?qū)懙男阅苓h(yuǎn)高于磁盤隨機(jī)寫,甚至比內(nèi)存隨機(jī)寫還快。

六、PageCache

Chaya:“碼哥,使用稀疏索引和 mmap 內(nèi)存映射技術(shù)提高讀消息的性能;Topic Partition 加磁盤順序?qū)懗志没⒌脑O(shè)計(jì)已經(jīng)很快了,但是與內(nèi)存順序?qū)戇€是慢了,還有優(yōu)化空間么?”

小姑娘,你的想法很好,作為快到令人發(fā)指的 Kafka,確實(shí)想到了一個(gè)方式來提高讀寫寫磁盤文件的性能。這就是接下來的主角 Page Cache 。

簡而言之:利用操作系統(tǒng)的緩存技術(shù),在讀寫磁盤日志文件時(shí),操作的是內(nèi)存,而不是文件,由操作系統(tǒng)決定什么在某個(gè)時(shí)間將 Page Cache 的數(shù)據(jù)刷寫到磁盤中。

  • Producer 發(fā)送消息到 Broker 時(shí),Broker 會(huì)使用 pwrite() 系統(tǒng)調(diào)用寫入數(shù)據(jù),此時(shí)數(shù)據(jù)都會(huì)先寫入page cache。
  • Consumer 消費(fèi)消息時(shí),Broker 使用 sendfile() 系統(tǒng)調(diào)用函數(shù),通零拷貝技術(shù)地將 Page Cache 中的數(shù)據(jù)傳輸?shù)?Broker 的 Socket buffer,再通過網(wǎng)絡(luò)傳輸?shù)?Consumer。
  • leader 與 follower 之間的同步,與上面 consumer 消費(fèi)數(shù)據(jù)的過程是同理的。

Kafka重度依賴底層操作系統(tǒng)提供的PageCache功能。當(dāng)上層有寫操作時(shí),操作系統(tǒng)只是將數(shù)據(jù)寫入PageCache,同時(shí)標(biāo)記Page屬性為Dirty。

當(dāng)讀操作發(fā)生時(shí),先從PageCache中查找,如果發(fā)生缺頁才進(jìn)行磁盤調(diào)度,最終返回需要的數(shù)據(jù)。

于是我們得到一個(gè)重要結(jié)論:如果Kafka producer的生產(chǎn)速率與consumer的消費(fèi)速率相差不大,那么就能幾乎只靠對broker page cache的讀寫完成整個(gè)生產(chǎn)-消費(fèi)過程,磁盤訪問非常少。

實(shí)際上PageCache是把盡可能多的空閑內(nèi)存都當(dāng)做了磁盤緩存來使用。

七、數(shù)據(jù)壓縮和批量處理

數(shù)據(jù)壓縮在 Kafka 中有助于減少磁盤空間的使用和網(wǎng)絡(luò)帶寬的消耗,從而提升整體性能。

通過減少消息的大小,壓縮可以顯著降低生產(chǎn)者和消費(fèi)者之間的數(shù)據(jù)傳輸時(shí)間。

Chaya:Kafka 支持的壓縮算法有哪些?

在Kafka 2.1.0版本之前,Kafka支持3種壓縮算法:GZIP、Snappy和LZ4。從2.1.0開始,Kafka正式支持Zstandard算法(簡寫為zstd)。

Chaya:這么多壓縮算法,我如何選擇?

一個(gè)壓縮算法的優(yōu)劣,有兩個(gè)重要的指標(biāo):壓縮比,文件壓縮前的大小與壓縮后的大小之比,比如源文件占用 1000 M 內(nèi)存,經(jīng)過壓縮后變成了 200 M,壓縮比 = 1000 /200 = 5,壓縮比越高越高;另一個(gè)指標(biāo)是壓縮/解壓縮吞吐量,比如每秒能壓縮或者解壓縮多少 M 數(shù)據(jù),吞吐量越高越好。

1.生產(chǎn)者壓縮

Kafka 的數(shù)據(jù)壓縮主要在生產(chǎn)者端進(jìn)行。具體步驟如下:

  • 生產(chǎn)者配置壓縮方式:在 KafkaProducer 配置中設(shè)置 compression.type 參數(shù),可以選擇 gzip、snappylz4  zstd。
  • 消息壓縮:生產(chǎn)者將消息批量收集到一個(gè) batch 中,然后對整個(gè) batch 進(jìn)行壓縮。這種批量壓縮方式可以獲得更高的壓縮率。
  • 壓縮消息存儲(chǔ):壓縮后的 batch 以壓縮格式存儲(chǔ)在 Kafka 的主題(Topic)分區(qū)中。
  • 消費(fèi)者解壓縮:消費(fèi)者從 Kafka 主題中獲取消息時(shí),首先對接收到的 batch 進(jìn)行解壓縮,然后處理其中的每一條消息。

2.解壓縮

有壓縮,那必有解壓縮。通常情況下,Producer 發(fā)送壓縮后的消息到 Broker ,原樣保存起來。

Consumer 消費(fèi)這些消息的時(shí)候,Broker 原樣發(fā)給 Consumer,由 Consumer 執(zhí)行解壓縮還原出原本的信息。

Chaya:Consumer 咋知道用什么壓縮算法解壓縮?

Kafka會(huì)將啟用了哪種壓縮算法封裝進(jìn)消息集合中,這樣當(dāng)Consumer讀取到消息集合時(shí),它自然就知道了這些消息使用的是哪種壓縮算法。

總之一句話:Producer端壓縮、Broker端保持、Consumer端解壓縮。

3.批量數(shù)據(jù)處理

Kafka Producer 向 Broker 發(fā)送消息不是一條消息一條消息的發(fā)送,將多條消息打包成一個(gè)批次發(fā)送。

批量數(shù)據(jù)處理可以顯著提高 Kafka 的吞吐量并減少網(wǎng)絡(luò)開銷。

Kafka Producer 的執(zhí)行流程如下圖所示:

發(fā)送消息依次經(jīng)過以下處理器:

  • Serialize:鍵和值都根據(jù)傳遞的序列化器進(jìn)行序列化。優(yōu)秀的序列化方式可以提高網(wǎng)絡(luò)傳輸?shù)男省?/li>
  • Partition:決定將消息寫入主題的哪個(gè)分區(qū),默認(rèn)情況下遵循 murmur2 算法。自定義分區(qū)程序也可以傳遞給生產(chǎn)者,以控制應(yīng)將消息寫入哪個(gè)分區(qū)。
  • Compression:默認(rèn)情況下,在 Kafka 生產(chǎn)者中不啟用壓縮。Compression 不僅可以更快地從生產(chǎn)者傳輸?shù)酱?,還可以在復(fù)制過程中進(jìn)行更快的傳輸。壓縮有助于提高吞吐量,降低延遲并提高磁盤利用率。
  • Record Accumulator:Accumulate顧名思義,就是一個(gè)消息累計(jì)器。其內(nèi)部為每個(gè) Partition 維護(hù)一個(gè)Deque雙端隊(duì)列,隊(duì)列保存將要發(fā)送的 Batch批次數(shù)據(jù),Accumulate將數(shù)據(jù)累計(jì)到一定數(shù)量,或者在一定過期時(shí)間內(nèi),便將數(shù)據(jù)以批次的方式發(fā)送出去。記錄被累積在主題每個(gè)分區(qū)的緩沖區(qū)中。根據(jù)生產(chǎn)者批次大小屬性將記錄分組。主題中的每個(gè)分區(qū)都有一個(gè)單獨(dú)的累加器 / 緩沖區(qū)。
  • Group Send:記錄累積器中分區(qū)的批次按將它們發(fā)送到的代理分組。批處理中的記錄基于 batch.size  linger.ms 屬性發(fā)送到代理。記錄由生產(chǎn)者根據(jù)兩個(gè)條件發(fā)送。當(dāng)達(dá)到定義的批次大小或達(dá)到定義的延遲時(shí)間時(shí)。
  • Send Thread:發(fā)送線程,從 Accumulator 的隊(duì)列取出待發(fā)送的 Batch 批次消息發(fā)送到 Broker。
  • Broker 端處理:Kafka Broker 接收到 batch 后,將其存儲(chǔ)在對應(yīng)的主題分區(qū)中。
  • 消費(fèi)者端的批量消費(fèi):消費(fèi)者可以配置一次拉取多條消息的數(shù)量,通過 fetch.min.bytes  fetch.max.wait.ms 參數(shù)控制批量大小和等待時(shí)間。

八、無鎖輕量級(jí) offset

Offset 是 Kafka 中的一個(gè)重要概念,用于標(biāo)識(shí)消息在分區(qū)中的位置。

每個(gè)分區(qū)中的消息都有一個(gè)唯一的 offset,消費(fèi)者通過維護(hù)自己的 offset 來確保準(zhǔn)確消費(fèi)消息。offset 的高效管理對于 Kafka 的性能至關(guān)重要。

offset 是從 0 開始的,每當(dāng)有新的消息寫入分區(qū)時(shí),offset 就會(huì)加 1。offset 是不可變的,即使消息被刪除或過期,offset 也不會(huì)改變或重用。

Consumer需要向Kafka匯報(bào)自己的位移數(shù)據(jù),這個(gè)匯報(bào)過程被稱為提交位移(Committing Offsets)。因?yàn)镃onsumer能夠同時(shí)消費(fèi)多個(gè)partition的數(shù)據(jù),所以位移的提交實(shí)際上是在partition粒度上進(jìn)行的,即Consumer需要為分配給它的每個(gè)partition提交各自的位移數(shù)據(jù)

提交位移主要是為了表征Consumer的消費(fèi)進(jìn)度,這樣當(dāng)Consumer發(fā)生故障重啟之后,就能夠從Kafka中讀取之前提交的位移值,然后從相應(yīng)的位移處繼續(xù)消費(fèi)。

在傳統(tǒng)的消息隊(duì)列系統(tǒng)中,offset 通常需要通過鎖機(jī)制來保證一致性,但這會(huì)帶來性能瓶頸。Kafka 的設(shè)計(jì)哲學(xué)是盡量減少鎖的使用,以提升并發(fā)處理能力和整體性能。

1.無鎖設(shè)計(jì)思想

Kafka 在 offset 設(shè)計(jì)中采用了一系列無鎖的技術(shù),使其能夠在高并發(fā)的環(huán)境中保持高效。

  • 順序?qū)懭?/strong>:Kafka 使用順序?qū)懭氲姆绞綄⑾⒆芳拥饺罩疚募哪┪玻苊饬宋募恢玫念l繁變動(dòng),從而減少了鎖的使用。
  • MMAP 內(nèi)存映射文件:Kafka 使用內(nèi)存映射文件(Memory Mapped File)來訪問日志數(shù)據(jù)和索引文件。這種方式使得文件數(shù)據(jù)可以直接映射到進(jìn)程的虛擬地址空間中,從而減少了系統(tǒng)調(diào)用的開銷,提高了數(shù)據(jù)訪問的效率。
  • 零拷貝:Kafka 使用零拷貝(Zero Copy)技術(shù),將數(shù)據(jù)從磁盤直接傳輸?shù)骄W(wǎng)絡(luò),繞過了用戶態(tài)的復(fù)制過程,大大提高了數(shù)據(jù)傳輸?shù)男省?/li>
  • 批量處理:Kafka 支持批量處理消息,在一個(gè)批次中同時(shí)處理多個(gè)消息,減少了網(wǎng)絡(luò)和 I/O 的開銷。

2.消費(fèi)者 Offset 管理流程

graph TD;
    A[啟動(dòng)消費(fèi)者] --> B[從分區(qū)讀取消息];
    B --> C[處理消息];
    C --> D{是否成功處理?};
    D --> |是| E[更新 Offset];
    D --> |否| F[記錄失敗, 重新處理];
    E --> G[提交 Offset];
    G --> H[繼續(xù)處理下一個(gè)消息];
    F --> B;
    H --> B;
  • 啟動(dòng)消費(fèi)者:消費(fèi)者啟動(dòng)并訂閱 Kafka 主題的某個(gè)分區(qū)。
  • 從分區(qū)讀取消息:消費(fèi)者從指定分區(qū)中讀取消息。
  • 處理消息:消費(fèi)者處理讀取到的消息。
  • 是否成功處理:判斷消息是否成功處理。
  • 如果成功處理,更新 Offset。
  • 如果處理失敗,記錄失敗原因并準(zhǔn)備重新處理。
  • 更新 Offset:成功處理消息后,更新 Offset 以記錄已處理消息的位置。
  • 提交 Offset:將更新后的 Offset 提交到 Kafka,以確保消息處理進(jìn)度的持久化。
  • 繼續(xù)處理下一個(gè)消息:提交 Offset 后,繼續(xù)讀取并處理下一個(gè)消息。

Kafka 通過無鎖輕量級(jí) offset 的設(shè)計(jì),實(shí)現(xiàn)了高性能、高吞吐和低延時(shí)的目標(biāo)。

九、總結(jié)

Kafka 通過無鎖輕量級(jí) offset 的設(shè)計(jì),實(shí)現(xiàn)了高性能、高吞吐和低延時(shí)的目標(biāo)。

其 Reactor I/O 網(wǎng)絡(luò)模型、磁盤順序?qū)懭?、?nèi)存映射文件、零拷貝、數(shù)據(jù)壓縮和批量處理等技術(shù),為 Kafka 提供了強(qiáng)大的數(shù)據(jù)處理能力和高效的消息隊(duì)列服務(wù)。

  • Reactor I/O 網(wǎng)絡(luò)模型:通過 I/O 多路復(fù)用機(jī)制,Kafka 能夠同時(shí)處理大量的網(wǎng)絡(luò)連接請求,而不需要為每個(gè)連接創(chuàng)建一個(gè)線程,從而節(jié)省了系統(tǒng)資源。
  • 順序?qū)懭?/strong>:Kafka 使用順序?qū)懭氲姆绞綄⑾⒆芳拥饺罩疚募哪┪?,避免了文件位置的頻繁變動(dòng),從而減少了鎖的使用。
  • MMAP 內(nèi)存映射文件:Kafka 使用內(nèi)存映射文件(Memory Mapped File)來訪問日志數(shù)據(jù)和索引文件。這種方式使得文件數(shù)據(jù)可以直接映射到進(jìn)程的虛擬地址空間中,從而減少了系統(tǒng)調(diào)用的開銷,提高了數(shù)據(jù)訪問的效率。
  • 零拷貝:Kafka 使用零拷貝(Zero Copy)技術(shù),將數(shù)據(jù)從磁盤直接傳輸?shù)骄W(wǎng)絡(luò),繞過了用戶態(tài)的復(fù)制過程,大大提高了數(shù)據(jù)傳輸?shù)男省?/li>
  • 數(shù)據(jù)壓縮和批量處理:數(shù)據(jù)壓縮在 Kafka 中有助于減少磁盤空間的使用和網(wǎng)絡(luò)帶寬的消耗,從而提升整體性能。;Kafka 支持批量處理消息,在一個(gè)批次中同時(shí)處理多個(gè)消息,減少了網(wǎng)絡(luò)和 I/O 的開銷。
責(zé)任編輯:姜華 來源: 碼哥跳動(dòng)
相關(guān)推薦

2024-07-30 09:01:12

2024-02-26 21:15:20

Kafka緩存參數(shù)

2020-03-30 15:05:46

Kafka消息數(shù)據(jù)

2024-11-26 08:52:34

SQL優(yōu)化Kafka

2010-09-09 16:26:54

CSS選擇符

2020-02-27 21:03:30

調(diào)度器架構(gòu)效率

2020-02-27 15:44:41

Nginx服務(wù)器反向代理

2020-10-13 17:54:18

開發(fā)Kafka數(shù)據(jù)

2022-04-21 15:57:37

數(shù)字化轉(zhuǎn)型疫情云服務(wù)

2023-08-29 07:46:08

Redis數(shù)據(jù)ReHash

2020-10-15 09:19:36

Elasticsear查詢速度

2021-05-27 20:56:51

esbuild 工具JavaScript

2013-04-23 10:11:41

PaaS

2021-03-22 08:30:33

Kafka源碼架構(gòu)開發(fā)技術(shù)

2020-08-13 09:19:10

Kafka存儲(chǔ)MQ

2022-09-24 09:52:42

TopicQueuekafka

2021-05-31 07:44:08

Kafka分布式系統(tǒng)

2023-03-21 08:02:36

Redis6.0IO多線程

2018-04-11 14:13:29

物聯(lián)網(wǎng)信息技術(shù)互聯(lián)網(wǎng)

2022-05-23 08:09:42

物聯(lián)網(wǎng)IOT
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)