自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

淺談分布式消息系統(tǒng) Kafka 設(shè)計原理

開發(fā) 開發(fā)工具 分布式 Kafka
Kafka是一種高吞吐量、分布式、基于發(fā)布/訂閱的消息系統(tǒng),最初由LinkedIn公司開發(fā),使用Scala語言編寫,目前是Apache的開源項(xiàng)目。

 一、Kafka簡介

Kafka是一種高吞吐量、分布式、基于發(fā)布/訂閱的消息系統(tǒng),最初由LinkedIn公司開發(fā),使用Scala語言編寫,目前是Apache的開源項(xiàng)目。

跟RabbitMQ、RocketMQ等目前流行的開源消息中間件相比,Kakfa具有高吞吐、低延遲等特點(diǎn),在大數(shù)據(jù)、日志收集等應(yīng)用場景下被廣泛使用。

本文主要簡單介紹Kafka的設(shè)計原理。

二、Kafka架構(gòu)

基本概念:

  • broker:Kafka服務(wù)器,負(fù)責(zé)消息存儲和轉(zhuǎn)發(fā)
  • topic:消息類別,Kafka按照topic來分類消息
  • partition:topic的分區(qū),一個topic可以包含多個partition,topic消息保存在各個partition上
  • offset:消息在日志中的位置,可以理解是消息在partition上的偏移量,也是代表該消息的唯一序號
  • Producer:消息生產(chǎn)者
  • Consumer:消息消費(fèi)者
  • Consumer Group:消費(fèi)者分組,每個Consumer必須屬于一個group
  • Zookeeper:保存著集群broker、topic、partition等meta數(shù)據(jù);另外,還負(fù)責(zé)broker故障發(fā)現(xiàn),partition leader選舉,負(fù)載均衡等功能

三、Kafka設(shè)計原理

3.1 數(shù)據(jù)存儲設(shè)計

partition以文件形式存儲在文件系統(tǒng),目錄命名規(guī)則:<topic_name>-<partition_id>,例如,名為test的topic,其有3個partition,則Kafka數(shù)據(jù)目錄中有3個目錄:test-0, test-1, test-2,分別存儲相應(yīng)partition的數(shù)據(jù)。

partition的數(shù)據(jù)文件

partition中的每條Message包含了以下三個屬性:

  • offset
  • MessageSize
  • data

其中offset表示Message在這個partition中的偏移量,offset不是該Message在partition數(shù)據(jù)文件中的實(shí)際存儲位置,而是邏輯上一個值,它唯一確定了partition中的一條Message,可以認(rèn)為offset是partition中Message的id;MessageSize表示消息內(nèi)容data的大小;data為Message的具體內(nèi)容。

partition的數(shù)據(jù)文件由以上格式的Message組成,按offset由小到大排列在一起。

如果一個partition只有一個數(shù)據(jù)文件:

  1. 新數(shù)據(jù)是添加在文件末尾,不論文件數(shù)據(jù)文件有多大,這個操作永遠(yuǎn)都是O(1)的。
  2. 查找某個offset的Message是順序查找的。因此,如果數(shù)據(jù)文件很大的話,查找的效率就低。

Kafka通過分段和索引來提高查找效率。

數(shù)據(jù)文件分段segment

partition物理上由多個segment文件組成,每個segment大小相等,順序讀寫。每個segment數(shù)據(jù)文件以該段中最小的offset命名,文件擴(kuò)展名為.log。這樣在查找指定offset的Message的時候,用二分查找就可以定位到該Message在哪個segment數(shù)據(jù)文件中。

數(shù)據(jù)文件索引

數(shù)據(jù)文件分段使得可以在一個較小的數(shù)據(jù)文件中查找對應(yīng)offset的Message了,但是這依然需要順序掃描才能找到對應(yīng)offset的Message。為了進(jìn)一步提高查找的效率,Kafka為每個分段后的數(shù)據(jù)文件建立了索引文件,文件名與數(shù)據(jù)文件的名字是一樣的,只是文件擴(kuò)展名為.index。

