我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Springboot Websocket Stomp 消息订阅推送

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Springboot Websocket Stomp 消息订阅推送

需求背景

闲话不扯,直奔主题。需要和web前端建立长链接,互相实时通讯,因此想到了websocket,后面随着需求的变更,需要用户订阅主题,实现消息的精准推送,发布订阅等,则想到了STOMP(Simple Text-Orientated Messaging Protocol) 面向消息的简单文本协议。

websocket协议

想到了之前写的一个websocket长链接的demo,也贴上代码供大家参考。

pom文件
直接引入spring-boot-starter-websocket即可。


    	<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

声明websocket endpoint


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;


@Configuration
public class WebSocketConfig {

    
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}

websocket实现类,其中通过注解监听了各种事件,实现了推送消息等相关逻辑


import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.ruoyi.common.core.domain.AjaxResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;


@ServerEndpoint(value = "/ws/dataType/push/{token}")
@Component
public class DataTypePushWebSocket {

    private static final Logger log = LoggerFactory.getLogger(DataTypePushWebSocket.class);

    
    private static AtomicInteger onlineCount = new AtomicInteger(0);

    private static Cache<String, Session> SESSION_CACHE = CacheBuilder.newBuilder()
            .initialCapacity(10)
            .maximumSize(300)
            .expireAfterWrite(10, TimeUnit.MINUTES)
            .build();

    
    @OnOpen
    public void onOpen(Session session, @PathParam("token")String token) {
        String sessionId = session.getId();
        onlineCount.incrementAndGet(); // 在线数加1
        this.sendMessage("sessionId:" + sessionId +",已经和server建立连接", session);
        SESSION_CACHE.put(sessionId,session);
        log.info("有新连接加入:{},当前在线连接数为:{}", session.getId(), onlineCount.get());
    }

    
    @OnClose
    public void onClose(Session session,@PathParam("token")String token) {
        onlineCount.decrementAndGet(); // 在线数减1
        SESSION_CACHE.invalidate(session.getId());
        log.info("有一连接关闭:{},当前在线连接数为:{}", session.getId(), onlineCount.get());
    }

    
    @OnMessage
    public void onMessage(String message, Session session,@PathParam("token")String token) {
        log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
        this.sendMessage("服务端已收到推送消息:" + message, session);
    }

    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
        error.printStackTrace();
    }

    
    private static void sendMessage(String message, Session toSession) {
        try {
            log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
            toSession.getBasicRemote().sendText(message);
        } catch (Exception e) {
            log.error("服务端发送消息给客户端失败:{}", e);
        }
    }

    public static AjaxResult sendMessage(String message, String sessionId){
        Session session = SESSION_CACHE.getIfPresent(sessionId);
        if(Objects.isNull(session)){
            return AjaxResult.error("token已失效");
        }
        sendMessage(message,session);
        return AjaxResult.success();
    }

    public static AjaxResult sendBroadcast(String message){
        long size = SESSION_CACHE.size();
        if(size <=0){
            return AjaxResult.error("当前没有在线客户端,无法推送消息");
        }
        ConcurrentMap<String, Session> sessionConcurrentMap = SESSION_CACHE.asMap();
        Set<String> keys = sessionConcurrentMap.keySet();
        for (String key : keys) {
            Session session = SESSION_CACHE.getIfPresent(key);
            DataTypePushWebSocket.sendMessage(message,session);
        }

        return AjaxResult.success();

    }

}

至此websocket服务端代码已经完成。

stomp协议

前端代码.这个是在某个vue工程中写的js,各位大佬自己动手改改即可。其中Settings.wsPath是后端定义的ws地址例如ws://localhost:9003/ws


import Stomp from 'stompjs'
import Settings from '@/settings.js'

