我的编程空间,编程开发者的网络收藏夹
学习永远不晚

spring-integration连接MQTT全过程

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

spring-integration连接MQTT全过程

MQTT一种物联网数据传输协议,构建在TCP之上,采用发布与订阅的模式进行数据交互,发布与订阅是两个独立的连接通道,这里采用spring-integration-mqt来实现发布与订阅MQTT,与直接采用MQTT的SDK相对要简单许多,服务端采用ActiveMQ来支持MQTT的消息服务并实现消息转发。

首先需要引入spring-integration-mqt的包

这里只需要引入这一个包即可。

<dependency>
     <groupId>org.springframework.integration</groupId>
     <artifactId>spring-integration-mqtt</artifactId>
     <version>5.3.1.RELEASE</version>
</dependency>

MQTT的配置比较简单

和spring-integration集成一样,需要配置相对应的入站、出站就可以了

具体配置如下:

package org.noka.serialservice.config;
 
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.noka.serialservice.service.MsgSendService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.support.MessageBuilder;
 

@EnableIntegration
@Configuration
@ConditionalOnProperty("mqtt.services")
public class MQTTConfig implements ApplicationListener<ApplicationEvent> {
    private static Logger logger = LoggerFactory.getLogger(MQTTConfig.class);
 
    private final MsgSendService msgSendService;//发布消息到消息中间件接口
 
    @Value("${mqtt.appid:mqtt_id}")
    private String appid;//客户端ID
 
    @Value("${mqtt.input.topic:mqtt_input_topic}")
    private String[] inputTopic;//订阅主题,可以是多个主题
 
    @Value("${mqtt.out.topic:mqtt_out_topic}")
    private String[] outTopic;//发布主题,可以是多个主题
 
    @Value("${mqtt.services:#{null}}")
    private String[] mqttServices;//服务器地址以及端口
 
    @Value("${mqtt.user:#{null}}")
    private String user;//用户名
 
    @Value("${mqtt.password:#{null}}")
    private String password;//密码
 
    @Value("${mqtt.KeepAliveInterval:300}")
    private Integer KeepAliveInterval;//心跳时间,默认为5分钟
 
    @Value("${mqtt.CleanSession:false}")
    private Boolean CleanSession;//是否不保持session,默认为session保持
 
    @Value("${mqtt.AutomaticReconnect:true}")
    private Boolean AutomaticReconnect;//是否自动重联,默认为开启自动重联
 
    @Value("${mqtt.CompletionTimeout:30000}")
    private Long CompletionTimeout;//连接超时,默认为30秒
 
    @Value("${mqtt.Qos:1}")
    private Integer Qos;//通信质量,详见MQTT协议
 
 
    public MQTTConfig(MsgSendService msgSendService) {
        this.msgSendService = msgSendService;
    }
 
    
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();//连接工厂类
        MqttConnectOptions options = new MqttConnectOptions();//连接参数
        options.setServerURIs(mqttServices);//连接地址
        if(null!=user) {
            options.setUserName(user);//用户名
        }
        if(null!=password) {
            options.setPassword(password.toCharArray());//密码
        }
        options.setKeepAliveInterval(KeepAliveInterval);//心跳时间
        options.setAutomaticReconnect(AutomaticReconnect);//断开是否自动重联
        options.setCleanSession(CleanSession);//保持session
        factory.setConnectionOptions(options);
        return factory;
    }
 
    
    @Bean
    public MessageProducerSupport mqttInput(MqttPahoClientFactory mqttPahoClientFactory){
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(appid, mqttPahoClientFactory, inputTopic);//建立订阅连接
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);//bytes类型接收
        adapter.setCompletionTimeout(CompletionTimeout);//连接超时的时间
        adapter.setConverter(converter);
        adapter.setQos(Qos);//消息质量
        adapter.setOutputChannelName(ChannelName.INPUT_DATA);//输入管道名称
        return adapter;
    }
    
    @Bean
    @ServiceActivator(inputChannel = ChannelName.OUTPUT_DATA_MQTT)
    public AbstractMqttMessageHandler MQTTOutAdapter(MqttPahoClientFactory connectionFactory) {
        //创建一个新的出站管道,由于MQTT的发布与订阅是两个独立的连接,因此客户端的ID(即APPID)不能与订阅时所使用的ID一样,否则在服务端会认为是同一个客户端,而造成连接失败
        MqttPahoMessageHandler outGate = new MqttPahoMessageHandler(appid + "_put", connectionFactory);
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);//bytes类型接收
        outGate.setAsync(true);
        outGate.setCompletionTimeout(CompletionTimeout);//设置连接超时时时
        outGate.setDefaultQos(Qos);//设置通信质量
        outGate.setConverter(converter);
        return outGate;
    }
 
    
    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        if (event instanceof MqttSubscribedEvent) {
            String msg = "OK";
            
            msgSendService.send(MessageBuilder.withPayload(msg.getBytes()).build());
        }
    }
}

其中ChanneName是一个常量类

