springBoot + netty搭建高性能 websocket 服务 & 性能测试(包含python 测试脚本)
作者:mmseoamin日期:2023-12-18

一、前言

1、如果我们的app类似于股票这种,数据很多很快,之前用的tomcat自带的websocket 又或者 spring-boot-starter-websocke集成,但是性能在数据并发很大时就会存在问题。

2、我前面写的一篇关于 springBoot+webosket的,没有使用netty的文章 springBoot使用webSocket的几种方式以及在高并发出现的问题及解决 ,其中就包含了 以下者两种方式,都有说明,大家如果量不大,下面这两种方式也是可以的。

  • tomcat自带的websocket
  • spring-boot-starter-websocke集成

    二、使用Netty 完成 webSocket

    1、如何使用 ,可以参考 netty + webSocket + SpringBott 是参考文章 SpringBoot整合Netty处理WebSocket(支持url参数) 这篇文章是,说的已经很ok了

    2、但是上面那篇文章还是有所不足,因为我需要加上token认证,只有认证了,才可以建立链接,上面那篇的文章,只是获取参数,在认证方面,还是有所不足,满足不了这个条件。后续我可以把我的方式,写一篇文章放出来

    2.1、RequestUriUtils 的 getBasePath 方法

    2、比如你的链接是 ws://192.168.172.139:1234/ws/id=1,使用它文章中的获取后得到 /ws/,建议改成如下,获取之后是 /ws

        /**
         * 获取URI中参数以外部分路径
         *
         * @param uri
         * @return
         */
        public static String getBasePath(String uriStr) {
            String pathWithSlash ="";
            try {
                // 使用URI解析URL字符串
                URI uri = new URI(uriStr);
                // 获取路径部分
                pathWithSlash = uri.getPath();
                // 去掉末尾的斜杠
                return pathWithSlash.replaceAll("/$", "");
            } catch (URISyntaxException e) {
                log.error("解析path错误", e);
            }
            return pathWithSlash;
        }
    

    2.2、WebSocketChannelInitializer 中的 ChannelPipeline 说明

    在WebSocket服务器的构建中添加.addLast(new HttpServerCodec())的主要原因是WebSocket握手是基于HTTP协议的,WebSocket连接的建立需要经过以下步骤:

    1. 客户端向服务器发送一个HTTP请求,请求升级到WebSocket协议。
    2. 服务器收到这个请求后,需要进行协议升级处理,将HTTP协议切换到WebSocket协议。
    3. 一旦升级成功,WebSocket连接建立,客户端和服务器之间可以通过WebSocket协议进行双向通信。

    因此,WebSocket握手的开始阶段仍然是HTTP请求和响应。为了处理这个初始的HTTP请求,需要在Netty的ChannelPipeline中添加.addLast(new HttpServerCodec()),以确保能够解析和处理这个HTTP请求,并在需要时将其升级为WebSocket连接。简而言之,.addLast(new HttpServerCodec())的作用是为了使WebSocket服务器能够正确地处理WebSocket握手之前的HTTP请求和响应,确保WebSocket连接能够成功建立。一旦WebSocket连接建立,就可以通过WebSocket协议进行实时双向通信。

    这是WebSocket服务器构建中的一个标准操作。websocket协议本身是基于http协议的,所以这边也要使用http解编码器

    2.3、addLast(new ChunkedWriteHandler())

    .addLast(new ChunkedWriteHandler()) 是 Netty 中的一个 ChannelHandler,它的主要作用是支持异步写大数据流(例如文件传输)。

    在某些情况下,你可能需要向客户端发送大量的数据,例如文件的内容,而不是一次性将整个数据写入缓冲区,因为这可能会导致内存占用过高。相反,你可以将数据分成小块(chunk)并逐块写入客户端,以避免内存问题。

    ChunkedWriteHandler 的作用如下:

    1. 支持大数据流的异步写入: 它允许你将数据切割成小块并异步地将这些块写入客户端。这对于传输大型文件或大量数据非常有用,因为它可以避免将整个数据加载到内存中。
    2. 维护写入顺序: 它确保数据块按照它们添加到 Channel 的顺序进行写入。这有助于保持数据的有序性。
    3. 提高性能: 通过异步写入数据块,ChunkedWriteHandler 可以提高网络性能,因为它不会阻塞线程等待数据传输完成。

    这个处理器通常与其他处理器一起使用,以完成完整的数据传输过程。例如,如果你要实现文件传输,通常会使用 ChunkedWriteHandler 将文件数据切割成小块,然后使用其他处理器来处理文件的传输,例如文件块的编码和解码。

    总之,.addLast(new ChunkedWriteHandler()) 的作用是支持异步写大数据流,以提高性能并降低内存使用,尤其在需要传输大量数据时非常有用。

    2.4、addLast(new HttpObjectAggregator(1024 * 64))

    将HttpMessage和HttpContents聚合到一个完成的 FullHttpRequest或FullHttpResponse中,具体是FullHttpRequest对象还是FullHttpResponse对象取决于是请求还是响应

    .addLast(new HttpObjectAggregator(1024 * 64)) 是 Netty 中的一个 ChannelHandler,主要用于将HTTP请求或响应的多个部分聚合成一个完整的HTTP消息。这对于处理HTTP消息非常有用,特别是当你需要处理大量的HTTP数据时。

    以下是.addLast(new HttpObjectAggregator(1024 * 64))的主要作用:

    1. 消息聚合: 在HTTP通信中,请求或响应可能会分成多个部分(例如,HTTP请求头和HTTP请求体)。HttpObjectAggregator 负责将这些部分聚合成一个完整的FullHttpRequest或FullHttpResponse,以便更容易处理和操作。
    2. 内存管理: 这个处理器还具有内存管理功能。你可以在构造函数中指定一个最大的聚合字节数(在示例中是64 KB)。如果接收到的HTTP数据超过了这个大小,HttpObjectAggregator 将抛出异常以防止内存泄漏。
    3. 简化HTTP消息处理: 聚合HTTP消息使得你可以更容易地处理完整的HTTP请求和响应,而不必手动处理每个部分。这对于构建Web服务器或HTTP代理非常有用。

    示例使用:

    pipeline.addLast(new HttpServerCodec()); // 添加HTTP编解码器
    pipeline.addLast(new HttpObjectAggregator(1024 * 64)); // 聚合HTTP消息,最大64KB
    pipeline.addLast(new MyHttpRequestHandler()); // 自定义的HTTP请求处理器
    

    在上面的示例中,首先使用 HttpServerCodec 添加了HTTP编解码器,然后使用 HttpObjectAggregator 聚合HTTP消息,最后添加了一个自定义的HTTP请求处理器。

    总之,.addLast(new HttpObjectAggregator(1024 * 64)) 的作用是将HTTP请求或响应的多个部分聚合成一个完整的HTTP消息,以简化和改善处理HTTP消息的流程,并提供内存管理功能。这在构建支持HTTP的应用程序中非常有用。

    2.5、addLast(new WebSocketServerCompressionHandler())

    webSocket 数据压缩扩展,当添加这个的时候WebSocketServerProtocolHandler的第三个参数需要设置成true

    .addLast(new WebSocketServerCompressionHandler()) 是 Netty 中的一个 ChannelHandler,用于支持 WebSocket 消息的压缩和解压缩。WebSocket 消息压缩可以减小消息的大小,提高网络传输效率,尤其在低带宽环境下非常有用。

    以下是 .addLast(new WebSocketServerCompressionHandler()) 的主要作用:

    1. WebSocket 消息压缩: 当客户端和服务器之间通过 WebSocket 协议传输大量数据时,可以使用压缩技术将消息压缩为更小的尺寸,以减少网络带宽的使用。WebSocketServerCompressionHandler 负责处理消息的压缩。
    2. WebSocket 消息解压缩: 对于接收到的已压缩的 WebSocket 消息,服务器需要将其解压缩以获取原始消息。WebSocketServerCompressionHandler 也负责解压缩已压缩的消息。
    3. 支持多种压缩算法: WebSocketServerCompressionHandler 支持多种压缩算法,包括通常的 DEFLATE 和 GZIP 压缩算法,以及自定义的压缩算法。

    在WebSocket应用程序中,通常需要在WebSocket连接建立时协商是否启用压缩,以及使用哪种压缩算法。如果客户端和服务器都支持压缩,那么它们可以在消息传输过程中启用压缩。

    要使用 .addLast(new WebSocketServerCompressionHandler()),你需要在 WebSocket 服务器的处理管道中添加该处理器。例如:

    pipeline.addLast(new HttpServerCodec()); // 添加HTTP编解码器
    pipeline.addLast(new HttpObjectAggregator(1024 * 64)); // 聚合HTTP消息,最大64KB
    pipeline.addLast(new WebSocketServerCompressionHandler()); // 添加WebSocket消息压缩处理器
    pipeline.addLast(new MyWebSocketHandler()); // 自定义的WebSocket处理器
    

    在上面的示例中,首先使用 HttpServerCodec 添加了HTTP编解码器,然后使用 HttpObjectAggregator 聚合HTTP消息,接下来添加了 WebSocketServerCompressionHandler 以支持WebSocket消息压缩,最后添加了一个自定义的WebSocket处理器。

    总之,.addLast(new WebSocketServerCompressionHandler()) 的作用是为WebSocket服务器添加消息压缩和解压缩的功能,以减小消息大小并提高网络传输效率。这在需要传输大量数据的WebSocket应用中非常有用。

    2.6、.addLast(new MyWebSocketHandler())

    自定义处理器 - 处理 web socket 消息(消息的父类是WebSocketFrame,旗下有很多子类,比如BinaryWebSocketFrame TextWebSocketFrame 等等)

    如果你使用的是 父类是WebSocketFrame,则需要在其内部,判断是什么类型的数据,如果你使用的具体的子类,那么只有具体的消息类型会到哪里

    2.7、 .addLast(new WebSocketServerProtocolHandler(WebSocketProperties.path, null, true, 10485760));

    服务器端向外暴露的 web socket 端点,当客户端传递比较大的对象时,maxFrameSize参数的值需要调大

    WebSocketServerProtocolHandler 是 Netty 中的一个关键组件,用于处理 WebSocket 握手和协议升级,以及管理 WebSocket 连接的生命周期。它的主要作用如下:

    1. WebSocket 握手处理: 当客户端通过 HTTP 请求发起 WebSocket 握手时,WebSocketServerProtocolHandler 负责识别并处理这些握手请求。它可以检查HTTP请求中的升级标头和协议头,以确定是否需要升级到 WebSocket 协议。
    2. WebSocket 握手协议升级: 如果客户端发送了符合 WebSocket 握手规范的请求,WebSocketServerProtocolHandler 会处理协议升级,将连接从 HTTP 协议切换到 WebSocket 协议。这个过程包括升级响应的构建和升级握手的处理。
    3. WebSocket 生命周期管理: 一旦 WebSocket 握手成功,WebSocketServerProtocolHandler 管理 WebSocket 连接的生命周期。它会处理连接的打开、关闭、异常和消息传递等事件。
    4. Ping/Pong 处理: WebSocket 协议支持 Ping 和 Pong 消息,用于保持连接的活动状态。WebSocketServerProtocolHandler 会自动处理这些心跳消息,以确保连接保持活动状态。

    以下是一个示例,展示了如何在 Netty 中使用 WebSocketServerProtocolHandler:

    pipeline.addLast(new HttpServerCodec()); // 添加HTTP编解码器
    pipeline.addLast(new HttpObjectAggregator(1024 * 64)); // 聚合HTTP消息,最大64KB
    pipeline.addLast(new WebSocketServerProtocolHandler("/websocket")); // 添加WebSocket握手处理器
    pipeline.addLast(new MyWebSocketHandler()); // 自定义的WebSocket处理器
    

    在上面的示例中,WebSocketServerProtocolHandler 被添加到处理管道中,并指定了 WebSocket 的路径(在示例中是"/websocket")。一旦握手成功,连接将切换到 WebSocket 协议,并且可以在 MyWebSocketHandler 中处理 WebSocket 消息。

    总之,WebSocketServerProtocolHandler 是用于处理 WebSocket 握手和协议升级的关键组件,它使得在 Netty 中创建 WebSocket 服务器变得更加容易。

    三、Web Socket 性能对比——Spring Boot vs Tomcat vs Netty

    参考文章 Web Socket 性能对比——Spring Boot vs Tomcat vs Netty 说的很ok了。

    四、使用四种框架分别实现百万websocket常连接的服务器(写的很好,必看)

    1、文章包含了一些线上的参数调整,都是干活

    原文地址: https://colobu.com/2015/05/22/implement-C1000K-servers-by-spray-netty-undertow-and-node-js/

    五、七种WebSocket框架的性能比较

    原文地址: https://colobu.com/2015/07/14/performance-comparison-of-7-websocket-frameworks/

    六、使用python 脚本测试

    1、主要测试两部分

    • 大量客户端同时在线,查看性能,内存消耗问题
    • 大量客户端同时在线,数据发送效率

      2、python 安装这里就不再说了

      3、本文的 第四节 和第五节 请务必了解,需要修改对应的 服务器 tcp链接数等等参数。

      4、我的webSocket 链接格式是 ws://192.168.172.226:7081/ws/token 最后的那个token,用于线上的认证,只有认证了的用户,才会建立通道,这里为了方便测试,直接用数值代替,如下,这样,就代表用户id好了,毕竟后续我要是测试50w个客户端,总不能先生成50w个token吧。

      • ws://192.168.172.226:7081/ws/1
      • ws://192.168.172.226:7081/ws/2

        6.1、python 脚本

        1、脚本内容

        import threading
        import time
        import websocket
        # 定义带有顺序编号的 WebSocket URL
        url_base = "ws://192.168.172.226:7081/ws/"
        num_connections = 10000  # 要模拟的连接数
        running_connections = 0  # 跟踪当前正在运行的连接数
        # 创建线程本地存储对象来存储每个线程的文件名
        local = threading.local()
        # 建立 WebSocket 连接的函数
        def connect_websocket():
            global running_connections
            try:
                # 使用顺序编号生成 URL
                url = url_base + str(running_connections)
                # 为当前线程创建文件名
                local.filename = f"{running_connections}.txt"
                while True:
                    # 创建 WebSocket 连接
                    ws = websocket.create_connection(url)
                    while True:
                        # 接收来自服务端的消息
                        message = ws.recv()
                        # 保存消息到文件
                        with open(local.filename, "a") as file:
                            file.write(message + "\n")
            except Exception as e:
                print(f"WebSocket 连接失败: {e}")
            running_connections -= 1
        # 开始模拟 WebSocket 连接
        while running_connections < num_connections:
            t = threading.Thread(target=connect_websocket)
            t.start()
            running_connections += 1
        # 等待所有连接完成
        while running_connections > 0:
            time.sleep(1)
        print("所有 WebSocket 连接完成。")
        

        2、运行

        # 安装  websocket-client
        pip install websocket-client
        # 运行test.py 文件
        python test.py
        

        3、说明

        脚本作用是生成指定 num_connections 的webSocket 连接数,并一直监听服务端返回的消息,如果服务端有消息就会保存到对应链接的文件夹下面,包含其服务端返回的内容。

        6.2、netty 服务端

        1、具体的链接的代码我这里就不说了

        2、主要需要写两个接口,一个接口是向所有在线的客户端发送一条消息,另一个接口是向所有在线的客户端发送指定数量mockCount的消息

        package cn.jt.thermalapi.common.controller;
        import cn.jt.thermalapi.response.Response;
        import cn.jt.thermalapi.websocket.session.SessionFactory;
        import io.swagger.annotations.Api;
        import lombok.extern.slf4j.Slf4j;
        import org.springframework.web.bind.annotation.GetMapping;
        import org.springframework.web.bind.annotation.PathVariable;
        import org.springframework.web.bind.annotation.RequestMapping;
        import org.springframework.web.bind.annotation.RestController;
        import springfox.documentation.annotations.ApiIgnore;
        import java.util.concurrent.atomic.AtomicInteger;
        /**
         * @author GXM
         * @version 1.0.0
         * @Description TODO
         * @createTime 2023年10月13日
         */
        @ApiIgnore
        @Api(tags = "测试api")
        @RestController
        @RequestMapping("/test")
        @Slf4j
        public class TestController {
            @GetMapping("mockOne")
            public Response mockOne() {
                AtomicInteger count = new AtomicInteger(0);
                SessionFactory.getSession().broadcast(count.getAndIncrement() + "");
                return Response.buildSuccess();
            }
            @GetMapping("mockMany/{mockCount}")
            public Response mockMany(@PathVariable("mockCount") int mockCount) {
                AtomicInteger count = new AtomicInteger(0);
                while (count.getAndIncrement() <= mockCount) {
                    SessionFactory.getSession().broadcast(count.getAndIncrement() + "");
                }
                return Response.buildSuccess();
            }
        }
        

        6.3、演示

        1、启动你的netty 服务端

        2、启动测试脚本

        python test.py
        

        3、服务端日志输出,我测试出来,1w 链接大约30s左右,看自己机器吧,我这还是在idea里面跑的。

        springBoot + netty搭建高性能 websocket 服务 & 性能测试(包含python 测试脚本),在这里插入图片描述,第1张

        4、请求test/mockOne接口,大于1s

        springBoot + netty搭建高性能 websocket 服务 & 性能测试(包含python 测试脚本),在这里插入图片描述,第2张

        5、在 test.py文件下,生成了对应的1w客户端的文件,其内容就是服务端发送的。

        springBoot + netty搭建高性能 websocket 服务 & 性能测试(包含python 测试脚本),在这里插入图片描述,第3张

        5、请求test/mockMany/100接口,这个大家可以自己测试下,或者等我后续在服务器测试结束后,再把这篇文章整理,一下,因为本次测试都是在我本机上测试的,只是初步了解。但是脚本已经可以使用,后续大家测试服务器上面,步骤是一样的