自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

支持億級(jí)連接并開源的分布式MQTT消息服務(wù)器分享

開發(fā) 架構(gòu)
EMQX 是一款開源的大規(guī)模分布式 MQTT 消息服務(wù)器,功能豐富,專為物聯(lián)網(wǎng)和實(shí)時(shí)通信應(yīng)用而設(shè)計(jì)。EMQX 5.0 單集群支持 MQTT 并發(fā)連接數(shù)高達(dá) 1 億條,單服務(wù)器的傳輸與處理吞吐量可達(dá)每秒百萬(wàn)級(jí) MQTT 消息,同時(shí)保證毫秒級(jí)的低時(shí)延。

今天給各位分享一款開源的分布式MQTT消息服務(wù)器EMQX,此消息服務(wù)器幾乎是物聯(lián)網(wǎng)系統(tǒng)的標(biāo)配同時(shí)也適合做即時(shí)通知和推送服務(wù)場(chǎng)景,在作者之前參與的項(xiàng)目中主要用于做物聯(lián)網(wǎng)系統(tǒng)邊緣設(shè)備信息采集、以及交易所行情數(shù)據(jù)推送使用,下面是EMQX 相關(guān)介紹。

什么是 EMQX

EMQX 是一款開源的大規(guī)模分布式 MQTT 消息服務(wù)器,功能豐富,專為物聯(lián)網(wǎng)和實(shí)時(shí)通信應(yīng)用而設(shè)計(jì)。EMQX 5.0 單集群支持 MQTT 并發(fā)連接數(shù)高達(dá) 1 億條,單服務(wù)器的傳輸與處理吞吐量可達(dá)每秒百萬(wàn)級(jí) MQTT 消息,同時(shí)保證毫秒級(jí)的低時(shí)延。

EMQX 支持多種協(xié)議,包括 MQTT (3.1、3.1.1 和 5.0)、HTTP、QUIC 和 WebSocket 等,保證各種網(wǎng)絡(luò)環(huán)境和硬件設(shè)備的可訪問(wèn)性。EMQX 還提供了全面的 SSL/TLS 功能支持,比如雙向認(rèn)證以及多種身份驗(yàn)證機(jī)制,為物聯(lián)網(wǎng)設(shè)備和應(yīng)用程序提供可靠和高效的通信基礎(chǔ)設(shè)施。

圖片圖片

內(nèi)置基于 SQL 的規(guī)則引擎,EMQX 可以實(shí)時(shí)提取、過(guò)濾、豐富和轉(zhuǎn)換物聯(lián)網(wǎng)數(shù)據(jù)。此外,EMQX 采用了無(wú)主分布式架構(gòu),以確保高可用性和水平擴(kuò)展性,并提供操作友好的用戶體驗(yàn)和出色的可觀測(cè)性。

EMQX 提供了開源版和商業(yè)版兩種方式,用戶可以基于自己需求進(jìn)行選擇。

官網(wǎng)地址:https://www.emqx.io

github 地址:https://github.com/emqx/emqx

為什么說(shuō)專為物聯(lián)網(wǎng)和實(shí)時(shí)通信設(shè)計(jì)?

物聯(lián)網(wǎng)方面

