前言
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅模式的轻量级通讯协议,构建于TCP/IP协议之上,
优点是低开销,低宽带占用,适用于物联网、小型设备等弱网环境。
Linux下安装Mqtt服务器
使用Docker安装
docker pull emqx/emqx
这是一个开源的MQTT协议实现,支持MQTT5.0版本。
docker run -d --name mqtt -p 1883:1883 emqx/emqx
创建容器实例,MQTT默认端口号1883
配置账号密码
MQTT协议支持多种认证方式,如固定账号密码,查询MySQL,查询Redis等。具体可以查看EMQX认证。
这里我们使用EMQX内置Mnesia数据库存储账号密码。进入容器交互
docker exec -it ba087715dd9b /bin/bash
修改/etc/plugins/emqx_auth_mnesia.conf配置文件,配置账号密码
auth.user.1.username = test1 auth.user.1.password = 123456
启用emqx_auth_mnesia插件
emqx_ctl plugins load emqx_auth_mnesia
关闭匿名访问,修改/etc/emqx.conf配置文件
allow_anonymous = false
重启容器
docker restart ba087715dd9b
桌面客户端连接
MQTTX-下载地址,效果图如下
Java客户端
添加maven依赖
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version></dependency>
发布主题
import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class TestMqttPublish { public static void main(String[] args) throws MqttException { MqttClient mqttClient = createMqttClient(); MqttMessage message = new MqttMessage("this is a message".getBytes()); // 服务质量 message.setQos(2); // 发布消息 mqttClient.publish("first_topic", message); // 断开连接 mqttClient.disconnect(); mqttClient.close(); } private static MqttClient createMqttClient() throws MqttException { // 服务器地址 String broker = "tcp://xxx:1883"; String clientId = "emqx_test";//每个客户端必须唯一,可以用随机值 MemoryPersistence persistence = new MemoryPersistence(); MqttClient client = new MqttClient(broker, clientId, persistence); // 配置账号密码 MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName("test1"); connOpts.setPassword("123456".toCharArray()); connOpts.setCleanSession(true); // 建立连接 client.connect(connOpts); return client; } }
关于服务质量QOS,有3种取值
-
0:至多一次,消息可能会丢失或重复
-
1:至少一次,消息确保到达,但可能重复
-
2:只有一次,确保消息到达一次
订阅主题
import java.util.concurrent.TimeUnit;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class TestMqttSubscribe { public static void main(String[] args) throws MqttException, InterruptedException { MqttClient mqttClient = createMqttClient(); // 设置消息回调处理 mqttClient.setCallback(new MyHandler()); // 订阅消息 mqttClient.subscribe("first_topic"); TimeUnit.SECONDS.sleep(10); // 断开连接 mqttClient.disconnect(); mqttClient.close(); } private static MqttClient createMqttClient() throws MqttException { // 服务器地址 String broker = "tcp://xxx:1883"; String clientId = "emqx_test2";//每个客户端必须唯一,可以用随机值 MemoryPersistence persistence = new MemoryPersistence(); MqttClient client = new MqttClient(broker, clientId, persistence); // 配置账号密码 MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName("test1"); connOpts.setPassword("123456".toCharArray()); connOpts.setCleanSession(true); // 建立连接 client.connect(connOpts); return client; } static class MyHandler implements MqttCallback { /** * 连接异常断开 */ @Override public void connectionLost(Throwable cause) { cause.printStackTrace(); } /** * 消息到达 */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String msg = new String(message.getPayload()); System.out.println(String.format("客户端接收到消息,主题为:%s,内容为:%s", topic, msg)); } /** * 消息传输完成 */ @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("消息传输完成"); } } }
注意,消息订阅的客户端和消息发布的客户端的clientId必须不一样。
MQTT5新特性
MQTT5增加了共享订阅
的功能,相当于订阅端的负载均衡功能,在5.0之前,如果有多个客户端订阅了同一个主题,那么这多个客户端都会接收到此消息。这种情况下,只能由订阅者自行处理去重(防止多次消费)。
共享订阅要求我们的主题格式必须为$share/{group}/{filter}
-
$share: 固定前缀,表明这是一个共享订阅
-
{group} : 群组名,是一个不包含 "/", "+" 以及 "#" 的字符串。订阅会话通过使用相同的{group}表示共享同一个订阅,匹配该订阅的消息每次只会发布给其中一个客户端。
例如,假设订阅者s1,s2,s3属于群组g1,订阅者s4,s5属于群组g2。那么当 EMQX 向这个主题发布消息msg1的时候,s1,s2,s3中只有一个会收到 msg1,s4,s5中只有一个会收到 msg1
[s1] msg1 / [emqx] ------> "$share/g1/topic" - [s2] got msg1 | \ | [s3] | msg1 ----> "$share/g2/topic" -- [s4] \ [s5] got msg1
-
{filter}: 即非共享订阅中的主题过滤器
订阅主题代码为
mqttClient.subscribe("$share/mqtt/first_topic"); //如果非共享主题为/server/first_topic,那么共享主题为$share/mqtt//server/first_topic
发布主题代码为
mqttClient.publish("first_topic", message);
如果想要使用更多MQTT5新特性,需要使用下面的maven依赖
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.mqttv5.client</artifactId> <version>1.2.5</version></dependency>
更多新特性介绍,可以查看MQTT 5.0。
Spring整合Mqtt
添加maven依赖
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.1.6.RELEASE</version></dependency>
代码实现
import java.util.UUID;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.integration.annotation.IntegrationComponentScan;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.integration.channel.DirectChannel;import org.springframework.integration.core.MessageProducer;import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;import org.springframework.integration.mqtt.core.MqttPahoClientFactory;import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.MessageHandler;import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;@Configuration@IntegrationComponentScanpublic class MqttClientConfig { /** * 连接工厂,配置账号密码等信息 */ @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setUserName("test1"); mqttConnectOptions.setPassword("123456".toCharArray()); mqttConnectOptions.setServerURIs(new String[]{"tcp://xxx:1883"}); mqttConnectOptions.setKeepAliveInterval(2); mqttConnectOptions.setAutomaticReconnect(true); factory.setConnectionOptions(mqttConnectOptions); return factory; } private String createClientId() { return UUID.randomUUID().toString(); } /** * 配置client,发布. */ @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( createClientId(), mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultQos(2); messageHandler.setDefaultRetained(false); //不保留消息 return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } //接收通道 @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * 配置client,监听的topic. */ @Bean public MessageProducer inbound() { String[] topics = {"$share/mqtt/first_topic"}; MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(createClientId(), mqttClientFactory(), topics); adapter.setTaskScheduler(new ThreadPoolTaskScheduler()); adapter.setCompletionTimeout(3_000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(2); adapter.setOutputChannel(mqttInputChannel()); return adapter; } /** * 消息处理器 */ @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return (message -> { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); String payload = message.getPayload().toString(); System.out.println("消息主题:" + topic); System.out.println("消息内容:" + payload); }); } }
底层也是使用的org.eclipse.paho.client.mqttv3依赖。接下来配置消息网关
import org.springframework.integration.annotation.MessagingGateway;import org.springframework.integration.mqtt.support.MqttHeaders;import org.springframework.messaging.handler.annotation.Header;import org.springframework.stereotype.Component;@Component@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttGateway { void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic); }
后续直接依赖此网关对象就可以了,Spring底层使用GatewayProxyFactoryBean来实例化此Bean。SpringBoot项目中配置上述两个类就可以使用了。
参考
转自:https://www.cnblogs.com/strongmore/p/16297164.html