索引文件中包含若干個索引條目,每個條目表示數(shù)據(jù)文件中一條Message的索引。索引包含兩個部分,分別為相對offset和position。

  • 相對offset:因?yàn)閿?shù)據(jù)文件分段以后,每個數(shù)據(jù)文件的起始o(jì)ffset不為0,相對offset表示這條Message相對于其所屬數(shù)據(jù)文件中最小的offset的大小。舉例,分段后的一個數(shù)據(jù)文件的offset是從20開始,那么offset為25的Message在index文件中的相對offset就是25-20 = 5。存儲相對offset可以減小索引文件占用的空間。
  • position,表示該條Message在數(shù)據(jù)文件中的絕對位置。只要打開文件并移動文件指針到這個position就可以讀取對應(yīng)的Message了。

index文件中并沒有為數(shù)據(jù)文件中的每條Message建立索引,而是采用了稀疏存儲的方式,每隔一定字節(jié)的數(shù)據(jù)建立一條索引。這樣避免了索引文件占用過多的空間,從而可以將索引文件保留在內(nèi)存中。但缺點(diǎn)是沒有建立索引的Message也不能一次定位到其在數(shù)據(jù)文件的位置,從而需要做一次順序掃描,但是這次順序掃描的范圍就很小了。

總結(jié)

查找某個offset的消息,先二分法找出消息所在的segment文件(因?yàn)槊總€segment的命名都是以該文件中消息offset最小的值命名);然后,加載對應(yīng)的.index索引文件到內(nèi)存,同樣二分法找出小于等于給定offset的***的那個offset記錄(相對offset,position);***,根據(jù)position到.log文件中,順序查找出offset等于給定offset值的消息。

由于消息在partition的segment數(shù)據(jù)文件中是順序讀寫的,且消息消費(fèi)后不會刪除(刪除策略是針對過期的segment文件),這種順序磁盤IO存儲設(shè)計是Kafka高性能很重要的原因。

3.2 生產(chǎn)者設(shè)計

  • 負(fù)載均衡:由于消息topic由多個partition組成,且partition會均衡分布到不同broker上,因此,為了有效利用broker集群的性能,提高消息的吞吐量,producer可以通過隨機(jī)或者h(yuǎn)ash等方式,將消息平均發(fā)送到多個partition上,以實(shí)現(xiàn)負(fù)載均衡。
  • 批量發(fā)送:是提高消息吞吐量重要的方式,Producer端可以在內(nèi)存中合并多條消息后,以一次請求的方式發(fā)送了批量的消息給broker,從而大大減少broker存儲消息的IO操作次數(shù)。但也一定程度上影響了消息的實(shí)時性,相當(dāng)于以時延代價,換取更好的吞吐量。

3.3 消費(fèi)者設(shè)計

  • 任何Consumer必須屬于一個Consumer Group
  • 同一Consumer Group中的多個Consumer實(shí)例,不同時消費(fèi)同一個partition,等效于隊(duì)列模式。如圖,Consumer Group 1的三個Consumer實(shí)例分別消費(fèi)不同的partition的消息,即,TopicA-part0、TopicA-part1、TopicA-part2。
  • 不同Consumer Group的Consumer實(shí)例可以同時消費(fèi)同一個partition,等效于發(fā)布訂閱模式。如圖,Consumer Group 1的Consumer1和Consumer Group 2的Consumer4,同時消費(fèi)TopicA-part0的消息。
  • partition內(nèi)消息是有序的,Consumer通過pull方式消費(fèi)消息。
  • Kafka不刪除已消費(fèi)的消息

隊(duì)列模式

隊(duì)列模式,指每條消息只會有一個Consumer消費(fèi)到。Kafka保證同一Consumer Group中只有一個Consumer會消費(fèi)某條消息。

  • 在Consumer Group穩(wěn)定狀態(tài)下,每一個Consumer實(shí)例只會消費(fèi)某一個或多個特定partition的數(shù)據(jù),而某個partition的數(shù)據(jù)只會被某一個特定的Consumer實(shí)例所消費(fèi),也就是說Kafka對消息的分配是以partition為單位分配的,而非以每一條消息作為分配單元;
  • 同一Consumer Group中,如果Consumer實(shí)例數(shù)量少于partition數(shù)量,則至少有一個Consumer會消費(fèi)多個partition的數(shù)據(jù);如果Consumer的數(shù)量與partition數(shù)量相同,則正好一個Consumer消費(fèi)一個partition的數(shù)據(jù);而如果Consumer的數(shù)量多于partition的數(shù)量時,會有部分Consumer無法消費(fèi)該Topic下任何一條消息;
  • 設(shè)計的優(yōu)勢是:每個Consumer不用都跟大量的broker通信,減少通信開銷,同時也降低了分配難度,實(shí)現(xiàn)也更簡單;可以保證每個partition里的數(shù)據(jù)可以被Consumer有序消費(fèi)。
  • 設(shè)計的劣勢是:無法保證同一個Consumer Group里的Consumer均勻消費(fèi)數(shù)據(jù),且在Consumer實(shí)例多于partition個數(shù)時導(dǎo)致有些Consumer會餓死。