以下是幾個(gè)理由說(shuō)明為什么MQTT適合物聯(lián)網(wǎng):

  1. 輕量級(jí)和低帶寬消耗:MQTT協(xié)議設(shè)計(jì)簡(jiǎn)單輕量,消息頭部開銷小,傳輸數(shù)據(jù)量少,使其非常適合在低帶寬、不穩(wěn)定的網(wǎng)絡(luò)環(huán)境下使用。這對(duì)于許多物聯(lián)網(wǎng)設(shè)備來(lái)說(shuō)非常重要,因?yàn)樗鼈兺ǔ>哂匈Y源受限的特點(diǎn),如有限的處理能力、內(nèi)存和電池壽命。
  2. 可靠性和持久性:MQTT支持可靠的消息傳遞,并且具有消息持久性。設(shè)備可以發(fā)布消息并確保消息可靠地傳遞到服務(wù)器,即使在網(wǎng)絡(luò)連接中斷后,也可以在重新連接后接收未傳遞的消息。這對(duì)于物聯(lián)網(wǎng)應(yīng)用來(lái)說(shuō)非常重要,因?yàn)樵O(shè)備可能會(huì)經(jīng)歷網(wǎng)絡(luò)不穩(wěn)定、斷開和重新連接等情況。
  3. 異步通信和發(fā)布-訂閱模式:MQTT使用發(fā)布-訂閱模式,設(shè)備可以通過(guò)訂閱特定主題來(lái)接收感興趣的消息,而無(wú)需直接與其他設(shè)備進(jìn)行點(diǎn)對(duì)點(diǎn)通信。
  4. 支持廣播和多播:MQTT可以通過(guò)使用通配符和主題過(guò)濾器,實(shí)現(xiàn)消息的廣播和多播。這意味著一個(gè)設(shè)備可以發(fā)布一條消息,并且多個(gè)訂閱者可以接收到該消息,從而實(shí)現(xiàn)了一對(duì)多和多對(duì)多的通信模式。
  5. 支持安全性和認(rèn)證:MQTT協(xié)議提供了各種安全機(jī)制,包括傳輸層安全性(TLS/SSL)和身份驗(yàn)證機(jī)制,以確保數(shù)據(jù)的保密性和完整性。這對(duì)于物聯(lián)網(wǎng)應(yīng)用來(lái)說(shuō)至關(guān)重要,因?yàn)樵S多物聯(lián)網(wǎng)設(shè)備處理的是敏感數(shù)據(jù)。

實(shí)時(shí)通信設(shè)方面

  1. 即時(shí)通訊(Instant Messaging):EMQ X可以用作即時(shí)通訊系統(tǒng)的后端,支持實(shí)時(shí)的消息傳遞和即時(shí)聊天功能。它可以處理大量的并發(fā)連接和消息交換,保證實(shí)時(shí)性和可靠性。
  2. 在線游戲(Online Gaming):在線游戲通常需要實(shí)時(shí)的玩家互動(dòng)和消息傳遞。EMQ X可以作為游戲服務(wù)器的消息中間件,處理游戲玩家之間的實(shí)時(shí)通信和事件傳遞,支持實(shí)時(shí)游戲場(chǎng)景的需求。
  3. 即時(shí)通知和推送服務(wù):EMQ X可以用于構(gòu)建實(shí)時(shí)通知和推送服務(wù),例如本人之前基于EMQX做過(guò)交易所的行情數(shù)推送,實(shí)時(shí)新聞推送、社交網(wǎng)絡(luò)通知等。
  4. 實(shí)時(shí)監(jiān)控和數(shù)據(jù)分發(fā):EMQ X適用于實(shí)時(shí)監(jiān)控和數(shù)據(jù)分發(fā)應(yīng)用,例如物流監(jiān)控、設(shè)備狀態(tài)監(jiān)測(cè)、實(shí)時(shí)數(shù)據(jù)分析等。它可以接收和分發(fā)實(shí)時(shí)數(shù)據(jù)流,支持實(shí)時(shí)事件處理和數(shù)據(jù)流轉(zhuǎn)換。
  5. 即時(shí)位置共享:EMQ X可以用于構(gòu)建實(shí)時(shí)位置共享應(yīng)用,例如實(shí)時(shí)定位服務(wù)、共享出行等。它可以處理實(shí)時(shí)位置數(shù)據(jù)的接收和分發(fā),支持實(shí)時(shí)位置更新和共享。

分布式集群設(shè)計(jì)原理

MQX 本身支持分布式集群架構(gòu),能夠在保證高可用性、容錯(cuò)性和可擴(kuò)展性的同時(shí),處理大量的客戶端和消息。通過(guò)使用 EMQX 集群,您可以在一個(gè)或多個(gè)節(jié)點(diǎn)發(fā)生故障時(shí)仍然保持集群運(yùn)行,從而享受到容錯(cuò)和高可用性的好處。

