我的编程空间,编程开发者的网络收藏夹
学习永远不晚

SpringBoot整合WebSocket的客户端和服务端的实现代码

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

SpringBoot整合WebSocket的客户端和服务端的实现代码

本文是项目中使用了websocket进行一些数据的推送,对比项目做了一个demo,ws的相关问题不做细数,仅做一下记录。

此demo针对ws的搭建主要逻辑背景是一个服务端B:通讯层 产生消息推送出去,另外一个项目A充当客户端和服务端,A的客户端:是接收通讯层去无差别接收这些消息,A的服务端:根据地址ip去订阅。用户通过订阅A的ws,同时记录下自己的信息,项目B推送的消息,项目A接收到之后通过当初订阅的逻辑和一些权限过滤条件对项目B产生的消息进行过滤再推送到用户客户端上。

一、项目中服务端的创建

首先引入maven仓库

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

websocket的服务端搭建

同时注意springboot要开启ws服务

启动类加上@EnableScheduling

简要解读demo

/webSocket/{id}:链接的id是业务上的一个id,这边之前做过类似拍卖的,相当于一个服务端或者业务上的一个标识,是客户端指明链接到哪一个拍卖间的标识

@ServerEndpoint:作为服务端的注解。

package com.ghh.myproject.websocket;
import cn.hutool.core.lang.UUID;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint("/webSocket/{id}")
@Component
public class WebSocket {
    private Logger log = LoggerFactory.getLogger(WebSocket.class);
    private static int onlineCount = 0;
    
    private static Map<String, WebSocket> clients = new ConcurrentHashMap<>();
    
    private static Map<String, String> idMap = new ConcurrentHashMap<>();
    
    private Session session;
    
    private String id;
    
    private String userNo;
    
    @OnOpen
    public void onOpen(@PathParam("id") String id, Session session) throws IOException {
        log.info("已连接到id:{}竞拍场,当前竞拍场人数:{}", id, getUserNosById(id).size());
        this.id = id;
        this.session = session;
        // 生成一个随机序列号来存储一个id下的所有用户
        this.userNo = UUID.fastUUID().toString();
        addOnlineCount();
        //根据随机序列号存储一个socket连接
        clients.put(userNo, this);
        idMap.put(userNo, id);
    }
    
    @OnClose
    public void onClose() throws IOException {
        clients.remove(userNo);
        idMap.remove(userNo);
        subOnlineCount();
    }
    
    @OnMessage
    public void onMessage(String message) throws IOException {
//        JSONObject jsonTo = JSONObject.parseObject(message);
//        String mes = (String) jsonTo.get("message");
//        if (!("All").equals(jsonTo.get("To"))) {
//            sendMessageTo(mes, jsonTo.get("To").toString());
//        } else {
//            sendMessageAll(message);
//        }
        log.info("onMessage方法成功");
    }
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("{}", error);
    }
    public static void sendMessageTo(String message, String userNo) throws IOException {
        // session.getBasicRemote().sendText(message);
        //session.getAsyncRemote().sendText(message);
        WebSocket webSocket = clients.get(userNo);
        if (webSocket != null && webSocket.session.isOpen()) {
            webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message));
        }
    }
    
    public static void sendMessageToById(String message, String id) {
        // session.getBasicRemote().sendText(message);
        //session.getAsyncRemote().sendText(message);
        //根据id获取所有的userNo链接的用户
        List<String> userNos = getUserNosById(id);
        for (WebSocket item : clients.values()) {
            //遍历链接的value值,如果当前传入的id中链接的用户包含value值,则推送。
            if (userNos.contains(item.userNo)) {
                item.session.getAsyncRemote().sendText(message);
            }
        }
    }
    
    public static void sendMessageAll(String message){
        for (WebSocket item : clients.values()) {
            item.session.getAsyncRemote().sendText(message);
        }
    }
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }
    public static synchronized void addOnlineCount() {
        WebSocket.onlineCount++;
    }
    public static synchronized void subOnlineCount() {
        WebSocket.onlineCount--;
    }
    public static synchronized Map<String, WebSocket> getClients() {
        return clients;
    }
    
    public static List<String> getUserNosById(String id) {
        ArrayList<String> userNos = new ArrayList<>();
        for (Map.Entry<String, String> entry : idMap.entrySet()) {
            if (entry.getValue().equals(id)) {
                userNos.add(entry.getKey());
            }
        }
        return userNos;
    }
}

 demo中模拟的是定时器推送,第一个参数是消息内容,第二个是推送到哪一个拍卖间或者其他业务上的内容。方法的具体内容上一段代码有详细解释,有通过id,或者发送给全部ws链接的客户端

WebSocket.sendMessageToById(""+count,2+"");
@Scheduled(cron = "*/5 * * * * ?")
    public void job1(){
        log.info("测试生成次数:{}",count);
        redisTemplate.opsForValue().set("测试"+count, ""+count++);
        if (count%2==0){
            WebSocket.sendMessageToById(""+count,2+"");
        }else {
            WebSocket.sendMessageToById(""+count,1+"");
        }

        log.info("websocket发送"+count);
    }

