mqtt使用springboot实现硬件通信

step1: https://github.com/wrs13634194612/hello-mqtt.git

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <parent>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-parent</artifactId>

        <version>2.6.4</version>

        <relativePath/> <!– lookup parent from repository –>

    </parent>

    <groupId>com.example</groupId>

    <artifactId>demo2</artifactId>

    <version>0.0.1-SNAPSHOT</version>

    <name>demo2</name>

    <description>demo2</description>

    <properties>

        <java.version>11</java.version>

    </properties>

    <dependencies>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-web</artifactId>

        </dependency>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-configuration-processor</artifactId>

        </dependency>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-integration</artifactId>

        </dependency>

        <dependency>

            <groupId>org.springframework.integration</groupId>

            <artifactId>spring-integration-stream</artifactId>

        </dependency>

        <dependency>

            <groupId>org.springframework.integration</groupId>

            <artifactId>spring-integration-mqtt</artifactId>

        </dependency>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-test</artifactId>

            <scope>test</scope>

        </dependency>

        <dependency>

            <groupId>org.projectlombok</groupId>

            <artifactId>lombok</artifactId>

        </dependency>

    </dependencies>

    <build>

        <plugins>

            <plugin>

                <groupId>org.springframework.boot</groupId>

                <artifactId>spring-boot-maven-plugin</artifactId>

            </plugin>

        </plugins>

    </build>

</project>

 

step2:

spring:

  mqtt:

    server-url: tcp://112.74.48.180:1883

    client-id: rain-smart-server-${random.value}

    username: admin

    password: 1234

    default-topic: test

 

step3:

package com.example.demo2;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication

public class Demo2Application {

    public static void main(String[] args) {

        SpringApplication.run(Demo2Application.class, args);

    }

}

 

step4:

package com.example.demo2.util;

import com.fasterxml.jackson.annotation.JsonInclude;

import com.fasterxml.jackson.core.JsonProcessingException;

import com.fasterxml.jackson.core.type.TypeReference;

import com.fasterxml.jackson.databind.JavaType;

import com.fasterxml.jackson.databind.ObjectMapper;

import lombok.extern.log4j.Log4j2;

import java.util.ArrayList;

import java.util.Collections;

import java.util.List;

import java.util.Map;

/**

 * @author zengqm

 * @date 2021/9/8 17:25

 */

@Log4j2

public class JsonUtils {

    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    public static ObjectMapper getInstance() {

        return OBJECT_MAPPER;

    }

    /**

     * 将 java 对象转为 json

     *

     * @param obj java 对象

     * @return json 字符串

     */

    public static String obj2json(Object obj) {

        try {

            return OBJECT_MAPPER.writeValueAsString(obj);

        } catch (JsonProcessingException e) {

            log.error("obj to json string failed", e);

            return "";

        }

    }

    /**

     * 将 java 对象转为 json,忽略 null 值

     *

     * @param obj java 对象

     * @return json 字符串

     */

    public static String obj2jsonIgnoreNull(Object obj) {

        ObjectMapper objectMapper = new ObjectMapper();

        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);

        try {

            return objectMapper.writeValueAsString(obj);

        } catch (JsonProcessingException e) {

            log.error("obj to json string failed", e);

            return "";

        }

    }

    /**

     * 将 json 字符串转换为 java 对象

     *

     * @param jsonString json 字符串

     * @param clazz      指定 java 对象的字节码对象

     * @param <T>        泛型

     * @return java 对象

     */

    public static <T> T json2obj(String jsonString, Class<T> clazz) {

        try {

            return OBJECT_MAPPER.readValue(jsonString, clazz);

        } catch (JsonProcessingException e) {

            log.error("json string to obj failed", e);

            return null;

        }

    }

    /**

     * 将 json 字符串转换为存储指定 java 对象的 List 集合

     *

     * @param jsonString json 字符串

     * @param clazz      指定 java 对象的字节码对象

     * @param <T>        泛型

     * @return 存储指定 java 对象的 List 集合

     */

    public static <T> List<T> json2list(String jsonString, Class<T> clazz) {

        try {

            JavaType javaType = getCollectionType(ArrayList.class, clazz);

            return OBJECT_MAPPER.readValue(jsonString, javaType);

        } catch (JsonProcessingException e) {

            log.error("json string to list failed", e);

            return Collections.emptyList();

        }

    }

    /**

     * 将 json 字符串转换为 Map

     *

     * @param jsonString json 字符串

     * @return Map 对象

     */

    public static Map<String, Object> json2map(String jsonString) {

        try {

            return OBJECT_MAPPER.readValue(jsonString, new TypeReference<Map<String, Object>>() {

            });

        } catch (JsonProcessingException e) {

            log.error("json string to map failed", e);

            return Collections.emptyMap();

        }

    }

    /**

     * 获取泛型的 Collection Type

     *

     * @param collectionClass 泛型的 Collection

     * @param elementClass    元素类

     * @return Java 类型

     */

    public static JavaType getCollectionType(Class<?> collectionClass, Class<?>… elementClass) {

        return OBJECT_MAPPER.getTypeFactory().constructParametricType(collectionClass, elementClass);

    }

}

 

step5:

package com.example.demo2.entity;

import lombok.Getter;

import lombok.Setter;

import java.io.Serializable;

/**

 * @author zengqm01

 * @date 2022/3/3 14:26

 */