以下是一個(gè)四個(gè)節(jié)點(diǎn)組成的EMQ集群,每個(gè)節(jié)點(diǎn)都運(yùn)行一個(gè) EMQX 實(shí)例,并且與集群中的其他節(jié)點(diǎn)通信,共享客戶端連接、訂閱、發(fā)布消息等信息。這允許集群在節(jié)點(diǎn)之間自動(dòng)分配負(fù)載并在節(jié)點(diǎn)出現(xiàn)故障時(shí)提供高可用性

圖片圖片

在集群架構(gòu)下,我們可以隨著業(yè)務(wù)的增長(zhǎng)向集群添加新節(jié)點(diǎn),從而提供可擴(kuò)展性。這樣可以處理越來(lái)越多的客戶端和消息,而不必?fù)?dān)心單個(gè)代理的限制。

消息轉(zhuǎn)發(fā)設(shè)計(jì)

EMQX 分布式集群的基本功能是轉(zhuǎn)發(fā)和發(fā)布消息到訂閱者,如下圖所示。

圖片圖片

為了實(shí)現(xiàn)這一目標(biāo),EMQX 在 嵌入式數(shù)據(jù)庫(kù) Mria 中維護(hù)著與之相關(guān)的幾個(gè)數(shù)據(jù)表:

  • 訂閱表
  • 路由表
  • 主題樹

訂閱表:主題-訂閱者

EMQX 會(huì)維護(hù)一個(gè)訂閱表,用于存儲(chǔ)主題->訂閱者之間的映射關(guān)系,從而確保能將傳入消息正確路由到對(duì)應(yīng)的客戶端。該數(shù)據(jù)只存在于訂閱者所在的 EMQX 節(jié)點(diǎn)上,類似的結(jié)構(gòu)如下:

bash

node1:

    topic1 -> client1, client2
    topic2 -> client3

node2:

    topic1 -> client4

路由表:Topic-Node

路由表記錄了 主題->節(jié)點(diǎn) 之間的映射,它存儲(chǔ)每個(gè)節(jié)點(diǎn)上客戶端訂閱的主題列表,并用于將消息路由到對(duì)應(yīng)的節(jié)點(diǎn)。該數(shù)據(jù)會(huì)在同一集群中的所有節(jié)點(diǎn)復(fù)制一份,類似結(jié)構(gòu)如下:

bash

topic1 -> node1, node2
topic2 -> node3
topic3 -> node2, node4

主題樹:主題匹配通配符

主題樹是一種分層數(shù)據(jù)結(jié)構(gòu),它存儲(chǔ)有關(guān)主題層次結(jié)構(gòu)的信息,并用于消息與訂閱客戶端的匹配。

主題樹會(huì)在同一集群中的所有節(jié)點(diǎn)復(fù)制一份,下面是一個(gè) 主題-訂閱關(guān)系 的例子:

Client

Node

Subscribed topic

client1

node1

t/+/x, t/+/y

client2

node2

t/#

client3

node3

t/+/x, t/a

當(dāng)所有的訂閱完成后,EMQX 會(huì)維護(hù)以下主題樹和路由表。

圖片圖片

消息分發(fā)流程

當(dāng)一個(gè) MQTT 客戶端發(fā)布消息時(shí),它所在的節(jié)點(diǎn)會(huì)查找路由表,并根據(jù)消息主題將消息轉(zhuǎn)發(fā)到對(duì)應(yīng)的節(jié)點(diǎn)(可能是多個(gè)節(jié)點(diǎn))。

然后,接收到消息的節(jié)點(diǎn)會(huì)查找本地訂閱表,并將消息發(fā)送至對(duì)應(yīng)的訂閱者。

例如,當(dāng)客戶端 1 發(fā)布一條消息到主題 t/a 時(shí),消息在節(jié)點(diǎn)之間的路由和分發(fā)如下:

  1. 客戶端 1 向節(jié)點(diǎn) 1 發(fā)布一條主題為 t/a 的消息;
  2. 節(jié)點(diǎn) 1 查詢主題樹,了解到 t/a 與現(xiàn)有主題 t/a 和 t/# 相匹配。
  3. 節(jié)點(diǎn) 1 查詢路由表,并得知:

節(jié)點(diǎn) 2 上有客戶端訂閱了 t/# 主題;

