自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

一年省七位數(shù),得物自建HFDS在 Flink Checkpoint 場景下的應(yīng)用實踐

存儲 數(shù)據(jù)管理
Checkpoint:簡單的說,在某一時刻,將 Flink 任務(wù)本地機器中存儲在狀態(tài)后端的狀態(tài)去同步到遠程文件存儲系統(tǒng)(比如 HDFS)的過程就叫 Checkpoint。

1、背景

隨著阿里云Flink實例的遷移下云以及新增需求接入,自建Flink平臺規(guī)模逐漸壯大,當(dāng)前總計已超4萬核運行在自建的K8S集群中,然而 Flink 任務(wù)數(shù)的增加,特別是大狀態(tài)任務(wù),每次Checkpoint 時會產(chǎn)生脈沖式帶寬占用,峰值流量超過100Gb/s,早期使用阿里云OSS作為Checkpoint數(shù)據(jù)存儲,單個Bucket 每 1P數(shù)據(jù)量只有免費帶寬10Gb/s,超出部分單獨計費,當(dāng)前規(guī)模每月需要增加1x w+/月。

為了控制這部分成本,得物開展了自建HDFS在Flink Checkpoint場景下的落地工作,實現(xiàn)年度成本節(jié)省xxx萬元。

此次分享自建HDFS在實時計算checkpoint場景的實踐經(jīng)驗,希望能為讀者提供一些參考。

2、Flink Checkpoint 介紹

2.1 Flink里的Checkpoint是什么?

Checkpoint:簡單的說,在某一時刻,將 Flink 任務(wù)本地機器中存儲在狀態(tài)后端的狀態(tài)去同步到遠程文件存儲系統(tǒng)(比如 HDFS)的過程就叫 Checkpoint。

狀態(tài)后端:做狀態(tài)數(shù)據(jù)持久化的工具就叫做狀態(tài)后端。比如你在 Flink 中見到的 RocksDB、FileSystem 的概念就是指狀態(tài)后端,再引申一下,也可以理解為:應(yīng)用中有一份狀態(tài)數(shù)據(jù),把這份狀態(tài)數(shù)據(jù)存儲到 MySQL 中,這個 MySQL 就能叫做狀態(tài)后端。

2.2 Checkpoint解決什么問題?

其實在實時計算中的狀態(tài)的功能主要體現(xiàn)在任務(wù)可以做到失敗重啟后沒有數(shù)據(jù)質(zhì)量、時效問題。

實時任務(wù)一般都是 7x24 小時 Long run 的,掛了之后,就會有以下兩個問題,首先給一個實際場景:一個消費上游 Kafka,使用 Set 去重計算 DAU 的實時任務(wù)。

  • 數(shù)據(jù)質(zhì)量問題:當(dāng)這個實時任務(wù)掛了之后恢復(fù),Set空了,這時候任務(wù)再繼續(xù)從上次失敗的 Offset 消費 Kafka 產(chǎn)出數(shù)據(jù),則產(chǎn)出的數(shù)據(jù)就是錯誤數(shù)據(jù)了
  • 數(shù)據(jù)時效問題:一個實時任務(wù),產(chǎn)出的指標(biāo)是有時效性(主要是時延)要求的。你可以從今天 0 點開始重新消費,但是你回溯數(shù)據(jù)也是需要時間的。舉例:中午 12 點掛了,實時任務(wù)重新回溯 12 個小時的數(shù)據(jù)能在 1 分鐘之內(nèi)完成嘛?大多數(shù)場景下是不能的!一般都要回溯幾個小時,這就是實時場景中的數(shù)據(jù)時效問題。而 Flink的Checkpoint就是把 Set 定期的存儲到遠程 HDFS 上,當(dāng)任務(wù)掛了,我們的任務(wù)還可以從 HDFS 上面把這個數(shù)據(jù)給讀回來,接著從最新的一個 Kafka Offset 繼續(xù)計算就可以,這樣即沒有數(shù)據(jù)質(zhì)量問題,也沒有數(shù)據(jù)時效性問題。