export default {
  // 是否启用日志 默认启用
  debug:true,
  // 客户端连接信息
  stompClient:{},
  // 初始化
  init(callBack){
    this.stompClient = Stomp.client(Settings.wsPath)
    this.stompClient.hasDebug = this.debug
    this.stompClient.connect({},suce =>{
      this.console("连接成功,信息如下 ↓")
      this.console(this.stompClient)
      if(callBack){
        callBack()
      }
    },err => {
      if(err) {
        this.console("连接失败,信息如下 ↓")
        this.console(err)
      }
    })
  },
  // 订阅
  sub(address,callBack){
    if(!this.stompClient.connected){
      this.console("没有连接,无法订阅")
      return
    }
    // 生成 id
    let timestamp= new Date().getTime() + address
    this.console("订阅成功 -> "+address)
    this.stompClient.subscribe(address,message => {
      this.console(address+" 订阅消息通知,信息如下 ↓")
      this.console(message)
      let data = message.body
      callBack(data)
    },{
      id: timestamp
    })
  },
  unSub(address){
    if(!this.stompClient.connected){
      this.console("没有连接,无法取消订阅 -> "+address)
      return
    }
    let id = ""
    for(let item in this.stompClient.subscriptions){
      if(item.endsWith(address)){
        id = item
        break
      }
    }
    this.stompClient.unsubscribe(id)
    this.console("取消订阅成功 -> id:"+ id + " address:"+address)
  },
  // 断开连接
  disconnect(callBack){
    if(!this.stompClient.connected){
      this.console("没有连接,无法断开连接")
      return
    }
    this.stompClient.disconnect(() =>{
      console.log("断开成功")
      if(callBack){
        callBack()
      }
    })
  },
  // 单位 秒
  reconnect(time){
    setInterval(() =>{
      if(!this.stompClient.connected){
        this.console("重新连接中...")
        this.init()
      }
    },time * 1000)
  },
  console(msg){
    if(this.debug){
      console.log(msg)
    }
  },
  // 向订阅发送消息
  send(address,msg) {
    this.stompClient.send(address,{},msg)
  }
}

后端stomp config,里面都有注释,写的很详细,并且我加入了和前端的心跳ping pong。


package com.cn.scott.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;


@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {

    private static long HEART_BEAT=10000;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //允许使用socketJs方式访问,访问点为webSocket,允许跨域
        //在网页上我们就可以通过这个链接
        //ws://127.0.0.1:port/ws来和服务器的WebSocket连接
        registry.addEndpoint("/ws").setAllowedOrigins("*");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        ThreadPoolTaskScheduler te = new ThreadPoolTaskScheduler();
        te.setPoolSize(1);
        te.setThreadNamePrefix("wss-heartbeat-thread-");
        te.initialize();
        //基于内存的STOMP消息代理来代替mq的消息代理
        //订阅Broker名称,/user代表点对点即发指定用户,/topic代表发布广播即群发
        //setHeartbeatValue 设置心跳及心跳时间
        registry.enableSimpleBroker("/user", "/topic").setHeartbeatValue(new long[]{HEART_BEAT,HEART_BEAT}).setTaskScheduler(te);
        //点对点使用的订阅前缀,不设置的话,默认也是/user/
        registry.setUserDestinationPrefix("/user/");
    }
}

后端stomp协议接受、订阅等动作通知


package com.cn.scott.ws;

import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController
public class StompSocketHandler {

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    
    @SubscribeMapping("/user/{id}/listener")
    public void subscribeMapping(@DestinationVariable("id") final long id) {
        System.out.println(">>>>>>用户:"+id +",已订阅");
        SubscribeMsg param = new SubscribeMsg(id,String.format("用户【%s】已订阅成功", id));
        sendToUser(param);
    }


    
    @MessageMapping(value = "/user/{id}/listener")
    public void UserSubListener(@DestinationVariable long  id, String msg) {
        System.out.println("收到客户端:" +id+",的消息");
        SubscribeMsg param = new SubscribeMsg(id,String.format("已收到用户【%s】发送消息【%s】", id,msg));
        sendToUser(param);
    }
    
     @GetMapping("/refresh/{userId}")
    public void refresh(@PathVariable Long userId, String msg) {
        StompSocketHandler.SubscribeMsg param = new StompSocketHandler.SubscribeMsg(userId,String.format("服务端向用户【%s】发送消息【%s】", userId,msg));
        sendToUser(param);
    }

    
    public void sendToUser(SubscribeMsg screenChangeMsg){
        //这里可以控制权限等。。。
        simpMessagingTemplate.convertAndSendToUser(screenChangeMsg.getUserId().toString(),"/listener", JSON.toJSONString(screenChangeMsg));
    }

    
    public void sendBroadCast(String topic,String msg){
        simpMessagingTemplate.convertAndSend(topic,msg);
    }


    
    public static class SubscribeMsg {
        private Long userId;
        private String msg;
        public SubscribeMsg(Long UserId, String msg){
            this.userId = UserId;
            this.msg = msg;
        }
        public Long getUserId() {
            return userId;
        }
        public String getMsg() {
            return msg;
        }
    }
}

连接展示

建立连接成功,这里可以看出是基于websocket协议

在这里插入图片描述

连接信息

在这里插入图片描述

ping pong

在这里插入图片描述

