相关推荐recommended
基于SpringBoot快速使用Netty - 客户端
作者:mmseoamin日期:2023-12-25

启动配置代码

        该文章提供客户端代码,如需服务器端代码,请看下篇文章,基于SpringBoot项目编写的。

        支持运行项目时自动启动netty,支持断线无限重连,只需要修改配置文件中的IP和端口即可使用,可以直接复制代码,解码处理器需要自己编写逻辑,当然也可以使用提供的解码器,详细见下文。

        没有提供Controller,要是需要,可以自己新建一个Controller,再ClientBoot类中写一个sendMsg()方法,方法中调用connect()方法,然后在你的Controller里注入ClientBoot,调用sendMsg()即可。

ClientStarter : 启动器

//客户端启动器
@Slf4j
@Component
public class ClientStarter {
    @Resource
    private NettyConfig nettyConfig;
    public void bootstrap() {
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(new NioEventLoopGroup(nettyConfig.getWorker()))
             .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyConfig.getTimeout())
             .option(ChannelOption.SO_KEEPALIVE, true) // 避免意外断开
             .channel(NioSocketChannel.class) // 指定通道
             .handler(new ClientHandler()); // 指定处理器
        //连接服务器
        try {
            ClientBoot clientBoot = new ClientBoot();
            clientBoot.connect(bootstrap, nettyConfig);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

ClientBoot : 连接服务器

//客户端链接
@Component
@Slf4j
public class ClientBoot {
    public void connect(Bootstrap bootstrap, NettyConfig nettyConfig) throws InterruptedException {
        // 连接 netty
        ChannelFuture future = bootstrap.connect(nettyConfig.getHost(), nettyConfig.getPort());
        //连接失败无限重连,直到连接成功为止,重连时间为5秒/次
        future.addListener((ChannelFutureListener) channelFuture -> {
            if (!channelFuture.isSuccess()) {
                log.info("连接失败,尝试重新连接!");
                // 在连接失败后,5秒后尝试重新连接
                channelFuture.channel().eventLoop().schedule(() -> {
                    try {
                        connect(bootstrap, nettyConfig);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }, nettyConfig.getReconnect(), TimeUnit.SECONDS);
            } else {
                log.info("客户端连接服务器成功!");
            }
        });
        //给关闭通道进行监听
        Channel channel = future.channel();
        channel.closeFuture().sync();
    }
}

ClientHandler: 客户端的处理器

        这个类中的解码器和消息处理器是你主要写的地方,解码器需要根据自己的业务进行编写,也可以使用提供好的解码器,当然还可以自行添加一些其他的Handler

netty 提供的解码器

DelimiterBasedFrameDecoder 解决TCP的粘包解码器
StringDecoder              消息转成String解码器
LineBasedFrameDecoder      自动完成标识符分隔解码器
FixedLengthFrameDecoder    固定长度解码器,二进制
Base64Decoder base64       解码器
对于 netty的数据传递都是ByteBuf,我们一般重写以上的解码器、编码器来实现自己的逻辑
//客户端处理器
public class ClientHandler extends ChannelInitializer {
    private static final NettyConfig nettyConfig = ApplicationContextHelperUtil.getBean(NettyConfig.class);
    @Override
    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
        ChannelPipeline pipeline = nioSocketChannel.pipeline();
        //心跳检测处理器
        //IdleStateHandler参数说明: 读空闲时间,写空闲时间,全部空闲时间,时间单位(默认秒)
        pipeline.addLast(new IdleStateHandler(nettyConfig.getReadTime(), nettyConfig.getWriteTime(), 0, TimeUnit.SECONDS));
        pipeline.addLast(new AnalyzeMessageHandler());//自定义解码器
        pipeline.addLast(new MonitorMessageHandler());//客户端消息处理器
    }
}

其他所需代码

依赖

        
        
            io.netty
            netty-all
            4.1.39.Final
        
        
        
            cn.hutool
            hutool-all
            5.5.4
        
        
            com.google.code.gson
            gson
            2.8.5
        
        
            org.springframework.boot
            spring-boot-starter-aop
        
        
            org.apache.httpcomponents
            httpclient
            4.5.12
        
        
            com.alibaba
            fastjson
            1.2.75
        
        
            joda-time
            joda-time
            2.10.1
        
        
            com.alibaba
            easyexcel
            2.2.10
        
        
            com.google.guava
            guava
            30.1-jre
        
        
        
            org.apache.logging.log4j
            log4j-api
            2.17.0
        
        
            org.springframework.boot
            spring-boot-starter-web
            
                
                    log4j-to-slf4j
                    org.apache.logging.log4j
                
            
        

配置启动类

项目启动入口(启动类)

@SpringBootApplication
public class NettyClientApplication {
    @Resource
    private ClientStarter clientStarter;
    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(NettyClientApplication.class, args);
        try {
            InetAddress ip = Inet4Address.getLocalHost();
            System.out.println("当前IP地址==>>:" + ip.getHostAddress());
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        System.out.println("(♥◠‿◠)ノ゙  netty客户端启动成功   ლ(´ڡ`ლ)゙ ");
        NettyClientApplication application = context.getBean(NettyClientApplication.class);
        application.runClient();
    }
    public void runClient() {
        // 异步启动 Netty
        Executors.newSingleThreadExecutor().execute(clientStarter::bootstrap);
    }
}

配置文件

# yml配置netty
netty:
  client:
    boss: 1 # boss线程数量 默认为cpu线程数*2 负责 ServerSocketChannel 上的 accept 事件
    worker: 4 # worker线程数量 默认为cpu线程数*2 负责 socketChannel 上的读写
    timeout: 100000 # 连接超时时间(毫秒)
    port: 6999 # 服务器主端口 默认6999
    host: 127.0.0.1 # 服务器地址 127.0.0.1
    writeTime: 2 # 客户端写入时间 2秒 目前默认发送心跳用
    readTime: 900 # 客户端读取时间 15分钟 900秒
    reconnect: 5 # 重新连接时间 5秒

Config

Netty属性配置

//Netty属性配置
@Data
@Configuration
public class NettyConfig {
    /**
     * boss线程数量 默认为cpu线程数*2
     * 负责 ServerSocketChannel 上的 accept 事件
     */
    @Value("${netty.client.boss}")
    private Integer boss;
    /**
     * worker线程数量 默认为cpu线程数*2
     * 负责 socketChannel 上的读写
     */
    @Value("${netty.client.worker}")
    private Integer worker;
    /**
     * 连接超时时间 默认为30s
     */
    @Value("${netty.client.timeout}")
    private Integer timeout;
    /**
     * 服务器主端口 默认6999
     */
    @Value("${netty.client.port}")
    private Integer port;
    /**
     * 服务器地址 默认为本地
     */
    @Value("${netty.client.host}")
    private String host;
    /**
     * 客户端写入时间 2秒
     * 目前默认发送心跳
     */
    @Value("${netty.client.writeTime}")
    private Integer writeTime;
    /**
     * 客户端读取时间 15分钟
     */
    @Value("${netty.client.readTime}")
    private Integer readTime;
    /**
     * 重新连接时间 5秒
     */
    @Value("${netty.client.reconnect}")
    private Integer reconnect;
}

Handler

AnalyzeMessageHandler: 自定义解码器

//接收到服务器的报文并解析
@Slf4j
public class AnalyzeMessageHandler extends ByteToMessageDecoder {
    private static final NettyServiceImpl nettyService = ApplicationContextHelperUtil.getBean(NettyServiceImpl.class);
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {
        log.info("接受到服务器发来得数据,正在进行解析中...");
        int len = byteBuf.writerIndex();//报文总长度
        // ... 根据需求编写自己的解码逻辑
    }
} 

MonitorMessageHandler: 消息处理器

//Netty 客户端监听消息处理器
@Slf4j
@ChannelHandler.Sharable
@RequiredArgsConstructor
public class MonitorMessageHandler extends ChannelInboundHandlerAdapter {
    private static final ClientStarter clientStarter = ApplicationContextHelperUtil.getBean(ClientStarter.class);
    /**
     * 服务端上线的时候调用
     *
     * @param ctx 通道处理程序(上线服务器信息)
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("连上了服务器:" + ctx.channel().remoteAddress());
        super.channelActive(ctx);
    }
    /**
     * 服务端掉线的时候调用
     * 如果发生服务器掉线,等待10秒后重新尝试连接
     *
     * @param ctx 通道处理程序(下线服务器信息)
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws InterruptedException {
        log.info("{}断开了服务器", ctx.channel().remoteAddress());
        // 关闭连接并释放底层的套接字资源
        ctx.channel().close();
        // 优雅关闭功能
        ctx.channel().eventLoop().parent().shutdownGracefully();
        // 清除计数
        ReferenceCountUtil.release(ctx);
        // 重新连接
        clientStarter.bootstrap();
    }
    /**
     * 读取服务端消息
     *
     * @param ctx 通道处理程序(上线服务器信息)
     * @param msg 收到的信息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf mes = (ByteBuf) msg;
        log.info("来自服务端的消息: {}", mes);
    }
    /**
     * 异常发生时候调用
     *
     * @param ctx 通道处理程序(上线服务器信息)
     * @param exc 异常信息
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable exc) {
        log.error("{}出现异常请注意查看:{}", ctx.channel().remoteAddress(), NettyUtil.printStackTrace((Exception) exc));
    }
    /**
     * 用来触发特殊事件
     * 该方法是监听处理ClientHandler类中IdleStateHandler处理器的
     *
     * @param ctx 通道处理程序
     * @param evt 事件信息
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            // 触发了写空闲事件
            if (event.state() == IdleState.WRITER_IDLE) {
                log.info("已2秒未向服务器发消息了,自动发送一条心跳包!");
                ByteBuf message = SendMessage.heartbeatMessage((short) 0);//厂站号默认0
                ctx.writeAndFlush(message);
            }
            // 触发了读空闲事件
            if (event.state().equals(IdleState.READER_IDLE)) {
                log.info("已经15分钟未收到服务器报文,正在进行重新连接!");
                ctx.pipeline().remove(this);//移除当前处理器本身
                ctx.channel().close();//关闭当前通道
                ctx.close();//关闭通道
                clientStarter.bootstrap();
            }
            // 触发了全部空闲事件
            if (event.state() == IdleState.ALL_IDLE) {
                log.info("全部空闲时间超时了!");
                ctx.close();
                clientStarter.bootstrap();
            }
        }
        super.userEventTriggered(ctx, evt);
    }
}

Utils工具类

解决注入空指针

        从Spring的上下文中去获取到类,解决@Autowired注入空指针的问题

/**
 * 从Spring的上下文中去获取到类,解决@Autowired注入空指针的问题
 * @author nld
 * @version 1.0
 */
@Component
public class ApplicationContextHelperUtil implements ApplicationContextAware {
    private static ApplicationContext applicationContext;
    @Override
    public void setApplicationContext(ApplicationContext applicationContext1 ) throws BeansException {
        applicationContext = applicationContext1;
    }
    public static ApplicationContext getApplicationContext(){
        return applicationContext;
    }
    /**
     *
     * @param clazz 需要注入的类
     * @param  泛型
     */
    @SuppressWarnings("unchecked")
    public static  T getBean(Class clazz) {
        return (T) applicationContext.getBean(clazz);
    }
}

 Netty工具类

      用于打印ByteBuf 方便调试,处理异常的工具类,自己编写的一个类,可以自己往里添加

/**
 * 异常打印工具类
 *
 * @author nld
 * @version 1.0
 */
public class NettyUtil {
    //Exceptiony 异常处理
    public static String printStackTrace(Exception e) {
        Writer writer = new StringWriter();
        PrintWriter printWriter = new PrintWriter(writer);
        e.printStackTrace(printWriter);
        printWriter.close();
        return writer.toString();
    }
    //ByteBuf 打印用
    public static void log(ByteBuf buffer) {
        int length = buffer.readableBytes();
        int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
        StringBuilder buf = new StringBuilder(rows * 80 * 2)
                .append("read index:").append(buffer.readerIndex())
                .append(" write index:").append(buffer.writerIndex())
                .append(" capacity:").append(buffer.capacity())
                .append(StringUtil.NEWLINE);
        ByteBufUtil.appendPrettyHexDump(buf, buffer);
        System.out.println(buf.toString());
    }
}

        以上就是客户端的相关代码,可以直接复制使用,服务器代码见下篇文章,编写博客纯属个人爱好,代码如有不足之处,欢迎各位评论提出优化方案,共勉!感谢!