2.3 Checkpoint的運行流程?

  1. JM 定時調(diào)度 Checkpoint 的觸發(fā),接受到 JM 做 Checkpoint 的請求后,開始做本地 Checkpoint,暫停處理新流入的數(shù)據(jù),將新數(shù)據(jù)緩存起來。
  2. 將任務(wù)的本地狀態(tài)數(shù)據(jù),復(fù)制到一個遠程的持久化存儲(HDFS)空間上。
  3. 繼續(xù)處理新流入的數(shù)據(jù),包括剛才緩存起來的數(shù)據(jù)。

圖片

3、自建HDFS引入

3.1 為什么用HDFS?

Flink 做為一個成熟的流計算引擎,對外宣稱可以實現(xiàn) Exactly Once。為了實現(xiàn)業(yè)務(wù)上的 Exactly Once,F(xiàn)link 肯定不能丟數(shù)據(jù),也就是狀態(tài)數(shù)據(jù)必須保障高可靠性,而HDFS作為是一個分布式文件系統(tǒng),具備高容錯率、高吞吐量等特性,是業(yè)界使用最廣泛的開源分布式文件系統(tǒng),針對大狀態(tài)的Checkpoint任務(wù)非常契合,帶寬易擴展且成本低廉。

HDFS主要有如下幾項特點:

  • 和本地文件系統(tǒng)一樣的目錄樹視圖
  • Append Only 的寫入(不支持隨機寫)
  • 順序和隨機讀
  • 超大數(shù)據(jù)規(guī)模
  • 易擴展,容錯率高

3.2 得物自建HDFS架構(gòu)

架構(gòu)層面是典型的主從結(jié)構(gòu),架構(gòu)見下圖,核心思想是將文件按照固定大小進行分片存儲,

  • 主節(jié)點:稱為 NameNode,主要存放諸如目錄樹、文件分片信息、分片存放位置等元數(shù)據(jù)信息
  • 從節(jié)點:稱為 DataNode,主要用來存分片數(shù)據(jù)

比如用戶發(fā)出了一個1GB的文件寫請求給HDFS客戶端,HDFS客戶端會根據(jù)配置(默認是128MB),對這個文件進行切分,HDFS客戶端會切分成8個Block,然后詢問NameNode應(yīng)該將這些切分好的Block往哪幾臺DataNode上寫,此后client端和NameNode分配的多個DataNode構(gòu)成pipeline管道,開始以packet為單位向Datanode寫數(shù)據(jù)。

圖片

4、自建HDFS落地實踐

4.1 集群規(guī)劃

早期使用OSS的主要瓶頸在于帶寬,為了匹配將大狀態(tài)的任務(wù)從OSS遷移到Hdfs帶寬需求,支撐寫入流量100Gib+/s,對比OSS的帶寬成本,結(jié)合到成本與帶寬瓶頸考慮,內(nèi)部大數(shù)據(jù)d2s.5xlarge機型做了一次性能壓測,單節(jié)點吞吐能達到12Gib/s,按100Gib/s預(yù)估,算上Buffer,3副本集群需要xx臺機器,滿足現(xiàn)在的帶寬及寫入吞吐需求,最終選擇d2s.5xlarge類型Ecs機器,對應(yīng)實例詳情如下:

圖片

圖片

4.2 穩(wěn)定性保障建設(shè)

4.2.1 Hdfs組件指標(biāo)采集

為了確保HDFS集群的穩(wěn)定和可靠性,支撐線上實時Flink任務(wù)Checkpoint,監(jiān)控告警建設(shè)是必不可少的,我們通過統(tǒng)一的采集程序Hadoop Exporter將集群里各組件的JMX信息換為維度模型,將下述為扁平化的事實指標(biāo)Jmx數(shù)據(jù),轉(zhuǎn)換為維度結(jié)構(gòu),比如針對NameNode、DataNode,可以直接將指標(biāo)使用預(yù)定義維度,例如:cluster、instance等維度,并存儲到Prometheus能夠識別的指標(biāo)數(shù)據(jù),存儲為一個二維字典結(jié)構(gòu),例如: _hadoop_namenode_metrics[指標(biāo)分類(通常是MBean的名稱)][指標(biāo)名稱]