调用接口向订阅用户1发送消息,http://localhost:9003/refresh/1?msg=HelloStomp,可以在客户端控制台查看已经收到了消息。这个时候不同用户通过自己的userId可以区分订阅的主题,可以做到通过userId精准的往客户端推送消息。

在这里插入图片描述

还记得我们在后端配置的时候还指定了广播的订阅主题/topic,这时我们前端通过js只要订阅了这个主题,那么后端在像这个主题推送消息时,所有订阅的客户端都能收到,感兴趣的小伙伴可以自己试试,api我都写好了。

在这里插入图片描述

至此,实战完毕,喜欢的小伙伴麻烦关注加点赞。

springboot + stomp后端源码地址:https://gitee.com/ErGouGeSiBaKe/stomp-server

到此这篇关于Springboot Websocket Stomp 消息订阅推送的文章就介绍到这了,更多相关Springboot Websocket Stomp 消息订阅推送内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Springboot Websocket Stomp 消息订阅推送

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

SpringBoot+WebSocket实现消息推送功能

WebSocket协议是基于TCP的一种新的网络协议。本文将通过SpringBoot集成WebSocket实现消息推送功能,感兴趣的可以了解一下
2022-11-13

springboot怎么集成websocket实现消息推送

要在Spring Boot中集成WebSocket实现消息推送,可以按照以下步骤进行操作:添加依赖:在`pom.xml`文件中添加以下依赖:org.springframework.bootspring-boot-starter-websoc
2023-10-26

Linux推送服务的消息推送如何支持WebSocket的二进制消息

在Linux上实现WebSocket二进制消息的推送服务,可以使用一些开源的WebSocket服务器实现,如Node.js的WebSocket模块或Java的Java-WebSocket库。以下是一个简单的Node.js WebSocket
Linux推送服务的消息推送如何支持WebSocket的二进制消息
2024-08-22

uni-app结合.NET 7实现微信小程序订阅消息推送

uni-app结合.NET7实现微信小程序订阅消息推送,可跨平台开发应用并推送订阅消息。创建.NET7API项目并安装Microsoft.Azure.NotificationHubs包。获取NotificationHub连接字符串和密钥,配置NotificationHub模板。在API项目中编写控制器用于发送订阅消息。在uni-app项目中安装@dcloudio/uni-notification插件,启用订阅消息权限。集成subscribeMessageAPI订阅消息,通过onReceiveSubscri
uni-app结合.NET 7实现微信小程序订阅消息推送
2024-04-02

uni-app结合.NET 7实现微信小程序订阅消息推送

本文主要介绍了uni-app结合.NET 7实现微信小程序订阅消息推送,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
2023-02-05

uniapp小程序订阅消息推送+Thinkphp5后端代码教程示例

记录一下通过uniapp开发小程序消息推送的实例,配合后端tp推送,之前写的项目是微信小程序而且后端是原生php,这次通过项目记录一下 目录 回顾access_token获取规则以及思路 第一步:设计前端触发订阅事件第二步:设
2023-08-16

Java应用层协议WebSocket实现消息推送

后端向前端推送消息就需要长连接,首先想到的就是websocket,下面这篇文章主要给大家介绍了关于java后端+前端使用WebSocket实现消息推送的详细流程,需要的朋友可以参考下
2023-02-22

Swoole和Workerman的消息推送和订阅在PHP与MySQL中的应用场景

一、Swoole的应用场景聊天室应用Swoole提供了WebSocket服务器的支持,可以轻松实现实时的聊天室应用。下面是一个简单的聊天室示例代码:// 创建WebSocket服务器$server = new SwooleWebSocke
2023-10-21

Linux推送服务的消息推送与WebSocket的握手过程优化

在Linux系统中,实现消息推送服务通常会使用WebSocket协议来进行实时通信。WebSocket协议是一种在Web浏览器和服务器之间进行全双工通信的协议,可以实现低延迟和高效的消息传递。在消息推送服务中,WebSocket的握手过程
Linux推送服务的消息推送与WebSocket的握手过程优化
2024-08-22

SpringBoot整合WebSocket实现后端向前端主动推送消息方式

这篇文章主要介绍了SpringBoot整合WebSocket实现后端向前端主动推送消息方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
2022-11-13

微信小程序——服务通知,发送订阅消息

一、什么是消息推送 二、整体效果 三、实现步骤 1 小程序开通订阅消息 2 postApi测试效果 三、uniapp配置 这里有个需要特别注意的点,我们要给用户发送消息,就必须引导用户授权,如下 因为用户不点击允许,你是没有办法
2023-08-16

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录