mqtt使用springboot实现硬件通信
step1: https://github.com/wrs13634194612/hello-mqtt.git
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.4</version>
<relativePath/> <!– lookup parent from repository –>
</parent>
<groupId>com.example</groupId>
<artifactId>demo2</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo2</name>
<description>demo2</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
step2:
spring:
mqtt:
server-url: tcp://112.74.48.180:1883
client-id: rain-smart-server-${random.value}
username: admin
password: 1234
default-topic: test
step3:
package com.example.demo2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Demo2Application {
public static void main(String[] args) {
SpringApplication.run(Demo2Application.class, args);
}
}
step4:
package com.example.demo2.util;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.log4j.Log4j2;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* @author zengqm
* @date 2021/9/8 17:25
*/
@Log4j2
public class JsonUtils {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static ObjectMapper getInstance() {
return OBJECT_MAPPER;
}
/**
* 将 java 对象转为 json
*
* @param obj java 对象
* @return json 字符串
*/
public static String obj2json(Object obj) {
try {
return OBJECT_MAPPER.writeValueAsString(obj);
} catch (JsonProcessingException e) {
log.error("obj to json string failed", e);
return "";
}
}
/**
* 将 java 对象转为 json,忽略 null 值
*
* @param obj java 对象
* @return json 字符串
*/
public static String obj2jsonIgnoreNull(Object obj) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
try {
return objectMapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
log.error("obj to json string failed", e);
return "";
}
}
/**
* 将 json 字符串转换为 java 对象
*
* @param jsonString json 字符串
* @param clazz 指定 java 对象的字节码对象
* @param <T> 泛型
* @return java 对象
*/
public static <T> T json2obj(String jsonString, Class<T> clazz) {
try {
return OBJECT_MAPPER.readValue(jsonString, clazz);
} catch (JsonProcessingException e) {
log.error("json string to obj failed", e);
return null;
}
}
/**
* 将 json 字符串转换为存储指定 java 对象的 List 集合
*
* @param jsonString json 字符串
* @param clazz 指定 java 对象的字节码对象
* @param <T> 泛型
* @return 存储指定 java 对象的 List 集合
*/
public static <T> List<T> json2list(String jsonString, Class<T> clazz) {
try {
JavaType javaType = getCollectionType(ArrayList.class, clazz);
return OBJECT_MAPPER.readValue(jsonString, javaType);
} catch (JsonProcessingException e) {
log.error("json string to list failed", e);
return Collections.emptyList();
}
}
/**
* 将 json 字符串转换为 Map
*
* @param jsonString json 字符串
* @return Map 对象
*/
public static Map<String, Object> json2map(String jsonString) {
try {
return OBJECT_MAPPER.readValue(jsonString, new TypeReference<Map<String, Object>>() {
});
} catch (JsonProcessingException e) {
log.error("json string to map failed", e);
return Collections.emptyMap();
}
}
/**
* 获取泛型的 Collection Type
*
* @param collectionClass 泛型的 Collection
* @param elementClass 元素类
* @return Java 类型
*/
public static JavaType getCollectionType(Class<?> collectionClass, Class<?>… elementClass) {
return OBJECT_MAPPER.getTypeFactory().constructParametricType(collectionClass, elementClass);
}
}
step5:
package com.example.demo2.entity;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
/**
* @author zengqm01
* @date 2022/3/3 14:26
*/
@Setter
@Getter
public class User implements Serializable {
private static final long serialVersionUID = 1L;
private String name;
private Integer age;
}
step6:
package com.example.demo2.controller;
import com.example.demo2.config.Mqtt;
import com.example.demo2.config.MqttGateway;
import com.example.demo2.entity.User;
import lombok.AllArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@AllArgsConstructor
@RequestMapping("/mqtt")
public class MqttController {
private final MqttGateway mqttGateway;
@PostMapping("/sendStr")
public String sendToMqtt(@RequestBody Mqtt<String> mqtt){
mqttGateway.sendToMqtt(mqtt.getTopic(),mqtt.getQos(),mqtt.getPayloadJsonStr());
return "Success";
}
@PostMapping("/sendEntity")
public String sendEntityToMqtt(@RequestBody Mqtt<User> mqtt){
mqttGateway.sendToMqtt(mqtt.getTopic(),mqtt.getQos(),mqtt.getPayloadJsonStr());
return "Success";
}
}
step7:
package com.example.demo2.config;
import com.example.demo2.util.JsonUtils;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
@Setter
@Getter
public class Mqtt<T> implements Serializable {
private static final Long serialVersionUID = 1L;
private String topic;
private int qos;
private T payload;
public String getPayloadJsonStr(){
if (payload instanceof String){
return (String)payload;
}
return JsonUtils.obj2json(payload);
}
}
step8:
package com.example.demo2.config;
import lombok.Getter;
import lombok.Setter;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
/**
* @author zengqm01
* @date 2022/3/3 12:17
*/
@Setter
@Getter
@Configuration
@IntegrationComponentScan
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfiguration {
private String serverUrl;
private String clientId;
private String username;
private String password;
private String defaultTopic;
private int timeout = 100;
private int keepAlive = 100;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory mqttClientFactory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] {serverUrl});
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepAlive);
options.setAutomaticReconnect(true);
mqttClientFactory.setConnectionOptions(options);
return mqttClientFactory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound(MqttPahoClientFactory mqttClientFactory) {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory);
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer mqttInbound(MessageChannel mqttInboundChannel) {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(serverUrl, "test", "testtopic/#");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInboundChannel);
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInboundChannel")
public MessageHandler handler() {
return message -> System.out.println(message.getPayload());
}
@Bean
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
}
step9:
package com.example.demo2.config;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,@Header(MqttHeaders.QOS) int qos,String payload);
}
step10:
使用postman测试,请求方式post,地址 127.0.0.1:8080/mqtt/sendStr
参数为body,选择raw方式,默认json格式,请求内容为json字符串
{
"qos": 1,
"payload": "WY+SWITCH=1",
"topic": "/zcz002/zcz002103910/C1/"
}
0表示关闭,1表示打开,发送指令0和1,可以控制开关的打开和关闭
————————————————
版权声明:本文为CSDN博主「勘察加熊人」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/cf8833/article/details/123261019