二、java充当客户端链接ws

上述是java作为ws服务端推送当前业务信息的一个demo。我们项目目前做的是一个通讯层的概念,只能够推送数据内容,却无法根据用户权限去推送不同的数据。

ws客户端的搭建,首先链接ws服务端。首先是我们另外一个服务的ws配置信息,我这边demo是模拟链接上面的ws服务

1、ws客户端的配置

package com.ghh.websocketRecive.wsMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.ContainerProvider;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import java.net.URI;

@Component
@Slf4j
public class WSClient {
    public static Session session;
    public static void startWS() {
        try {
            if (WSClient.session != null) {
                WSClient.session.close();
            }
            WebSocketContainer container = ContainerProvider.getWebSocketContainer();
            //设置消息大小最大为10M
            container.setDefaultMaxBinaryMessageBufferSize(10*1024*1024);
            container.setDefaultMaxTextMessageBufferSize(10*1024*1024);
            // 客户端,开启服务端websocket。
            String uri = "ws://192.168.0.108:8082/webSocket/1";
            Session session = container.connectToServer(WSHandler.class, URI.create(uri));
            WSClient.session = session;
        } catch (Exception ex) {
            log.info(ex.getMessage());
        }
    }
}

2、配置信息需要在项目启动的时候去启用和链接ws服务

package com.ghh.websocketRecive;
import com.ghh.websocketRecive.wsMessage.WSClient;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import javax.annotation.PostConstruct;
@Slf4j
@EnableScheduling
@SpringBootApplication
@MapperScan("com.ghh.websocketRecive.dao")
public class WebsocketReciveApplication {
    public static void main(String[] args) {
        SpringApplication.run(WebsocketReciveApplication.class, args);
    }
    @PostConstruct
    public void init(){
        log.info("初始化应用程序");     // 初始化ws,链接服务端
        WSClient.startWS();
    }
}

3、接收服务端推送的消息进行权限过滤demo

@ClientEndpoint:作为ws的客户端注解,@OnMessage接收服务端推送的消息。

package com.ghh.websocketRecive.wsMessage;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ghh.websocketRecive.entity.Student;
import com.ghh.websocketRecive.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import java.util.Objects;
import java.util.Set;
import static com.ghh.websocketRecive.wsMessage.WSClient.startWS;
@ClientEndpoint
@Slf4j
@Component
public class WSHandler {
    @Autowired
    RedisTemplate<String,String> redisTemplate;
    private static RedisTemplate<String,String> redisTemplateService;
    @PostConstruct
    public void init() {
        redisTemplateService=redisTemplate;
    }
    @OnOpen
    public void onOpen(Session session) {
        WSClient.session = session;
    }
    @OnMessage
    public void processMessage(String message) {
        log.info("websocketRecive接收推送消息"+message);
        int permission = Integer.parseInt(message)%5;
        //查询所有订阅的客户端的ip。
        Set<String> keys = redisTemplateService.keys("ip:*");
        for (String key : keys) {
            // 根据登录后存储的客户端ip,获取权限地址
            String s = redisTemplateService.opsForValue().get(key);
            String[] split = s.split(",");
            for (String s1 : split) {
                //向含有推送过来的数据权限地址的客户端推送告警数据。
                if (s1.equals(permission+"")){
                    WebSocket.sendMessageToByIp(message,key.split(":")[1]);
                }
            }
        }
    }
    @OnError
    public void processError(Throwable t) {
        WSClient.session = null;
        try {
            Thread.sleep(5000);
            startWS();
        } catch (InterruptedException e) {
            log.error("---websocket processError InterruptedException---", e);
        }
        log.error("---websocket processError error---", t);
    }
    @OnClose
    public void processClose(Session session, CloseReason closeReason) {
        log.error(session.getId() + closeReason.toString());
    }
    public void send(String sessionId, String message) {
        try {
            log.info("send Msg:" + message);
            if (Objects.nonNull(WSClient.session)) {
                WSClient.session.getBasicRemote().sendText(message);
            } else {
                log.info("---websocket error----");
            }
        } catch (Exception e) {
            log.error("---websocket send error---", e);
        }
    }
}

4、ws客户端推送消息,推送消息和上面服务端类似。

这边是根据ip

package com.ghh.websocketRecive.wsMessage;
import cn.hutool.core.lang.UUID;
import com.alibaba.fastjson.JSON;
import com.ghh.websocketRecive.service.UserService;
import lombok.Builder;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint("/webSocket/{ip}")
@Component
public class WebSocket {
    private Logger log = LoggerFactory.getLogger(WebSocket.class);
    private static int onlineCount = 0;
    private static Map<String, WebSocket> clients = new ConcurrentHashMap<>();
    private Session session;
    