節(jié)點(diǎn) 3 上有客戶端訂閱了 t/a 主題;因此節(jié)點(diǎn) 1 會(huì)將消息同時(shí)轉(zhuǎn)發(fā)給節(jié)點(diǎn) 2 和節(jié)點(diǎn) 3。

  1. 節(jié)點(diǎn) 2 收到轉(zhuǎn)發(fā)的t/a消息后,通過(guò)查詢本地訂閱表,將消息分發(fā)給訂閱了 t/# 的客戶端。
  2. 節(jié)點(diǎn) 3 收到轉(zhuǎn)發(fā)的 t/a 消息后,通過(guò)查詢本地訂閱表,將消息分發(fā)給訂閱了 t/a 的客戶端。
  3. 消息發(fā)布完成。

連接數(shù)測(cè)試

5.0支持并發(fā)連接數(shù)高達(dá) 1 億條測(cè)試報(bào)告:https://www.emqx.com/zh/blog/reaching-100m-mqtt-connections-with-emqx-5-0

快速體驗(yàn)

安裝

容器化部署是體驗(yàn) EMQX 的最快方式,因此本節(jié)將以容器化部署為例,在命令行工具中輸入如下命令,下載并運(yùn)行最新版 EMQX。

docker pull emqx/emqx:5.5.1

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.5.

圖片圖片

通過(guò)瀏覽器訪問(wèn) http://localhost:18083/(localhost 可替換為您的實(shí)際 IP 地址)以訪問(wèn) EMQX Dashboard 管理控制臺(tái),進(jìn)行設(shè)備連接與相關(guān)指標(biāo)監(jiān)控管理,默認(rèn)用戶名及密碼:admin/public。

圖片圖片

登錄成功之后如下圖

圖片圖片

示例編寫

圖片圖片

下面我們使用Java 語(yǔ)言,寫一個(gè)示例,發(fā)送消息至主題mytopic ,訂閱端分布為Java后端程序和JS訂閱

Maven依賴項(xiàng)

創(chuàng)建工程并添加Maven依賴項(xiàng),這里依賴的paho是 mqtt 的一個(gè)工具類

<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  <version>1.2.5</version>
</dependency>
創(chuàng)建發(fā)送消息代碼
package cn.g2link.seg.base.mqtt.test;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;

