從 Pulsar Client 的原理到它的監(jiān)控面板
背景
前段時間業(yè)務團隊偶爾會碰到一些 Pulsar 使用的問題,比如消息阻塞不消費了、生產(chǎn)者消息發(fā)送緩慢等各種問題。
雖然我們有個監(jiān)控頁面可以根據(jù) topic 維度查看他的發(fā)送狀態(tài),比如速率、流量、消費狀態(tài)等信息。
但也有幾個問題:
- 無法在應用維度查看他所依賴的所有 topic 的各種狀態(tài)。
- 監(jiān)控的信息還不夠,比如發(fā)送/消費延遲、發(fā)送/消費失敗等數(shù)據(jù)。
總之就是缺少一個全局的監(jiān)控視角,通過這些指標可以很方便的分析出當時的運行情況。
基于這個需求經(jīng)過一段時間的折騰,現(xiàn)在已經(jīng)上線使用幾個月,目前比較穩(wěn)定,效果圖如下:
現(xiàn)在就可以在每個應用的監(jiān)控面板里看到自己使用了哪些 topic,分別的生產(chǎn)消費情況如何。
核心流程
要實現(xiàn)這些功能就得在應用的 metrics 中加入相關的監(jiān)控信息,但官方的 Java client 是沒有暴露出這些指標的。
但 pulsar-client-go 是自帶了這些指標的
由于 SDK 不支持所以只能自己想辦法實現(xiàn)了,為此其實有兩種實現(xiàn)方案:
- 魔改 Java client,在需要監(jiān)控的地方手動埋點指標。
- 由于我們使用了 SkyWalking,所以可以編寫插件,以 agent 的方式獲取數(shù)據(jù)、埋點指標。
不過第一種方案有以下一些問題:
- 需要自己維護一個代碼分支,還需要定期和官方保持一致,難免會出現(xiàn)代碼沖突。
- 需要推動業(yè)務方進行依賴升級,線上有著幾百個應用,推動起來時間太慢。
第二種方案的好處就不言而喻了:
- 升級無感知,只需要在我們的基礎鏡像中加上插件即可。
- Java client 的版本也更容易統(tǒng)一。
Client 原理
但其實不管是哪種方案我們都得熟悉 Java Client 的實現(xiàn)原理,才能知道哪些數(shù)據(jù)是我們需要重點關注的,可以幫助我們更好的定位問題。
本文重點不在于此,具體代碼就不仔細分析了。
從上圖可以看出,如果我們想要監(jiān)控消費是否存在阻塞的情況,這幾個內(nèi)部隊列是需要重點監(jiān)控的,一旦他們出現(xiàn)堆積,那就會出現(xiàn)消費阻塞。
其實這些數(shù)據(jù)都可以通過。
org.apache.pulsar.client.api.ProducerStats
org.apache.pulsar.client.api.ConsumerStats
這兩個接口獲取到生產(chǎn)者和消費者的大部分指標,只是這里還有一個小插曲。
那就是在獲取消費者隊列大小的時候,獲取到的數(shù)據(jù)一直為空。
最終經(jīng)過源碼排查,原來是我們大量使用的 messageListener 在獲取隊列大小時有 bug,導致獲取到的數(shù)據(jù)一直都為 0.
相關的 issue 和 PR 可以在這兩個鏈接查看,問題原因和修復過程都有具體描述:https://github.com/apache/pulsar/issues/20076 https://github.com/apache/pulsar/pull/20245
但這個修復得在新版本才能使用,就導致我們現(xiàn)在的監(jiān)控頁面一直顯示為空。
開發(fā) SkyWalking 插件
然后就是開發(fā)一個 SkyWalking 的插件了,其實直接使用 SW 開發(fā)插件是上手 Java-Agent 比較快的方式。
SW 的 SDK 封裝了許多 agent 原生接口,使得開發(fā)起來非常容易;當然缺點也有,就是得集成整個 SW 的 agent。
這里我簡單介紹下這個插件的運行流程:
- 在創(chuàng)建和刪除 consumer 的時候維護 consumerPool
- 啟動一個定時任務,定期從這些 consumer 中獲取指標數(shù)據(jù)。
當消費多分區(qū) topic 時,為了能唯一標志一個 consumer,所以給每個消費者都加了一個 hashcode 的 label。
因為我們所有的 Java 技術棧都是使用的 Prometheus 的包來生成 metrics ,所以該插件也是使用該包生成的數(shù)據(jù)。
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
<version>0.12.0</version>
<scope>provided</scope>
</dependency>
為了兼容一些特殊 Java 應用沒有該包時會啟動報錯,所以在初始化插件的時候需要檢測當前 classpath
下是否存在該依賴。
這些功能 SW 已經(jīng)封裝好了,對我們來說也是開箱即用。
其實 SW 插件自己也是支持 metrics 的,由于我們只是使用了它的 trace 功能,所以這里就沒有使用它的 API。
關于開發(fā)一個 SW 插件的流程也比較簡單,可以參考官方文檔或者是一些現(xiàn)成的插件源碼。https://skywalking.apache.org/docs/skywalking-java/next/en/setup/service-agent/java-agent/java-plugin-development-guide/
總結
有了這個監(jiān)控面板后,對于 Pulsar 客戶端內(nèi)部的一些運行情況就不再是黑盒了,還可以基于此做一些報警,比如消費堆積、發(fā)送延遲過大等。
當然僅僅只有這個面板依然是不夠的,后續(xù)我們又開發(fā)了可以通過 messageId
查詢它的整個生命周期,包括:
- 生產(chǎn)者、消費者信息
- 消息生產(chǎn)時間
- 推送時間
- ack 時間等
同時借助與 Pulsar-SQL 的能力,還能以列表的形式展示當前 topic 的消息列表。
當然在實現(xiàn)這兩個功能的同時也踩了不少坑,提了幾個 PR ,后面在抽時間做具體的分享。