4.2.2 指標(biāo)采集架構(gòu)

結(jié)合當(dāng)前集群的規(guī)模,我們通過集中是Pull的方式采集架構(gòu),只需要啟動時指定集群Namenode及Jn的Jmx的url信息,就能采集集群的所有組件的指標(biāo)信息,這樣當(dāng)有集群擴展或變更時,會自動采集上報到apm里,方便運維,具體采集架構(gòu)如下圖:

圖片

4.2.3 監(jiān)控與告警

監(jiān)控:基于已采集匯報上的指標(biāo)數(shù)據(jù),目前配置了Namenode、Datanode組件核心指標(biāo)監(jiān)控大盤,包括HDFS節(jié)點健康狀態(tài)、HDFS服務(wù)健康狀態(tài)、數(shù)據(jù)塊健康狀態(tài)、節(jié)點的寫入吞吐量等指標(biāo)。

圖片

圖片

告警:當(dāng)前監(jiān)控數(shù)據(jù)已完成接入公司天眼監(jiān)控平臺,我們將影響hdfs服務(wù)可用性的指標(biāo)統(tǒng)一配置了告警模版,比如集群總的寫入帶寬、Callqueue隊列、DN存活數(shù)量、集群節(jié)點基礎(chǔ)io值班等,可以動態(tài)覆蓋多集群,實現(xiàn)定制化告警,更加靈活及方便感知問題,減少故障止損時長,滿足線上HDFS穩(wěn)定性保障SLA目標(biāo)。

4.2.4 集群快速變更能力

隨著Hdfs集群規(guī)模的增加,在日常運維過程中,如何做到快速擴、縮容、節(jié)點重啟及配置變更能力,

保障集群具備快速止損的能力,我們封裝了一整套HDFS的各組件變更能力,包括節(jié)點自動上報到cmdb對應(yīng)應(yīng)用、集群數(shù)據(jù)節(jié)點maintenance模式快速無影響重啟、日常變配等,并集成到ansible playbook,做到集群擴容在分鐘級完成。

圖片

4.3 遷移到HDFS攻克難關(guān)

4.3.1 DN 心跳匯報于刪除共用一把寫鎖問題

現(xiàn)象:自建Flink平臺大部分大狀態(tài)任務(wù)遷移后,自建HDFS集群節(jié)點整體的水位各個ecs的網(wǎng)絡(luò)帶寬峰值,出現(xiàn)偶發(fā)部分任務(wù)因checkpiont 寫入失敗問題,報錯信息如下:

問題定位過程:
  • 根據(jù)客戶端日志的堆棧信息,查看Namenode的日志找到對應(yīng)的文件、塊,發(fā)現(xiàn)了錯誤日志,文件塊在寫入成功后不能及時上報,塊的狀態(tài)一直處于not COMPLETE。

圖片

這里介紹下Hdfs文件寫入流程介紹:

客戶端向datanode寫入塊結(jié)束后,datanode通過IBR(增量塊匯報)向namenode匯報新寫入的塊

namenode收到匯報后更新文件的塊副本數(shù),當(dāng)文件塊副本數(shù)>=1時,文件寫入狀態(tài)為COMPLETE

客戶端寫入結(jié)束后不斷向namenode詢問文件寫入狀態(tài)是否COMPLETE,失敗5(默認)次后報錯寫入失敗。

  • 根據(jù)上述寫入流程,懷疑問題出現(xiàn)在IBR階段,查看Namenode監(jiān)控指標(biāo),Namenode處理塊匯報平均時長<10ms,所以猜測問題出在Datanode端,觀察發(fā)現(xiàn),Datanode偶發(fā)心跳匯報間隔>30s(正常3s一次),Datanode IBR和心跳都是BPServiceActor線程處理,很可能是心跳阻塞了IBR。

