我的编程空间,编程开发者的网络收藏夹
学习永远不晚

消息中间件ActiveMQ的简单入门介绍与使用

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

消息中间件ActiveMQ的简单入门介绍与使用

一、什么是消息中间件

消息中间件顾名思义实现的就是在两个系统或两个客户端之间进行消息传送

二、什么是ActiveMQ

ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现,ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。

三、什么时候需要用ActiveMQ

ActiveMQ常被应用与系统业务的解耦,异步消息的推送,增加系统并发量,提高用户体验。例如以我在工作中的使用,在比较耗时且异步的远程开锁操作时

四、如何使用ActiveMQ

1.AcitveMQ的数据传送流程

2.ActiveMQ的两种消息传递类型

(1)点对点传输,即一个生产者对应一个消费者,生产者向broke推送数据,数据存储在broke的一个队列中,当消费者接受该条队列里的数据。

(2)基于发布/订阅模式的传输,即根据订阅话题来接收相应数据,一个生产者可向多个消费者推送数据,与MQTT协议的实现是类似的,对MQTT协议有兴趣的可跳转文末查看

两种消息传递类型的不同,点对点传输消费者可以接收到在连接之前生产者所推送的数据,而基于发布/订阅模式的传输方式消费者只能接收到连接之后生产者推送的数据。

3.ActiveMQ的安装与启动

(1)官网下载对应服务器版本

(2)解压后进入apache-activemq-5.15.9/bin目录

(3)执行./activemq start启动ActiveMQ

(4)浏览器输入ActiveMQ启动的服务器ip:8161便可进入web界面,点击Manage ActiveMQ broker可以查看消息推送的状态,默认账号密码为admin,admin

(5)启动错误分析

进入/root/apache-activemq-5.15.9/data目录查看activemq.log文件,根据错误提示信息修改,例如端口号被占用等。

4.ActiveMQ的代码测试

(1)构建maven项目,引入依赖


<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.9.0</version>
        </dependency>

(2)生产者类



public class MyProducer {

    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";

    public static void main(String[] args) throws JMSException {
        // 创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 打开连接
        connection.start();
        // 创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
        Destination destination = session.createQueue("myQueue");
        // 创建一个生产者
        MessageProducer producer = session.createProducer(destination);
        // 向队列推送10个文本消息数据
        for (int i = 1 ; i <= 10 ; i++){
            // 创建文本消息
            TextMessage message = session.createTextMessage("第" + i + "个文本消息");
            //发送消息
            producer.send(message);
            //在本地打印消息
            System.out.println("已发送的消息:" + message.getText());
        }
        //关闭连接
        connection.close();
    }

}

运行结果:

已发送的消息:第1个文本消息

已发送的消息:第2个文本消息

已发送的消息:第3个文本消息

已发送的消息:第4个文本消息

已发送的消息:第5个文本消息

已发送的消息:第6个文本消息

已发送的消息:第7个文本消息

已发送的消息:第8个文本消息

已发送的消息:第9个文本消息

已发送的消息:第10个文本消息

测试查看web后台显示,有10条消息在队列中等待消费

(3)消费者类



public class MyConsumer {

    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";

