SpringBoot + ResponseBodyEmitter 實時異步流式推送,優(yōu)雅!
ChatGPT 的火爆,讓流式輸出技術(shù)迅速走進大眾視野。在那段時間里,許多熱愛鉆研技術(shù)的小伙伴紛紛開始學(xué)習(xí)和實踐 SSE 異步處理。
我當(dāng)時也寫過相關(guān)文章,今天,咱們換一種更為簡便的方式來實現(xiàn)流式輸出,那就是 ResponseBodyEmitter。
其實,ResponseBodyEmitter 并非新技術(shù),早在 Spring Framework 4.2 版本就已被引入。直到最近,我們在開發(fā)一個滾動日志輸出功能時,才深入了解到它的強大之處。
ResponseBodyEmitter 的作用
相較于 SSE 技術(shù),ResponseBodyEmitter 更加簡單易用。它主要用于處理異步的 HTTP 響應(yīng),其核心優(yōu)勢在于 允許逐步將數(shù)據(jù)發(fā)送到客戶端,而非一次性發(fā)送所有內(nèi)容。這一特性使得它在需要長時間處理或進行流式傳輸?shù)膱鼍爸斜憩F(xiàn)出色。需要注意的是,ResponseBodyEmitter 本質(zhì)上是一個接口。
使用場景
- 長輪詢:服務(wù)器在有數(shù)據(jù)時會立即響應(yīng)客戶端請求,若暫無數(shù)據(jù),則保持連接開放,等待數(shù)據(jù)到來。
- **服務(wù)器推送事件 (SSE)**:服務(wù)器能夠持續(xù)不斷地向客戶端推送各類事件,實現(xiàn)實時交互。
- 流式傳輸:可逐步發(fā)送大量數(shù)據(jù),像文件下載或者實時數(shù)據(jù)流傳輸?shù)葓鼍岸歼m用。
- 異步處理:在處理耗時任務(wù)時,能逐步返回處理結(jié)果,避免客戶端長時間等待,提升用戶體驗。
業(yè)務(wù)場景舉例
在實際業(yè)務(wù)中,ResponseBodyEmitter 有著廣泛的應(yīng)用,比如進度條的實時更新、實時聊天功能、股票價格的實時更新、系統(tǒng)日志的流式輸出以及 AI 的流式響應(yīng)等。
實時日志流實戰(zhàn)
接下來,我們通過一個簡單的實時日志流功能,來深入了解 ResponseBodyEmitter 的使用。假設(shè)我們有一個應(yīng)用程序,需要實時查看服務(wù)器的日志,以便快速定位和解決問題。
創(chuàng)建控制器
首先,我們在 Spring Boot 應(yīng)用中創(chuàng)建一個控制器,借助 ResponseBodyEmitter 實現(xiàn)實時日志流。
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
@RestController
@RequestMapping("/api/log")
publicclass LogController {
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseBodyEmitter streamLogs() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
// 開啟異步線程處理數(shù)據(jù)并發(fā)送
new Thread(() -> {
try {
while (true) {
String logEntry = getLatestLogEntry();
if (logEntry != null) {
emitter.send(logEntry);
}
// 每秒檢查一次日志更新
Thread.sleep(1000);
}
} catch (Exception e) {
// 出現(xiàn)異常時結(jié)束響應(yīng)并傳遞錯誤信息
emitter.completeWithError(e);
}
}).start();
return emitter;
}
private String getLatestLogEntry() {
// 模擬從日志文件中獲取最新日志條目
return"2025-02-12 12:00:00 - INFO: User logged in successfully.";
}
}
運行效果
當(dāng)我們啟動這個應(yīng)用程序,并訪問 /api/log/stream 路徑時,就能看到一個實時更新的日志流。服務(wù)器會每秒向客戶端推送一條新的日志條目,客戶端會將其顯示在頁面上,效果如下:
運行效果
ResponseBodyEmitter 的核心方法
- send(Object data):向客戶端發(fā)送數(shù)據(jù),該方法可以多次調(diào)用,實現(xiàn)數(shù)據(jù)的逐步發(fā)送。
- complete():用于結(jié)束響應(yīng)流,表示數(shù)據(jù)已經(jīng)全部發(fā)送完畢。
- onTimeout(Runnable callback):設(shè)置超時回調(diào)函數(shù),當(dāng)連接超時時,會執(zhí)行該回調(diào)。
- onCompletion(Runnable callback):設(shè)置完成回調(diào)函數(shù),當(dāng)數(shù)據(jù)發(fā)送完成后,會執(zhí)行該回調(diào)。
ResponseBodyEmitter 工作原理
異步數(shù)據(jù)生成與推送
在傳統(tǒng)的 HTTP 請求 - 響應(yīng)模式中,服務(wù)器通常需要等待整個響應(yīng)數(shù)據(jù)生成完成后,才會將其一次性發(fā)送給客戶端。關(guān)注公眾號:碼猿技術(shù)專欄,回復(fù)關(guān)鍵詞:1111 獲取阿里內(nèi)部Java性能調(diào)優(yōu)手冊!而 ResponseBodyEmitter 打破了這種模式,它允許服務(wù)端在任務(wù)執(zhí)行過程中異步地生成響應(yīng)數(shù)據(jù)。
當(dāng)有部分數(shù)據(jù)準備好時,就可以立即調(diào)用 send() 方法將這些數(shù)據(jù)推送給客戶端,而無需等待整個任務(wù)完成。這就好比一場接力賽,每完成一段賽程(生成一部分數(shù)據(jù)),就馬上將接力棒(數(shù)據(jù))傳遞給客戶端,大大提高了數(shù)據(jù)傳輸?shù)膶崟r性。
分塊傳輸機制
ResponseBodyEmitter 采用了 HTTP 的分塊編碼(Chunked Encoding)方式來傳輸數(shù)據(jù)。在傳統(tǒng)的 HTTP 響應(yīng)中,通常需要在響應(yīng)頭中明確指定 Content-Length,表示整個響應(yīng)數(shù)據(jù)的長度。但在分塊傳輸中,服務(wù)器不會提前設(shè)置 Content-Length,而是將數(shù)據(jù)分成多個獨立的塊,每個塊都有自己的長度標識。
客戶端在接收到數(shù)據(jù)塊后,可以立即對其進行處理,而不必等待整個響應(yīng)數(shù)據(jù)接收完畢。這種方式使得數(shù)據(jù)可以邊生成邊傳輸,減少了客戶端的等待時間,提高了用戶體驗。
連接生命周期管理
為了確保資源的合理使用,ResponseBodyEmitter 提供了對連接生命周期的有效管理。當(dāng)所有數(shù)據(jù)都發(fā)送完畢后,需要調(diào)用 complete() 方法來明確告知客戶端響應(yīng)結(jié)束,關(guān)閉連接。如果在數(shù)據(jù)傳輸過程中出現(xiàn)異常,可以調(diào)用 completeWithError() 方法,結(jié)束響應(yīng)并向客戶端傳遞錯誤信息。
這樣可以避免連接長時間保持開放,造成資源浪費。
注意事項
- 客戶端支持:雖然大多數(shù)瀏覽器和 HTTP 客戶端庫都支持分塊傳輸,但某些老舊的客戶端可能存在兼容性問題。
- 超時設(shè)置:為避免長連接長時間占用資源,可以為 ResponseBodyEmitter 設(shè)置超時時間,示例代碼如下:
emitter.onTimeout(() -> emitter.complete());
- 線程安全:ResponseBodyEmitter 的 send() 方法是線程安全的,但在使用時需要注意控制任務(wù)線程的生命周期,避免出現(xiàn)資源泄漏。
- 連接關(guān)閉:務(wù)必確保在任務(wù)結(jié)束時調(diào)用 complete() 或 completeWithError() 方法,否則可能導(dǎo)致連接無法正常關(guān)閉,造成資源浪費。
與 Streaming 和 SSE 的對比
- Streaming:直接通過 OutputStream 向客戶端寫入數(shù)據(jù),靈活性較高,但需要手動處理流的關(guān)閉,增加了開發(fā)的復(fù)雜度。
- Server-Sent Events (SSE):基于 text/event-stream 協(xié)議,適用于服務(wù)端事件推送場景,但要求客戶端支持 SSE 協(xié)議。
- ResponseBodyEmitter:通用性更強,適用于任何支持 HTTP 的客戶端,并且易于與 Spring 框架集成,是一種更為便捷的流式傳輸解決方案。
在處理類似 AI 這種響應(yīng)式的流式輸出場景時,相較于 SSE,ResponseBodyEmitter 作為 Spring 提供的輕量級流式傳輸解決方案,在 HTTP 協(xié)議兼容性方面表現(xiàn)更優(yōu)。
小結(jié)
ResponseBodyEmitter 是 Spring 框架提供的輕量級流式傳輸解決方案,它能夠顯著提升高并發(fā)和實時性場景下的用戶體驗。通過 ResponseBodyEmitter,我們可以輕松實現(xiàn)服務(wù)器向客戶端的實時數(shù)據(jù)推送。
無論是進度條的實時更新、實時聊天、股票價格的實時監(jiān)控還是系統(tǒng)日志的流式輸出,ResponseBodyEmitter 都能幫助我們構(gòu)建更加動態(tài)和互動的應(yīng)用程序。