圖片

  • 我們根據(jù)猜測的方向,繼續(xù)定位什么原因?qū)е滦奶枞薎BR匯報,于是在每臺節(jié)點上,部署了腳本(見下圖),根據(jù)Datanode的Jmx指標(biāo)監(jiān)聽本節(jié)點心跳間隔,大于10s時就打印Datanode的Jstack。

Datanode 每個節(jié)點上的metric信息里包含心跳匯報間隔的數(shù)據(jù)。

圖片

  • 分析多個Jstack代碼(具體內(nèi)容見下),可以發(fā)現(xiàn)BPServiceActor線程被CommandProcessingThread線程阻塞,而CommandProcessingThread線程在調(diào)用invalidate()方法,而invalidate()是在調(diào)用刪除操作。
"BP-1732625734-****-1675758643065 heartbeating to ****:8020" #56 daemon prio=5 os_prio=0 tid=0x00007f8fc6417800 nid=0x77e0 waiting on condition [0x00007f8f933f5000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000720e9d988> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
        at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.writeLock(BPOfferService.java:118)
        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.updateActorStatesFromHeartbeat(BPOfferService.java:570)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.offerService(BPServiceActor.java:699)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.run(BPServiceActor.java:879)
        at java.lang.Thread.run(Thread.java:748)


   Locked ownable synchronizers:
        - None
        
"Command processor" #54 daemon prio=5 os_prio=0 tid=0x00007f8fc640f800 nid=0x77de runnable [0x00007f8f935f7000]
   java.lang.Thread.State: RUNNABLE
        at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)
        at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:242)
        at java.io.File.isDirectory(File.java:858)
        at java.io.File.toURI(File.java:741)
        at org.apache.hadoop.hdfs.server.datanode.LocalReplica.getBlockURI(LocalReplica.java:256)
        at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.invalidate(FsDatasetImpl.java:2133)
        at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.invalidate(FsDatasetImpl.java:2099)
        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.processCommandFromActive(BPOfferService.java:738)
        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.processCommandFromActor(BPOfferService.java:684)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.processCommand(BPServiceActor.java:1359)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.lambda$enqueue$2(BPServiceActor.java:1405)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread$$Lambda$75/2086554487.run(Unknown Source)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.processQueue(BPServiceActor.java:1332)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.run(BPServiceActor.java:1315)


   Locked ownable synchronizers:
        - <0x00000007204cf938> (a java.util.concurrent.locks.ReentrantReadWriteLock$FairSync)
        - <0x0000000720e9d988> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)

結(jié)合堆棧信息定位到代碼,確實發(fā)現(xiàn)processCommandFromActor方法在執(zhí)行刪除(調(diào)用invalidate()方法)操作時與心跳匯報updateActorStatesFromHeartbeat方法共用同一把寫鎖。

class BPOfferService {
private final Lock mWriteLock = mReadWriteLock.writeLock();
void writeLock() {
  mWriteLock.lock();
}


void writeUnlock() {
  mWriteLock.unlock();
}


void updateActorStatesFromHeartbeat(
    BPServiceActor actor,
    NNHAStatusHeartbeat nnHaState) {
  writeLock();
  try {
//... 心跳匯報
  } finally {
    writeUnlock();
  }
}
boolean processCommandFromActor(DatanodeCommand cmd,
    BPServiceActor actor) throws IOException {
  assert bpServices.contains(actor);
// ...省略
  writeLock();
  try {
//...執(zhí)行刪除邏輯
  } finally {
    writeUnlock();
  }
}
}
  • 確認問題:查看Namenode審計日志發(fā)現(xiàn),集群持續(xù)有大量文件刪除(Flink刪除過期Checkpoint meta文件)操作,修改Datanode端代碼,在調(diào)用processCommandFromActive方法超過一定10s后打印調(diào)用時長與CommandAction日志。查看datanode日志發(fā)現(xiàn)確實存在刪除操作大于30s的情況,由此確認問題就是出現(xiàn)在刪除操作耗時過長影響了Datanode的增量塊匯報。