如果有partition或者Consumer的增減,為了保證均衡消費(fèi),需要實(shí)現(xiàn)Consumer Rebalance,分配算法如下:

broker對Consumer設(shè)計原理:

  • 對于每個Consumer Group,選舉出一個Broker作為Coordinator(0.9版本以上),由它Watch Zookeeper,從而監(jiān)控判斷是否有partition或者Consumer的增減,然后生成Rebalance命令,按照以上算法重新分配。
  • 當(dāng)Consumer Group***次被初始化時,Consumer通常會讀取每個partition的最早或最近的offset(Zookeeper記錄),然后順序地讀取每個partition log的消息,在Consumer讀取過程中,它會提交已經(jīng)成功處理的消息的offsets(由Zookeeper記錄)。
  • 當(dāng)一個partition被重新分配給Consumer Group中的其他Consumer,新的Consumer消費(fèi)的初始位置會設(shè)置為(原來Consumer)最近提交的offset。

如圖,Last Commited Offset指Consumer最近一次提交的消費(fèi)記錄offset,Current Position是當(dāng)前消費(fèi)的位置,High Watermark是成功拷貝到log的所有副本節(jié)點(diǎn)(partition的所有ISR節(jié)點(diǎn),下文介紹)的最近消息的offset,Log End Offset是寫入log中***一條消息的offset+1。

從Consumer的角度來看,最多只能讀取到High watermark的位置,后面的消息對消費(fèi)者不可見,因?yàn)槲赐耆珡?fù)制的數(shù)據(jù)還沒可靠存儲,有丟失可能。

發(fā)布訂閱模式

發(fā)布訂閱模式,又指廣播模式,Kafka保證topic的每條消息會被所有Consumer Group消費(fèi)到,而對于同一個Consumer Group,還是保證只有一個Consumer實(shí)例消費(fèi)到這條消息。

3.4 Replication設(shè)計

作為消息中間件,數(shù)據(jù)的可靠性以及系統(tǒng)的可用性,必然依賴數(shù)據(jù)副本的設(shè)計。

Kafka的replica副本單元是topic的partition,一個partition的replica數(shù)量不能超過broker的數(shù)量,因?yàn)橐粋€broker最多只會存儲這個partition的一個副本。所有消息生產(chǎn)、消費(fèi)請求都是由partition的leader replica來處理,其他follower replica負(fù)責(zé)從leader復(fù)制數(shù)據(jù)進(jìn)行備份。

Replica均勻分布到整個集群,Replica的算法如下:

  • 將所有Broker(假設(shè)共n個Broker)和待分配的Partition排序
  • 將第i個Partition分配到第(i mod n)個Broker上
  • 將第i個Partition的第j個Replica分配到第((i + j) mode n)個Broker上

如圖,TopicA有三個partition:part0、part1、part2,每個partition的replica數(shù)等于2(一個是leader,另一個是follower),按照以上算法會均勻落到三個broker上。

broker對replica管理:

選舉出一個broker作為controller,由它Watch Zookeeper,負(fù)責(zé)partition的replica的集群分配,以及l(fā)eader切換選舉等流程。

In-Sync-Replica(ISR)

分布式系統(tǒng)在處理節(jié)點(diǎn)故障時,需要預(yù)先明確節(jié)點(diǎn)的”failure”和”alive”的定義。對于Kafka節(jié)點(diǎn),判斷是”alive”有以下兩個條件:

  • 節(jié)點(diǎn)必須和Zookeeper保持心跳連接
  • 如果節(jié)點(diǎn)是follower,必須從leader節(jié)點(diǎn)上復(fù)制數(shù)據(jù)來備份,而且備份的數(shù)據(jù)相比leader而言,不能落后太多。

