Java应用层协议WebSocket如何实现消息推送
这篇“Java应用层协议WebSocket如何实现消息推送”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Java应用层协议WebSocket如何实现消息推送”文章吧。
前言
大部分的web开发者,开发的业务都是基于Http协议的:前端请求后端接口,携带参数,后端执行业务代码,再返回结果给前端。作者参与开发的项目,有一个报警推送的功能,服务端实时推送报警信息给浏览器端;还有像抖音里面,如果有人关注、回复你的评论时,抖音就会推送相关消息给你了,你就会收到一条消息。
有些同学会说了,基于Http协议也能实现啊:前端定时访问后端(每隔3s或者几秒),后端返回消息数据,前端拿到后弹出消息。这种方式太low了,而且每个浏览器都这样,使用系统的人一多,服务器的压力就太大了些。那到底用什么技术手段实现呢?我们的主角就登场了。
WebSocket是在单个TCP连接上进行全双工通信的应用层协议(Http协议也是应用层),浏览器端和服务端都可主动发送数据给另一端。这样是不是比Http协议更适合消息推送这种场景。
浏览器端
建一个SpringBoot项目,Html放在class="lazy" data-src\main\resources\static下:
<!DOCTYPE html><html lang="zh" xmlns:th="http://www.thymeleaf.org"><head><!-- 解决中文乱码--> <meta charset="UTF-8"/> <title></title> <script type="text/javascript" class="lazy" data-src="./js/jquery.min.js"></script></head><body> <input id="input1" type="text" /><br/> <input type="button" value="浏览器发送服务端" onclick="btnClick()" /> <input type="button" value="服务端发送浏览器" onclick="btnClick1()" /> <input type="button" value="重新打开连接" onclick="btnClick2()" /> <br/> <textarea id="textArea" ></textarea><script> var ws; webSocketInit(); function webSocketInit() { ws =new WebSocket('ws://localhost:8080/bootdemo/webSocket/10086'); // 获取连接状态 console.log('ws连接状态[初始]:' + ws.readyState); //监听是否连接成功 ws.onopen = function () { console.log('ws连接状态[成功]:' + ws.readyState); }; // 接听服务器发回的信息并处理展示 ws.onmessage = function (obj) { console.log('接收到来自服务器的消息:'); var txt = $("#textArea").val(); $("#textArea").val(txt + "\n" + obj.data); $("#textArea").scrollTop($("#textArea")[0].scrollHeight); //完成通信后关闭WebSocket连接 // ws.close(); }; // 监听连接关闭事件 ws.onclose = function () { // 监听整个过程中websocket的状态 console.log('ws连接状态[关闭]:' + ws.readyState); }; // 监听并处理error事件 ws.onerror = function (error) { console.log(error); }; } function btnClick() { console.log("浏览器端发送消息:"); //连接成功则发送一个数据 ws.send($("#input1").val()); } function btnClick1() { $.ajax({ url: 'http://localhost:8080/bootdemo/pushWebSocket/publish?' + 'userId=10086&message=' + $("#input1").val(), type: 'GET', success: function (data) { // console.log(data); } }); } function btnClick2() { webSocketInit(); }</script></body></html>
服务器端
先引入依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymeleaf</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>provided</scope> </dependency>
bean上添加@ServerEndpoint,作为WebSocket的服务端。
import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;import javax.websocket.OnMessage;import javax.websocket.OnOpen;import javax.websocket.Session;import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;import java.util.HashMap;import java.util.Map;import java.util.concurrent.CopyOnWriteArraySet;@Component@Slf4j@ServerEndpoint("/webSocket/{userId}")public class WebSocketServer { //与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; private static final CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>(); // 用来存在线连接数 private static final Map<String, Session> sessionPool = new HashMap<String, Session>(); @OnOpen public void onOpen(Session session, @PathParam(value = "userId") String userId) { try { this.session = session; webSockets.add(this); sessionPool.put(userId, session); } catch (Exception e) { } } @OnMessage public void onMessage(String message) { log.info("websocket消息: 收到客户端消息:" + message); } public void sendOneMessage(String userId, String message) { Session session = sessionPool.get(userId); if (session != null && session.isOpen()) { try { log.info("服务端推送消息:" + message); session.getAsyncRemote().sendText(message); } catch (Exception e) { e.printStackTrace(); } } }}
进行注册:
@Configurationpublic class WebSocketConfigOne { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); }}
推送消息的控制器:
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.ResponseBody;import java.util.HashMap;import java.util.Map;@Controller@RequestMapping("/pushWebSocket")public class WebSocketController { @Autowired private WebSocketServer webSocketServer; @GetMapping("/publish") @ResponseBody public Map publish(String userId, String message) { webSocketServer.sendOneMessage(userId, message); HashMap<String, Object> map = new HashMap<>(); map.put("code", 200); return map; }}
还有我的配置文件application.properties:
# web port
server.port=8080
server.servlet.context-path=/bootdemo
运行启动类后,访问html(localhost:8080/bootdemo/index.html)如下:
有的同学一思索,点击图中的第2个按钮"服务端发送浏览器",你这好像也是前端先请求,再推送的消息;我们的WebSocketController#publish方法,在真实的场景下,可以在后端的定时任务中、消息中间件的消费者端调用,不用前端先发送请求。
当然SpringBoot有专门构建WebSocket服务端的方式。
核心配置类:
import lombok.extern.slf4j.Slf4j;import org.springframework.context.annotation.Configuration;import org.springframework.http.server.ServerHttpRequest;import org.springframework.http.server.ServerHttpResponse;import org.springframework.http.server.ServletServerHttpRequest;import org.springframework.web.servlet.HandlerMapping;import org.springframework.web.socket.WebSocketHandler;import org.springframework.web.socket.config.annotation.EnableWebSocket;import org.springframework.web.socket.config.annotation.WebSocketConfigurer;import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;import org.springframework.web.socket.server.HandshakeInterceptor;import javax.servlet.http.HttpServletRequest;import java.util.Map;@Configuration@EnableWebSocket@Slf4jpublic class WebSocketConfig1 implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(new MyWebSocketHandler(), "/webSocket/{userId}")//设置连接路径和处理 .setAllowedOrigins("*") .addInterceptors(new MyWebSocketInterceptor());//设置拦截器 } class MyWebSocketInterceptor implements HandshakeInterceptor { //前置拦截一般用来注册用户信息,绑定 WebSocketSession @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { log.info("前置拦截~~"); if (!(request instanceof ServletServerHttpRequest)) { return true; } HttpServletRequest servletRequest = ((ServletServerHttpRequest)request).getServletRequest(); Map map = (Map)servletRequest.getAttribute(HandlerMapping. URI_TEMPLATE_VARIABLES_ATTRIBUTE); String userId = (String)map.get("userId"); attributes.put("userId", userId); return true; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { log.info("后置拦截~~"); } }}
核心处理器:
import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;import org.springframework.web.socket.*;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;@Slf4j@Componentpublic class MyWebSocketHandler implements WebSocketHandler { private static final Map<String, WebSocketSession> SESSIONS = new ConcurrentHashMap<>(); @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { String userId = (String) session.getAttributes().get("userId"); SESSIONS.put(userId, session); } @Override public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { String msg = message.getPayload().toString(); log.info("收到客户端消息:" + msg); } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { log.info("连接出错"); if (session.isOpen()) { session.close(); } } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { log.info("连接关闭:status:" + closeStatus); } @Override public boolean supportsPartialMessages() { return false; } public void sendMessage(String userId, String message) { WebSocketSession webSocketSession = SESSIONS.get(userId); if (webSocketSession == null || !webSocketSession.isOpen()) { return; } try { webSocketSession.sendMessage(new TextMessage(message)); } catch (Exception ex) { log.error("推送消息异常:" + ex); } }}
控制器也改造下:
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.ResponseBody;import java.util.HashMap;import java.util.Map;@Controller@RequestMapping("/pushWebSocket")public class WebSocketController { @Autowired private MyWebSocketHandler handler; @GetMapping("/publish") @ResponseBody public Map publish(String userId, String message) { handler.sendMessage(userId, message); HashMap<String, Object> map = new HashMap<>(); map.put("code", 200); return map; }}
前端部分不用做修改,和之前一样的代码。
以上就是关于“Java应用层协议WebSocket如何实现消息推送”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注编程网行业资讯频道。
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341