0x00 为什么使用MQTT

消息队列遥测传输MQTT(Message Queuing Telemetry Transport)是从消息队列MQ(MessageQueue,传统的Publish/Subscribe订阅模型)演变而来,但具有面向物联网应用的特点设计。

从设计目的上讲,MQTT面向物联网,充分理解这种由于成本带来的糟糕带宽/低下性能。断线重连是基操,保证消息的投递/重试是标准,客户端提前立遗嘱以便于掉线处理后事是专属设计。

从使用特点上讲,MQTT与微服务密不可分。MQTT服务器端(代理/Broker)运行之后成为一个单独的服务接入点,需要为之分配处理程序资源,MQTT接入的流量可解码打包直接丢给Kafka,在解耦了各应用各语言的同时还能保持良好的扩展性。

如果你有以上两个痛点需求,那么你需要的正是MQTT,而不是Netty或其他纯TCP框架。

0x01 MQTT概念与构成元素

https://blog.csdn.net/qq_28877125/article/details/78325003

从接入上看,MQTT由服务端、发布端和订阅端组成,符合传统的MQ结构。

0x02 MQTT使用

MQTT本身是一种通信协议,如果想要实际使用,需要选择实现的载体(即服务端/代理端/Broker),市面上有许多实现的实例,各有优缺点。

博主自测了两款实现:Mosquitto和EMQx。测试使用如下代码运行,分别扮演客户端和发布端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# sub.py 订阅端
import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    print("Connected with result code: " + str(rc))

def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))

def on_disconnect(client, userdata, rc):
    print("Disconnect with result code: " + str(rc))

# client = mqtt.Client(client_id, transport='tcp')
client = mqtt.Client()
client.username_pw_set(username='admin', password='123456')
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.connect(host='127.0.0.1', port=1883, keepalive=600)
client.subscribe('fifa', qos=1)
client.loop_forever()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# pub.py 发布端
import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    print("Connected with result code: " + str(rc))

def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))

client = mqtt.Client()
client.username_pw_set(username='admin', password='123456')
client.on_connect = on_connect
client.on_message = on_message
client.connect(host='127.0.0.1', port=1883, keepalive=600)
client.publish('fifa', payload='amazing', qos=1)

Mosquitto是由C/C++实现的一种极简MQTT实现,主要面向嵌入式。


以上是mosquitto-1.6.9-install-windows-x64全貌,从文件结构上也能体会到mosquitto的简洁性。

1
2

# 启动mosquitto, -v选项能够看到Broker详细输出
mosquitto -c mosquitto.conf -v

实际测试发现mosquitto目前仍然处于开发阶段,qos收发策略测试都不能完全进行。原因在于配置文件817行cleansession false注释一旦取消再次启动就报错。如果项目很小,也比较简单可以考虑mosquitto,流量大需要做集群负载均衡等就不推荐了,本身是不支持的。

1
2
3
4
5
6
7
8
9

# Set the clean session variable for this bridge.
# When set to true, when the bridge disconnects for any reason, all
# messages and subscriptions will be cleaned up on the remote
# broker. Note that with cleansession set to true, there may be a
# significant amount of retained messages sent when the bridge
# reconnects after losing its connection.
# When set to false, the subscriptions and messages are kept on the
# remote broker, and delivered when the bridge reconnects.
#cleansession false

EMQx Broker是由EMQ公司推出的开源MQTT Broker,支持分布式集群,限流限速,共享订阅等诸多便捷功能。

https://www.emqx.io/cn/

在企业级应用中经常需要均衡承载各种流量/转发等,EMQx都有推出对应的连接插件,可以说胜任各种场景均没问题。

https://docs.emqx.io/broker/latest/cn/

1
2
3
4
5
6
7
8
9
10

D:\Temp\emqx-windows-v4.0.6\emqx\bin\emqx.cmd
usage: emqx (install|uninstall|start|stop|restart|console|ping|list|attach)

# 启动EMQx,由于使用ErLang开发,此处可能需要提前安装ErLang
emqx start

# 访问Dashboard网页控制面板
http://127.0.0.1:18083/
admin
public

再次用之前的python发布端/订阅端测试,基本表现与mosquitto一致,但无疑支持的内容更多。

0x03 结语

至此已建立MQTT基本使用模型,往后的插件协作与经验分享有机会再补充。

Mosquitto和EMQx的配置可参照其他文章。

转自:https://www.codenong.com/cs105727506/