Kafka將滿足以上條件的replica節(jié)點(diǎn)認(rèn)為是”in sync”(同步中),稱為In-Sync-Replica(ISR)。

Kafka的Zookeeper維護(hù)了每個partition的ISR信息,理想情況下,ISR包含了partition的所有replica所在的broker節(jié)點(diǎn)信息,而當(dāng)某些節(jié)點(diǎn)不滿足以上條件時,ISR可能只包含部分replica。例如,上圖中的TopicA-part0的ISR列表可能是[broker1,broker2,broker3],也可能是[broker1,broker3]和[broker1]。

數(shù)據(jù)可靠性

Kafka如何保證數(shù)據(jù)可靠性?首先看下,Producer生產(chǎn)一條消息,該消息被認(rèn)為是”committed”(即broker認(rèn)為消息已經(jīng)可靠存儲)的過程:

  • 消息所在partition的ISR replicas會定時異步從leader上批量復(fù)制數(shù)據(jù)log
  • 當(dāng)所有ISR replica都返回ack,告訴leader該消息已經(jīng)寫log成功后,leader認(rèn)為該消息committed,并告訴Producer生產(chǎn)成功。這里和以上”alive”條件的第二點(diǎn)是不矛盾的,因?yàn)閘eader有超時機(jī)制,leader等ISR的follower復(fù)制數(shù)據(jù),如果一定時間不返回ack(可能數(shù)據(jù)復(fù)制進(jìn)度落后太多),則leader將該follower replica從ISR中剔除。
  • 消息committed之后,Consumer才能消費(fèi)到。

ISR機(jī)制下的數(shù)據(jù)復(fù)制,既不是完全的同步復(fù)制,也不是單純的異步復(fù)制,這是Kafka高吞吐很重要的機(jī)制。同步復(fù)制要求所有能工作的follower都復(fù)制完,這條消息才會被認(rèn)為committed,這種復(fù)制方式極大的影響了吞吐量。而異步復(fù)制方式下,follower異步的從leader復(fù)制數(shù)據(jù),數(shù)據(jù)只要被leader寫入log就被認(rèn)為已經(jīng)committed,這種情況下如果follower都復(fù)制完都落后于leader,而如果leader突然宕機(jī),則會丟失數(shù)據(jù)。而Kafka的這種使用ISR的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐量,follower可以批量的從leader復(fù)制數(shù)據(jù),數(shù)據(jù)復(fù)制到內(nèi)存即返回ack,這樣極大的提高復(fù)制性能,當(dāng)然數(shù)據(jù)仍然是有丟失風(fēng)險的。

Kafka本身定位于高性能的MQ,更多注重消息吞吐量,在此基礎(chǔ)上結(jié)合ISR的機(jī)制去盡量保證消息的可靠性,但不是絕對可靠的。

服務(wù)可用性

Kafka所有收發(fā)消息請求都由leader節(jié)點(diǎn)處理,由以上數(shù)據(jù)可靠性設(shè)計可知,當(dāng)ISR的follower replica故障后,leader會及時地從ISR列表中把它剔除掉,并不影響服務(wù)可用性,那么當(dāng)leader故障后會怎樣呢?如何選舉新的leader?

leader選舉

  • Kafka在Zookeeper存儲partition的ISR信息,并且能動態(tài)調(diào)整ISR列表的成員,只有ISR里的成員replica才會被選為leader,并且ISR所有的replica都有可能成為leader;
  • leader節(jié)點(diǎn)宕機(jī)后,Zookeeper能監(jiān)控發(fā)現(xiàn),并由broker的controller節(jié)點(diǎn)從ISR中選舉出新的leader,并通知ISR內(nèi)的所有broker節(jié)點(diǎn)。

因此,可以看出,只要ISR中至少有一個replica,Kafka就能保證服務(wù)的可用性(但不保證網(wǎng)絡(luò)分區(qū)下的可用性)。

容災(zāi)和數(shù)據(jù)一致性

