java集成mqtt,与硬件设备通信esp8266硬件

java集成

maven依赖

<dependency>

            <groupId>org.eclipse.paho</groupId>

            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>

            <version>1.2.0</version>

        </dependency>

 

配置参数

package com.ruoyi.demo;

/**

 * @author 张璞

 * @date 2022/4/2 16:51

 */

public class MqConfig {

    public static int qos = 2; //只有一次

    public static String broker = "tcp://xxx.110.32.xxx:1883";

    public static String userName = "xxxxx";

    public static String passWord = "xxxxx";

}

 

发布消息

package com.ruoyi.demo;

import org.eclipse.paho.client.mqttv3.*;

import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**

 * @author 张璞

 * @date 2022/3/30 17:51

 */

public class PubMsg {

    /**

     * 功能描述:连接MQ

     * @author zhangpu

     * @date 2022/4/2

     * @param clientId

     * @param userName

     * @param password

     */

    private static MqttClient connect(String clientId, String userName,String password) throws MqttException {

        MemoryPersistence persistence = new MemoryPersistence();

        MqttConnectOptions connOpts = new MqttConnectOptions();

        connOpts.setCleanSession(true);

        connOpts.setUserName(userName);

        connOpts.setPassword(password.toCharArray());

        connOpts.setConnectionTimeout(10);// 设置超时时间

        connOpts.setKeepAliveInterval(20); // 设置会话心跳时间

        // broker,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存

        MqttClient mqttClient = new MqttClient(MqConfig.broker, clientId, persistence);

        mqttClient.setCallback(new PushCallback("test"));//设置回调

        mqttClient.connect(connOpts);

        return mqttClient;

    }

    /**

     * 功能描述:发送消息

     * @author zhangpu

     * @date 2022/4/2

     * @param msg

     * @param clientId

     * @param topic

     */

    private static void publish(String msg, String clientId, String topic) throws MqttException {

        MqttClient mqttClient = connect(clientId, MqConfig.userName, MqConfig.passWord);

        if (mqttClient != null)

        {

            MqttMessage message = new MqttMessage(msg.getBytes());

            message.setQos(MqConfig.qos);

            //重新连接MQTT服务时,不需要接收该主题最新消息,设置retained为false

            //重新连接MQTT服务时,需要接收该主题最新消息,设置retained为true;

            message.setRetained(false);

            mqttClient.publish(topic, message);

        }

        if (mqttClient != null)

        {

            mqttClient.disconnect();//释放连接

        }

    }

    public static void main(String[] args) throws MqttException {

        publish("message content", "client-id-0", "test-topic");

    }

}

 

发布回调

package com.ruoyi.demo;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

import org.eclipse.paho.client.mqttv3.MqttCallback;

import org.eclipse.paho.client.mqttv3.MqttMessage;

/**

 * @author 张璞

 * @date 2022/3/30 17:52

 */

public class PushCallback implements MqttCallback {

    private String threadId;

    public PushCallback(String threadId) {

        this.threadId = threadId;

    }

    public void connectionLost(Throwable cause) {

        // 连接丢失后,一般在这里面进行重连

        System.out.println("连接断开,可以做重连");

    }

    public void deliveryComplete(IMqttDeliveryToken token) {

        System.out.println("deliveryComplete———" + token.isComplete());

    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {

        String msg = new String(message.getPayload());

        System.out.println("——-messageArrived——-"+threadId + " " + msg);

    }

}

 

订阅

package com.ruoyi.demo;

import org.eclipse.paho.client.mqttv3.*;

import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**

 * @author 张璞

 * @date 2022/3/30 17:53

 */

public class SubMsg {

    /**

     * 功能描述:连接MQ

     * @author zhangpu

     * @date 2022/4/2

     * @param clientId 客户端唯一标识

     */

    private static MqttClient connect(String clientId) throws MqttException {

        MemoryPersistence persistence = new MemoryPersistence();

        MqttConnectOptions connOpts = new MqttConnectOptions();

        connOpts.setCleanSession(false);

        connOpts.setConnectionTimeout(10);

        connOpts.setKeepAliveInterval(20);

        connOpts.setUserName(MqConfig.userName);

        connOpts.setPassword(MqConfig.passWord.toCharArray());

        MqttClient mqttClient = new MqttClient(MqConfig.broker, clientId, persistence);

        mqttClient.connect(connOpts);

        return mqttClient;

    }

    /**

     * 功能描述:订阅消息

     * @author zhangpu

     * @date 2022/4/2

     * @param clientId

     * @param topic

     */

    private static void subMsg(String clientId, String topic) throws MqttException{

        MqttClient mqttClient = connect(clientId);

        if(mqttClient != null)

        {

            int[] Qos  = {MqConfig.qos};

            String[] topics = {topic};

            mqttClient.subscribe(topics, Qos);

        }

        mqttClient.subscribe(topic,MqConfig.qos,new SubMessageListener());

    }

    public static void main(String[] args) throws MqttException{

        subMsg("testSub", "test-topic");

    }

}

 

订阅监听

package com.ruoyi.demo;

import org.eclipse.paho.client.mqttv3.IMqttMessageListener;

import org.eclipse.paho.client.mqttv3.MqttMessage;

/**

 * @author 张璞

 * @date 2022/4/2 15:46

 */

public class SubMessageListener implements IMqttMessageListener {

