springBoot使用webSocket的几种方式以及在高并发出现的问题及解决

您所在的位置:网站首页 高并发JAVA线程池配多少合适 springBoot使用webSocket的几种方式以及在高并发出现的问题及解决

springBoot使用webSocket的几种方式以及在高并发出现的问题及解决

2024-06-30 11:39:43| 来源: 网络整理| 查看: 265

一、第一种方式-原生注解(tomcat内嵌) 1.1、引入依赖 org.springframework.boot spring-boot-starter-websocket 1.2、配置文件 package cn.jt.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * @author GXM * @version 1.0.0 * @Description TODO * @createTime 2023年07月06日 */ @Configuration public class WebSocketConfig { /** * 初始化Bean,它会自动注册使用了 @ServerEndpoint 注解声明的 WebSocket endpoint * * @return */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } } 1.3、构建安全的WebSocket抽象层

1、该类可以作为一个基础的安全抽象层,后续项目中如果需要做认证的操作,都可以继承该抽象类

ClientUserInfoService 大家可以看作一个 UserService 就是一张用户表的service类

这里认证采用的是 jwt的方式,大家可以换成自己的

2、大家这里注意,我们使用的是 javax.websocket.Session; 这个是tomcat下的 在这里插入图片描述

package cn.jt.websocket; import cn.jt.client.entity.ClientUserInfo; import cn.jt.client.service.ClientUserInfoService; import cn.jt.jwt.JwtUtils; import cn.jt.utils.SpringContextUtils; import lombok.extern.slf4j.Slf4j; import javax.websocket.Session; import java.io.IOException; import java.util.Date; /** * @author GXM * @version 1.0.0 * @Description TODO * @createTime 2023年07月06日 */ @Slf4j public abstract class SecureWebSocket { private static final ClientUserInfoService clientUserInfoService; static { clientUserInfoService = SpringContextUtils.getBean(ClientUserInfoService.class); } protected Session session; protected String token; protected Long tokenExpiresAt; protected ClientUserInfo clientUserInfo; /** * 验证token是否有效(包含有效期) * * @param token token * @param isInit 是否对token和userInfo进行初始化赋值 * @return boolean */ protected boolean isTokenValid(String token, boolean isInit) { ClientUserInfo clientUserInfo; try { clientUserInfo = JwtUtils.getClientUserInfo(token); } catch (Exception e) { log.error("ws 认证失败", e); return false; } if (isInit) { this.clientUserInfo = clientUserInfo; this.tokenExpiresAt = JwtUtils.getDecodedJWT(token).getExpiresAt().getTime(); this.token = token; } return true; } /** * 认证失败,断开连接 * * @param session session */ protected void sendAuthFailed(Session session) { try { session.getBasicRemote().sendText("认证失败"); session.close(); } catch (IOException e) { e.printStackTrace(); } } } 1.4、构建基础的WebSocket

1、代码很简单,大家一看就知道逻辑了,这里就解释一下各个注解的含义

@ServerEndpoint:将目前的类定义成一个websocket服务器端,注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端@OnOpen:当WebSocket建立连接成功后会触发这个注解修饰的方法。@OnClose:当WebSocket建立的连接断开后会触发这个注解修饰的方法。@OnMessage:当客户端发送消息到服务端时,会触发这个注解修改的方法。@OnError:当WebSocket建立连接时出现异常会触发这个注解修饰的方法。

2、大家这里注意,我们使用的是 javax.websocket.Session; 这个是tomcat下的 在这里插入图片描述

