日常记录-SpringBoot整合netty-socketio
作者:mmseoamin日期:2023-12-13

SpringBoot整合netty-socketio

  • 一、准备工作
    • 1、maven依赖
    • 2、socketIO的yml配置
    • 3、socketIO的config代码
    • 4、SocketIOServer启动或关闭
    • 5、项目目录结构
    • 二、客户端和服务端建立连接
      • 1、服务端
        • 1.1 用户缓存信息ClientCache
        • 1.2 SocketIOServerHandler
        • 2、客户端
        • 3、简单的演示
        • 三、广播
          • 1、SocketIO基础概念图
          • 2、定义namespace
          • 3、创建namespace所属的Handler
            • 3.1 自定义Handler
            • 3.2 监听自定义Handler
            • 3.3演示
              • 3.3.1 正确演示
              • 3.3.1 错误演示
              • 四、常用方法
                • 1、加入房间
                • 2、离开房间
                • 3、获取用户所有房间
                • 4、发送消息给指定的房间
                • 5、广播消息给指定的Namespace下所有客户端
                • 6、点对点发送

                  这次整合借鉴了以下博主的智慧

                  websocket和socketio的区别

                  socket.io.js最简版单页HTML测试工具

                  Netty-SocketIO多路复用

                  springboot学习(四十三) springboot使用netty-socketio实现消息推送

                  SpringBoot集成SocketIO

                  一、准备工作

                  1、maven依赖

                  socketio的核心依赖就只有这个

                  
                  
                     com.corundumstudio.socketio
                     netty-socketio
                     1.7.19
                  
                  

                  2、socketIO的yml配置

                  日常记录-SpringBoot整合netty-socketio,在这里插入图片描述,第1张

                  #自定义socketio配置,你可以直接硬编码,看个人喜好
                  socketio:
                    # socketio请求地址
                    host: 127.0.0.1
                    # socketio端口
                    port: 9999
                    # 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
                    maxFramePayloadLength: 1048576
                    # 设置http交互最大内容长度
                    maxHttpContentLength: 1048576
                    # socket连接数大小(如只监听一个端口boss线程组为1即可)
                    bossCount: 1
                    # 连接数大小
                    workCount: 100
                    # 允许客户请求
                    allowCustomRequests: true
                    # 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
                    upgradeTimeout: 1000000
                    # Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
                    pingTimeout: 6000000
                    # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
                    pingInterval: 25000
                    # 命名空间,多个以逗号分隔,
                    namespaces: /test,/socketIO
                    #namespaces: /socketIO
                  

                  3、socketIO的config代码

                  package com.gzgs.socketio.common.config;
                  import com.corundumstudio.socketio.SocketConfig;
                  import com.corundumstudio.socketio.SocketIOServer;
                  import com.corundumstudio.socketio.annotation.SpringAnnotationScanner;
                  import org.springframework.beans.factory.annotation.Value;
                  import org.springframework.context.annotation.Bean;
                  import org.springframework.context.annotation.Configuration;
                  import java.util.Arrays;
                  import java.util.Optional;
                  @Configuration
                  public class SocketIOConfig {
                      @Value("${socketio.host}")
                      private String host;
                      @Value("${socketio.port}")
                      private Integer port;
                      @Value("${socketio.bossCount}")
                      private int bossCount;
                      @Value("${socketio.workCount}")
                      private int workCount;
                      @Value("${socketio.allowCustomRequests}")
                      private boolean allowCustomRequests;
                      @Value("${socketio.upgradeTimeout}")
                      private int upgradeTimeout;
                      @Value("${socketio.pingTimeout}")
                      private int pingTimeout;
                      @Value("${socketio.pingInterval}")
                      private int pingInterval;
                      @Value("${socketio.namespaces}")
                      private String[] namespaces;
                      @Bean
                      public SocketIOServer socketIOServer() {
                          SocketConfig socketConfig = new SocketConfig();
                          socketConfig.setTcpNoDelay(true);
                          socketConfig.setSoLinger(0);
                          com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
                          config.setSocketConfig(socketConfig);
                          config.setHostname(host);
                          config.setPort(port);
                          config.setBossThreads(bossCount);
                          config.setWorkerThreads(workCount);
                          config.setAllowCustomRequests(allowCustomRequests);
                          config.setUpgradeTimeout(upgradeTimeout);
                          config.setPingTimeout(pingTimeout);
                          config.setPingInterval(pingInterval);
                          //服务端
                          final SocketIOServer server = new SocketIOServer(config);
                          //添加命名空间(如果你不需要命名空间,下面的代码可以去掉)
                          Optional.ofNullable(namespaces).ifPresent(nss ->
                                  Arrays.stream(nss).forEach(server::addNamespace));
                          return server;
                      }
                      //这个对象是用来扫描socketio的注解,比如 @OnConnect、@OnEvent
                      @Bean
                      public SpringAnnotationScanner springAnnotationScanner() {
                          return new SpringAnnotationScanner(socketIOServer());
                      }
                  }
                  

                  4、SocketIOServer启动或关闭

                  我在启动类里面定义了启动或者关闭SocketIOServer

                  package com.gzgs.socketio;
                  import com.corundumstudio.socketio.SocketIOServer;
                  import lombok.extern.slf4j.Slf4j;
                  import org.springframework.beans.factory.DisposableBean;
                  import org.springframework.beans.factory.annotation.Autowired;
                  import org.springframework.boot.CommandLineRunner;
                  import org.springframework.boot.SpringApplication;
                  import org.springframework.boot.autoconfigure.SpringBootApplication;
                  import org.springframework.stereotype.Component;
                  @SpringBootApplication
                  public class SocketioServerApplication {
                      public static void main(String[] args) {
                          SpringApplication.run(SocketioServerApplication.class, args);
                      }
                  }
                  @Component
                  @Slf4j
                  class SocketIOServerRunner implements CommandLineRunner, DisposableBean {
                      @Autowired
                      private SocketIOServer socketIOServer;
                      @Override
                      public void run(String... args) throws Exception {
                          socketIOServer.start();
                          log.info("SocketIOServer==============================启动成功");
                      }
                      @Override
                      public void destroy() throws Exception {
                          //如果用kill -9  这个监听是没用的,有可能会导致你服务kill掉了,但是socket服务没有kill掉
                          socketIOServer.stop();
                          log.info("SocketIOServer==============================关闭成功");
                      }
                  }
                  

                  日常记录-SpringBoot整合netty-socketio,在这里插入图片描述,第2张

                  springboot整合socketIO的工作已经完成了

                  5、项目目录结构

                  参考下即可,核心是如何配置以及如何启动/关闭SocketIO

                  日常记录-SpringBoot整合netty-socketio,在这里插入图片描述,第3张

                  二、客户端和服务端建立连接

                  1、服务端

                  1.1 用户缓存信息ClientCache

                  package com.gzgs.socketio.common.cache;
                  import com.corundumstudio.socketio.SocketIOClient;
                  import org.springframework.stereotype.Component;
                  import java.util.HashMap;
                  import java.util.UUID;
                  import java.util.concurrent.ConcurrentHashMap;
                  /**
                   * 这是存储用户的缓存信息
                   */
                  @Component
                  public class ClientCache {
                      //用于存储用户的socket缓存信息
                      private static ConcurrentHashMap> concurrentHashMap = new ConcurrentHashMap<>();
                      //保存用户信息
                      public void saveClient(String userId,UUID sessionId,SocketIOClient socketIOClient){
                          HashMap sessionIdClientCache = concurrentHashMap.get(userId);
                          if(sessionIdClientCache == null){
                              sessionIdClientCache = new HashMap<>();
                          }
                          sessionIdClientCache.put(sessionId,socketIOClient);
                          concurrentHashMap.put(userId,sessionIdClientCache);
                      }
                      //获取用户信息
                      public HashMap getUserClient(String userId){
                          return concurrentHashMap.get(userId);
                      }
                      //根据用户id和session删除用户某个session信息
                      public void deleteSessionClientByUserId(String userId,UUID sessionId){
                          concurrentHashMap.get(userId).remove(sessionId);
                      }
                      //删除用户缓存信息
                      public void deleteUserCacheByUserId(String userId){
                          concurrentHashMap.remove(userId);
                      }
                  }
                  

                  1.2 SocketIOServerHandler

                  用于监听客户端的建立连接请求和关闭连接请求

                  package com.gzgs.socketio.common.handler;
                  import com.corundumstudio.socketio.SocketIOClient;
                  import com.corundumstudio.socketio.SocketIOServer;
                  import com.corundumstudio.socketio.annotation.OnConnect;
                  import com.corundumstudio.socketio.annotation.OnDisconnect;
                  import com.gzgs.socketio.common.cache.ClientCache;
                  import lombok.extern.slf4j.Slf4j;
                  import org.springframework.beans.factory.annotation.Autowired;
                  import org.springframework.stereotype.Component;
                  import java.util.UUID;
                  @Slf4j
                  @Component
                  public class SocketIOServerHandler {
                      @Autowired
                      private ClientCache clientCache;
                      /**
                       * 建立连接
                       * @param client 客户端的SocketIO
                       */
                      @OnConnect
                      public void onConnect(SocketIOClient client) {
                          //因为我定义用户的参数为userId,你也可以定义其他名称 客户端请求 http://localhost:9999?userId=12345
                          
                           //下面两种是加了命名空间的,他会请求对应命名空间的方法(就类似你进了不同的房间玩游戏)
                          //因为我定义用户的参数为userId,你也可以定义其他名称 客户端请求 http://localhost:9999/test?userId=12345
                          //因为我定义用户的参数为userId,你也可以定义其他名称 客户端请求 http://localhost:9999/SocketIO?userId=12345
                          String userId = client.getHandshakeData().getSingleUrlParam("userId");
                          //同一个页面sessionid一样的
                          UUID sessionId = client.getSessionId();
                          //保存用户的信息在缓存里面
                          clientCache.saveClient(userId,sessionId,client);
                          log.info("SocketIOServerHandler-用户id:{},sessionId:{},建立连接成功",userId,sessionId);
                      }
                      /**
                       * 关闭连接
                       * @param client 客户端的SocketIO
                       */
                      @OnDisconnect
                      public void onDisconnect(SocketIOClient client){
                          //因为我定义用户的参数为userId,你也可以定义其他名称
                          String userId = client.getHandshakeData().getSingleUrlParam("userId");
                          //sessionId,页面唯一标识
                          UUID sessionId = client.getSessionId();
                          //clientCache.deleteUserCacheByUserId(userId);
                          //只会删除用户某个页面会话的缓存,不会删除该用户不同会话的缓存,比如:用户同时打开了谷歌和QQ浏览器,当你关闭谷歌时候,只会删除该用户谷歌的缓存会话
                          clientCache.deleteSessionClientByUserId(userId,sessionId);
                          log.info("SocketIOServerHandler-用户id:{},sessionId:{},关闭连接成功",userId,sessionId);
                      }
                  
                  }
                  

                  2、客户端

                  直接复制建立html文件,在浏览器打开就可以使用了

                  
                  
                  
                      
                      SocketIO客户端测试环境
                      
                      
                      
                      
                  
                  
                  

                  客户端测试环境


                  SocketClient建立连接




                  SocketClient发送消息




                  SocketIO互动消息

                  html效果如下:

                  日常记录-SpringBoot整合netty-socketio,在这里插入图片描述,第4张

                  3、简单的演示

                  自己点击建立连接和断开连接按钮测试玩下

                  ps:http://localhost:9999?userId=12345是没有命名空间的请求

                  日常记录-SpringBoot整合netty-socketio,在这里插入图片描述,第5张

                  日常记录-SpringBoot整合netty-socketio,在这里插入图片描述,第6张

                  日常记录-SpringBoot整合netty-socketio,在这里插入图片描述,第7张

                  日常记录-SpringBoot整合netty-socketio,在这里插入图片描述,第8张

                  三、广播

                  1、SocketIO基础概念图

                  SocketIO、namespace(命名空间)、room(房间)的关系如下:

                  日常记录-SpringBoot整合netty-socketio,请添加图片描述,第9张

                  SocketIO广播是以namespace或者room为维度的,具体如下:

                  如果不定义namespace,默认是/

                  如果定义了namespace,没有定义room,房间默认的名字和namespace一样。

                  2、定义namespace

                  日常记录-SpringBoot整合netty-socketio,在这里插入图片描述,第10张

                  你也可以这样定义

                  server.addNamespace(“/test”);

                  server.addNamespace(“/socketIO”);

                  3、创建namespace所属的Handler

                  3.1 自定义Handler

                  package com.gzgs.socketio.common.handler;
                  import com.corundumstudio.socketio.AckRequest;
                  import com.corundumstudio.socketio.SocketIOClient;
                  import com.corundumstudio.socketio.annotation.OnEvent;
                  import com.fasterxml.jackson.core.JsonProcessingException;
                  import lombok.extern.slf4j.Slf4j;
                  import org.springframework.stereotype.Component;
                  @Slf4j
                  @Component
                  public class TestHandler {
                      //测试使用
                      @OnEvent("testHandler")
                      public void testHandler(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {
                          log.info("MyTestHandler:{}",data);
                          if(ackRequest.isAckRequested()){
                              //返回给客户端,说我接收到了
                              ackRequest.sendAckData("MyTestHandler",data);
                          }
                      }
                  }
                  
                  package com.gzgs.socketio.common.handler;
                  import com.corundumstudio.socketio.AckRequest;
                  import com.corundumstudio.socketio.SocketIOClient;
                  import com.corundumstudio.socketio.annotation.OnEvent;
                  import com.fasterxml.jackson.core.JsonProcessingException;
                  import lombok.extern.slf4j.Slf4j;
                  import org.springframework.stereotype.Component;
                  
                  @Slf4j
                  @Component
                  public class SocketIOHandler  {
                      //测试使用
                      @OnEvent("socketIOHandler")
                      public void testHandler(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {
                          log.info("SocketIOHandler:{}",data);
                          if(ackRequest.isAckRequested()){
                              //返回给客户端,说我接收到了
                              ackRequest.sendAckData("SocketIOHandler",data);
                          }
                      }
                  }
                  

                  日常记录-SpringBoot整合netty-socketio,在这里插入图片描述,第11张

                  3.2 监听自定义Handler

                  在启动类的SocketIO监听里面加入监听

                  日常记录-SpringBoot整合netty-socketio,在这里插入图片描述,第12张

                  package com.gzgs.socketio;
                  import com.corundumstudio.socketio.SocketIOServer;
                  import com.gzgs.socketio.common.handler.SocketIOHandler;
                  import com.gzgs.socketio.common.handler.TestHandler;
                  import lombok.extern.slf4j.Slf4j;
                  import org.springframework.beans.factory.DisposableBean;
                  import org.springframework.beans.factory.annotation.Autowired;
                  import org.springframework.boot.CommandLineRunner;
                  import org.springframework.boot.SpringApplication;
                  import org.springframework.boot.autoconfigure.SpringBootApplication;
                  import org.springframework.stereotype.Component;
                  @SpringBootApplication
                  public class SocketioServerApplication {
                      public static void main(String[] args) {
                          SpringApplication.run(SocketioServerApplication.class, args);
                      }
                  }
                  @Component
                  @Slf4j
                  class SocketIOServerRunner implements CommandLineRunner, DisposableBean {
                      @Autowired
                      private SocketIOServer socketIOServer;
                      @Autowired
                      private TestHandler testHandler;
                      @Autowired
                      private SocketIOHandler socketIOHandler;
                      @Override
                      public void run(String... args) throws Exception {
                          //namespace分别交给各自的Handler监听,这样就可以隔离,只有客户端指定namespace,才能访问对应Handler。
                          //比如:http://localhost:9999/test?userId=12345
                          socketIOServer.getNamespace("/test").addListeners(testHandler);
                          socketIOServer.getNamespace("/socketIO").addListeners(socketIOHandler);
                          socketIOServer.start();
                          log.info("SocketIOServer==============================启动成功");
                      }
                      @Override
                      public void destroy() throws Exception {
                          socketIOServer.stop();
                          log.info("SocketIOServer==============================关闭成功");
                      }
                  }
                  

                  3.3演示

                  3.3.1 正确演示

                  日常记录-SpringBoot整合netty-socketio,在这里插入图片描述,第13张

                  日常记录-SpringBoot整合netty-socketio,在这里插入图片描述,第14张

                  3.3.1 错误演示

                  日常记录-SpringBoot整合netty-socketio,在这里插入图片描述,第15张

                  四、常用方法

                  其他的一些测试我写在下面的代码上,自己去测试才能更好的理解

                  1、加入房间

                      //加入房间
                      @OnEvent("joinRoom")
                      public void joinRooms(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {
                          client.joinRoom(data);
                          if(ackRequest.isAckRequested()){
                              //返回给客户端,说我接收到了
                              ackRequest.sendAckData("加入房间","成功");
                          }
                      }
                  

                  2、离开房间

                      //离开房间
                      @OnEvent("leaveRoom")
                      public void leaveRoom(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {
                          client.leaveRoom(data);
                          if(ackRequest.isAckRequested()){
                              //返回给客户端,说我接收到了
                              ackRequest.sendAckData("离开房间","成功");
                          }
                      }
                  

                  3、获取用户所有房间

                      //获取该用户所有房间
                      @OnEvent("getUserRooms")
                      public void getUserRooms(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {
                          String userId = client.getHandshakeData().getSingleUrlParam("userId");
                          Set allRooms = client.getAllRooms();
                          for (String room:allRooms){
                              System.out.println("房间名称:"+room);
                          }
                          log.info("服务器收到消息,客户端用户id:{} | 客户发送的消息:{} | 是否需要返回给客户端内容:{} ",userId,data,ackRequest.isAckRequested());
                          if(ackRequest.isAckRequested()){
                              //返回给客户端,说我接收到了
                              ackRequest.sendAckData("你好","哈哈哈");
                          }
                      }
                  

                  4、发送消息给指定的房间

                     @OnEvent("sendRoomMessage")
                      public void sendRoomMessage(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {
                          String userId = client.getHandshakeData().getSingleUrlParam("userId");
                          Set allRooms = client.getAllRooms();
                          for (String room:allRooms){
                              log.info("房间:{}",room);
                              //发送给指定空间名称以及房间的人,并且排除不发给自己
                              socketIoServer.getNamespace("/socketIO").getRoomOperations(room).sendEvent("message",client, data);
                              //发送给指定空间名称以及房间的人,包括自己
                              //socketIoServer.getNamespace("/socketIO").getRoomOperations(room).sendEvent("message", data);;
                          }
                          if(ackRequest.isAckRequested()){
                              //返回给客户端,说我接收到了
                              ackRequest.sendAckData("发送消息到指定的房间","成功");
                          }
                      }
                  

                  5、广播消息给指定的Namespace下所有客户端

                  //广播消息给指定的Namespace下所有客户端
                      @OnEvent("sendNamespaceMessage")
                      public void sendNamespaceMessage(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {
                          socketIoServer.getNamespace("/socketIO").getBroadcastOperations().sendEvent("message",client, data);;
                          if(ackRequest.isAckRequested()){
                              //返回给客户端,说我接收到了
                              ackRequest.sendAckData("发送消息到指定的房间","成功");
                          }
                      }
                  

                  6、点对点发送

                      //点对点
                      public void sendMessageOne(String userId) throws JsonProcessingException {
                          HashMap userClient = clientCache.getUserClient(userId);
                          for (UUID sessionId : userClient.keySet()) {
                              socketIoServer.getNamespace("/socketIO").getClient(sessionId).sendEvent("message", "这是点对点发送");
                          }
                      }