分布式框架下的數(shù)據(jù)處理與模型推理實踐
概念
Ray 是一個開源的高性能分布式計算框架,旨在簡化大規(guī)模分布式應用的開發(fā)和運行。它提供了靈活的任務(wù)調(diào)度、資源管理以及并行計算能力,使開發(fā)者能夠輕松實現(xiàn)從單機到多節(jié)點的計算擴展。Ray 支持多種場景,包括分布式機器學習訓練、強化學習、超參數(shù)優(yōu)化、大規(guī)模數(shù)據(jù)處理和實時模型部署等。Ray 的核心概念是通過統(tǒng)一的 API,使開發(fā)者專注于邏輯開發(fā),而不必關(guān)心底層的分布式實現(xiàn)細節(jié)。
通過模塊化設(shè)計,Ray 集成了多個功能強大的庫,如Ray Data(數(shù)據(jù)處理)、 Ray Train(分布式訓練)、Ray Tune(超參數(shù)優(yōu)化)、Ray Serve(模型部署)、RLlib(強化學習)等,為開發(fā)者提供了一站式的分布式計算解決方案。無論是構(gòu)建 AI 應用還是解決復雜并行計算問題,Ray 都是一種高效且易用的選擇。Ray 的模塊化設(shè)計將復雜功能分解為獨立子系統(tǒng),常用模塊包括:
- (1)Ray Core:基礎(chǔ)的分布式任務(wù)調(diào)度和資源管理。
- (2)Ray Data:高效的分布式數(shù)據(jù)處理模塊。
- (3)Ray Train:支持主流框架的分布式機器學習訓練工具。
- (4)Ray Tune:超參數(shù)優(yōu)化庫,支持大規(guī)模調(diào)優(yōu)任務(wù)。
- (5)Ray Serve:用于實時部署 AI 模型的高性能工具。
- (6)RLlib:分布式強化學習庫,適用于復雜環(huán)境中的智能決策任務(wù)。
數(shù)據(jù)處理與模型推理
Ray Data 的分布式數(shù)據(jù)處理
在大模型訓練過程中的數(shù)據(jù)處理階段,往往有著海量結(jié)構(gòu)化與非結(jié)構(gòu)化數(shù)據(jù),這些數(shù)據(jù)難以快速、有效的處理,造成數(shù)據(jù)處理效率低下、數(shù)據(jù)質(zhì)量不高等問題。智算數(shù)據(jù)平臺基于Ray分布式框架的計算能力,提供了大量常用的分布式數(shù)據(jù)處理算子,并且實現(xiàn)文本、圖片、音頻、視頻等多種類型數(shù)據(jù)的流水線處理,為模型訓練提供高質(zhì)量預訓練數(shù)據(jù)。除了基本的數(shù)據(jù)處理算子外,數(shù)據(jù)平臺還將通過引入多模態(tài)大模型,補齊了處理多模態(tài)數(shù)據(jù)能力。
在大模型訓練中,數(shù)據(jù)預處理的效率直接影響訓練速度和模型性能。Ray Data 是一個強大的分布式數(shù)據(jù)處理模塊,該模塊支持多種文件類型的讀寫,如json、parquet等格式的文本、圖片、視頻等。Ray Data存儲數(shù)據(jù)的格式是基于Apache Arrow,一種高效的列式數(shù)據(jù)格式,相比Python數(shù)據(jù)處理常用的pandas,在讀取和寫入大型文件時會更加高效。集成了map,filter等分布式計算算子,提供了高效的數(shù)據(jù)處理,適用于海量數(shù)據(jù)的處理任務(wù)。此外,Ray Data還支持批處理功能,加速數(shù)據(jù)處理。在和Actor配合使用的場景下,可以減少請求次數(shù),提高了處理效率。
功能亮點
支持多數(shù)據(jù)格式:可以處理文本、圖片、音頻、視頻等多模態(tài)數(shù)據(jù)。
分布式操作:通過內(nèi)置的算子(如 map、filter、groupby 等),實現(xiàn)數(shù)據(jù)分片和高效并行計算。
與主流工具集成:兼容 Pandas、Spark,方便數(shù)據(jù)遷移和開發(fā)。
示例:分布式數(shù)據(jù)預處理代碼
import ray
from ray.data import read_parquet
ray.init() # 初始化 Ray
# 加載數(shù)據(jù)集
dataset = read_parquet("s3://bucket/data.parquet")
# 數(shù)據(jù)清洗和特征處理
processed_data = (
dataset.map(lambda x: {"text_length": len(x["text"])})
.filter(lambda x: x["text_length"] > 50)
)
# 分布式存儲
processed_data.write_parquet("s3://bucket/processed_data.parquet")
Ray Data 的設(shè)計理念是利用分布式集群的算力,在保留開發(fā)者熟悉的數(shù)據(jù)操作方式的同時,提升數(shù)據(jù)處理的速度和擴展性。
GPU 分配與分布式推理
在模型推理場景下,特別是針對大模型的批量推理,資源分配和高效利用至關(guān)重要。Ray 提供了靈活的 @ray.remote 裝飾器,支持任務(wù)的分布式調(diào)度以及 GPU 資源的高效管理。
示例:使用llava模型進行分布式推理代碼
from transformers import LlavaNextProcessor, LlavaNextForConditionalGeneration
from PIL import Image
import torch
import ray
import os
# 初始化 Ray
ray.init()
@ray.remote(num_gpus=1) # 請求 1 個 GPU
def run_model_on_gpu(picture):
processor = LlavaNextProcessor.from_pretrained("llava-hf/llama3-llava-next-8b-hf")
model = LlavaNextForConditionalGeneration.from_pretrained("llava-hf/llama3-llava-next-8b-hf", torch_dtype=torch.float16, device_map="auto")
# prepare image and text prompt, using the appropriate prompt template
image = Image.open(picture)
# Define a chat histiry and use `apply_chat_template` to get correctly formatted prompt
# Each value in "content" has to be a list of dicts with types ("text", "image")
conversation = [
{
"role": "user",
"content": [
{"type": "text", "text": "描述這張圖片的內(nèi)容"},
{"type": "image"},
],
},
]
prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
inputs = processor(images=image, text=prompt, return_tensors="pt").to(model.device)
# autoregressively complete prompt
output = model.generate(**inputs, max_new_tokens=1000)
return processor.decode(output[0], skip_special_tokens=True)
pic_list = ["/root/llava/img1.png","/root/llava/img2.png"]
results = ray.get([run_model_on_gpu.remote(pic) for pic in pic_list])
print(results)
# 關(guān)閉 Ray
ray.shutdown()
在運行時,我們使用nvidia-smi命令,可以查看到有兩個ray子進程在并發(fā)處理推理任務(wù)。
最后輸出結(jié)果如下,通過以下結(jié)果與原圖的對比,我們可以直觀地看到使用 Ray 進行分布式推理的流程和效果。
輸出結(jié)果
具體而言,Ray 能夠高效地將推理任務(wù)拆分到多個 GPU 上進行并行處理,每個 GPU 僅需處理分配的部分數(shù)據(jù),這大幅提高了推理效率和整體吞吐量。
例如,在多張圖片的分布式推理任務(wù)中,每一張圖片被分發(fā)到不同的 GPU 節(jié)點上,分別執(zhí)行推理操作。最終,結(jié)果被匯總生成完整的推理輸出。在這一過程中:
推理結(jié)果:圖片描述的準確性保持一致,基于分布式的推理不會影響結(jié)果的質(zhì)量。
性能提升:推理時間縮短了一半以上,分布式計算的效率得到驗證。
可擴展性:只要增加更多的 GPU 資源,Ray 的調(diào)度器即可自動分配更多的計算任務(wù),實現(xiàn):超越單機多卡的算力水平
當然,以上只是Ray分布式的一些基礎(chǔ)應用,為了說明Ray在分布式推理中的可行性。我們也可以使用如Ray Data模塊的map_batch去做離線的批量分布式推理,提高數(shù)據(jù)的處理效率;再比如通過Ray Server模塊去部署大模型服務(wù),做到支持實時推理,支持多用戶并發(fā)請求等更多功能。
隨著 NPU(如華為 Ascend 系列)等新型加速硬件的崛起,Ray 也擴展了對異構(gòu)設(shè)備的支持。在集群中,Ray 也可以識別并調(diào)度到帶有 NPU 的節(jié)點運行任務(wù),利用國產(chǎn)芯片的高效性能支持大規(guī)模 AI 應用。
總結(jié)
Ray 作為新一代分布式計算框架,通過模塊化設(shè)計和簡單易用的接口,極大地降低了開發(fā)分布式應用的門檻。在數(shù)據(jù)處理方面,Ray Data 提供了強大的分布式數(shù)據(jù)處理能力,可以高效地處理海量多模態(tài)數(shù)據(jù)。在模型推理方面,Ray Core的Remote函數(shù)和資源調(diào)度功能使開發(fā)者能夠充分利用集群中的 GPU,快速構(gòu)建分布式推理服務(wù)。
隨著 AI 的快速發(fā)展,Ray 已成為開發(fā)者構(gòu)建大規(guī)模分布式系統(tǒng)的核心工具。在未來的應用中,無論是處理復雜的數(shù)據(jù)管道還是優(yōu)化模型推理性能,Ray 都將發(fā)揮越來越重要的作用。
本文轉(zhuǎn)載自 ??AI遇見云??,作者: 景泓斐