    @Override

    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {

        System.out.println("接收消息主题 : " + topic);

        System.out.println("接收消息Qos : " + mqttMessage.getQos());

        System.out.println("接收消息内容 : " + new String(mqttMessage.getPayload()));

    }

}

 

c集成

/**

 * @author 张璞

 * @date 2022/3/30 17:51

 */

#include <ESP8266WiFi.h>   

#include <PubSubClient.h>       

#include <DNSServer.h>

#include <ESP8266WebServer.h>

#include <WiFiManager.h>  

#include <Ticker.h>       

const char* mqttServer = "xxx.xxx.xxx.xxx";//MQ服务地址

const char* mqttUserName = "xxx"; //户名

const char* mqttPassword = "xxx";// 密码

Ticker ticker;

WiFiClient wifiClient;

PubSubClient mqttClient(wifiClient);

int count;    // Ticker计数用变量

/**

 * @brief 程序启动入口

 * 

 */

void setup() {

    Serial.begin(9600);

    pinMode(LED_BUILTIN, OUTPUT);             // 设置板上LED引脚为输出模式

    digitalWrite(LED_BUILTIN, HIGH);          // 启动后关闭板上LED

    connectEquipment();                       //自动配网

    mqttClient.setServer(mqttServer, 1883);   // 设置MQTT服务器和端口号

    mqttClient.setCallback(receiveCallback);  // 设置MQTT订阅回调函数

    connectMQTTServer();                     // 连接MQTT服务器

    ticker.attach(1, tickerCount);          // Ticker定时对象

}

 

void loop() {

 if (!mqttClient.connected())  // 如果开发板未能成功连接服务器,则尝试连接服务器

  {

    connectMQTTServer();

  }

   mqttClient.loop();

  if (count >= 3)// 每隔3秒钟发布一次信息

  {

    pubMQTTmsg();

    count = 0;

  }

}

void tickerCount(){

  count++;

}

/**

 * @brief 建立网络配置

 * 

 */

void connectEquipment(){

    WiFiManager wifiManager;// 建立WiFiManager对象

    wifiManager.autoConnect("zhangpuArduino", "xxxxxxx");//wifi名:密码

    // WiFi连接成功后将通过串口监视器输出连接成功信息 

    Serial.println(""); 

    Serial.print("ESP8266 Connected to ");

    Serial.println(WiFi.SSID());              // WiFi名称

    Serial.print("IP address:\t");

    Serial.println(WiFi.localIP());           // IP

    Serial.print("MAC address:\t");   

    Serial.println(WiFi.macAddress());

  }

/**

 * @brief 连接mqtt

 * 

 */

void connectMQTTServer(){

  String clientId = "esp8266-" + WiFi.macAddress();

  if (mqttClient.connect(clientId.c_str(), mqttUserName, mqttPassword)) 

  { 

    Serial.println("MQTT Server Connected.");

    Serial.print("Server Address: ");

    Serial.println(mqttServer);

    Serial.print("ClientId: ");

    Serial.println(clientId);

    subscribeTopic(); // 订阅指定主题

  } else {

    Serial.print("MQTT Server Connect Failed. Client State:");

    Serial.println(mqttClient.state());

    delay(3000);

  }   

}

 

/**

 * @brief 发布消息

 * 

 */

void pubMQTTmsg(){

  static int value; // 客户端发布信息用数字

  String topicString = "Taichi-Maker-Pub-" + WiFi.macAddress();

  char publishTopic[topicString.length() + 1];  

  strcpy(publishTopic, topicString.c_str());

  String messageString = "Hello World " + String(value++); 

  char publishMsg[messageString.length() + 1];   

  strcpy(publishMsg, messageString.c_str());

  if(mqttClient.publish(publishTopic, publishMsg))

  {

    Serial.println("Message Publish Success."); 

    Serial.println("Publish Topic:");Serial.println(publishTopic);

    Serial.println("Publish message:");Serial.println(publishMsg);    

  } else {

    Serial.println("Message Publish Failed."); 

  }

}

/**

 * 

 * 订阅主题

 */

void subscribeTopic(){

  String topicString = "Taichi-Maker-Sub-" + WiFi.macAddress();//订阅的主题

  char subTopic[topicString.length() + 1];  

  strcpy(subTopic, topicString.c_str());

  if(mqttClient.subscribe(subTopic)){

    Serial.println("Subscrib Topic:");

    Serial.println(subTopic);

  } else {

    Serial.print("Subscribe Fail…");

  }  

}

/**

 * @brief 收到信息后的回调函数

 * @param topic 

 * @param payload 

 * @param length 

 */

void receiveCallback(char* topic, byte* payload, unsigned int length) {

  Serial.print("Message Received [");

  Serial.print(topic);

  Serial.print("] ");

  for (int i = 0; i < length; i++) {

    Serial.print((char)payload[i]);

  }

  Serial.println("");

  Serial.print("Message Length(Bytes) ");

  Serial.println(length);

  if ((char)payload[0] == '1') {     // 如果收到的信息以“1”为开始

    digitalWrite(BUILTIN_LED, LOW);  // 则点亮LED。

    Serial.println("LED ON");

  } else {                           

    digitalWrite(BUILTIN_LED, HIGH); // 否则熄灭LED。

    Serial.println("LED OFF");

  }

}

————————————————

版权声明:本文为CSDN博主「村东头老张」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/weixin_42456784/article/details/126647446