WebSocket是一种在单个TCP连接上进行全双工通信的持久化协议。
全双工协议就是客户端可以给我们服务器发数据 服务器也可以主动给客户端发数据。
http协议是一种无状态,非持久化的单全双工应用层协议。
主要用于一问一答的方式交付信息,即客户端发送请求,服务器返回响应。这种模式适合于获取数据或者提交数据的场景。
所以http协议中,服务器无法主动给客户端发送数据,导致出现服务器数据状态发生改变,客户端无法感知。
针对上面的问题,http 勉强可以通过 定时轮询 和 长轮询 解决问题。
定时轮询:客户端不断地定时请求服务器, 询问数据状态变更的情况。
定时轮询的弊端:存在延时,浪费服务器资源和带宽,存在大量无效请求。
长轮询:拉长请求时间,客户端发送请求后,服务器在没有新数据时不会立即响应,而是等到有新数据时才返回响应。这种方法可以减少无效的请求,
长轮询的弊端:仍然需要频繁地建立和断开连接,且服务器需要维护未完成的请求,这可能会占用大量的服务器资源。
承上启下 所有最后我们websocket应运而生,它就是为了解决这个问题而设计的。
WebSocket协议可以实现全双工通信,即客户端和服务器可以在任何时候 相互 主动发送数据。此外,一旦WebSocket连接建立,客户端和服务器之间的连接将保持活动状态,直到被任何一方关闭。
org.springframework.boot spring-boot-starter-websocket
package com.ruoyi.framework.websocket; import com.ruoyi.common.core.domain.model.LoginUser; import com.ruoyi.common.utils.StringUtils; import com.ruoyi.framework.config.WebSocketConfig; import com.ruoyi.framework.web.service.TokenService; import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; import org.slf4j.LoggerFactory; /** * @author qujingye * @Classname WebSocketServer * @Description 虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean * @Date 2023/12/19 16:11 */ @Component @ServerEndpoint(value = "/websocket/message", configurator = WebSocketConfig.class) public class WebSocketServer { private static TokenService tokenService; @Autowired private void setOriginMessageSender(TokenService tokenService) { WebSocketServer.tokenService = tokenService; } /** * WebSocketServer 日志控制器 */ private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class); private final static ConcurrentHashMap> sessionPool = new ConcurrentHashMap<>(); private final static AtomicLong atomicLong = new AtomicLong(0L); /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session) throws Exception { Long userId = parseUserId(session); System.out.println(userId); LOGGER.info("[WebSocket] 有新的连接, 当前用户id: {}", userId); if (userId == null) { return; } CopyOnWriteArrayList sessions = sessionPool.get(userId); //不存在其他人登陆 if (null == sessions) { sessions = new CopyOnWriteArrayList<>(); } sessions.add(session); sessionPool.put(userId, sessions); atomicLong.getAndIncrement(); LOGGER.info("[WebSocket] 有新的连接, 当前连接数: {}", atomicLong.get()); } /** * 连接关闭时处理 */ @OnClose public void onClose(Session session) { Long userId = parseUserId(session); if (userId == null) { return; } CopyOnWriteArrayList sessions = sessionPool.remove(userId); CopyOnWriteArrayList newSessions = new CopyOnWriteArrayList<>(); for (Session s : sessions) { if (!s.getId().equals(session.getId())) { newSessions.add(s); } } sessionPool.put(userId, newSessions); atomicLong.getAndDecrement(); LOGGER.info("[WebSocket] 连接断开, 当前连接数: {}", atomicLong.get()); } /** * 抛出异常时处理 */ @OnError public void onError(Session session, Throwable exception) throws Exception { LOGGER.error("用户错误:,原因:" + exception.getMessage()); } /** * 服务器接收到客户端消息时调用的方法 */ @OnMessage public void onMessage(String message, Session session) { //把收到的消息发回去 session.getAsyncRemote().sendText(message); LOGGER.info("message: {}", message); } /** * 给该用户id的全部发送消息 */ public void sendMessage(Long userId, String message) { CopyOnWriteArrayList sessions = sessionPool.get(userId); if (null == sessions || sessions.size() == 0) { return; } sessions.forEach(s -> s.getAsyncRemote().sendText(message)); } /** * 获取用户id */ private Long parseUserId(Session session) { String token = (String) session.getUserProperties().get(WebSocketConfig.WEBSOCKET_PROTOCOL); if (StringUtils.isNotEmpty(token)) { LoginUser loginUser = tokenService.getLoginUserByToken(token); if (loginUser != null) { return loginUser.getUserId(); } } return null; } }
注入ServerEndpointExporter来自动注册端点
package com.ruoyi.framework.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; import javax.websocket.HandshakeResponse; import javax.websocket.server.HandshakeRequest; import javax.websocket.server.ServerEndpointConfig; import java.util.List; import java.util.Map; /** * @author qujingye * @Classname WebSocketConfig * @Description 继承服务器断点配置类 * @Date 2023/12/19 16:08 */ @Configuration public class WebSocketConfig extends ServerEndpointConfig.Configurator { /** * WebSocket的协议头 */ public final static String WEBSOCKET_PROTOCOL = "Sec-Websocket-Protocol"; /** * 注入ServerEndpointExporter,这个Bean会自动注册使用了@ServerEndpoint注解声明的WebSocket Endpoint。 */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } /** * 建立握手时,连接前的操作 */ @Override public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) { // 这个用户属性userProperties 可以通过 session.getUserProperties()获取 final MapuserProperties = sec.getUserProperties(); Map > headers = request.getHeaders(); List protocol = headers.get(WEBSOCKET_PROTOCOL); // 存放自己想要的header信息 if (protocol != null) { userProperties.put(WEBSOCKET_PROTOCOL, protocol.get(0)); } } /** * 创建端点实例,也就是被@ServerEndpoint所标注的对象 */ @Override public T getEndpointInstance(Class clazz) throws InstantiationException { return super.getEndpointInstance(clazz); } }
package com.ruoyi.framework.security.filter; import com.ruoyi.common.utils.StringUtils; import com.ruoyi.framework.config.WebSocketConfig; import org.springframework.web.filter.OncePerRequestFilter; import javax.servlet.*; import javax.servlet.annotation.WebFilter; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; /** * @author qujingye * @Classname WebsocketFilter * @Description * @Date 2023/12/19 16:08 */ //@Component @WebFilter(filterName = "WebsocketFilter", urlPatterns = "/websocket/*") public class WebsocketFilter extends OncePerRequestFilter { @Override protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException { String token = request.getHeader(WebSocketConfig.WEBSOCKET_PROTOCOL); if (StringUtils.isNotEmpty(token)) { response.setHeader(WebSocketConfig.WEBSOCKET_PROTOCOL, token); } filterChain.doFilter(request, response); } }
package com.ruoyi; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.boot.web.servlet.ServletComponentScan; /** * 启动程序 * * @author ruoyi */ @SpringBootApplication(exclude = { DataSourceAutoConfiguration.class }) @ServletComponentScan public class RuoYiApplication { public static void main(String[] args) { // System.setProperty("spring.devtools.restart.enabled", "false"); SpringApplication.run(RuoYiApplication.class, args); System.out.println("(♥◠‿◠)ノ゙ 若依启动成功 ლ(´ڡ`ლ)゙ \n" + " .-------. ____ __ \n" + " | _ _ \ \ \ / / \n" + " | ( ' ) | \ _. / ' \n" + " |(_ o _) / _( )_ .' \n" + " | (_,_).' __ ___(_ o _)' \n" + " | |\ \ | || |(_,_)' \n" + " | | \ `' /| `-' / \n" + " | | \ / \ / \n" + " ''-' `'-' `-..-' "); } }
集成websocket测试
连接 断开 重置
发送消息
返回内容