该文章提供客户端代码,如需服务器端代码,请看下篇文章,基于SpringBoot项目编写的。
支持运行项目时自动启动netty,支持断线无限重连,只需要修改配置文件中的IP和端口即可使用,可以直接复制代码,解码处理器需要自己编写逻辑,当然也可以使用提供的解码器,详细见下文。
没有提供Controller,要是需要,可以自己新建一个Controller,再ClientBoot类中写一个sendMsg()方法,方法中调用connect()方法,然后在你的Controller里注入ClientBoot,调用sendMsg()即可。
//客户端启动器 @Slf4j @Component public class ClientStarter { @Resource private NettyConfig nettyConfig; public void bootstrap() { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(new NioEventLoopGroup(nettyConfig.getWorker())) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyConfig.getTimeout()) .option(ChannelOption.SO_KEEPALIVE, true) // 避免意外断开 .channel(NioSocketChannel.class) // 指定通道 .handler(new ClientHandler()); // 指定处理器 //连接服务器 try { ClientBoot clientBoot = new ClientBoot(); clientBoot.connect(bootstrap, nettyConfig); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
//客户端链接 @Component @Slf4j public class ClientBoot { public void connect(Bootstrap bootstrap, NettyConfig nettyConfig) throws InterruptedException { // 连接 netty ChannelFuture future = bootstrap.connect(nettyConfig.getHost(), nettyConfig.getPort()); //连接失败无限重连,直到连接成功为止,重连时间为5秒/次 future.addListener((ChannelFutureListener) channelFuture -> { if (!channelFuture.isSuccess()) { log.info("连接失败,尝试重新连接!"); // 在连接失败后,5秒后尝试重新连接 channelFuture.channel().eventLoop().schedule(() -> { try { connect(bootstrap, nettyConfig); } catch (InterruptedException e) { throw new RuntimeException(e); } }, nettyConfig.getReconnect(), TimeUnit.SECONDS); } else { log.info("客户端连接服务器成功!"); } }); //给关闭通道进行监听 Channel channel = future.channel(); channel.closeFuture().sync(); } }
这个类中的解码器和消息处理器是你主要写的地方,解码器需要根据自己的业务进行编写,也可以使用提供好的解码器,当然还可以自行添加一些其他的Handler
netty 提供的解码器
DelimiterBasedFrameDecoder 解决TCP的粘包解码器 StringDecoder 消息转成String解码器 LineBasedFrameDecoder 自动完成标识符分隔解码器 FixedLengthFrameDecoder 固定长度解码器,二进制 Base64Decoder base64 解码器 对于 netty的数据传递都是ByteBuf,我们一般重写以上的解码器、编码器来实现自己的逻辑
//客户端处理器 public class ClientHandler extends ChannelInitializer{ private static final NettyConfig nettyConfig = ApplicationContextHelperUtil.getBean(NettyConfig.class); @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { ChannelPipeline pipeline = nioSocketChannel.pipeline(); //心跳检测处理器 //IdleStateHandler参数说明: 读空闲时间,写空闲时间,全部空闲时间,时间单位(默认秒) pipeline.addLast(new IdleStateHandler(nettyConfig.getReadTime(), nettyConfig.getWriteTime(), 0, TimeUnit.SECONDS)); pipeline.addLast(new AnalyzeMessageHandler());//自定义解码器 pipeline.addLast(new MonitorMessageHandler());//客户端消息处理器 } }
io.netty netty-all4.1.39.Final cn.hutool hutool-all5.5.4 com.google.code.gson gson2.8.5 org.springframework.boot spring-boot-starter-aoporg.apache.httpcomponents httpclient4.5.12 com.alibaba fastjson1.2.75 joda-time joda-time2.10.1 com.alibaba easyexcel2.2.10 com.google.guava guava30.1-jre org.apache.logging.log4j log4j-api2.17.0 org.springframework.boot spring-boot-starter-weblog4j-to-slf4j org.apache.logging.log4j
@SpringBootApplication public class NettyClientApplication { @Resource private ClientStarter clientStarter; public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(NettyClientApplication.class, args); try { InetAddress ip = Inet4Address.getLocalHost(); System.out.println("当前IP地址==>>:" + ip.getHostAddress()); } catch (UnknownHostException e) { e.printStackTrace(); } System.out.println("(♥◠‿◠)ノ゙ netty客户端启动成功 ლ(´ڡ`ლ)゙ "); NettyClientApplication application = context.getBean(NettyClientApplication.class); application.runClient(); } public void runClient() { // 异步启动 Netty Executors.newSingleThreadExecutor().execute(clientStarter::bootstrap); } }
# yml配置netty netty: client: boss: 1 # boss线程数量 默认为cpu线程数*2 负责 ServerSocketChannel 上的 accept 事件 worker: 4 # worker线程数量 默认为cpu线程数*2 负责 socketChannel 上的读写 timeout: 100000 # 连接超时时间(毫秒) port: 6999 # 服务器主端口 默认6999 host: 127.0.0.1 # 服务器地址 127.0.0.1 writeTime: 2 # 客户端写入时间 2秒 目前默认发送心跳用 readTime: 900 # 客户端读取时间 15分钟 900秒 reconnect: 5 # 重新连接时间 5秒
//Netty属性配置 @Data @Configuration public class NettyConfig { /** * boss线程数量 默认为cpu线程数*2 * 负责 ServerSocketChannel 上的 accept 事件 */ @Value("${netty.client.boss}") private Integer boss; /** * worker线程数量 默认为cpu线程数*2 * 负责 socketChannel 上的读写 */ @Value("${netty.client.worker}") private Integer worker; /** * 连接超时时间 默认为30s */ @Value("${netty.client.timeout}") private Integer timeout; /** * 服务器主端口 默认6999 */ @Value("${netty.client.port}") private Integer port; /** * 服务器地址 默认为本地 */ @Value("${netty.client.host}") private String host; /** * 客户端写入时间 2秒 * 目前默认发送心跳 */ @Value("${netty.client.writeTime}") private Integer writeTime; /** * 客户端读取时间 15分钟 */ @Value("${netty.client.readTime}") private Integer readTime; /** * 重新连接时间 5秒 */ @Value("${netty.client.reconnect}") private Integer reconnect; }
//接收到服务器的报文并解析 @Slf4j public class AnalyzeMessageHandler extends ByteToMessageDecoder { private static final NettyServiceImpl nettyService = ApplicationContextHelperUtil.getBean(NettyServiceImpl.class); @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List
//Netty 客户端监听消息处理器 @Slf4j @ChannelHandler.Sharable @RequiredArgsConstructor public class MonitorMessageHandler extends ChannelInboundHandlerAdapter { private static final ClientStarter clientStarter = ApplicationContextHelperUtil.getBean(ClientStarter.class); /** * 服务端上线的时候调用 * * @param ctx 通道处理程序(上线服务器信息) */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("连上了服务器:" + ctx.channel().remoteAddress()); super.channelActive(ctx); } /** * 服务端掉线的时候调用 * 如果发生服务器掉线,等待10秒后重新尝试连接 * * @param ctx 通道处理程序(下线服务器信息) */ @Override public void channelInactive(ChannelHandlerContext ctx) throws InterruptedException { log.info("{}断开了服务器", ctx.channel().remoteAddress()); // 关闭连接并释放底层的套接字资源 ctx.channel().close(); // 优雅关闭功能 ctx.channel().eventLoop().parent().shutdownGracefully(); // 清除计数 ReferenceCountUtil.release(ctx); // 重新连接 clientStarter.bootstrap(); } /** * 读取服务端消息 * * @param ctx 通道处理程序(上线服务器信息) * @param msg 收到的信息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf mes = (ByteBuf) msg; log.info("来自服务端的消息: {}", mes); } /** * 异常发生时候调用 * * @param ctx 通道处理程序(上线服务器信息) * @param exc 异常信息 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable exc) { log.error("{}出现异常请注意查看:{}", ctx.channel().remoteAddress(), NettyUtil.printStackTrace((Exception) exc)); } /** * 用来触发特殊事件 * 该方法是监听处理ClientHandler类中IdleStateHandler处理器的 * * @param ctx 通道处理程序 * @param evt 事件信息 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; // 触发了写空闲事件 if (event.state() == IdleState.WRITER_IDLE) { log.info("已2秒未向服务器发消息了,自动发送一条心跳包!"); ByteBuf message = SendMessage.heartbeatMessage((short) 0);//厂站号默认0 ctx.writeAndFlush(message); } // 触发了读空闲事件 if (event.state().equals(IdleState.READER_IDLE)) { log.info("已经15分钟未收到服务器报文,正在进行重新连接!"); ctx.pipeline().remove(this);//移除当前处理器本身 ctx.channel().close();//关闭当前通道 ctx.close();//关闭通道 clientStarter.bootstrap(); } // 触发了全部空闲事件 if (event.state() == IdleState.ALL_IDLE) { log.info("全部空闲时间超时了!"); ctx.close(); clientStarter.bootstrap(); } } super.userEventTriggered(ctx, evt); } }
从Spring的上下文中去获取到类,解决@Autowired注入空指针的问题
/** * 从Spring的上下文中去获取到类,解决@Autowired注入空指针的问题 * @author nld * @version 1.0 */ @Component public class ApplicationContextHelperUtil implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext1 ) throws BeansException { applicationContext = applicationContext1; } public static ApplicationContext getApplicationContext(){ return applicationContext; } /** * * @param clazz 需要注入的类 * @param泛型 */ @SuppressWarnings("unchecked") public static T getBean(Class clazz) { return (T) applicationContext.getBean(clazz); } }
用于打印ByteBuf 方便调试,处理异常的工具类,自己编写的一个类,可以自己往里添加
/** * 异常打印工具类 * * @author nld * @version 1.0 */ public class NettyUtil { //Exceptiony 异常处理 public static String printStackTrace(Exception e) { Writer writer = new StringWriter(); PrintWriter printWriter = new PrintWriter(writer); e.printStackTrace(printWriter); printWriter.close(); return writer.toString(); } //ByteBuf 打印用 public static void log(ByteBuf buffer) { int length = buffer.readableBytes(); int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4; StringBuilder buf = new StringBuilder(rows * 80 * 2) .append("read index:").append(buffer.readerIndex()) .append(" write index:").append(buffer.writerIndex()) .append(" capacity:").append(buffer.capacity()) .append(StringUtil.NEWLINE); ByteBufUtil.appendPrettyHexDump(buf, buffer); System.out.println(buf.toString()); } }
以上就是客户端的相关代码,可以直接复制使用,服务器代码见下篇文章,编写博客纯属个人爱好,代码如有不足之处,欢迎各位评论提出优化方案,共勉!感谢!