需要了解netty 基本知识原理,不了解的可以查看我之前的博客,以及网上的资料,这里不在过多撰述。
这里以对接硬件雷达水位计为例:
说一下思路, 这里场景各种设备连接DTU,然后通过DTU上报报文,和接收服务器下发的指令。
例如127.0.0.1:2233 就是你服务器的ip和端口,我们需要开发部署一个 JAVA 开发的Netty 服务器来监听 2233端口, 从机配置我们的服务器ip和端口连接到netty。
那么我们开发netty 的思路应该是什么样子的。
- netty 监听端口;
- netty 保存通道长链接;
- 将netty 的 里面的所有通过 存放到一个 ConcurrentHashMap 里面来进行管理;
- 通过 netty 监听 我们可以获取 从机上报到服务器的报文,我们进行业务处理;
- 通过Map 我们实现 定时下发报文,让从机回复响应;
springboot,依赖, 去掉tomcat ,我们这里只做服务器,并不需要tomcat,以及只用 starter
org.springframework.boot spring-boot-starter-parent 2.1.3.RELEASE org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-tomcat org.slf4j slf4j-log4j12
io.netty netty-all
cn.hutool hutool-all 4.6.1
其他相关依赖 不在撰写, 数据库依赖以及 工具类依赖 ,自己按需引用
不在过多解释代码,每行都有注释
package com.joygis.iot.netty.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; /** * 功能描述: netty服务启动类 * * @Author keLe * @Date 2022/8/26 */ @Slf4j @Component public class NettyServer { public void start(InetSocketAddress address) { //配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 绑定线程池,编码解码 //服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝 ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) // 指定Channel .channel(NioServerSocketChannel.class) //使用指定的端口设置套接字地址 .localAddress(address) //使用自定义处理类 .childHandler(new NettyServerChannelInitializer()) //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数 .option(ChannelOption.SO_BACKLOG, 128) //保持长连接,2小时无数据激活心跳机制 .childOption(ChannelOption.SO_KEEPALIVE, true) //将小的数据包包装成更大的帧进行传送,提高网络的负载 .childOption(ChannelOption.TCP_NODELAY, true); // 绑定端口,开始接收进来的连接 ChannelFuture future = bootstrap.bind(address).sync(); if (future.isSuccess()) { log.info("netty服务器开始监听端口:{}",address.getPort()); } //关闭channel和块,直到它被关闭 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package com.joygis.iot.netty.server; import com.joygis.iot.netty.MyDecoder; import com.joygis.iot.netty.MyEncoder; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.TimeUnit; /** * 功能描述: 服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器 * * @Author keLe * @Date 2022/8/26 */ public class NettyServerChannelInitializer extends ChannelInitializer{ @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //接收消息格式,使用自定义解析数据格式 pipeline.addLast("decoder",new MyDecoder()); //发送消息格式,使用自定义解析数据格式 pipeline.addLast("encoder",new MyEncoder()); //针对客户端,如果在1分钟时没有想服务端发送写心跳(ALL),则主动断开 //如果是读空闲或者写空闲,不处理,这里根据自己业务考虑使用 //pipeline.addLast(new IdleStateHandler(600,0,0, TimeUnit.SECONDS)); //自定义的空闲检测 pipeline.addLast(new NettyServerHandler()); } }
接收消息格式,使用自定义解析数据格式工具类(5.23更新拆包粘包处理)
package com.joygis.iot.netty; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** * 功能描述: 自定义接收消息格式 * * @Author keLe * @Date 2022/8/26 */ public class MyDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List
自定义发送消息格式,使用自定义解析数据格式工具类
/** * 功能描述: 自定义发送消息格式 * * @Author keLe * @Date 2022/8/26 */ public class MyEncoder extends MessageToByteEncoder{ @Override protected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) throws Exception { //将16进制字符串转为数组 byteBuf.writeBytes(hexString2Bytes(s)); } /** * 功能描述: 16进制字符串转字节数组 * @Author keLe * @Date 2022/8/26 * @param src 16进制字符串 * @return byte[] */ public static byte[] hexString2Bytes(String src) { int l = src.length() / 2; byte[] ret = new byte[l]; for (int i = 0; i < l; i++) { ret[i] = (byte) Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue(); } return ret; } }
package com.joygis.iot.netty.server; import com.joygis.iot.netty.ChannelMap; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; /** * 功能描述: netty服务端处理类 * * @Author keLe * @Date 2022/8/26 */ @Slf4j public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * 功能描述: 有客户端连接服务器会触发此函数 * @Author keLe * @Date 2022/8/26 * @param ctx 通道 * @return void */ @Override public void channelActive(ChannelHandlerContext ctx) { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress(); int clientPort = insocket.getPort(); //获取连接通道唯一标识 ChannelId channelId = ctx.channel().id(); //如果map中不包含此连接,就保存连接 if (ChannelMap.getChannelMap().containsKey(channelId)) { log.info("客户端:{},是连接状态,连接通道数量:{} ",channelId,ChannelMap.getChannelMap().size()); } else { //保存连接 ChannelMap.addChannel(channelId, ctx.channel()); log.info("客户端:{},连接netty服务器[IP:{}-->PORT:{}]",channelId, clientIp,clientPort); log.info("连接通道数量: {}",ChannelMap.getChannelMap().size()); } } /** * 功能描述: 有客户端终止连接服务器会触发此函数 * @Author keLe * @Date 2022/8/26 * @param ctx 通道处理程序上下文 * @return void */ @Override public void channelInactive(ChannelHandlerContext ctx) { InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = inSocket.getAddress().getHostAddress(); ChannelId channelId = ctx.channel().id(); //包含此客户端才去删除 if (ChannelMap.getChannelMap().containsKey(channelId)) { //删除连接 ChannelMap.getChannelMap().remove(channelId); log.info("客户端:{},连接netty服务器[IP:{}-->PORT:{}]",channelId, clientIp,inSocket.getPort()); log.info("连接通道数量: " + ChannelMap.getChannelMap().size()); } } /** * 功能描述: 有客户端发消息会触发此函数 * @Author keLe * @Date 2022/8/26 * @param ctx 通道处理程序上下文 * @param msg 客户端发送的消息 * @return void */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("加载客户端报文,客户端id:{},客户端消息:{}",ctx.channel().id(), msg); String data = String.valueOf(msg); Integer water = Integer.parseInt(data.substring(6,10),16); log.info("当前水位:{}cm",water); //响应客户端 //this.channelWrite(ctx.channel().id(), msg); } /* @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { String bytes = "01 03 00 02 00 01 25 CA"; ctx.writeAndFlush(bytes); }*/ /** * 功能描述: 服务端给客户端发送消息 * @Author keLe * @Date 2022/8/26 * @param channelId 连接通道唯一id * @param msg 需要发送的消息内容 * @return void */ public void channelWrite(ChannelId channelId, Object msg) throws Exception { Channel channel = ChannelMap.getChannelMap().get(channelId); if (channel == null) { log.info("通道:{},不存在",channelId); return; } if (msg == null || msg == "") { log.info("服务端响应空的消息"); return; } //将客户端的信息直接返回写入ctx channel.write(msg); //刷新缓存区 channel.flush(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { String socketString = ctx.channel().remoteAddress().toString(); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info("Client:{},READER_IDLE 读超时",socketString); ctx.disconnect(); Channel channel = ctx.channel(); ChannelId id = channel.id(); ChannelMap.removeChannelByName(id); } else if (event.state() == IdleState.WRITER_IDLE) { log.info("Client:{}, WRITER_IDLE 写超时",socketString); ctx.disconnect(); Channel channel = ctx.channel(); ChannelId id = channel.id(); ChannelMap.removeChannelByName(id); } else if (event.state() == IdleState.ALL_IDLE) { log.info("Client:{},ALL_IDLE 总超时",socketString); ctx.disconnect(); Channel channel = ctx.channel(); ChannelId id = channel.id(); ChannelMap.removeChannelByName(id); } } } /** * 功能描述: 发生异常会触发此函数 * @Author keLe * @Date 2022/8/26 * @param ctx 通道处理程序上下文 * @param cause 异常 * @return void */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); log.info("{}:发生了错误,此连接被关闭。此时连通数量:{}",ctx.channel().id(),ChannelMap.getChannelMap().size()); } }
package com.joygis.iot.netty; import io.netty.channel.Channel; import io.netty.channel.ChannelId; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import org.springframework.util.CollectionUtils; import java.util.concurrent.ConcurrentHashMap; /** * 功能描述: 管理通道Map类 * * @Author keLe * @Date 2022/8/26 */ public class ChannelMap { /** * 管理一个全局map,保存连接进服务端的通道数量 */ private static final ConcurrentHashMapCHANNEL_MAP = new ConcurrentHashMap<>(128); public static ConcurrentHashMap getChannelMap() { return CHANNEL_MAP; } /** * 获取指定name的channel */ public static Channel getChannelByName(ChannelId channelId){ if(CollectionUtils.isEmpty(CHANNEL_MAP)){ return null; } return CHANNEL_MAP.get(channelId); } /** * 将通道中的消息推送到每一个客户端 */ public static boolean pushNewsToAllClient(String obj){ if(CollectionUtils.isEmpty(CHANNEL_MAP)){ return false; } for(ChannelId channelId: CHANNEL_MAP.keySet()) { Channel channel = CHANNEL_MAP.get(channelId); channel.writeAndFlush(new TextWebSocketFrame(obj)); } return true; } /** * 将channel和对应的name添加到ConcurrentHashMap */ public static void addChannel(ChannelId channelId,Channel channel){ CHANNEL_MAP.put(channelId,channel); } /** * 移除掉name对应的channel */ public static boolean removeChannelByName(ChannelId channelId){ if(CHANNEL_MAP.containsKey(channelId)){ CHANNEL_MAP.remove(channelId); return true; } return false; } }
package com.joygis.iot.netty; import lombok.Getter; import lombok.Setter; import lombok.ToString; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.stereotype.Component; /** * 功能描述: 配置类 * * @Author keLe * @Date 2022/8/26 */ @Setter @Getter @ToString @Component @Configuration @PropertySource("classpath:application.yml") @ConfigurationProperties(prefix = "socket") public class SocketProperties { private Integer port; private String host; }
appliction.yml
spring: profiles: active: test resources: cache: period: 0 application: name: iot-netty socket: # 监听端口 8090 port: 8090 #ip地址 host: 127.0.0.1
package com.joygis.iot.config; import cn.hutool.cron.CronUtil; import com.joygis.iot.netty.SocketProperties; import com.joygis.iot.netty.server.NettyServer; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.net.InetSocketAddress; /** * 功能描述: 任务队列 * @Author keLe * @Date 2022/7/20 */ @Component @Slf4j public class LaunchRunner implements CommandLineRunner { @Resource private NettyServer nettyServer; @Resource private SocketProperties socketProperties; @Override public void run(String... args) throws Exception { TaskRunner(); InetSocketAddress address = new InetSocketAddress(socketProperties.getHost(),socketProperties.getPort()); log.info("netty服务器启动地址:"+socketProperties.getHost()); nettyServer.start(address); } /** * 执行正在运行的任务 */ private void TaskRunner() { /** * 任务队列启动 */ CronUtil.setMatchSecond(true); CronUtil.start(); log.info("\n-----------------------任务服务启动------------------------\n\t" + "当前正在启动的{}个任务"+ "\n-----------------------------------------------------------\n\t" , CronUtil.getScheduler().size() ); } }
两个定时器,一个定时下发报文,一个定时删除不活跃的连接
package com.joygis.iot.manage; import com.joygis.iot.netty.ChannelMap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelId; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.util.Arrays; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 功能描述: 定时发送Dtu报文 * * @Author keLe * @Date 2022/8/29 */ @Slf4j @Component public class DtuManage { public void sendMsg(){ ConcurrentHashMapchannelMap = ChannelMap.getChannelMap(); if(CollectionUtils.isEmpty(channelMap)){ return; } ConcurrentHashMap.KeySetView channelIds = channelMap.keySet(); byte[] msgBytes = {0x01, 0x03, 0x00, 0x02, 0x00, 0x01, 0x25, (byte) 0xCA}; for(ChannelId channelId : channelIds){ Channel channel = ChannelMap.getChannelByName(channelId); // 判断是否活跃 if(channel==null || !channel.isActive()){ ChannelMap.getChannelMap().remove(channelId); log.info("客户端:{},连接已经中断",channelId); return ; } // 指令发送 ByteBuf buffer = Unpooled.buffer(); log.info("开始发送报文:{}",Arrays.toString(msgBytes)); buffer.writeBytes(msgBytes); channel.writeAndFlush(buffer).addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { log.info("客户端:{},回写成功:{}",channelId,Arrays.toString(msgBytes)); } else { log.info("客户端:{},回写失败:{}",channelId,Arrays.toString(msgBytes)); } }); } } /** * 功能描述: 定时删除不活跃的连接 * @Author keLe * @Date 2022/8/26 * @return void */ public void deleteInactiveConnections(){ ConcurrentHashMap channelMap = ChannelMap.getChannelMap(); if(!CollectionUtils.isEmpty(channelMap)){ for (Map.Entry next : channelMap.entrySet()) { ChannelId channelId = next.getKey(); Channel channel = next.getValue(); if (!channel.isActive()) { channelMap.remove(channelId); log.info("客户端:{},连接已经中断",channelId); } } } } }
使用网络助手进行调试
百度云盘下载地址:https://pan.baidu.com/s/1dcVk9MH88RMRF9dmR3mH5g
提取码:z7h0
发送报文
定时发送报文
我们有了方向,才有了思路,就会有具体落地。
如果对netty还有不太明白的地方,可以看看我的后续博客,持续更新中。