public class MqttPublishExample {
    public static void main(String[] args) {
        //emq 的 tcp監(jiān)聽端口
        String broker = "tcp://localhost:1883";
        String clientId = "mqtt_client1";
        //發(fā)送的主題
        String topic = "mytopic";
        //消息體
        String message = "Hello, MQTT!";

        try {
            MqttClient mqttClient = new MqttClient(broker, clientId);
            mqttClient.connect();

            MqttTopic mqttTopic = mqttClient.getTopic(topic);
            MqttMessage mqttMessage = new MqttMessage(message.getBytes());
            mqttTopic.publish(mqttMessage);

            mqttClient.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
創(chuàng)建訂閱消息代碼
Java后端訂閱
package cn.g2link.seg.base.mqtt.test;

import org.eclipse.paho.client.mqttv3.*;

public class MqttSubscribeExample {
    public static void main(String[] args) {
      //emq 的 tcp監(jiān)聽端口
        String broker = "tcp://localhost:1883";
        String clientId = "mqtt_subsribe_client1";
      //監(jiān)聽的主題
        String topic = "mytopic";

        try {
            MqttClient mqttClient = new MqttClient(broker, clientId);
            mqttClient.connect();
            System.out.println("connect success" );
            mqttClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    System.out.println("Connection lost!");
                }

                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    String payload = new String(message.getPayload());
                    System.out.println("Received message: " + payload);
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    // Not used in this example
                }
            });

            mqttClient.subscribe(topic);
            System.out.println(String.format("topic:%s subscribe success ", topic));
            // Keep the program running to receive messages
            while (true) {
                // Do nothing
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
瀏覽器端訂閱

我們通過(guò)mqtt.min.js,來(lái)連接EMQX暴露的 webscoket 為8083端口,同時(shí)訂閱mytopic主題

<html>

<head>
    <script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
    <script>
        // 將在全局初始化一個(gè) mqtt 變量
        console.log(mqtt)

        // 創(chuàng)建一個(gè) MQTT 客戶端實(shí)例
        var client = mqtt.connect('mqtt://localhost:8083/mqtt', {
            clientId: 'web-mqtt-client' // 替換為您的客戶端ID
        });

        // 連接成功時(shí)的回調(diào)函數(shù)
        client.on('connect', function () {
            console.log('已連接到 MQTT 服務(wù)器');

            // 訂閱主題
            client.subscribe('mytopic'); // 替換為您要訂閱的主題
        });

        // 接收到消息時(shí)的回調(diào)函數(shù)
        client.on('message', function (topic, message) {
            console.log('收到消息:', message.toString());
            // 在這里處理收到的消息,可以根據(jù)需要進(jìn)行相應(yīng)的邏輯操作
        });

        // 連接斷開時(shí)的回調(diào)函數(shù)
        client.on('close', function () {
            console.log('與 MQTT 服務(wù)器的連接已斷開');
        });

        // 連接錯(cuò)誤時(shí)的回調(diào)函數(shù)
        client.on('error', function (error) {
            console.log('連接發(fā)生錯(cuò)誤:', error);
        });
    </script>
</head>

<body>

</body>

</html>
監(jiān)控消息(可選)

在主題監(jiān)控頁(yè)面添加mytopic,這一步主要為了觀察發(fā)送和消費(fèi)的次數(shù)

圖片圖片

示例驗(yàn)證

訂閱端啟動(dòng)

點(diǎn)擊MqttSubscribeExample的 main 方法啟動(dòng)訂閱

圖片圖片

圖片圖片

啟動(dòng)成功以后,會(huì)在EQMX 控制臺(tái),顯示客戶端連接信息,如下圖顯示了兩個(gè)訂閱端

圖片圖片

消息發(fā)送

啟動(dòng)MqttPublishExample的 main 方法,進(jìn)行消息發(fā)送,發(fā)送后訂閱端會(huì)收到以下消息

Java 后端

圖片圖片

瀏覽器端

圖片圖片

主題監(jiān)控

查看EQMX 控制臺(tái)的主題監(jiān)控,會(huì)看到當(dāng)前topic 流入和流出條數(shù)

圖片圖片

總結(jié)

以上只是簡(jiǎn)單介紹了什么是 EMQX 以及它的應(yīng)用場(chǎng)景介紹,要想更多了解EMQX細(xì)節(jié),可以訪問(wèn)官方進(jìn)行了解。

責(zé)任編輯:武曉燕 來(lái)源: 架構(gòu)成長(zhǎng)指南
相關(guān)推薦

2023-11-10 00:02:17

EMQX物聯(lián)網(wǎng)

2018-11-07 09:23:21

服務(wù)器分布式機(jī)器學(xué)習(xí)

2019-03-27 08:43:17

Nginx負(fù)載均衡服務(wù)器

2014-03-12 10:42:10

equeue分布式消息隊(duì)列

2019-08-12 16:07:32

Web系統(tǒng)集群

2012-07-06 09:27:02

云計(jì)算分布式服務(wù)器負(fù)載均衡

2019-01-28 11:03:03

NginxFastDFS服務(wù)器

2018-05-31 09:27:38

服務(wù)器架構(gòu)原理

2017-03-13 14:02:10

分布式聊天服務(wù)器

2021-07-23 08:57:32

鴻蒙HarmonyOS應(yīng)用

2023-05-29 14:07:00

Zuul網(wǎng)關(guān)系統(tǒng)

2021-02-24 16:17:18

架構(gòu)運(yùn)維技術(shù)

2012-02-24 09:27:45

x86服務(wù)器

2015-05-12 13:03:54

開源分布式存儲(chǔ)HDFS

2020-03-09 08:24:06

TengineWeb代理服務(wù)器

2022-12-13 09:19:26

分布式消息隊(duì)列

2019-09-05 09:02:45

消息系統(tǒng)緩存高可用

2022-07-25 06:42:24

分布式鎖Redis

2019-12-27 10:00:34

開源技術(shù) 軟件

2017-07-27 14:32:05

大數(shù)據(jù)分布式消息Kafka
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)