来标识入站、出站管道的名称,以便在其它需要的地方使用,实现方法如下:


public class ChannelName {
    public final static String INPUT_DATA="input_data";//入站管道
    public final static String OUTPUT_DATA_TCP="output_data_TCP";//TCP出站管道
    public final static String OUTPUT_DATA_MQTT="output_data_MQTT";//mqtt出站管道名称
}

此时所有配置完成,接下来需要做的就是处理接收到的数据和发布数据,以上配置完成以后,接收和发送数据都是通过数据管道来完成,配置的是数据管道名称。

数据发送网关只是一个接口

用于向指定的数据管道里面发送数据,实现如下:

package org.noka.serialservice.service;
 
import org.noka.serialservice.config.ChannelName;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
 

@MessagingGateway
@Component
public interface MsgGateway {
    
    @Gateway(requestChannel = ChannelName.OUTPUT_DATA_MQTT)
    void send(@Header(MqttHeaders.TOPIC) String a, Message<byte[]> out);
}

在需要的地方,可以向下面这样调用这个接口,向MQTT服务器发送消息

//topic为主题名称,out为消息内容
msgGateway.send(topic, out);

MQTT服务器有数据下发时

会自动调将数据放入配置的入站数据管道中,在需要接收数据的地方,向下面这样配置即可

    
    @ServiceActivator(inputChannel = ChannelName.INPUT_DATA)
    public void upCase(Message<byte[]> in) {
        logger.info("[net service data]========================================");
        logger.info("[net dow data]"+new String(in.getPayload()));//字符串方式打印服务器下发的数据
        logger.info("[net dow hex]"+ Hex.encodeHexString(in.getPayload(),false));//16进制方式打印服务器下发的数据
        serialService.send(in.getPayload());//将服务器下发的数据转发给串口
    }

最后是参数配置文件

#--------MQTT---------------------------
#设备ID,唯一标识
mqtt.appid=mqtt_id
#订阅主题,多个主题用逗号分隔
mqtt.input.topic=mqtt_input_topic
#发布主题
mqtt.out.topic=mqtt_out_topic,aac
#MQTT服务器地址,可以是多个地址
mqtt.services=tcp://47.244.191.41:1883
#mqtt用户名,默认无
#mqtt.user=guest
#mqtt密码,默认无
#mqtt.password=guest
#心跳间隔时间,默认3000
#mqtt.KeepAliveInterval=3000
#是否不保持session,默认false
#mqtt.CleanSession=false
#是否自动连接,默认true
#mqtt.AutomaticReconnect=true
#连接超时,默认30000
#mqtt.CompletionTimeout=30000
#传输质量,默认1
#mqtt.Qos=1

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

spring-integration连接MQTT全过程

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

spring-integration连接MQTT全过程

这篇文章主要介绍了spring-integration连接MQTT全过程,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
2023-03-11

spring integration怎么连接MQTT

本篇内容主要讲解“spring integration怎么连接MQTT”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“spring integration怎么连接MQTT”吧!MQTT一种物联网数
2023-07-05

Spring连接Mysql数据库全过程

这篇文章主要介绍了Spring连接Mysql数据库全过程,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
2022-11-21

C#创建WebService接口并连接的全过程

工作时遇到需要请求客户的接口返回数据,要求使用WebService,借此机会记录一下,下面这篇文章主要给大家介绍了关于C#创建WebService接口并连接的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下
2022-12-19

Spring整合ehCache全过程

这篇文章主要介绍了Spring整合ehCache全过程,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
2023-02-14

小程序连接MQTT进行通信(保证能用)

一、MQTT通信介绍 MQTT 是一种基于客户端服务端架构的发布/订阅模式的消息传输协议。它的设计思想是轻巧、开放、 简单、规范,易于实现。这些特点使得它对很多场景来说都是很好的选择,特别是对于受限的环境如机器与机器的通信(M2M)以及物联
2023-08-17

Pycharm使用Database Navigator连接mysql数据库全过程

目录Pycharm Database Navigator连接mysql1.安装Database Navigator2.下载mysql驱动3.创建连接4.时区错误pycharm无法连接上mysql对mysql进行配置报错解决Pycharm D
2022-07-19

怎么通过接口安全退出Spring Boot

怎么通过接口安全退出Spring Boot?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。1、在pom.xml中引入actuator, security依赖
2023-05-31

windows环境下python连接openGauss数据库的全过程

目录一、python 介绍二、Python下载及安装三、openGauss Connectors (Psycopg2) 介绍四、openGauss Connectors (Psycopg2)下载并初始化五、连接并访问openGauss数据库
2023-01-04

通过 plsql 连接远程 Oracle

方法一:通过 plsql 工具和 oracle client(不是即时客户端 instantclient) 的方式来连接 Oracle一、 安装 oracle client,(本教程已经下载并解压)二、 选 择 管 理 员 安 装 ,如图:三、剩下的就是一路下
通过 plsql 连接远程 Oracle
2016-10-20

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录