需要了解netty 基本知识原理,不了解的可以查看我之前的博客,以及网上的资料,这里不在过多撰述。
这里以对接硬件雷达水位计为例:

说一下思路, 这里场景各种设备连接DTU,然后通过DTU上报报文,和接收服务器下发的指令。
例如127.0.0.1:2233 就是你服务器的ip和端口,我们需要开发部署一个 JAVA 开发的Netty 服务器来监听 2233端口, 从机配置我们的服务器ip和端口连接到netty。
那么我们开发netty 的思路应该是什么样子的。
- netty 监听端口;
- netty 保存通道长链接;
- 将netty 的 里面的所有通过 存放到一个 ConcurrentHashMap 里面来进行管理;
- 通过 netty 监听 我们可以获取 从机上报到服务器的报文,我们进行业务处理;
- 通过Map 我们实现 定时下发报文,让从机回复响应;
springboot,依赖, 去掉tomcat ,我们这里只做服务器,并不需要tomcat,以及只用 starter
org.springframework.boot spring-boot-starter-parent 2.1.3.RELEASE org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-tomcat org.slf4j slf4j-log4j12
io.netty
netty-all
cn.hutool hutool-all 4.6.1
其他相关依赖 不在撰写, 数据库依赖以及 工具类依赖 ,自己按需引用
不在过多解释代码,每行都有注释
package com.joygis.iot.netty.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; /** * 功能描述: netty服务启动类 * * @Author keLe * @Date 2022/8/26 */ @Slf4j @Component public class NettyServer { public void start(InetSocketAddress address) { //配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 绑定线程池,编码解码 //服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝 ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) // 指定Channel .channel(NioServerSocketChannel.class) //使用指定的端口设置套接字地址 .localAddress(address) //使用自定义处理类 .childHandler(new NettyServerChannelInitializer()) //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数 .option(ChannelOption.SO_BACKLOG, 128) //保持长连接,2小时无数据激活心跳机制 .childOption(ChannelOption.SO_KEEPALIVE, true) //将小的数据包包装成更大的帧进行传送,提高网络的负载 .childOption(ChannelOption.TCP_NODELAY, true); // 绑定端口,开始接收进来的连接 ChannelFuture future = bootstrap.bind(address).sync(); if (future.isSuccess()) { log.info("netty服务器开始监听端口:{}",address.getPort()); } //关闭channel和块,直到它被关闭 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package com.joygis.iot.netty.server; import com.joygis.iot.netty.MyDecoder; import com.joygis.iot.netty.MyEncoder; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.TimeUnit; /** * 功能描述: 服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器 * * @Author keLe * @Date 2022/8/26 */ public class NettyServerChannelInitializer extends ChannelInitializer{ @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //接收消息格式,使用自定义解析数据格式 pipeline.addLast("decoder",new MyDecoder()); //发送消息格式,使用自定义解析数据格式 pipeline.addLast("encoder",new MyEncoder()); //针对客户端,如果在1分钟时没有想服务端发送写心跳(ALL),则主动断开 //如果是读空闲或者写空闲,不处理,这里根据自己业务考虑使用 //pipeline.addLast(new IdleStateHandler(600,0,0, TimeUnit.SECONDS)); //自定义的空闲检测 pipeline.addLast(new NettyServerHandler()); } }
接收消息格式,使用自定义解析数据格式工具类(5.23更新拆包粘包处理)
package com.joygis.iot.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* 功能描述: 自定义接收消息格式
*
* @Author keLe
* @Date 2022/8/26
*/
public class MyDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List
自定义发送消息格式,使用自定义解析数据格式工具类
/** * 功能描述: 自定义发送消息格式 * * @Author keLe * @Date 2022/8/26 */ public class MyEncoder extends MessageToByteEncoder{ @Override protected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) throws Exception { //将16进制字符串转为数组 byteBuf.writeBytes(hexString2Bytes(s)); } /** * 功能描述: 16进制字符串转字节数组 * @Author keLe * @Date 2022/8/26 * @param src 16进制字符串 * @return byte[] */ public static byte[] hexString2Bytes(String src) { int l = src.length() / 2; byte[] ret = new byte[l]; for (int i = 0; i < l; i++) { ret[i] = (byte) Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue(); } return ret; } }
package com.joygis.iot.netty.server;
import com.joygis.iot.netty.ChannelMap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
/**
* 功能描述: netty服务端处理类
*
* @Author keLe
* @Date 2022/8/26
*/
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 功能描述: 有客户端连接服务器会触发此函数
* @Author keLe
* @Date 2022/8/26
* @param ctx 通道
* @return void
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = insocket.getAddress().getHostAddress();
int clientPort = insocket.getPort();
//获取连接通道唯一标识
ChannelId channelId = ctx.channel().id();
//如果map中不包含此连接,就保存连接
if (ChannelMap.getChannelMap().containsKey(channelId)) {
log.info("客户端:{},是连接状态,连接通道数量:{} ",channelId,ChannelMap.getChannelMap().size());
} else {
//保存连接
ChannelMap.addChannel(channelId, ctx.channel());
log.info("客户端:{},连接netty服务器[IP:{}-->PORT:{}]",channelId, clientIp,clientPort);
log.info("连接通道数量: {}",ChannelMap.getChannelMap().size());
}
}
/**
* 功能描述: 有客户端终止连接服务器会触发此函数
* @Author keLe
* @Date 2022/8/26
* @param ctx 通道处理程序上下文
* @return void
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = inSocket.getAddress().getHostAddress();
ChannelId channelId = ctx.channel().id();
//包含此客户端才去删除
if (ChannelMap.getChannelMap().containsKey(channelId)) {
//删除连接
ChannelMap.getChannelMap().remove(channelId);
log.info("客户端:{},连接netty服务器[IP:{}-->PORT:{}]",channelId, clientIp,inSocket.getPort());
log.info("连接通道数量: " + ChannelMap.getChannelMap().size());
}
}
/**
* 功能描述: 有客户端发消息会触发此函数
* @Author keLe
* @Date 2022/8/26
* @param ctx 通道处理程序上下文
* @param msg 客户端发送的消息
* @return void
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("加载客户端报文,客户端id:{},客户端消息:{}",ctx.channel().id(), msg);
String data = String.valueOf(msg);
Integer water = Integer.parseInt(data.substring(6,10),16);
log.info("当前水位:{}cm",water);
//响应客户端
//this.channelWrite(ctx.channel().id(), msg);
}
/* @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
String bytes = "01 03 00 02 00 01 25 CA";
ctx.writeAndFlush(bytes);
}*/
/**
* 功能描述: 服务端给客户端发送消息
* @Author keLe
* @Date 2022/8/26
* @param channelId 连接通道唯一id
* @param msg 需要发送的消息内容
* @return void
*/
public void channelWrite(ChannelId channelId, Object msg) throws Exception {
Channel channel = ChannelMap.getChannelMap().get(channelId);
if (channel == null) {
log.info("通道:{},不存在",channelId);
return;
}
if (msg == null || msg == "") {
log.info("服务端响应空的消息");
return;
}
//将客户端的信息直接返回写入ctx
channel.write(msg);
//刷新缓存区
channel.flush();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
String socketString = ctx.channel().remoteAddress().toString();
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
log.info("Client:{},READER_IDLE 读超时",socketString);
ctx.disconnect();
Channel channel = ctx.channel();
ChannelId id = channel.id();
ChannelMap.removeChannelByName(id);
} else if (event.state() == IdleState.WRITER_IDLE) {
log.info("Client:{}, WRITER_IDLE 写超时",socketString);
ctx.disconnect();
Channel channel = ctx.channel();
ChannelId id = channel.id();
ChannelMap.removeChannelByName(id);
} else if (event.state() == IdleState.ALL_IDLE) {
log.info("Client:{},ALL_IDLE 总超时",socketString);
ctx.disconnect();
Channel channel = ctx.channel();
ChannelId id = channel.id();
ChannelMap.removeChannelByName(id);
}
}
}
/**
* 功能描述: 发生异常会触发此函数
* @Author keLe
* @Date 2022/8/26
* @param ctx 通道处理程序上下文
* @param cause 异常
* @return void
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
log.info("{}:发生了错误,此连接被关闭。此时连通数量:{}",ctx.channel().id(),ChannelMap.getChannelMap().size());
}
}
package com.joygis.iot.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.util.CollectionUtils;
import java.util.concurrent.ConcurrentHashMap;
/**
* 功能描述: 管理通道Map类
*
* @Author keLe
* @Date 2022/8/26
*/
public class ChannelMap {
/**
* 管理一个全局map,保存连接进服务端的通道数量
*/
private static final ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(128);
public static ConcurrentHashMap getChannelMap() {
return CHANNEL_MAP;
}
/**
* 获取指定name的channel
*/
public static Channel getChannelByName(ChannelId channelId){
if(CollectionUtils.isEmpty(CHANNEL_MAP)){
return null;
}
return CHANNEL_MAP.get(channelId);
}
/**
* 将通道中的消息推送到每一个客户端
*/
public static boolean pushNewsToAllClient(String obj){
if(CollectionUtils.isEmpty(CHANNEL_MAP)){
return false;
}
for(ChannelId channelId: CHANNEL_MAP.keySet()) {
Channel channel = CHANNEL_MAP.get(channelId);
channel.writeAndFlush(new TextWebSocketFrame(obj));
}
return true;
}
/**
* 将channel和对应的name添加到ConcurrentHashMap
*/
public static void addChannel(ChannelId channelId,Channel channel){
CHANNEL_MAP.put(channelId,channel);
}
/**
* 移除掉name对应的channel
*/
public static boolean removeChannelByName(ChannelId channelId){
if(CHANNEL_MAP.containsKey(channelId)){
CHANNEL_MAP.remove(channelId);
return true;
}
return false;
}
}
package com.joygis.iot.netty;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
/**
* 功能描述: 配置类
*
* @Author keLe
* @Date 2022/8/26
*/
@Setter
@Getter
@ToString
@Component
@Configuration
@PropertySource("classpath:application.yml")
@ConfigurationProperties(prefix = "socket")
public class SocketProperties {
private Integer port;
private String host;
}
appliction.yml
spring:
profiles:
active: test
resources:
cache:
period: 0
application:
name: iot-netty
socket:
# 监听端口 8090
port: 8090
#ip地址
host: 127.0.0.1
package com.joygis.iot.config;
import cn.hutool.cron.CronUtil;
import com.joygis.iot.netty.SocketProperties;
import com.joygis.iot.netty.server.NettyServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.net.InetSocketAddress;
/**
* 功能描述: 任务队列
* @Author keLe
* @Date 2022/7/20
*/
@Component
@Slf4j
public class LaunchRunner implements CommandLineRunner {
@Resource
private NettyServer nettyServer;
@Resource
private SocketProperties socketProperties;
@Override
public void run(String... args) throws Exception {
TaskRunner();
InetSocketAddress address = new InetSocketAddress(socketProperties.getHost(),socketProperties.getPort());
log.info("netty服务器启动地址:"+socketProperties.getHost());
nettyServer.start(address);
}
/**
* 执行正在运行的任务
*/
private void TaskRunner() {
/**
* 任务队列启动
*/
CronUtil.setMatchSecond(true);
CronUtil.start();
log.info("\n-----------------------任务服务启动------------------------\n\t" +
"当前正在启动的{}个任务"+
"\n-----------------------------------------------------------\n\t"
, CronUtil.getScheduler().size()
);
}
}
两个定时器,一个定时下发报文,一个定时删除不活跃的连接
package com.joygis.iot.manage;
import com.joygis.iot.netty.ChannelMap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelId;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 功能描述: 定时发送Dtu报文
*
* @Author keLe
* @Date 2022/8/29
*/
@Slf4j
@Component
public class DtuManage {
public void sendMsg(){
ConcurrentHashMap channelMap = ChannelMap.getChannelMap();
if(CollectionUtils.isEmpty(channelMap)){
return;
}
ConcurrentHashMap.KeySetView channelIds = channelMap.keySet();
byte[] msgBytes = {0x01, 0x03, 0x00, 0x02, 0x00, 0x01, 0x25, (byte) 0xCA};
for(ChannelId channelId : channelIds){
Channel channel = ChannelMap.getChannelByName(channelId);
// 判断是否活跃
if(channel==null || !channel.isActive()){
ChannelMap.getChannelMap().remove(channelId);
log.info("客户端:{},连接已经中断",channelId);
return ;
}
// 指令发送
ByteBuf buffer = Unpooled.buffer();
log.info("开始发送报文:{}",Arrays.toString(msgBytes));
buffer.writeBytes(msgBytes);
channel.writeAndFlush(buffer).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
log.info("客户端:{},回写成功:{}",channelId,Arrays.toString(msgBytes));
} else {
log.info("客户端:{},回写失败:{}",channelId,Arrays.toString(msgBytes));
}
});
}
}
/**
* 功能描述: 定时删除不活跃的连接
* @Author keLe
* @Date 2022/8/26
* @return void
*/
public void deleteInactiveConnections(){
ConcurrentHashMap channelMap = ChannelMap.getChannelMap();
if(!CollectionUtils.isEmpty(channelMap)){
for (Map.Entry next : channelMap.entrySet()) {
ChannelId channelId = next.getKey();
Channel channel = next.getValue();
if (!channel.isActive()) {
channelMap.remove(channelId);
log.info("客户端:{},连接已经中断",channelId);
}
}
}
}
}
使用网络助手进行调试
百度云盘下载地址:https://pan.baidu.com/s/1dcVk9MH88RMRF9dmR3mH5g
提取码:z7h0



发送报文


定时发送报文


我们有了方向,才有了思路,就会有具体落地。
如果对netty还有不太明白的地方,可以看看我的后续博客,持续更新中。