    private String ip;
    @Autowired
    RedisTemplate<String,String> redisTemplate;
    private static RedisTemplate<String,String> redisTemplateService;
    @PostConstruct
    public void init() {
        redisTemplateService = redisTemplate;
    }
    @OnOpen
    public void onOpen(@PathParam("ip") String ip, Session session) throws IOException {
        log.info("ip:{}客户端已连接:,当前客户端数量:{}", ip, onlineCount+1);
        this.ip = ip;
        this.session = session;
        // 接入一个websocket则生成一个随机序列号
        addOnlineCount();
        //根据随机序列号存储一个socket连接
        clients.put(ip, this);
    }
    @OnClose
    public void onClose() throws IOException {
        clients.remove(ip);
        onlineCount--;
        subOnlineCount();
    }
    
    @OnMessage
    public void onMessage(String message) throws IOException {
        log.info("客户端发送消onMessage方法成功");
    }
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("{}", error);
    }
    public static void sendMessageTo(String message, String userNo) throws IOException {
        WebSocket webSocket = clients.get(userNo);
        if (webSocket != null && webSocket.session.isOpen()) {
            webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message));
        }
    }
    
    public static void sendMessageToByIp(String message, String ip) {
        for (WebSocket item : clients.values()) {
            //遍历链接的value值,如果当前传入的ip中链接的用户包含value值,则推送。
            if (item.ip.equals(ip)) {
                item.session.getAsyncRemote().sendText(message);
            }
        }
    }
    
    public static void sendMessageAll(String message){
        for (WebSocket item : clients.values()) {
            item.session.getAsyncRemote().sendText(message);
        }
    }
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }
    public static synchronized void addOnlineCount() {
        WebSocket.onlineCount++;
    }
    public static synchronized void subOnlineCount() {
        WebSocket.onlineCount--;
    }
    public static synchronized Map<String, WebSocket> getClients() {
        return clients;
    }
}

概述:

至此,简易的demo搭建完成,项目gitee网址:https://gitee.com/ghhNB/study.git

到此这篇关于SpringBoot整合WebSocket的客户端和服务端的实现的文章就介绍到这了,更多相关SpringBoot整合WebSocket内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

SpringBoot整合WebSocket的客户端和服务端的实现代码

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

SpringBoot整合WebSocket实现后端向前端发送消息的实例代码

WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据,下面这篇文章主要给大家介绍了关于SpringBoot整合WebSocket实现后端向前端发送消息的相关资料,需要的朋友可以参考下
2023-03-06

Linux UDP服务端和客户端程序的实现

1. 源码 UDP服务端: #include #include #incl
2022-06-03

SpringBoot+CAS整合服务端和客户端实现SSO单点登录与登出快速入门上手

文章目录 一、教学讲解视频二、前言三、准备工作四、CAS Server服务端搭建五、CAS Client客户端搭建六、结尾 一、教学讲解视频 教学讲解视频地址:视频地址 二、前言 因为CAS支持HTTP请求访问,而我们是快速入门
2023-08-18

Openssl实现双向认证教程(附服务端客户端代码)

一、背景说明 1.1 面临问题 最近一份产品检测报告建议使用基于pki的认证方式,由于产品已实现https,商量之下认为其意思是使用双向认证以处理中间人形式攻击。 《信息安全工程》中接触过双向认证,但有两个问题。 第一个是当时最终的课程设计
2022-06-04

nodejs socket实现的服务端和客户端功能示例

本文实例讲述了nodejs socket实现的服务端和客户端功能。分享给大家供大家参考,具体如下: 使用node.js的net模块能很快的开发出基于TCP的服务端和客户端。直接贴代码。 server.js/*** Created with
2022-06-04

C# winfroms使用socket客户端服务端的示例代码

本文介绍了如何在C#WinForms中使用Socket实现客户端-服务器通信。示例代码演示了客户端如何连接到服务器、发送和接收数据,以及服务器如何接收连接、发送和接收数据。本文适用于希望在C#WinForms应用程序中实现网络通信的开发人员。
C# winfroms使用socket客户端服务端的示例代码
2024-04-02

​java实现客户端调用的代码怎么写

下面是一个简单的Java客户端调用示例代码:import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;impo
2023-10-23

android客户端从服务器端获取json数据并解析的实现代码

首先客户端从服务器端获取json数据 1、利用HttpUrlConnection代码如下:/** * 从指定的URL中获取数组 * @param urlPath * @return * @throws
2022-06-06

Socket结合线程池怎么实现客户端和服务端通信demo

本篇内容主要讲解“Socket结合线程池怎么实现客户端和服务端通信demo”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Socket结合线程池怎么实现客户端和服务端通信demo”吧!1、要求可以
2023-06-29

Spring Boot集成Sorl搜索客户端的实现代码

Apache Solr是一个搜索引擎。Spring Boot为solr客户端库及Spring Data Solr提供的基于solr客户端库的抽象提供了基本的配置。Spring Boot提供了一个用于聚集依赖的spring-boot-star
2023-05-30

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录