package cn.jt.websocket; import com.alibaba.fastjson.JSON; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.Map; import java.util.concurrent.*; /** * @author GXM * @version 1.0.0 * @Description * @createTime 2023年07月06日 */ @Slf4j @ServerEndpoint("/globalWs/{token}") @Component public class GlobalWebsocket extends SecureWebSocket { /** * key: userKye * value: GlobalWebsocket 这里你直接存储 session 也是可以的 */ private static final Map CLIENTS = new ConcurrentHashMap(); /** * // 如果允许 一个账号 多人登录的话 就 加上 "-" + tokenTime,因为每次登录的token过期时间都是不一样的 * clientUserInfo.getId() + "-" + clientUserInfo.getAccount() ; */ private String userKye; @OnOpen public void onOpen(Session session, @PathParam("token") String token) { if (!isTokenValid(token, true)) { sendAuthFailed(session); return; } this.session = session; this.userKye = clientUserInfo.getId() + "-" + clientUserInfo.getAccount() + "-" + super.tokenExpiresAt; CLIENTS.put(userKye, this); log.info("当前在线用户:{}", CLIENTS.keySet()); try { session.getBasicRemote().sendText("连接成功!"); } catch (IOException e) { e.printStackTrace(); } } @OnMessage public String onMessage(Session session, String message) { // 先判断当前token 是否已经到期了 if (!isTokenValid(token, false)) { sendAuthFailed(session); return null; } try { session.getBasicRemote().sendText("received"); } catch (IOException e) { e.printStackTrace(); } return null; } @OnError public void onError(Session session, Throwable throwable) { // log.error("ws session 发生错误,session key is {}",throwable); log.error("ws session 发生错误:{}", throwable.getMessage()); } @OnClose public void onClose(Session session) { CLIENTS.remove(userKye); log.info("ws 用户 userKey {} 已下线,当前在线用户:{}", userKye, CLIENTS.keySet()); } /** * 发送消息 * * @param messageVo */ public void sendMessage(MessageVo messageVo) { try { this.session.getBasicRemote().sendText(JSON.toJSONString(messageVo)); } catch (IOException e) { log.error("发送消息异常", e); } } /** * 向user精确用户发送消息 * * @param userKey 由 account + "-" + refreshToken的签发时间组成,例:"admin-1635830649000" * @param messageVo 消息内容 */ public static void sendToUser(String userKey, MessageVo messageVo) { GlobalWebsocket globalWebsocket = CLIENTS.get(userKey); if (null != globalWebsocket) { globalWebsocket.sendMessage(messageVo); return; } log.error("发送消息到指定用户,但是用户不存在,userKey is {},message is {}", userKey, JSON.toJSONString(messageVo)); } /** * 全体组播消息 * * @param */ public static void broadcast(MessageVo messageVo) { CLIENTS.values().forEach(c -> { Session curSession = c.session; if (curSession.isOpen()) { try { curSession.getBasicRemote().sendText(JSON.toJSONString(messageVo)); } catch (IOException e) { log.error("发送ws数据错误:{}", e.getMessage()); } } } ); } } 1.5、SpringBoot 开启 WebSocket @EnableWebSocket

在这里插入图片描述

1.6、高并发时候的问题

1、这里要说明一下在高并发下的问题,如果你同时向在线的 3 个webSocket 在线客户端发送消息,即广播所有在线用户(目前是3个),每个用户每秒10条,那就是说,你每秒要发送 30 条数据,我们调用上述的广播函数 broadcast(),有时候会出现

java.lang.IllegalStateException: 远程 endpoint 处于 [xxxxxx] 状态,如: The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for calle

这是因为在高并发的情况下,出现了session抢占的问题,导致session,的状态不一致,所以,这里可以去尝试加锁操作,如下

public static final ExecutorService WEBSOCKET_POOL_EXECUTOR = new ThreadPoolExecutor( 20, 20, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder() .setNameFormat("GlobalWebsocket-executor-" + "%d") .setUncaughtExceptionHandler((thread, throwable) -> log.error("ThreadPool {} got exception", thread, throwable)).build(), new ThreadPoolExecutor.AbortPolicy()); /** * 全体组播消息 * * @param */ public static void broadcast(MessageVo messageVo) { CLIENTS.values().forEach(c -> { Session curSession = c.session; if (curSession.isOpen()) { // 建议单个session 一个线程,避免 一个session会话网络不好,会出现超时异常,当前线程会因此中断。 // 导致后面的session没有进行发送操作。使用单个线程,单个session情况下避免session之间的相互影响。 WEBSOCKET_POOL_EXECUTOR.execute(() -> { synchronized (curSession) { // 双重锁检查,外边的 isOpen 第一遍过滤,里面枷加锁之后,第二遍过滤 if (curSession.isOpen()) { try { curSession.getBasicRemote().sendText(JSON.toJSONString(messageVo)); } catch (IOException e) { log.error("发送ws数据错误:{}", e.getMessage()); } } } }); } } ); }

其中增加了,双重锁检查,以及线程池操作,当然加上锁之后,性能是肯定会有所下降的

建议单个session 一个线程,避免 一个session会话网络不好,会出现超时异常,当前线程会因此中断

2、按照上述的代码,我这边测试12个webSocket 链接,每秒每个客户端都发送10条数据,相当于每秒发送120条数据,目前看来,速度还是不错的,但是当客户端重连后,偶尔会出现错误信息 远程主机已经关闭了一个链接,类似于这种错误,这条错误日志是在广播代码的如下位置打印的,这是因为当准备发送消息的时候,远程客户端还是关闭了。

try { curSession.getBasicRemote().sendText(JSON.toJSONString(messageVo)); } catch (IOException e) { log.error("发送ws数据错误:{}", e.getMessage()); } 二、第二种方式-Spring封装 2.1、引入依赖 org.springframework.boot spring-boot-starter-websocket 2.2、自己的webSocket处理service

1、WebSocketService 处理器类如下

类似于 UserService 等等,主要是抽出一部分的业务逻辑

package cn.jt.websocket.spring; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import java.io.IOException; /** * @author GXM * @version 1.0.0 * @Description TODO * @createTime 2023年07月19日 */ public interface WebSocketService { /** * 会话开始回调 * * @param session 会话 */ void handleOpen(WebSocketSession session); /** * 会话结束回调 * * @param session 会话 */ void handleClose(WebSocketSession session); /** * 处理消息 * * @param session 会话 * @param message 接收的消息 */ void handleMessage(WebSocketSession session, String message); /** * 发送消息 * * @param session 当前会话 * @param message 要发送的消息 * @throws IOException 发送io异常 */ void sendMessage(WebSocketSession session, String message) throws IOException; /** * 发送消息 * * @param userId 用户id * @param message 要发送的消息 * @throws IOException 发送io异常 */ void sendMessage(Integer userId, TextMessage message) throws IOException; /** * 发送消息 * * @param userId 用户id * @param message 要发送的消息 * @throws IOException 发送io异常 */ void sendMessage(Integer userId, String message) throws IOException; /** * 发送消息 * * @param session 当前会话 * @param message 要发送的消息 * @throws IOException 发送io异常 */ void sendMessage(WebSocketSession session, TextMessage message) throws IOException; /** * 广播 * * @param message 字符串消息 * @throws IOException 异常 */ void broadCast(String message) throws IOException; /** * 广播 * * @param message 文本消息 * @throws IOException 异常 */ void broadCast(TextMessage message) throws IOException; /** * 处理会话异常 * * @param session 会话 * @param error 异常 */ void handleError(WebSocketSession session, Throwable error); }

2、WebSocketServiceImpl 实现类如下

类似于 UserServiceImpl 等等,主要是抽出一部分的业务逻辑

package cn.jt.websocket.spring; import cn.jt.client.entity.ClientUserInfo; import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @author GXM * @version 1.0.0 * @Description TODO * @createTime 2023年07月19日 */ @Slf4j public class WebSocketServiceImpl implements WebSocketService { private final Map clients = new ConcurrentHashMap(); @Override public void handleOpen(WebSocketSession session) { // 这个时候就需要在建立 webSocket 时存储的 用户信息了 Map attributes = session.getAttributes(); ClientUserInfo clientUserInfo = (ClientUserInfo) attributes.get("clientUserInfo"); clients.put(clientUserInfo.getId(), session); log.info("a new connection opened,current online count:{}", clients.size()); } @Override public void handleClose(WebSocketSession session) { // 这个时候就需要在建立 webSocket 时存储的 用户信息了 Map attributes = session.getAttributes(); ClientUserInfo clientUserInfo = (ClientUserInfo) attributes.get("clientUserInfo"); clients.remove(clientUserInfo.getId()); log.info("a new connection closed,current online count:{}", clients.size()); } @Override public void handleMessage(WebSocketSession session, String message) { // 只处理前端传来的文本消息,并且直接丢弃了客户端传来的消息 log.info("received a message:{}", message); } @Override public void sendMessage(WebSocketSession session, String message) throws IOException { this.sendMessage(session, new TextMessage(message)); } @Override public void sendMessage(Integer userId, TextMessage message) throws IOException { WebSocketSession webSocketSession = clients.get(userId); if (webSocketSession.isOpen()) { webSocketSession.sendMessage(message); } } @Override public void sendMessage(Integer userId, String message) throws IOException { this.sendMessage(userId, new TextMessage(message)); } @Override public void sendMessage(WebSocketSession session, TextMessage message) throws IOException { session.sendMessage(message); } @Override public void broadCast(String message) throws IOException { clients.values().forEach(session -> { if (session.isOpen()) { try { session.sendMessage(new TextMessage(message)); } catch (IOException e) { throw new RuntimeException(e); } } }); } @Override public void broadCast(TextMessage message) throws IOException { clients.values().forEach(session -> { if (session.isOpen()) { try { session.sendMessage(message); } catch (IOException e) { throw new RuntimeException(e); } } }); } @Override public void handleError(WebSocketSession session, Throwable error) { log.error("websocket error:{},session id:{}", error.getMessage(), session.getId()); log.error("", error); } } 2.3、实现spring框架的WebSocket处理器

1、注意这里的 webSocketSession 就是 spring 包下的了,不再是 tomcat包下的了

在这里插入图片描述

这里其实就和我们之前使用原生注解(tomcat)的那个一样了,都是几个特定的函数

我们在特定的方法下,调用我们自己的 service去单独处理,解耦合

package cn.jt.websocket.spring; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.socket.*; /** * @author GXM * @version 1.0.0 * @Description TODO * @createTime 2023年07月19日 */ public class DefaultWebSocketHandler implements WebSocketHandler { @Autowired private WebSocketService webSocketService; /** * 建立连接 * * @param session Session */ @Override public void afterConnectionEstablished(WebSocketSession session) { webSocketService.handleOpen(session); } /** * 接收消息 * * @param session Session * @param message 消息 */ @Override public void handleMessage(WebSocketSession session, WebSocketMessage message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; webSocketService.handleMessage(session, textMessage.getPayload()); } } /** * 发生错误 * * @param session Session * @param exception 异常 */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) { webSocketService.handleError(session, exception); } /** * 关闭连接 * * @param session Session * @param closeStatus 关闭状态 */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) { webSocketService.handleClose(session); } /** * 是否支持发送部分消息 * * @return false */ @Override public boolean supportsPartialMessages() { return false; } } 2.4、自定义拦截器

这里,我们可以设置拦截器,在做请求参数,或者权限认证的时候,不用在建立链接的函数afterConnectionEstablished里面去处理

可以理解为 springMvc 每次请求前的拦截器

package cn.jt.websocket.spring; import cn.jt.client.entity.ClientUserInfo; import cn.jt.jwt.JwtUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.HandshakeInterceptor; import java.util.Map; /** * @author GXM * @version 1.0.0 * @Description TODO * @createTime 2023年07月19日 */ @Slf4j public class WebSocketInterceptor implements HandshakeInterceptor { /** * 建立请求之前,可以用来做权限判断 * * @param request the current request * @param response the current response * @param wsHandler the target WebSocket handler * @param attributes the attributes from the HTTP handshake to associate with the WebSocket * session; the provided attributes are copied, the original map is not used. * @return * @throws Exception */ @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) throws Exception { if (request instanceof ServletServerHttpRequest) { ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest) request; // 模拟用户(通常利用JWT令牌解析用户信息) String token = servletServerHttpRequest.getServletRequest().getParameter("token"); try { ClientUserInfo clientUserInfo = JwtUtils.getClientUserInfo(token); // 设置当前这个session的属性,后续我们在发送消息时,可以通过 session.getAttributes().get("clientUserInfo")可以取出 clientUserInfo参数 attributes.put("clientUserInfo", clientUserInfo); } catch (Exception e) { log.error("webSocket 认证失败 ", e); return false; } return true; } return false; } /** * 建立请求之后 * * @param request the current request * @param response the current response * @param wsHandler the target WebSocket handler * @param exception an exception raised during the handshake, or {@code null} if none */ @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { } } 2.5、WebSocket配置

将自定义处理器、拦截器以及WebSocket操作类依次注入到IOC容器中。

@EnableWebSocket:开启WebSocket功能addHandler:添加处理器addInterceptors:添加拦截器setAllowedOrigins:设置允许跨域(允许所有请求来源) package cn.jt.websocket.spring; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; /** * @author GXM * @version 1.0.0 * @Description TODO * @createTime 2023年07月19日 */ @Configuration public class WebSocketConfiguration implements WebSocketConfigurer { @Bean public DefaultWebSocketHandler defaultWebSocketHandler() { return new DefaultWebSocketHandler(); } @Bean public WebSocketService webSocket() { return new WebSocketServiceImpl(); } @Bean public WebSocketInterceptor webSocketInterceptor() { return new WebSocketInterceptor(); } @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { // 链接方式如下 ws://127.0.0.1:9085/globalWs/message?token=qwncjncwqqdnjncz.adqwascsvcgrgb.cbrtbfvb // 如果你设置了springboot的 contentPath 那就需要在:9085端口后面 加上contentPath 的值,在拼接上 globalWs/message?token=qwncjncwqqdnjncz.adqwascsvcgrgb.cbrtbfvb registry.addHandler(defaultWebSocketHandler(), "/globalWs/message") .addInterceptors(webSocketInterceptor()) .setAllowedOrigins("*"); } } 2.6、SpringBoot 开启 WebSocket @EnableWebSocket

在这里插入图片描述

2.7、链接

1、其中 thermal-api 是我的项目名称

在这里插入图片描述

2、链接路径如下

ws://127.0.0.1:9085/thermal-api/globalWs/message?token=qwncjncwqqdnjncz.adqwascsvcgrgb.cbrtbfvb 2.8、高并发时候的问题

1、如果在广播的时候,客户端很多,发送的消息也是很多,还是会出现和之前 第一种方式-原生注解(tomcat内嵌)相同的问题,出现类似如下报错

The remote endpoint was in state [xxxx] which is an invalid state for calle

2、错误分析可以看 踩坑笔记 Spring websocket并发发送消息异常,写的很清楚。

2.8.1、解决方案一

1、和之前一样,加锁

@Override public void broadCast(String message) throws IOException { clients.values().forEach(session -> { if (session.isOpen()) { synchronized (session){ try { session.sendMessage(new TextMessage(message)); } catch (IOException e) { throw new RuntimeException(e); } } } }); } 2.8.2、解决方案二

1、使用 spring 的,Spring 的解决方案是把原来的 WebSocketSession 封了一层,即 org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator

3、代码稍微改一下,如下

@Override public void handleOpen(WebSocketSession session) { // 这个时候就需要在建立 webSocket 时存储的 用户信息了 Map attributes = session.getAttributes(); ClientUserInfo clientUserInfo = (ClientUserInfo) attributes.get("clientUserInfo"); clients.put(clientUserInfo.getId(), new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 64000)); log.info("a new connection opened,current online count:{}", clients.size()); } 第三种方式-TIO

1、请上网了解,用的比较少,不做过多说明

第四种方式-STOMP

1、请上网了解,用的比较少,不做过多说明



【本文地址】

公司简介

联系我们

今日新闻


点击排行

实验室常用的仪器、试剂和
说到实验室常用到的东西,主要就分为仪器、试剂和耗
不用再找了,全球10大实验
01、赛默飞世尔科技(热电)Thermo Fisher Scientif
三代水柜的量产巅峰T-72坦
作者:寞寒最近,西边闹腾挺大,本来小寞以为忙完这
通风柜跟实验室通风系统有
说到通风柜跟实验室通风,不少人都纠结二者到底是不
集消毒杀菌、烘干收纳为一
厨房是家里细菌较多的地方,潮湿的环境、没有完全密
实验室设备之全钢实验台如
全钢实验台是实验室家具中较为重要的家具之一,很多

推荐新闻


图片新闻

实验室药品柜的特性有哪些
实验室药品柜是实验室家具的重要组成部分之一,主要
小学科学实验中有哪些教学
计算机 计算器 一般 打孔器 打气筒 仪器车 显微镜
实验室各种仪器原理动图讲
1.紫外分光光谱UV分析原理:吸收紫外光能量,引起分
高中化学常见仪器及实验装
1、可加热仪器:2、计量仪器:(1)仪器A的名称:量
微生物操作主要设备和器具
今天盘点一下微生物操作主要设备和器具,别嫌我啰嗦
浅谈通风柜使用基本常识
 众所周知,通风柜功能中最主要的就是排气功能。在

专题文章

    CopyRight 2018-2019 实验室设备网 版权所有 win10的实时保护怎么永久关闭