@Setter

@Getter

public class User implements Serializable {

    private static final long serialVersionUID = 1L;

    private String name;

    private Integer age;

}

 

step6:

package com.example.demo2.controller;

import com.example.demo2.config.Mqtt;

import com.example.demo2.config.MqttGateway;

import com.example.demo2.entity.User;

import lombok.AllArgsConstructor;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RequestBody;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

@RestController

@AllArgsConstructor

@RequestMapping("/mqtt")

public class MqttController {

    private final MqttGateway mqttGateway;

    @PostMapping("/sendStr")

    public String sendToMqtt(@RequestBody Mqtt<String> mqtt){

        mqttGateway.sendToMqtt(mqtt.getTopic(),mqtt.getQos(),mqtt.getPayloadJsonStr());

        return "Success";

    }

    @PostMapping("/sendEntity")

    public String sendEntityToMqtt(@RequestBody Mqtt<User> mqtt){

        mqttGateway.sendToMqtt(mqtt.getTopic(),mqtt.getQos(),mqtt.getPayloadJsonStr());

        return "Success";

    }

}

 

step7:

package com.example.demo2.config;

import com.example.demo2.util.JsonUtils;

import lombok.Getter;

import lombok.Setter;

import java.io.Serializable;

@Setter

@Getter

public class Mqtt<T> implements Serializable {

    private static final Long serialVersionUID = 1L;

    private String topic;

    private int qos;

    private T payload;

    public String getPayloadJsonStr(){

        if (payload instanceof String){

            return (String)payload;

        }

        return JsonUtils.obj2json(payload);

    }

}

 

step8:

package com.example.demo2.config;

        import lombok.Getter;

        import lombok.Setter;

        import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

        import org.springframework.boot.context.properties.ConfigurationProperties;

        import org.springframework.context.annotation.Bean;

        import org.springframework.context.annotation.Configuration;

        import org.springframework.integration.annotation.IntegrationComponentScan;

        import org.springframework.integration.annotation.ServiceActivator;

        import org.springframework.integration.channel.DirectChannel;

        import org.springframework.integration.core.MessageProducer;

        import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;

        import org.springframework.integration.mqtt.core.MqttPahoClientFactory;

        import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;

        import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;

        import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;

        import org.springframework.messaging.Message;

        import org.springframework.messaging.MessageChannel;

        import org.springframework.messaging.MessageHandler;

        import org.springframework.messaging.MessagingException;

/**

 * @author zengqm01

 * @date 2022/3/3 12:17

 */

@Setter

@Getter

@Configuration

@IntegrationComponentScan

@ConfigurationProperties(prefix = "spring.mqtt")

public class MqttConfiguration {

    private String serverUrl;

    private String clientId;

    private String username;

    private String password;

    private String defaultTopic;

    private int timeout = 100;

    private int keepAlive = 100;

    @Bean

    public MqttPahoClientFactory mqttClientFactory() {

        DefaultMqttPahoClientFactory mqttClientFactory = new DefaultMqttPahoClientFactory();

        MqttConnectOptions options = new MqttConnectOptions();

        options.setServerURIs(new String[] {serverUrl});

        options.setUserName(username);

        options.setPassword(password.toCharArray());

        options.setConnectionTimeout(timeout);

        options.setKeepAliveInterval(keepAlive);

        options.setAutomaticReconnect(true);

        mqttClientFactory.setConnectionOptions(options);

        return mqttClientFactory;

    }

    @Bean

    @ServiceActivator(inputChannel = "mqttOutboundChannel")

    public MessageHandler mqttOutbound(MqttPahoClientFactory mqttClientFactory) {

        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory);

        messageHandler.setAsync(true);

        messageHandler.setDefaultTopic(defaultTopic);

        return messageHandler;

    }

    @Bean

    public MessageChannel mqttOutboundChannel() {

        return new DirectChannel();

    }

    @Bean

    public MessageProducer mqttInbound(MessageChannel mqttInboundChannel) {

        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(serverUrl, "test", "testtopic/#");

        adapter.setCompletionTimeout(5000);

        adapter.setConverter(new DefaultPahoMessageConverter());

        adapter.setQos(1);

        adapter.setOutputChannel(mqttInboundChannel);

        return adapter;

    }

    @Bean

    @ServiceActivator(inputChannel = "mqttInboundChannel")

    public MessageHandler handler() {

        return message -> System.out.println(message.getPayload());

    }

    @Bean

    public MessageChannel mqttInboundChannel() {

        return new DirectChannel();

    }

}

 

step9:

package com.example.demo2.config;

import org.springframework.integration.annotation.MessagingGateway;

import org.springframework.integration.mqtt.support.MqttHeaders;

import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")

public interface MqttGateway {

     void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,@Header(MqttHeaders.QOS) int qos,String payload);

}

 

step10:

使用postman测试,请求方式post,地址 127.0.0.1:8080/mqtt/sendStr

参数为body,选择raw方式,默认json格式,请求内容为json字符串

 {

         "qos": 1,

         "payload": "WY+SWITCH=1",

         "topic": "/zcz002/zcz002103910/C1/"

}

 

0表示关闭,1表示打开,发送指令0和1,可以控制开关的打开和关闭

————————————————

版权声明:本文为CSDN博主「勘察加熊人」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/cf8833/article/details/123261019