1、如果我们的app类似于股票这种,数据很多很快,之前用的tomcat自带的websocket 又或者 spring-boot-starter-websocke集成,但是性能在数据并发很大时就会存在问题。
2、我前面写的一篇关于 springBoot+webosket的,没有使用netty的文章 springBoot使用webSocket的几种方式以及在高并发出现的问题及解决 ,其中就包含了 以下者两种方式,都有说明,大家如果量不大,下面这两种方式也是可以的。
1、如何使用 ,可以参考 netty + webSocket + SpringBott 是参考文章 SpringBoot整合Netty处理WebSocket(支持url参数) 这篇文章是,说的已经很ok了
2、但是上面那篇文章还是有所不足,因为我需要加上token认证,只有认证了,才可以建立链接,上面那篇的文章,只是获取参数,在认证方面,还是有所不足,满足不了这个条件。后续我可以把我的方式,写一篇文章放出来
2、比如你的链接是 ws://192.168.172.139:1234/ws/id=1,使用它文章中的获取后得到 /ws/,建议改成如下,获取之后是 /ws
/** * 获取URI中参数以外部分路径 * * @param uri * @return */ public static String getBasePath(String uriStr) { String pathWithSlash =""; try { // 使用URI解析URL字符串 URI uri = new URI(uriStr); // 获取路径部分 pathWithSlash = uri.getPath(); // 去掉末尾的斜杠 return pathWithSlash.replaceAll("/$", ""); } catch (URISyntaxException e) { log.error("解析path错误", e); } return pathWithSlash; }
在WebSocket服务器的构建中添加.addLast(new HttpServerCodec())的主要原因是WebSocket握手是基于HTTP协议的,WebSocket连接的建立需要经过以下步骤:
- 客户端向服务器发送一个HTTP请求,请求升级到WebSocket协议。
- 服务器收到这个请求后,需要进行协议升级处理,将HTTP协议切换到WebSocket协议。
- 一旦升级成功,WebSocket连接建立,客户端和服务器之间可以通过WebSocket协议进行双向通信。
因此,WebSocket握手的开始阶段仍然是HTTP请求和响应。为了处理这个初始的HTTP请求,需要在Netty的ChannelPipeline中添加.addLast(new HttpServerCodec()),以确保能够解析和处理这个HTTP请求,并在需要时将其升级为WebSocket连接。简而言之,.addLast(new HttpServerCodec())的作用是为了使WebSocket服务器能够正确地处理WebSocket握手之前的HTTP请求和响应,确保WebSocket连接能够成功建立。一旦WebSocket连接建立,就可以通过WebSocket协议进行实时双向通信。
这是WebSocket服务器构建中的一个标准操作。websocket协议本身是基于http协议的,所以这边也要使用http解编码器
.addLast(new ChunkedWriteHandler()) 是 Netty 中的一个 ChannelHandler,它的主要作用是支持异步写大数据流(例如文件传输)。
在某些情况下,你可能需要向客户端发送大量的数据,例如文件的内容,而不是一次性将整个数据写入缓冲区,因为这可能会导致内存占用过高。相反,你可以将数据分成小块(chunk)并逐块写入客户端,以避免内存问题。
ChunkedWriteHandler 的作用如下:
- 支持大数据流的异步写入: 它允许你将数据切割成小块并异步地将这些块写入客户端。这对于传输大型文件或大量数据非常有用,因为它可以避免将整个数据加载到内存中。
- 维护写入顺序: 它确保数据块按照它们添加到 Channel 的顺序进行写入。这有助于保持数据的有序性。
- 提高性能: 通过异步写入数据块,ChunkedWriteHandler 可以提高网络性能,因为它不会阻塞线程等待数据传输完成。
这个处理器通常与其他处理器一起使用,以完成完整的数据传输过程。例如,如果你要实现文件传输,通常会使用 ChunkedWriteHandler 将文件数据切割成小块,然后使用其他处理器来处理文件的传输,例如文件块的编码和解码。
总之,.addLast(new ChunkedWriteHandler()) 的作用是支持异步写大数据流,以提高性能并降低内存使用,尤其在需要传输大量数据时非常有用。
将HttpMessage和HttpContents聚合到一个完成的 FullHttpRequest或FullHttpResponse中,具体是FullHttpRequest对象还是FullHttpResponse对象取决于是请求还是响应
.addLast(new HttpObjectAggregator(1024 * 64)) 是 Netty 中的一个 ChannelHandler,主要用于将HTTP请求或响应的多个部分聚合成一个完整的HTTP消息。这对于处理HTTP消息非常有用,特别是当你需要处理大量的HTTP数据时。
以下是.addLast(new HttpObjectAggregator(1024 * 64))的主要作用:
- 消息聚合: 在HTTP通信中,请求或响应可能会分成多个部分(例如,HTTP请求头和HTTP请求体)。HttpObjectAggregator 负责将这些部分聚合成一个完整的FullHttpRequest或FullHttpResponse,以便更容易处理和操作。
- 内存管理: 这个处理器还具有内存管理功能。你可以在构造函数中指定一个最大的聚合字节数(在示例中是64 KB)。如果接收到的HTTP数据超过了这个大小,HttpObjectAggregator 将抛出异常以防止内存泄漏。
- 简化HTTP消息处理: 聚合HTTP消息使得你可以更容易地处理完整的HTTP请求和响应,而不必手动处理每个部分。这对于构建Web服务器或HTTP代理非常有用。
示例使用:
pipeline.addLast(new HttpServerCodec()); // 添加HTTP编解码器 pipeline.addLast(new HttpObjectAggregator(1024 * 64)); // 聚合HTTP消息,最大64KB pipeline.addLast(new MyHttpRequestHandler()); // 自定义的HTTP请求处理器
在上面的示例中,首先使用 HttpServerCodec 添加了HTTP编解码器,然后使用 HttpObjectAggregator 聚合HTTP消息,最后添加了一个自定义的HTTP请求处理器。
总之,.addLast(new HttpObjectAggregator(1024 * 64)) 的作用是将HTTP请求或响应的多个部分聚合成一个完整的HTTP消息,以简化和改善处理HTTP消息的流程,并提供内存管理功能。这在构建支持HTTP的应用程序中非常有用。
webSocket 数据压缩扩展,当添加这个的时候WebSocketServerProtocolHandler的第三个参数需要设置成true
.addLast(new WebSocketServerCompressionHandler()) 是 Netty 中的一个 ChannelHandler,用于支持 WebSocket 消息的压缩和解压缩。WebSocket 消息压缩可以减小消息的大小,提高网络传输效率,尤其在低带宽环境下非常有用。
以下是 .addLast(new WebSocketServerCompressionHandler()) 的主要作用:
- WebSocket 消息压缩: 当客户端和服务器之间通过 WebSocket 协议传输大量数据时,可以使用压缩技术将消息压缩为更小的尺寸,以减少网络带宽的使用。WebSocketServerCompressionHandler 负责处理消息的压缩。
- WebSocket 消息解压缩: 对于接收到的已压缩的 WebSocket 消息,服务器需要将其解压缩以获取原始消息。WebSocketServerCompressionHandler 也负责解压缩已压缩的消息。
- 支持多种压缩算法: WebSocketServerCompressionHandler 支持多种压缩算法,包括通常的 DEFLATE 和 GZIP 压缩算法,以及自定义的压缩算法。
在WebSocket应用程序中,通常需要在WebSocket连接建立时协商是否启用压缩,以及使用哪种压缩算法。如果客户端和服务器都支持压缩,那么它们可以在消息传输过程中启用压缩。
要使用 .addLast(new WebSocketServerCompressionHandler()),你需要在 WebSocket 服务器的处理管道中添加该处理器。例如:
pipeline.addLast(new HttpServerCodec()); // 添加HTTP编解码器 pipeline.addLast(new HttpObjectAggregator(1024 * 64)); // 聚合HTTP消息,最大64KB pipeline.addLast(new WebSocketServerCompressionHandler()); // 添加WebSocket消息压缩处理器 pipeline.addLast(new MyWebSocketHandler()); // 自定义的WebSocket处理器
在上面的示例中,首先使用 HttpServerCodec 添加了HTTP编解码器,然后使用 HttpObjectAggregator 聚合HTTP消息,接下来添加了 WebSocketServerCompressionHandler 以支持WebSocket消息压缩,最后添加了一个自定义的WebSocket处理器。
总之,.addLast(new WebSocketServerCompressionHandler()) 的作用是为WebSocket服务器添加消息压缩和解压缩的功能,以减小消息大小并提高网络传输效率。这在需要传输大量数据的WebSocket应用中非常有用。
自定义处理器 - 处理 web socket 消息(消息的父类是WebSocketFrame,旗下有很多子类,比如BinaryWebSocketFrame TextWebSocketFrame 等等)
如果你使用的是 父类是WebSocketFrame,则需要在其内部,判断是什么类型的数据,如果你使用的具体的子类,那么只有具体的消息类型会到哪里
服务器端向外暴露的 web socket 端点,当客户端传递比较大的对象时,maxFrameSize参数的值需要调大
WebSocketServerProtocolHandler 是 Netty 中的一个关键组件,用于处理 WebSocket 握手和协议升级,以及管理 WebSocket 连接的生命周期。它的主要作用如下:
- WebSocket 握手处理: 当客户端通过 HTTP 请求发起 WebSocket 握手时,WebSocketServerProtocolHandler 负责识别并处理这些握手请求。它可以检查HTTP请求中的升级标头和协议头,以确定是否需要升级到 WebSocket 协议。
- WebSocket 握手协议升级: 如果客户端发送了符合 WebSocket 握手规范的请求,WebSocketServerProtocolHandler 会处理协议升级,将连接从 HTTP 协议切换到 WebSocket 协议。这个过程包括升级响应的构建和升级握手的处理。
- WebSocket 生命周期管理: 一旦 WebSocket 握手成功,WebSocketServerProtocolHandler 管理 WebSocket 连接的生命周期。它会处理连接的打开、关闭、异常和消息传递等事件。
- Ping/Pong 处理: WebSocket 协议支持 Ping 和 Pong 消息,用于保持连接的活动状态。WebSocketServerProtocolHandler 会自动处理这些心跳消息,以确保连接保持活动状态。
以下是一个示例,展示了如何在 Netty 中使用 WebSocketServerProtocolHandler:
pipeline.addLast(new HttpServerCodec()); // 添加HTTP编解码器 pipeline.addLast(new HttpObjectAggregator(1024 * 64)); // 聚合HTTP消息,最大64KB pipeline.addLast(new WebSocketServerProtocolHandler("/websocket")); // 添加WebSocket握手处理器 pipeline.addLast(new MyWebSocketHandler()); // 自定义的WebSocket处理器
在上面的示例中,WebSocketServerProtocolHandler 被添加到处理管道中,并指定了 WebSocket 的路径(在示例中是"/websocket")。一旦握手成功,连接将切换到 WebSocket 协议,并且可以在 MyWebSocketHandler 中处理 WebSocket 消息。
总之,WebSocketServerProtocolHandler 是用于处理 WebSocket 握手和协议升级的关键组件,它使得在 Netty 中创建 WebSocket 服务器变得更加容易。
参考文章 Web Socket 性能对比——Spring Boot vs Tomcat vs Netty 说的很ok了。
1、文章包含了一些线上的参数调整,都是干活
原文地址: https://colobu.com/2015/05/22/implement-C1000K-servers-by-spray-netty-undertow-and-node-js/
原文地址: https://colobu.com/2015/07/14/performance-comparison-of-7-websocket-frameworks/
1、主要测试两部分
2、python 安装这里就不再说了
3、本文的 第四节 和第五节 请务必了解,需要修改对应的 服务器 tcp链接数等等参数。
4、我的webSocket 链接格式是 ws://192.168.172.226:7081/ws/token 最后的那个token,用于线上的认证,只有认证了的用户,才会建立通道,这里为了方便测试,直接用数值代替,如下,这样,就代表用户id好了,毕竟后续我要是测试50w个客户端,总不能先生成50w个token吧。
1、脚本内容
import threading import time import websocket # 定义带有顺序编号的 WebSocket URL url_base = "ws://192.168.172.226:7081/ws/" num_connections = 10000 # 要模拟的连接数 running_connections = 0 # 跟踪当前正在运行的连接数 # 创建线程本地存储对象来存储每个线程的文件名 local = threading.local() # 建立 WebSocket 连接的函数 def connect_websocket(): global running_connections try: # 使用顺序编号生成 URL url = url_base + str(running_connections) # 为当前线程创建文件名 local.filename = f"{running_connections}.txt" while True: # 创建 WebSocket 连接 ws = websocket.create_connection(url) while True: # 接收来自服务端的消息 message = ws.recv() # 保存消息到文件 with open(local.filename, "a") as file: file.write(message + "\n") except Exception as e: print(f"WebSocket 连接失败: {e}") running_connections -= 1 # 开始模拟 WebSocket 连接 while running_connections < num_connections: t = threading.Thread(target=connect_websocket) t.start() running_connections += 1 # 等待所有连接完成 while running_connections > 0: time.sleep(1) print("所有 WebSocket 连接完成。")
2、运行
# 安装 websocket-client pip install websocket-client # 运行test.py 文件 python test.py
3、说明
脚本作用是生成指定 num_connections 的webSocket 连接数,并一直监听服务端返回的消息,如果服务端有消息就会保存到对应链接的文件夹下面,包含其服务端返回的内容。
1、具体的链接的代码我这里就不说了
2、主要需要写两个接口,一个接口是向所有在线的客户端发送一条消息,另一个接口是向所有在线的客户端发送指定数量mockCount的消息
package cn.jt.thermalapi.common.controller; import cn.jt.thermalapi.response.Response; import cn.jt.thermalapi.websocket.session.SessionFactory; import io.swagger.annotations.Api; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import springfox.documentation.annotations.ApiIgnore; import java.util.concurrent.atomic.AtomicInteger; /** * @author GXM * @version 1.0.0 * @Description TODO * @createTime 2023年10月13日 */ @ApiIgnore @Api(tags = "测试api") @RestController @RequestMapping("/test") @Slf4j public class TestController { @GetMapping("mockOne") public ResponsemockOne() { AtomicInteger count = new AtomicInteger(0); SessionFactory.getSession().broadcast(count.getAndIncrement() + ""); return Response.buildSuccess(); } @GetMapping("mockMany/{mockCount}") public Response mockMany(@PathVariable("mockCount") int mockCount) { AtomicInteger count = new AtomicInteger(0); while (count.getAndIncrement() <= mockCount) { SessionFactory.getSession().broadcast(count.getAndIncrement() + ""); } return Response.buildSuccess(); } }
1、启动你的netty 服务端
2、启动测试脚本
python test.py
3、服务端日志输出,我测试出来,1w 链接大约30s左右,看自己机器吧,我这还是在idea里面跑的。
4、请求test/mockOne接口,大于1s
5、在 test.py文件下,生成了对应的1w客户端的文件,其内容就是服务端发送的。
5、请求test/mockMany/100接口,这个大家可以自己测试下,或者等我后续在服务器测试结束后,再把这篇文章整理,一下,因为本次测试都是在我本机上测试的,只是初步了解。但是脚本已经可以使用,后续大家测试服务器上面,步骤是一样的