分布式系統(tǒng)的容災(zāi)能力,跟其本身針對數(shù)據(jù)一致性考慮所選擇的算法有關(guān),例如,Zookeeper的Zab算法,raft算法等。Kafka的ISR機(jī)制和這些Majority Vote算法對比如下:

ISR機(jī)制能容忍更多的節(jié)點(diǎn)失敗。假如replica節(jié)點(diǎn)有2f+1個,每個partition最多能容忍2f個失敗,且不丟失消息數(shù)據(jù);但相對Majority Vote選舉算法,只能最多容忍f個失敗。

在消息committed持久化上,ISR需要等2f個節(jié)點(diǎn)返回ack,但Majority Vote只需等f+1個節(jié)點(diǎn)返回ack,且不依賴處理最慢的follower節(jié)點(diǎn),因此Majority Vote有優(yōu)勢

ISR機(jī)制能節(jié)省更多replica節(jié)點(diǎn)數(shù)。例如,要保證f個節(jié)點(diǎn)可用,ISR方式至少要f個節(jié)點(diǎn),而Majority Vote至少需要2f+1個節(jié)點(diǎn)。

如果所有replica都宕機(jī)了,有兩種方式恢復(fù)服務(wù):

  1. 等ISR任一節(jié)點(diǎn)恢復(fù),并選舉為leader;
  2. 選擇***個恢復(fù)的節(jié)點(diǎn)(不一定是ISR中的節(jié)點(diǎn))為leader

***種方式消息不會丟失(只能說這種方式最有可能不丟而已),第二種方式可能會丟消息,但能盡快恢復(fù)服務(wù)可用。這是可用性和一致性場景的兩種考慮,Kafka默認(rèn)選擇第二種,用戶也可以自主配置。

大部分考慮CP的分布式系統(tǒng)(假設(shè)2f+1個節(jié)點(diǎn)),為了保證數(shù)據(jù)一致性,最多只能容忍f個節(jié)點(diǎn)的失敗,而Kafka為了兼顧可用性,允許最多2f個節(jié)點(diǎn)失敗,因此是無法保證數(shù)據(jù)強(qiáng)一致的。

如圖所示,一開始ISR數(shù)量等于3,正常同步數(shù)據(jù),紅色部分開始,leader發(fā)現(xiàn)其他兩個follower復(fù)制進(jìn)度太慢或者其他原因(網(wǎng)絡(luò)分區(qū)、節(jié)點(diǎn)故障等),將其從ISR剔除后,leader單節(jié)點(diǎn)存儲數(shù)據(jù);然后,leader宕機(jī),觸發(fā)重新選舉第二節(jié)點(diǎn)為leader,重新開始同步數(shù)據(jù),但紅色部分的數(shù)據(jù)在新leader上是沒有的;***原leader節(jié)點(diǎn)恢復(fù)服務(wù)后,重新從新leader上復(fù)制數(shù)據(jù),而紅色部分的數(shù)據(jù)已經(jīng)消費(fèi)不到了。

因此,為了減少數(shù)據(jù)丟失的概率,可以設(shè)置Kafka的ISR最小replica數(shù),低于該值后直接返回不可用,當(dāng)然是以犧牲一定可用性和吞吐量為前提了。

重復(fù)消息

消息傳輸有三種方式:

At most once:消息可能會丟失,但不會重復(fù)傳輸

At least once:消息不會丟失,但可能重復(fù)傳輸

Exactly once:消息保證會被傳輸一次且僅傳輸一次

Kafka實(shí)現(xiàn)了第二種方式,即,可能存在重復(fù)消息,需要業(yè)務(wù)自己保證消息冪等性處理。

3.5 高吞吐設(shè)計

  1. 對于partition,順序讀寫磁盤數(shù)據(jù),以時間復(fù)雜度O(1)方式提供消息持久化能力。
  2. Producer批量向broker寫數(shù)據(jù)
  3. Consumer批量從broker拉數(shù)據(jù)
  4. 日志壓縮
  5. Topic分多個partition,提高并發(fā)
  6. broker零拷貝(Zero Copy),使用sendfile系統(tǒng)調(diào)用,將數(shù)據(jù)直接從page cache發(fā)送到socket上
  7. Producer可配置是否等待消息committed。如果Producer生產(chǎn)消息,每次都必須等ISR存儲后才返回,時延會很高,進(jìn)而影響整體消息的吞吐量。為了解決這個問題,一方面Producer可以配置減少partition的副本數(shù),例如,ISR大小為1;另一方面,在不太關(guān)注消息可靠存儲的場景下,Producer可以通過配置選擇是否等待消息committed,如下:

