作者 | Jialin Liu, Mengyuan Chao, Jian Li, Wei Peng, Sixiang Ma, Wei Xu, Run Yang, Xin Chen
RayRTC 是字節(jié)基礎(chǔ)架構(gòu)組與字節(jié) AML 組共同合作,在內(nèi)部 RTC(Realtime Text Classification)文本訓(xùn)練平臺(tái)上基于 Ray 進(jìn)行的下一代 Serverless ML 的探索。RTC 文本分類平臺(tái)是一個(gè)一站式的 NLP 服務(wù)平臺(tái),包括了數(shù)據(jù)預(yù)處理,標(biāo)注,模型訓(xùn)練,打分,評(píng)估,AutoML 以及模型推理等機(jī)器學(xué)習(xí)全流程。目前字節(jié)內(nèi)各大產(chǎn)品,包括抖音,TikTok,頭條,西瓜,番茄等都有使用該平臺(tái)提供的相關(guān)自然語言能力。RayRTC 通過算法與系統(tǒng)的協(xié)同設(shè)計(jì)及 Serverless 等技術(shù)為 RTC 提供了性能和資源利用率的極致優(yōu)化,并由此抽象出一套通用的 Serverless ML 框架,目前已在字節(jié)內(nèi)部機(jī)器學(xué)習(xí)平臺(tái)上部署上線。
RayRTC 的核心計(jì)算引擎是 Ray,最早是 UC Berkeley 的一個(gè)針對(duì)強(qiáng)化學(xué)習(xí)所設(shè)計(jì)的大規(guī)模分布式計(jì)算框架。Ray 的作者 Robert Nishihara 和 Philipp Moritz 在此基礎(chǔ)上成立了 Anyscale 這家公司。開源項(xiàng)目千千萬,能成功商業(yè)化并在硅谷乃至整個(gè) IT 屆產(chǎn)生顛覆性影響的鳳毛麟角。Anyscale 的創(chuàng)始人中包括 Ion Stoica,這位羅馬尼亞籍教授上一家公司是跟他的學(xué)生 Matei Zaharia 以 Spark 技術(shù)為基礎(chǔ)成立的 Databricks 。Spark 和 Ray 分別誕生于大數(shù)據(jù)和機(jī)器學(xué)習(xí)時(shí)代,前者已經(jīng)在工業(yè)界得到廣泛應(yīng)用,后者也逐漸引起越來越多的公司在不同業(yè)務(wù)場景進(jìn)行探索。字節(jié)美研計(jì)算團(tuán)隊(duì)自 2020 年末開始接觸 Ray,2021 年開始在不同場景小范圍試驗(yàn)。RTC 文本分類平臺(tái)是第一個(gè)大規(guī)模上線的 Ray 應(yīng)用場景,在 RayRTC 的設(shè)計(jì)過程中,有不少第一手的經(jīng)驗(yàn)值得分享。本文從 RayRTC 所遇到的實(shí)際問題出發(fā),對(duì) Ray 在字節(jié)的實(shí)踐進(jìn)行介紹。
第一次接觸 Ray 的讀者可能會(huì)問,除了明星創(chuàng)始人團(tuán)隊(duì),深度貼近當(dāng)前 ML 需求的產(chǎn)學(xué)研支持,Ray 這套框架到底有哪些吸引人的地方?
首先是以 Ray 為底座可以非常輕松構(gòu)建完整機(jī)器學(xué)習(xí)完整生態(tài),如下圖所示:
機(jī)器學(xué)習(xí)的研發(fā)人員往往不僅需要關(guān)注算法本身,在實(shí)際的生產(chǎn)環(huán)境中,各個(gè)環(huán)節(jié)所涉及的工程量和運(yùn)維量也不容小覷。不少研究表明,工程師們有 80-90%的時(shí)間和精力投入在了算法之外的數(shù)據(jù)處理,全流程打通等。Ray 社區(qū)在近幾年的演進(jìn)中,不斷吸收業(yè)界領(lǐng)先的理念,積極地與其他開源社區(qū)和各大廠商進(jìn)行合作交流。以 Ray 為計(jì)算引擎的上層生態(tài)的豐富度是別的開源生態(tài)中不常見的。比如大數(shù)據(jù)處理方面,有 Intel 設(shè)計(jì)的 RayDP,將 Spark 無縫集成到 Ray 中,通過 Ray 的 Actor 拉起 Spark 的 executor,利用 Ray 的分布式調(diào)度實(shí)現(xiàn)資源細(xì)粒度的調(diào)控。這樣做的好處在于以 Spark 為大數(shù)據(jù)引擎的機(jī)器學(xué)習(xí)應(yīng)用中,通過 Ray 可以將 Spark 產(chǎn)生的 dataframe 以 ML Dataset 的形式直接從內(nèi)存?zhèn)鹘o下游的機(jī)器學(xué)習(xí)框架,比如 PyTorch。而 Ray 的生態(tài)里的其他組件,比如超參訓(xùn)練(Ray Tune)和推理服務(wù)(Ray Serve),則進(jìn)一步補(bǔ)足了訓(xùn)練階段后續(xù)的一系列工程需求。研發(fā)人員可以拋開繁瑣的上線部署流程,實(shí)現(xiàn)一鍵分布式以及一鍵部署。
Ray 的另一個(gè)顯著優(yōu)勢是其簡單通用的 API ,只需在一段函數(shù)上加入ray.remote 的裝飾器,便可將一個(gè)單機(jī)程序變成分布式執(zhí)行單元,如下所示:
#declare a Ray task
@ray.remote
def fun(a):
return a + 1
#submit and execute a Ray task
fun.remote()
#declare a Ray actor
@ray.remote
class Actor():
def fun(slef, a):
return a+1
actor = Actor.remote()
#execute an actor method
actor.fun.remote()
Ray 中最基礎(chǔ)的概念包括 Task 和 Actor,分別對(duì)應(yīng)函數(shù)和類。函數(shù)一般是無狀態(tài)的,在 Ray 里被封裝成 Task,從而被 Ray 的分布式系統(tǒng)進(jìn)行調(diào)度;類一般是有狀態(tài)的,在 Ray 里被映射成一個(gè) Actor。Actor 的表達(dá)性更強(qiáng),能覆蓋大多數(shù)的應(yīng)用程序子模塊?;?Actor 和 Task,Ray 對(duì)用戶暴露了資源的概念,即每個(gè) actor 或 task 都可以指定運(yùn)行所需要的資源,這對(duì)異構(gòu)的支持從開發(fā)人員的角度變得非常便利。比如:
@ray.remote(num_cpus=1, num_gpus=0.2):
def infer(data):
return model(data)
當(dāng) task 在被提交執(zhí)行的時(shí)候,Ray 的調(diào)度器會(huì)去找到一個(gè)滿足指定資源需求的節(jié)點(diǎn)。在此同時(shí) Ray 會(huì)考慮數(shù)據(jù)的 locality。比如上述例子中的“data”,實(shí)際運(yùn)行中可能會(huì)分布在任意一個(gè)遠(yuǎn)端的節(jié)點(diǎn)的內(nèi)存里,如果 task 不在數(shù)據(jù)所在的節(jié)點(diǎn)上執(zhí)行,跨節(jié)點(diǎn)的數(shù)據(jù)傳輸就無法避免。而 Ray 可以讓這一類的優(yōu)化變得透明。框架開發(fā)人員也可以利用 Ray 的 API 集成更豐富調(diào)度策略,最終提供給用戶的是非常簡單的 API。Ray 對(duì) Actor 和 Task 還有很多高級(jí)的細(xì)粒度控制特性,比如支持 gang-scheduling 的 placement group 等,在此不一一贅述。
Ray 另外的優(yōu)勢在于:
高效的數(shù)據(jù)傳遞和存儲(chǔ):Ray 通過共享內(nèi)存實(shí)現(xiàn)了一個(gè)輕量級(jí)的 plasma 分布式 object store。數(shù)據(jù)通過 Apache Arrow 格式存儲(chǔ)。
分布式調(diào)度:Ray 的調(diào)度是 decentralized,每個(gè)節(jié)點(diǎn)上的 raylet 都可以進(jìn)行調(diào)度;raylet 通過向 gcs 發(fā)送 heart beat 獲取全局信息,在本地優(yōu)先調(diào)度不能滿足的情況下,快速讓位給周邊 raylet 進(jìn)行調(diào)度。
多語言的支持:目前已經(jīng)支持的語言包括:Python, Java, C++。后續(xù) go 的支持以及更通用的多語言架構(gòu)設(shè)計(jì)也在進(jìn)行中。
下圖是 RayRTC 的一個(gè)早期設(shè)計(jì)規(guī)劃圖和階段一核心部分(DP+Training)的 Actor 封裝流程圖。本文著重講解階段一,二的設(shè)計(jì)和實(shí)現(xiàn)。其中在階段一中所用到的核心組件包括 Ray Actor Pool 和 RaySGD 等。
“DP+Training” Actor 化流程圖:
其中主要包括 DataProcessing 和 Training 兩個(gè) Stage。每一部分的核心計(jì)算邏輯都用 Ray 的 API 封裝成為 Actor 或 Task。Actor 提交運(yùn)行后通過 Ray 的調(diào)度被放置到合適的節(jié)點(diǎn)上執(zhí)行。Ray 的集群資源可以通過改造后的 Autoscaler 在字節(jié)內(nèi)的 Yarn/K8S 集群上實(shí)現(xiàn)動(dòng)態(tài)擴(kuò)縮容。
DP 實(shí)現(xiàn)過程中,我們利用 Ray 的 ActorPool 解決了一個(gè)因?yàn)閯?chuàng)建 Actor 數(shù)目過多而導(dǎo)致的 OOM 問題。Actorpool 本身相當(dāng)于一個(gè)線程池,但 Ray 的 Actorpool 可以被開發(fā)者拓展為更高階的彈性線程池。在 RayRTC 中,給定一組數(shù)據(jù),我們需要解決的核心問題之一是使用多少 Ray 的 actor 是比較高效的。這里的高效指:資源使用高效,性能較優(yōu)且穩(wěn)定性較好(不能 oom)等。最簡單的設(shè)計(jì)方式是 1 對(duì) 1,即對(duì)于每一個(gè) HDFS 路徑, 都指定一個(gè)單獨(dú)的 DP Actor 來進(jìn)行處理。但當(dāng)數(shù)據(jù)量線性增長時(shí),由于缺少內(nèi)存管控而很容易出現(xiàn) OOM。最極端的方式是 n 對(duì) 1,即用一個(gè) actor,順序處理所有數(shù)據(jù),這樣做顯然無法發(fā)揮 Ray 的分布式能力。比較理想的方式是 n 對(duì) m,即 m 個(gè) actor 處理 n 組數(shù)據(jù)。作為對(duì)比,1 對(duì) 1 的情況如下:
ray_preprocessor_ret_refs = []
for hdfs_file_path in hdfs_file_path_list:
my_dp = ray.remote(DP).remote(hdfs_file_path)
ray_preprocessor_ret_refs.append(my_dp.__call__.remote())
n 對(duì) m 的情況:
num_cpus = 10
actors = [ray.remote(DP).remote() for _ in range(num_cpus)]
actor_pool = ray.util.ActorPool(actors)
for hdfs_file_path in hdfs_file_path_list:
actor_pool.submit(lambda actor, info: actor.__call__.remote(**info),
hdfs_file_path)
在生產(chǎn)實(shí)踐中,通過對(duì) m 取一個(gè)定值,比如 m=10,可以有效控制內(nèi)存使用并實(shí)現(xiàn) I/O 并行。如前所述,給定一個(gè)動(dòng)態(tài)的 workload,我們也可以對(duì) m 的進(jìn)行彈性支持,類似于 K8S 的 HPA 或 Spark 的 dynamic allocation。不同的是,在 Ray 里,開發(fā)者通過可編程的方式實(shí)現(xiàn)定制化的 dynamic allocation,比較簡單的實(shí)現(xiàn)任意粒度的自動(dòng)擴(kuò)縮。這一部分的代碼可以參考最新版本的 Ray dataset 中的類似實(shí)現(xiàn)(https://github.com/ray-project/ray/blob/master/python/ray/data/impl/compute.py)。
Training 部分的邏輯由于歷史原因,在字節(jié)的內(nèi)部場景有比較復(fù)雜的深度定制。對(duì)此,我們采用了 Ray 社區(qū)第一版的 Ray SGD(最新的版本中,這一模塊為 Ray Train)對(duì)已有訓(xùn)練模塊進(jìn)行封裝。RaySGD 是一個(gè)輕量級(jí)的分布式訓(xùn)練框架,支持 PyTorch 和 TensorFlow。底層直接集成了 PyTorch 的 DDP 和 Tensorflow 的 MirroredStrategy 來進(jìn)行數(shù)據(jù)并行。RaySGD 通過把訓(xùn)練 worker 用 actor 進(jìn)行封裝,不僅實(shí)現(xiàn)了更靈活的分布式統(tǒng)一調(diào)度,而且與整個(gè) Ray 生態(tài)打通。比如可以與 Ray Tune(超參)和 Ray Serve(推理)直接在 actor 這一粒度上進(jìn)行通信和數(shù)據(jù)傳輸。
數(shù)據(jù)并行的分布式訓(xùn)練相比模型并行和混合并行的模式都要相對(duì)簡單。但把一個(gè)復(fù)雜的單機(jī)版 NLP 訓(xùn)練框架通過 Ray 封裝為分布式框架,并做到對(duì)原代碼侵入性最小,需要處理好以下幾個(gè)問題:
- 單節(jié)點(diǎn)的訓(xùn)練邏輯,如何設(shè)置模型,如何在 CPU 和 GPU 之間傳遞數(shù)據(jù)
- 如何設(shè)置 dataloader 以及 sampler,實(shí)現(xiàn)分布式數(shù)據(jù)讀取
- 如何控制一個(gè) epoch 里的 batch 循環(huán)
- 分布式訓(xùn)練邏輯,如何設(shè)置 worker 數(shù)量
- 如何使用 Ray 拉起 worker,并能在 worker 間通信
對(duì)于前 3 個(gè)問題,RayRTC 實(shí)現(xiàn)了 RayRTCTrainoperator,繼承自 ray.util.sgd.torch 中的 TrainingOperator,把單節(jié)點(diǎn)上的訓(xùn)練邏輯全部抽象到一個(gè)類。
class RayRTCTrainOperator(TrainingOperator):
def setup(self, config):
# Setup data
self.train_loader = DataLoader(self.train_data,...)
self.valid_loader = DataLoader(self.valid_data,...)
# Register data loader
self.register_data(
train_loader=self.train_loader,
validation_loader=self.valid_loader)
...
# Register model, optimizer
self.model, self.optimizer = \
self.register(models=model, optimizers=optimizer,...)
在 RayRTCTrainOperator 這個(gè)類中,首先設(shè)置好訓(xùn)練所需要的模型和數(shù)據(jù),并將 optimizer,scheduler 等參數(shù)傳入。這些數(shù)據(jù)會(huì)隨著 RayRTCTrainOperator 這個(gè)類被 Ray 封裝為 actor,從而分布到不同的節(jié)點(diǎn)上,從而使得每個(gè)節(jié)點(diǎn)上都有一份完全一樣的模型的拷貝和參數(shù)的初始狀態(tài)。
數(shù)據(jù)格式的不同:
除了模型和數(shù)據(jù)的 setup,具體的訓(xùn)練邏輯需要根據(jù) RTC 的場景進(jìn)行定制。比如,每一個(gè) epoch 的訓(xùn)練,以及一個(gè) epoch 中每一個(gè) batch 的訓(xùn)練。由于 RaySGD 對(duì)于 input 有一定的格式假設(shè),導(dǎo)致在 RayRTCTrainOperator 中,需要重定義 train_epoch 和 train_batch 這兩個(gè)函數(shù)以便正確處理數(shù)據(jù)和 metrics。舉例而言,在 RaySGD 中,batch input 需要符合以下格式:
*features, target = batch
(https://github.com/ray-project/ray/blob/ray-1.3.0/python/ray/util/sgd/torch/training_operator.py#L536)
而實(shí)際的場景中,用戶往往對(duì)數(shù)據(jù)格式有自己的定義。比如 RTC 中,batch 被定義為 Dict:
TensorDict = Dict[str, Union[torch.Tensor, Dict[str, torch.Tensor]]]
使用 RaySGD 中默認(rèn)的 train_batch 函數(shù),會(huì)在數(shù)據(jù) unpack 時(shí)候發(fā)生錯(cuò)誤。在 RayRTC 中,重寫的 train_batch 把處理后 batch 以正確的格式傳給 forward 函數(shù)。
訓(xùn)練指標(biāo)的自定義問題:
在 train_epoch 中,同樣有需要特殊處理的地方。RaySGD 默認(rèn)支持的 metrics 只包括 loss 等。RTC 中,用戶主要關(guān)心的指標(biāo)包括 accuracy, precision, recall 以及 f1 measure 等。這些指標(biāo)如何在 RaySGD 中加入是 RayRTC 實(shí)現(xiàn)過程中遇到的一個(gè)不小的挑戰(zhàn)。一方面由于 RTC 本身已經(jīng)實(shí)現(xiàn)了豐富的 metrics 計(jì)算模塊,一方面 RaySGD 對(duì)訓(xùn)練過程中 metrics 的處理有固定的假設(shè)和且封裝在比較底層。RayRTC 最終采取的方法是把 RTC 中的 metrics 計(jì)算模塊復(fù)用到 RaySGD 的 train_epoch 中。另外遇到的一個(gè)問題是 RTC 的 metrics 計(jì)算需要把 model 作為參數(shù)傳入,而 RaySGD 中的 model 已經(jīng)被 DDP 封裝,直接傳入會(huì)導(dǎo)致出錯(cuò)。最后,train_epoch 需要加入如下改動(dòng):
if hasattr(model, 'module'):
metrics = rtc.get_metrics(model.module, ... reset=True)
else:
metrics = rtc.get_metrics(model, ... reset=True)
改動(dòng)之后同時(shí)兼容了分布式和單機(jī)(沒有被 DDP 封裝)的情況。
RayRTCTrainOperator 可以理解為單機(jī)的訓(xùn)練模塊,到了分布式環(huán)境下,可以通過 TorchTrainer 這個(gè)類。如下所示:
trainer = TorchTrainer(
training_operator_cls=RayRTCTrainOperator,
num_workers=self.num_workers,
use_fp16=self.use_fp16,
use_gpu=self.use_gpu,
...
num_cpus_per_worker=self.cpu_worker
)
Trainer 的主要功能是設(shè)置 training worker 的數(shù)量,混合精度,以及 worker 的 cpu 和 gpu。應(yīng)用程序通過 trainer 可以非常簡單地控制整個(gè)分布式訓(xùn)練的邏輯:
for epoch in epochs:
metrics['train'] = trainer.train()
metrics['validate'] = trainer.validate()
return metrics
Trainer 的底層邏輯中包括了拉起 worker group(https://github.com/ray-project/ray/blob/8ce01ea2cc7eddd40c2415904fa94198c0fe1e44/python/ray/util/sgd/torch/worker_group.py#L195),每一個(gè)worker用actor表達(dá),從而形成一個(gè)actor group。RaySGD 也會(huì)處理 communication group 的 setup,以及 actor 的失敗重啟。經(jīng)過這些封裝,用戶只需要關(guān)注跟訓(xùn)練最直接相關(guān)的邏輯,而不需要花過多時(shí)間在底層通訊,調(diào)度等分布式邏輯,極大提高了編程效率。
Checkpoint 的問題:
在改造基本完成后,我們用抖音的數(shù)據(jù)進(jìn)行測試,發(fā)現(xiàn)模型在多卡時(shí),沒有任何調(diào)參的情況下,性能已經(jīng)可以與單機(jī)持平,符合上線要求。但第一次上線測試后,發(fā)現(xiàn) RayRTC 訓(xùn)練出來的模型連基線模型都打不過,準(zhǔn)確率甚至低到 30%。在把所有控制變量固定仍然沒有沒有找到原因后,第一反應(yīng)是 RayRTC 訓(xùn)練出來的模型可能并沒有真正保存下來,以致線上打分用到的實(shí)際是 pre-trained 的 bert 模型。事實(shí)證明確實(shí)如此,而導(dǎo)致這個(gè)原因是因?yàn)?RaySGD 中的 training worker 是在遠(yuǎn)端運(yùn)行,driver 端所初始的數(shù)據(jù)結(jié)構(gòu)隨著訓(xùn)練進(jìn)行會(huì)與之逐漸不同步。checkpointing 之前需要取得更新后的模型參數(shù),代碼如下所示:
for epoch in epochs:
metrics['train'] = trainer.train()
metrics['validate'] = trainer.validate()
self.model = trainer.get_model()
self.save_checkpoint()
return metrics
與之前比較,增加了第 4 行,通過 trainer 獲得更新后的 model,并通過 checkpoint 將模型持久化。
改造侵入性問題:
Anyscale 在一篇博客[https://www.anyscale.com/blog/ray-distributed-library-patterns]中總結(jié)了使用 Ray 的幾種 pattern。其中大致可以分為三類,RayRTC 屬于第三類。
- 用 Ray 做調(diào)度,比如 RayDP
- 用 Ray 做調(diào)度和通信,比如螞蟻的在線資源分配
- 用 Ray 做調(diào)度,通信,數(shù)據(jù)內(nèi)存存儲(chǔ)
從第一類到第三類,用 Ray 的層次加深,但并不意味著改造成本線性增加。具體的應(yīng)用需要具體分析。單純從代碼改動(dòng)量上分析,RayRTC 第一階段改了大概 2000 行代碼,占原應(yīng)用總代碼量的 1%不到。
同時(shí),RayRTC 把訓(xùn)練模塊單獨(dú)抽象出來,與原有代碼保持松耦合關(guān)系。用戶使用的時(shí)候,只需要載入相關(guān) RayRTC 的模塊,即可啟動(dòng) Ray 進(jìn)行分布式訓(xùn)練。
實(shí)驗(yàn)效果:
RayRTC 第一階段在 1 到 8 卡(NVIDIA V100)上進(jìn)行 scaling 測試,如下圖所示:
訓(xùn)練速度上,RayRTC 的性能隨卡數(shù)呈現(xiàn)線性增加。訓(xùn)練準(zhǔn)確度上,RayRTC 沒有因?yàn)?global batch size 的增加而顯著降低。8 卡訓(xùn)練中,單個(gè) epoch 時(shí)間降到了 6 分鐘以內(nèi)。以往研發(fā)人員往往需要等待幾個(gè)小時(shí)才能拿到訓(xùn)練結(jié)果,導(dǎo)致大家都習(xí)慣在下班前大量提交作業(yè),第二天再來查看效果。整體集群 quota 資源利用率在白天不高,在晚上排隊(duì)高峰。經(jīng)過 RayRTC 提速后,研發(fā)人員會(huì)越來越多的進(jìn)行接近交互式的開發(fā)迭代。
RayRTC pipeline
RayRTC 在字節(jié)內(nèi)部運(yùn)行在 Arnold 機(jī)器學(xué)習(xí)平臺(tái)。用戶在提交一個(gè) RayRTC 任務(wù)時(shí),對(duì)應(yīng)在 Arnold 平臺(tái)上拉起一個(gè) Trial。一個(gè) Trial 里,用戶配置一個(gè)或多個(gè) container 以及每個(gè) container 所需的 CPU/GPU/Mem 資源。在一個(gè) RayRTC 任務(wù)的整個(gè)生命周期中,對(duì)應(yīng) Trial 的資源是一直占用的。下圖展示了某 RTC 任務(wù)運(yùn)行期間的 GPU 資源使用情況。
如圖所示,在 Data Processing(DP)階段,GPU 資源完全處于 idle 狀態(tài)。造成這個(gè)現(xiàn)象的主要原因是當(dāng)前的 RayRTC 階段一方案雖然在 DP 和 Training 階段都充分利用 Ray 的并行能力進(jìn)行加速,但是這兩個(gè) stage 之間本質(zhì)還是串行執(zhí)行:Training 階段必須等到 DP 結(jié)束了才開始。對(duì)于 DP 時(shí)間長的 RayRTC 任務(wù),這將帶來很大的 GPU 資源浪費(fèi)。為了提高 GPU 資源使用率,我們結(jié)合 Ray Datasets 提供的 pipeline 功能, 提出并實(shí)現(xiàn)了 RayRTC 的流水并行方案 RayRTC pipeline。
Ray Datasets 是在 Ray1.6+版本引入的在 Ray 的 libraries 和應(yīng)用之間加載和交換數(shù)據(jù)標(biāo)準(zhǔn)化方法,其本身提供了一定的基本分布式數(shù)據(jù)處理能力,如 map, filter, repartition 等。如下圖所示,數(shù)據(jù)經(jīng)過 ETL 后,進(jìn)入 ML Training 系統(tǒng)前,可以先通過 Ray Datasets 的 API 進(jìn)行 last mile 的預(yù)處理。換言之,RayRTC 中的 DP 部分,完全可以用 Ray Datasets APIs 這種 Ray 標(biāo)準(zhǔn)化的方式重構(gòu),并與后面的 RaySGD(現(xiàn) Ray Train)打通。
除了提供 last mile 預(yù)處理標(biāo)準(zhǔn)化 APIs, Ray Dataset s 還提供了一組非常重要的 pipeline 接口,使得 DP 部分和 Training 部分的流水并行執(zhí)行成為可能。所謂流水并行執(zhí)行,如下圖所示,Training 執(zhí)行并不會(huì)等到 DP 全部結(jié)束后才開始,而是一旦 DP 完成了一小部分就會(huì)把處理后的數(shù)據(jù)直接傳入 Training 部分。流水處理有效減少 GPU idle 時(shí)間并縮短整個(gè)端到端 RTC 訓(xùn)練時(shí)間。
基于 Ray Datasets 的 RayRTC pipeline 實(shí)現(xiàn)
RayRTC pipeline 版本一:把 DP 部分當(dāng)做黑盒
考慮到 RTC 中 DP 的復(fù)雜邏輯,在 RayRTC pipeline 版本一中,我們把 DP 當(dāng)作黑盒處理。改造需求如下:
- DP(含 IO, trasforms, 數(shù)據(jù)集 split 等邏輯)與 Training 需要以 window 粒度流水并行,其中 DP 的 input 是文件路徑 fp_i,output 是訓(xùn)練和驗(yàn)證數(shù)據(jù)集{'T':Ti, 'V':Vi}。
- DP 中的 split 邏輯要保證多 epoch 訓(xùn)練中每個(gè) epoch 拿到的訓(xùn)練/驗(yàn)證數(shù)據(jù)集都相同,否則會(huì)導(dǎo)致數(shù)據(jù)泄露。多 epoch 訓(xùn)練中,只有第一個(gè) epoch 拿到的訓(xùn)練/驗(yàn)證數(shù)據(jù)集真正經(jīng)歷 DP,其余 epoch 都復(fù)用之前已經(jīng)處理分割好的數(shù)據(jù)集。
為滿足以上需求,我們利用 Ray Datasets 的 API 實(shí)現(xiàn)如下:
dsp= ray.data.from_items([fp1, fp2, …., fpn],parallelism=n)
.window(blocks_per_window=2).map(dp).repeat().split(2)
但是,以上改造無法滿足“每個(gè)訓(xùn)練 worker 拿到相同數(shù)目的 training instances”這個(gè)需求,因?yàn)樵摳脑熘械?split 的粒度其實(shí)還是“文件”而非“training instances”,而每個(gè)文件中包含的 training instances 數(shù)很可能不一樣。為了滿足這個(gè)需求,我們更新實(shí)現(xiàn)如下:
dsp_train= ray.data.from_items([fp1, fp2, …., fpn],parallelism=n)
.window(blocks_per_window=2).map(dp).flat_map(takeT).repeat()
.split(2, equal=True)
dsp_valid= ray.data.from_items([fp1, fp2, …., fpn],parallelism=n)
.window(blocks_per_window=2).map(dp).flat_map(takeV).repeat()
.split(2, equal=True)
其中:
def takeT(row):
train_data = row['T'].iter_rows()
for data in train_data:
yield data.as_pydict()
def takeV(row):
train_data = row['V'].iter_rows()
for data in train_data:
yield data.as_pydict()
但是更新后的實(shí)現(xiàn)帶來了新問題:dsp_train 和 dsp_valid 實(shí)際對(duì)應(yīng)兩次不同的 DP split 邏輯,從而導(dǎo)致了數(shù)據(jù)泄露。我們需要類似如下實(shí)現(xiàn)來解決:
dsp_train,dsp_valid = ray.data.from_items([fp1, fp2, …., fpn],parallelism=n)
.window(blocks_per_window=2).map(dp).unzip_and_flat_map('T', 'V')
.repeat().split(2, equal=True)
其中, unzip_and_flat_map 既有類似 unzip 功能,把原數(shù)據(jù)集分割成兩個(gè)數(shù)據(jù)集,原來數(shù)據(jù)集的 Row={'T':Ti, 'V':Vi} 變成兩個(gè)新數(shù)據(jù)集的 Row1=Ti,Row2=Vi;又有 flat_map 功能,把數(shù)據(jù)集的 Row1=Ti 真正展開成 Row=Training Instance。考慮到這個(gè) API 實(shí)現(xiàn)復(fù)雜且不具通用性,我們放棄了該版本改造,轉(zhuǎn)向了 RayRTC pipeline 的版本二實(shí)現(xiàn),把 DP 中的數(shù)據(jù)集分割邏輯抽取出來并提前,從開始就構(gòu)造獨(dú)立的訓(xùn)練/驗(yàn)證 pipeline,其余剩下的 DP 邏輯保留。
RayRTC pipeline 版本二:把 DP 中的數(shù)據(jù)集 Split 邏輯抽取出來并提前
在 RayRTC pipeline 版本二實(shí)現(xiàn)中,我們將數(shù)據(jù)集 scaling 和 split 邏輯抽取出來往前移,先構(gòu)造訓(xùn)練和驗(yàn)證數(shù)據(jù)集。然后,分別從這兩個(gè)數(shù)據(jù)集構(gòu)造相應(yīng)的訓(xùn)練/驗(yàn)證 pipelines。具體實(shí)現(xiàn)如下:
train_dataset, valid_dataset = self.get_datasets()
train_dataset_pipeline = train_dataset.window(blocks_per_window=2)
.flat_map(dp).repeat()
.random_shuffle_each_window().split(2, equal=True) # 2 is #trainWorkers
valid_dataset_pipeline = valid_dataset.window(blocks_per_window=2)
.flat_map(dp).repeat().split(2, equal=True) # 2 is #trainWorkers
其中:
def get_datasets(self):
# read dataset from hdfs
new_dataset = ray.data.read_api.read_json(partition_info_list)
# scale dataset up
scaled_dataset = new_dataset.flat_map(scale)
# shuffle dataset
shuffled_dataset = scaled_dataset.random_shuffle()
# split dataset into training and validation datasets
train_valid_ratio = 0.9
return shuffled_dataset.split_at_indices([int(shuffled_dataset.count() * train_valid_ratio)])
接著,train_dataset_pipeline 和 valid_dataset_pipeline 被傳入 trainer:在每個(gè) training worker 的 setup() 中,根據(jù)自己的 rank 得到相應(yīng)的子 pipeline。
self.train_dataset_pipeline = self.train_pipeline[self.world_rank]
self.train_dataset_pipeline_epoch = self.train_dataset_pipeline.iter_epochs()
self.valid_dataset_pipeline = self.valid_pipeline[self.world_rank]
self.valid_dataset_pipeline_epoch = self.valid_dataset_pipeline.iter_epochs()
在 training worker 的 train_epoch() 中,從子 training pipeline 中獲取 training instances 訓(xùn)練。
def train_epoch():
dataset_for_this_epoch = next(self.train_dataset_pipeline_epoch)
train_dataset = self.data_parser.parse(dataset_for_this_epoch)
train_loader = DataLoader(train_dataset)
for batch_idx, batch in enumerate(train_loader):
metrics = self.train_batch(batch, batch_info)
在 training worker 的 validate() 中, 從子 validation pipeline 中獲取 validation instances 驗(yàn)證。
def validate():
dataset_for_this_epoch = next(self.valid_dataset_pipeline_epoch)
valid_dataset = self.data_parser.parse(dataset_for_this_epoch)
valid_loader = DataLoader(valid_dataset)
for batch_idx, batch in enumerate(valid_loader):
metrics = self.validate_batch(batch, batch_info)
實(shí)驗(yàn)效果:
為驗(yàn)證 RayRTC-pipeline 效果,我們隨機(jī)選擇中等規(guī)模 RTC training job (約 168 萬條 instance),使用同等計(jì)算資源(2CPUs, 2GPUs)簡單做了如下對(duì)比實(shí)驗(yàn)。結(jié)果顯示,使用 pipeline 后,GPU idle 時(shí)間從原來的 245s 減少到了 102s,約 2.5 倍降低。端到端時(shí)間也比原來減少了 158s。除此之外,相比于階段一實(shí)現(xiàn),我們不但在初始階段對(duì)整個(gè)數(shù)據(jù)集進(jìn)行 random_shuffle,在每個(gè) window 的訓(xùn)練數(shù)據(jù)從 pipeline 出來時(shí),也通過 random shuffle 對(duì) window 中的訓(xùn)練數(shù)據(jù)再次進(jìn)行 shuffle。結(jié)果顯示,充分的全局和局部 shuffle 有效提高模型精度。
Version | Accuracy | Precision | Recall | f1-measure | GPU idle time | E2E time |
RayRTC-phase1 | 0.804 | 0.637 | 0.571 | 0.602 | 245s | 2296s |
RayRTC-pipeline | 0.821 | 0.715 | 0.556 | 0.625 | 102s | 2138s |
Improve | +0.017 | +0.078 | -0.015 | +0.023 | -143s | -158s |
總結(jié)
RayRTC 以 Ray 為分布式計(jì)算學(xué)習(xí)引擎,對(duì)字節(jié) RTC NLP 框架的全面改造升級(jí)不僅實(shí)現(xiàn)了性能的極致優(yōu)化(5 小時(shí)到 30 分鐘),同時(shí)通過流水并行極大降低了 GPU 資源的 idle 時(shí)間(60% reduction)。RayRTC 以松耦合的形式對(duì)現(xiàn)有業(yè)務(wù)的侵入極?。?lt;1% loc),同時(shí)為后續(xù)可插拔 low-level 優(yōu)化和 serverless autoscaling 提供了 API 支持??梢灶A(yù)見,后續(xù) RayRTC 在更大規(guī)模上進(jìn)行超參以及與推理打通,將會(huì)形成更高效的端到端 Serverless NLP Pipeline。