    public static void main(String[] args) throws JMSException {
        // 创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 打开连接
        connection.start();
        // 创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
        Destination destination = session.createQueue("myQueue");
        // 创建消费者
        MessageConsumer consumer = session.createConsumer(destination);
        // 创建消费的监听
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消费的消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

测试结果:

消费的消息:第1个文本消息

消费的消息:第2个文本消息

消费的消息:第3个文本消息

消费的消息:第4个文本消息

消费的消息:第5个文本消息

消费的消息:第6个文本消息

消费的消息:第7个文本消息

消费的消息:第8个文本消息

消费的消息:第9个文本消息

消费的消息:第10个文本消息

web后台显示有一个消费者处于连接状态,且已消费了10个message,而该条队列已没有message待消费了

(4)当我们运行两个消费者类,消息又是怎么被消费的呢?是两个消费者都能收到生产者生产的message,还是只有其中一个消费者能消费呢?

我们先运行两个消费者,在运行一个生产者对目标队列生产10个message,会发现有以下情况

// Consumer1控制台

消费的消息:第1个文本消息

消费的消息:第3个文本消息

消费的消息:第5个文本消息

消费的消息:第7个文本消息

消费的消息:第9个文本消息

// Consumer2控制台

消费的消息:第2个文本消息

消费的消息:第4个文本消息

消费的消息:第6个文本消息

消费的消息:第8个文本消息

消费的消息:第10个文本消息

即队列中的数据会平均的分给每一个消费者消费,且每一条数据只能被消费一次

(5)以上是基于队列点对点的传输类型,以下是基于发布/订阅模式传输的类型测试



public class MyProducerForTopic {

    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";

    public static void main(String[] args) throws JMSException {
        // 创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 打开连接
        connection.start();
        // 创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
        Destination destination = session.createTopic("topicTest");
        // 创建一个生产者
        MessageProducer producer = session.createProducer(destination);
        // 向队列推送10个文本消息数据
        for (int i = 1 ; i <= 10 ; i++){
            // 创建文本消息
            TextMessage message = session.createTextMessage("第" + i + "个文本消息");
            //发送消息
            producer.send(message);
            //在本地打印消息
            System.out.println("已发送的消息:" + message.getText());
        }
        //关闭连接
        connection.close();
    }

}


public class MyConsumerForTopic {

    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";

    public static void main(String[] args) throws JMSException {
        // 创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 打开连接
        connection.start();
        // 创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
        Destination destination = session.createTopic("topicTest");
        // 创建消费者
        MessageConsumer consumer = session.createConsumer(destination);
        // 创建消费的监听
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消费的消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

现在如果我们先启动生产者,再启动消费者,会发现消费者是无法接收到之前生产者之前所生产的数据,只有消费者先启动,再让生产者消费才可以正常接收数据,这也是发布/订阅的主题模式与点对点的队列模式的一个明显区别。

而如果启动两个消费者,那么每一个消费者都能完整的接收到生产者生产的数据,即每一条数据都被消费了两次,这是发布/订阅的主题模式与点对点的队列模式的另一个明显区别。

浅谈MQTT

1、什么是MQTT

MQTT的全称是“ Message Queuing Telemetry Transport”,即消息队列遥测传输,是一种基于订阅/发布模式的应用层协议,而http是一种基于restful风格的一种应用层协议。

MQTT协议是一种轻量级协议,作为一种低开销、低带宽占用的即时通讯协议,常被应用于物联网项目。同样基于订阅/发布模式的中间件有ActiveMQ,Kafka等消息中间件,归根结底实现的都是消息的传输。

2、如何理解MQTT

MQTT的是一种应用层协议,每一种协议都有其适用场景,而MQTT常被应用于消息推送,消息采集。例如温度检测仪器定时上传温度、检测矿洞氧气浓度等。

MQTT是基于TCP/IP的一种应用层协议,TCP/IP本身已实现了在不可靠的网络环境提供可靠的网络传输的功能,而MQTT协议也有其保障消息可靠传输的策略。

MQTT推送的消息有三种消息质量

1.至多一次,即消息只推送一次,至于消息有没有推送成功

2.至少一次,需要确认消息到达,可能会导致收到重复数据(注:MQTT定义的重发机制与tcp的重复机制是不同的,tcp的重复机制是在限定时间内如果没有收到对应序号的响应报文,则会重新推送该序列号对应的报文,而MQTT的重发机制是在客户端重新建立连接时,

补发之前没有对应响应报文的数据包,当然客户端可以选择是否要接收这些之前没有传输成功的数据包。最开始使用netty实现MQTT服务器的时候就理解错了,以为MQTT的重复机制与tcp的重复机制一样)

3.只有一次,确认消息只到达一次,常用于对数据要求严格的场景,例如计费场景,订单场景

3、如何使用MQTT

MQTT的客户端和服务端目前已有成熟的开源产品,例如服务端有emqx,客户端有Eclipse Paho Mqtt(Java),都可以方面的引入相应的库快速的实现推送功能(具体可根据需求查看对应的API)。

本质上来将是客户端与服务端建立一个Socket,然后根据MQTT协议规定发送响应的报,例如建立socket后发送connet报文去建立连接,然后服务器会解析该连接报文,并保存该连接的相关信息。

我们可以把MQTT协议的规定当成是我们实现web项目中所实现的业务逻辑。

4、MQTT协议的相关的名词解析

1.订阅(Subscription)

订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。

2.、会话(Session)

每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。

3.主题名(Topic Name)

连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。

4.主题筛选器(Topic Filter)

一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。

5.负载(Payload)

消息订阅者所具体接收的内容。

总结

到此这篇关于ActiveMQ的简单入门介绍与使用的文章就介绍到这了,更多相关ActiveMQ介绍与使用内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

消息中间件ActiveMQ的简单入门介绍与使用

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

如何进行ActiveMQ的简单入门与使用

这期内容当中小编将会给大家带来有关如何进行ActiveMQ的简单入门与使用,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。一、什么是消息中间件消息中间件顾名思义实现的就是在两个系统或两个客户端之间进行消息传
2023-06-21

node.js中debug模块的简单介绍与使用

前言 相信使用node.js的朋友们都知道,一般在nodejs需要进行调试的时候,可以使用console.log()方法来将调试信息输出到控制台,当发布到生产环境的时候,需要将这些调试信息都注释掉,为了方便切换而不需要改动程序代码,可以使用
2022-06-04

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录