這是用戶在消息吞吐量和持久化之間做的權(quán)衡選擇,持久化等級越高,生產(chǎn)消息吞吐量越小,反之,持久化等級越低,吞吐量越高。

3.6 HA基本原理

broker HA

broker集群信息由Zookeeper維護(hù),并選舉出一個controller。所有partition的leader選舉都由controller決定,將leader的變更直接通過rpc方式通知需要為此做出響應(yīng)的brokers;controller也負(fù)責(zé)增刪topic以及partition replica的重新分配。

controller在Zookeeper上注冊watch,一旦有broker宕機(jī),其對應(yīng)在Zookeeper的臨時節(jié)點(diǎn)自動被刪除,controller對宕機(jī)broker上的所有partition重新分配新leader;如果controller宕機(jī),其他broker通過Zookeeper選舉出新的controller,然后同樣對宕機(jī)broker上的所有partition重新分配新leader。

partition HA

partition leader所在的broker宕機(jī),如上所述,broker controller根據(jù)動態(tài)維護(hù)的ISR,會重新在剩下的broker機(jī)器中選出ISR里面的一個成員成為新的leader。如果ISR中至少有一個follower,則可以確保已經(jīng)committed的數(shù)據(jù)不丟失;否則選擇任意一個replica作為leader,該場景可能會有潛在的數(shù)據(jù)丟失;如果partition所有的replica都宕機(jī)了,就無法保證數(shù)據(jù)不丟失了,有兩種恢復(fù)方案,上文已介紹過。

四、推廣

騰訊云即將推出高性能的消息隊(duì)列服務(wù)Ckafka,完全兼容開源Kafka API(0.9版本)。Ckafka服務(wù)端完全托管在騰訊云上,用戶無需自己維護(hù)和搭建,使用開源Kafka API客戶端即可訪問實(shí)例,大大降低了用戶使用Kafka的門檻,歡迎體驗(yàn):)

原文鏈接:https://cloud.tencent.com/community/article/369570

【本文是51CTO專欄作者“騰訊云技術(shù)社區(qū)”的原創(chuàng)稿件,轉(zhuǎn)載請通過51CTO聯(lián)系原作者獲取授權(quán)】

戳這里,看該作者更多好文

責(zé)任編輯:武曉燕 來源: 51CTO專欄
相關(guān)推薦

2017-07-27 14:32:05

大數(shù)據(jù)分布式消息Kafka

2019-09-05 09:02:45

消息系統(tǒng)緩存高可用

2021-07-06 10:35:46

分布式KafkaLinux

2017-07-26 15:08:05

大數(shù)據(jù)分布式事務(wù)

2017-12-18 10:47:04

分布式存儲數(shù)據(jù)

2013-03-26 13:43:08

Java分布式計算

2023-10-08 10:49:16

搜索系統(tǒng)分布式系統(tǒng)

2020-01-17 09:07:14

分布式系統(tǒng)網(wǎng)絡(luò)

2013-06-13 11:29:14

分布式分布式緩存

2009-10-09 17:17:11

安裝VB dcom分布

2013-01-07 10:29:31

大數(shù)據(jù)

2017-12-12 14:51:15

分布式緩存設(shè)計

2023-05-29 14:07:00

Zuul網(wǎng)關(guān)系統(tǒng)

2022-04-07 17:13:09

緩存算法服務(wù)端

2023-05-12 08:23:03

分布式系統(tǒng)網(wǎng)絡(luò)

2012-10-09 16:43:47

FastDFS分布式文件系統(tǒng)

2022-06-16 07:31:15

MySQL服務(wù)器服務(wù)

2017-10-27 08:40:44

分布式存儲剪枝系統(tǒng)

2023-10-26 18:10:43

分布式并行技術(shù)系統(tǒng)

2022-06-21 08:27:22

Seata分布式事務(wù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號