springCloudGateway+Nacos注册与转发Netty+WebSocket
作者:mmseoamin日期:2024-04-30

Netty+WebSocket是一开始单体应用中与前端交互使用的,最近开始搞Cloud想着移植过来使用

具体官方描述本文就不体现了 直接开始实现 以及解决过程中遇到的问题

1.首先编写netty端代码,由于是微服务模式就直接新建一个项目

        springCloudGateway+Nacos注册与转发Netty+WebSocket,第1张

server:
  port: 8085
spring:
  application:
    name: mall-im
    
netty:
  # Netty端口
  port: 9001
  application:
    # Netty应用名称
    name: mall-im-netty

1.1 由于Netty 需要额外启动所以配置一个启动器,这里有一个小坑,很多同学习惯把自定义启动放到服务启动类里,其实也没事,但是只要你使用@RefreshScope注解后就会产生自定义启动被启动了两次的问题,感兴趣的同学可以自己尝试一下

/**
 * Netty 额外启动类
 *
 * @author 杨旭
 * @email 18811132173@163.com
 * @create 2023/11/16 17:25
 */
@Component
public class NettyCommandLineRunner implements CommandLineRunner {
    @Resource
    private NettyWebSocketServer nettyServer;
    @Override
    public void run(String... args) throws Exception {
        //netty 服务端启动的端口不可和Springboot启动类的端口号重复
        nettyServer.start();
        //关闭服务器的时候同时关闭Netty服务
        Runtime.getRuntime().addShutdownHook(new Thread(() -> nettyServer.destroy()));
    }
}

1.2 定义启动

/**
 * Netty启动类
 *
 * @author 杨旭
 * @email 18811132173@163.com
 * @create 2023/11/16 16:39
 */
@Slf4j
@Component
public class NettyWebSocketServer {
    private Channel channel;
    /**
     * bossGroup连接线程组,主要负责接受客户端连接,一般一个线程足矣
     */
    EventLoopGroup boosGroup = new NioEventLoopGroup();
    /**
     * workerGroup工作线程组,主要负责网络IO读写
     */
    EventLoopGroup workGroup = new NioEventLoopGroup();
    @Resource
    private NacosDiscoveryProperties nacosDiscoveryProperties;
    /**
     * Netty端口
     */
    @Value("${netty.port}")
    private Integer nettyPort;
    /**
     * Netty应用名称
     */
    @Value("${netty.application.name}")
    private String nettyName;
    @Async
    public void start() {
        log.error("=================Netty 端口启动:{}==================", nettyPort);
        try {
            //绑定端口
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 临时存放已完成三次握手的请求的队列的最大长度
            bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            // 设置两个线程组
            bootstrap.group(boosGroup, workGroup)
                    // 非阻塞异步服务端TCP Socket 连接
                    .channel(NioServerSocketChannel.class)
                    // 使用本地地址,绑定端口号
                    .localAddress(nettyPort)
                    //初始化handler
                    .childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            log.info("=================收到新的链接:{}==================", socketChannel.localAddress());
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            // 将请求和映带消息节码为HTTP消息
                            pipeline.addLast("http-codec", new HttpServerCodec());
                            // 向客户端发送HTML5文件
                            socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                            // 将HTTP消息的多个部分合成一条完整的HTTP消息
                            pipeline.addLast("aggregator", new HttpObjectAggregator(8192));
                            // 进行设置心跳检测
                            socketChannel.pipeline().addLast(new IdleStateHandler(60, 30, 60 * 30, TimeUnit.SECONDS));
                            // 配置通道处理 来进行业务处理
                            pipeline.addLast("handler", new WebSocketHandler());
                        }
                    });
            registerNamingService(nettyName, nettyPort);
            //绑定Netty的启动端口
            channel = bootstrap.bind(nettyPort).sync().channel();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 将Netty服务注册进Nacos
     *
     * @param nettyName 服务名称
     * @param nettyPort 服务端口号
     */
    private void registerNamingService(String nettyName, Integer nettyPort) {
        try {
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosDiscoveryProperties.getServerAddr());
            properties.setProperty(PropertyKeyConst.NAMESPACE, nacosDiscoveryProperties.getNamespace());
            NamingService namingService = NamingFactory.createNamingService(properties);
            InetAddress address = InetAddress.getLocalHost();
            namingService.registerInstance(nettyName, address.getHostAddress(), nettyPort);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    @PreDestroy
    public void destroy() {
        log.error("=================Netty服务关闭==================");
        if (channel != null) {
            channel.close();
        }
        boosGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
    }
}

1.3 业务实现,由于具体的业务还没想好怎么做所以就空下来了,本文主要是集成

/**
 * Netty业务实现类
 *
 * @author 杨旭
 * @email 18811132173@163.com
 * @create 2023/11/16 16:42
 */
public class WebSocketHandler extends SimpleChannelInboundHandler {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
        // TODO: 具体业务处理
    }
}

这个时候启动服务,可以看到已经成功启动了

springCloudGateway+Nacos注册与转发Netty+WebSocket,第2张

springCloudGateway+Nacos注册与转发Netty+WebSocket,第3张

使用工具访问一下springCloudGateway+Nacos注册与转发Netty+WebSocket,第4张

springCloudGateway+Nacos注册与转发Netty+WebSocket,第5张

2.接下来就是怎么将Netty用gateway 进行访问,同样具体的路由配置以及规则不进行赘述,大家自行百度

2.1 gateway增加路由

- id: mall-im
  uri: lb:ws://mall-im-netty
  predicates:
     - Path=/ws/**
  filters:
     - StripPrefix=1

2.2 增加路由后重新启动服务,并额外启动一个服务,开发环境多实例运行,注意(开发环境需要手动将.yml文件中端口号修改)多实例运行点击工具栏上方的启动,正常或调试均可

springCloudGateway+Nacos注册与转发Netty+WebSocket,第6张

springCloudGateway+Nacos注册与转发Netty+WebSocket,第7张

springCloudGateway+Nacos注册与转发Netty+WebSocket,第8张

2.3 用gateway进行访问

 springCloudGateway+Nacos注册与转发Netty+WebSocket,第9张

2.4 可以多尝试几次,这样就能实验出负载均衡的效果了

2.5 PS:如果是后端管理系统,发送个通知之类的 到目前为止,将业务处理实现就可以了,但如果是即时通讯,就会出现一个问题,A在9001,B在9002 这样两个人怎么进行通讯呢?

有思路的小伙伴咱们可以一起讨论下