圖片

由此確定問題:

刪除塊操作耗時過長,阻塞datanode心跳,導(dǎo)致IBR被阻塞,塊寫入成功后不能及時上報,客戶端重試一定次數(shù)后失敗拋異常,重試次數(shù)由dfs.client.block.write.locateFollowingBlock.retries決定,默認5次,第一次等待0.4s,之后每次等待時長翻倍,5次約為15s左右。

 問題解決方案

找到問題就是出現(xiàn)在BPServiceActor 線程做了太多的事,包含F(xiàn)BR、IBR、心跳匯報,而且心跳匯報和刪除共同持有一把寫鎖,那解決方案一個就把這兩把鎖進行拆分,一個就是將IBR邏輯單獨獨立出來,不受心跳匯報影響。

而社區(qū)3.4.0版本已經(jīng)將IBR從BPServiceActor 線程獨立出來了,所有我們最終將HDFS-16016 patch 合并到自建Hdfs3.3.3版本中,IBR不會被invalidate()阻塞,問題得到根治!

圖片

5、總結(jié)與規(guī)劃

總結(jié):Oss的流量已從早期137Gib/s降低到30Gib/s左右(下圖一),自建Hdfs集群峰值流量達到120Gb/s(下圖二),且平穩(wěn)運行

圖片

圖片

整個項目已完成全部大狀態(tài)任務(wù)從Oss遷移到自建Hdfs,當(dāng)前Hdfs集群規(guī)模xx臺,成本x w/月,原OSS帶寬費用阿里報價1x w/月,相比節(jié)省xx w/月。

未來規(guī)劃:對于全量 checkpoint 來說,TM 將每個 Checkpoint 內(nèi)部的數(shù)據(jù)都寫到同一個文件,而對于 RocksDBStateBackend 的增量 Checkpoint 來說,則會將每個 sst 文件寫到一個分布式系統(tǒng)的文件內(nèi),當(dāng)作業(yè)量很大,且作業(yè)的并發(fā)很大時,則會對底層 HDFS 形成非常大的壓力,

1)大量的 RPC 請求會影響 RPC 的響應(yīng)時間。

2)大量文件對 NameNode 內(nèi)存造成很大壓力。

針對上面的問題我們未來考慮引入小文件合并方案降低 HDFS 的壓力,包括 RPC 壓力以及 NameNode 內(nèi)存的壓力。

責(zé)任編輯:武曉燕 來源: 得物技術(shù)
相關(guān)推薦

2025-02-20 09:17:50

2023-07-07 19:26:50

自建DTS平臺

2023-03-30 18:39:36

2023-10-09 18:35:37

得物Redis架構(gòu)

2025-03-13 06:48:22

2023-03-06 10:24:53

CIO驅(qū)動技術(shù)

2023-02-23 17:45:26

服務(wù)器DHHCTO

2021-01-22 10:27:44

物聯(lián)網(wǎng)互聯(lián)網(wǎng)IoT

2021-10-14 06:52:47

算法校驗碼結(jié)構(gòu)

2023-02-08 18:33:49

SRE探索業(yè)務(wù)

2023-11-27 18:38:57

得物商家測試

2024-09-11 19:36:24

2022-12-14 18:40:04

得物染色環(huán)境

2024-12-19 09:45:24

2022-12-07 08:31:45

ClickHouse并行計算數(shù)據(jù)

2023-08-30 09:00:05

2023-09-13 18:59:40

SRE視角藍綠發(fā)布

2022-01-14 07:56:38

Checkpoint機制Flink

2023-07-19 22:17:21

Android資源優(yōu)化

2023-08-09 20:43:32

點贊
收藏

51CTO技術(shù)棧公眾號