在我的另一篇文章中,有简单介绍过Socket的相关概念链接:SpringBoot简单集成WebSocket
初步了解后,本次再进行一个深入通俗的理解。
Socket作为一种通信机制,通常也被称为"套接字"。
它类似于人们之间的"打电话行为"。我们将每个人的电话号作为独立端口。两个人打电话之前则首先需要其中一方知晓另一方的"端口"。然后申请向对方进行拨号呼叫(请求连接)。
此时被连接方如果正好空闲,接起电话,则双方正式达成连接。开始正式通讯.只要有其中一方挂断电话,则关闭Socket连接。
1.流式Socket(Stream).一种面向连接的Socket,针对面向连接的TCP服务应用,安全但效率低。属于业内较为常用的一种方式
2.数据报式Socket(DataGram).一种无连接的Socket.不安全,效率高.
1.优化了Socket的启动方式,不论是Jar包启动还是War部署都不用再额外配置。
2.优化了Socket的线程代码,更简洁的代码逻辑。
3.优化了Socket的连接保持,不再使用线程Sleep的方式,提高了响应速度。
4.去除了客户端监听机制,此类代码在实际业务场景中意义不大。
5.重构了Socket回写给客户端消息的方式,使用PrintWrite进行回写.
下面开始在SpringBoot中实战整合Socket
在JDK1.8中.官方整合了Socket服务至java.net包中。因此不需要引入其它依赖。
#Socket配置 socket: port: 8082
这一步类似配置一个Controller,主要用于接受客户端的连接请求。
@Slf4j @Component public class SocketServers{ //注入被开放的端口 @Value("${socket.port}") private Integer port; //这个是业务处理的接口 @Autowired private IBusinessSocketService socketService; @PostConstruct public void socketStart(){ //直接另起一个线程挂起Socket服务 new Thread(this::socketServer).start(); } private void socketServer() { ExecutorService executorService = Executors.newFixedThreadPool(10); try (ServerSocket serverSocket = new ServerSocket(port)) { log.info("socket端口在:{}中开启并持续监听=====>", port); while (Boolean.TRUE) { Socket clientSocket = serverSocket.accept(); //设置流读取的超时时间,这里设置为10s。超时后自动断开连接 clientSocket.setSoTimeout(10000); //是否与客户端保持持续连接,这行代码在本示例中,并没有作用,因为后面的逻辑处理完成后,会自动断开连接. clientSocket.setKeepAlive(Boolean.TRUE); log.info("发现客户端连接Socket:{}:{}===========>", clientSocket.getInetAddress().getHostAddress(), clientSocket.getPort()); //这里通过线程池启动ClientHandler方法中的run方法. executorService.execute(new ClientHandler(clientSocket, socketService)); } } catch (Exception e) { log.error("Socket服务启动异常!", e); } } }
@Slf4j public class ClientHandler implements Runnable { private final Socket clientSocket; private final IBusinessSocketService socketService; public ClientHandler(Socket clientSocket, IBusinessSocketService socketService) { this.clientSocket = clientSocket; this.socketService = socketService; } @Override @SneakyThrows public void run() { //SnowFlakeUtil 雪花ID生成工具类,后面会统一给出 String logId = SnowFlakeUtil.getId(); String hostIp = clientSocket.getInetAddress().getHostAddress(); String port = String.valueOf(clientSocket.getPort()); try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), Boolean.TRUE)) { //这里的StringUtil是自己写的工具类,后面会统一给出 String requestInfo = StringUtil.readIoStreamToString(clientSocket.getInputStream()); if (StringUtil.isNotEmpty(requestInfo)) { log.info("监听到客户端消息:{},监听日志ID为:{}", requestInfo, logId); socketService.executeBusinessCode(requestInfo, logId, out); clientSocket.shutdownOutput(); TimeUnit.SECONDS.sleep(1L); } } catch (IOException e) { log.error("与客户端:[{}:{}]通信异常!错误信息:{}", hostIp, port, e.getMessage()); } finally { log.info("客户端:[{}:{}]Socket连接已关闭,日志ID为:{}========>", hostIp, port, logId); } } }
public interface IBusinessSocketService { /** * 从Socket中接受消息并处理 * * @param requestInfo 请求报文 * @param logId 日志ID * @param writer 回写给客户端消息的回写类(Socket自带) */ void executeBusinessCode(String requestInfo, String logId, PrintWriter writer); }
@Slf4j @Service public class BusinessSocketServiceImpl implements IBusinessSocketService { @Override public void executeBusinessCode(String requestInfo, String logId, PrintWriter writer) { String responseMsg; boolean isSuccess = Boolean.TRUE; try { if (StringUtils.isEmpty(requestInfo)) { return; } //执行你的业务操作 responseMsg = "回填给客户端的信息,可以是任何格式的对象"; } catch (Exception e) { isSuccess = Boolean.FALSE; e.printStackTrace(); responseMsg = "回填给客户端的信息(业务处理错误的情况下)"; } try { //将响应报文通过PrintWriter回写给客户端 writer.println(responseMsg); } catch (Exception e) { log.error("Socket客户端数据返回异常!当前日志ID:[{}],异常信息:{}", logId, e); } } }
package com.util; /************************************************** * copyright (c) 2021 * created by yusen.zhang * date: 2021-9-18 * description: 雪花算法工具类 * **************************************************/ public class SnowFlakeUtil { private long workerId; private long datacenterId; private long sequence = 0L; private long twepoch = 1288834974657L; // Thu, 04 Nov 2010 01:42:54 GMT 标记时间 用来计算偏移量,距离当前时间不同,得到的数据的位数也不同 private long workerIdBits = 5L; // 物理节点ID长度 private long datacenterIdBits = 5L; // 数据中心ID长度 private long maxWorkerId = -1L ^ (-1L << workerIdBits); // 最大支持机器节点数0~31,一共32个 private long maxDatacenterId = -1L ^ (-1L << datacenterIdBits); // 最大支持数据中心节点数0~31,一共32个 private long sequenceBits = 12L; // 序列号12位, 4095,同毫秒内生成不同id的最大个数 private long workerIdShift = sequenceBits; // 机器节点左移12位 private long datacenterIdShift = sequenceBits + workerIdBits; // 数据中心节点左移17位 private long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits; // 时间毫秒数左移22位 private long sequenceMask = -1L ^ (-1L << sequenceBits); // 用于和当前时间戳做比较,以获取最新时间 private long lastTimestamp = -1L; //成员类,SnowFlakeUtil的实例对象的保存域 private static class IdGenHolder { private static final SnowFlakeUtil instance = new SnowFlakeUtil(); } //外部调用获取SnowFlakeUtil的实例对象,确保不可变 public static SnowFlakeUtil get() { return IdGenHolder.instance; } //初始化构造,无参构造有参函数,默认节点都是0 public SnowFlakeUtil() { this(0L, 0L); } //设置机器节点和数据中心节点数,都是 0-31 public SnowFlakeUtil(long workerId, long datacenterId) { if (workerId > maxWorkerId || workerId < 0) { throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); } if (datacenterId > maxDatacenterId || datacenterId < 0) { throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId)); } this.workerId = workerId; this.datacenterId = datacenterId; } //线程安全的id生成方法 @SuppressWarnings("all") public synchronized long nextId() { //获取当前毫秒数 long timestamp = timeGen(); //如果服务器时间有问题(时钟后退) 报错。 if (timestamp < lastTimestamp) { throw new RuntimeException(String.format( "Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); } //如果上次生成时间和当前时间相同,在同一毫秒内 if (lastTimestamp == timestamp) { //sequence自增,因为sequence只有12bit,所以和sequenceMask相与一下,去掉高位 sequence = (sequence + 1) & sequenceMask; //判断是否溢出,也就是每毫秒内超过4095,当为4096时,与sequenceMask相与,sequence就等于0 if (sequence == 0) { //自旋等待到下一毫秒 timestamp = tilNextMillis(lastTimestamp); } } else { //如果和上次生成时间不同,重置sequence,就是下一毫秒开始,sequence计数重新从0开始累加,每个毫秒时间内,都是从0开始计数,最大4095 sequence = 0L; } lastTimestamp = timestamp; // 最后按照规则拼出ID 64位 // 000000000000000000000000000000000000000000 00000 00000 000000000000 //1位固定整数 time datacenterId workerId sequence return ((timestamp - twepoch) << timestampLeftShift) | (datacenterId << datacenterIdShift) | (workerId << workerIdShift) | sequence; } //比较当前时间和过去时间,防止时钟回退(机器问题),保证给的都是最新时间/最大时间 protected long tilNextMillis(long lastTimestamp) { long timestamp = timeGen(); while (timestamp <= lastTimestamp) { timestamp = timeGen(); } return timestamp; } //获取当前的时间戳(毫秒) protected long timeGen() { return System.currentTimeMillis(); } /** * 获取全局唯一编码 */ public static String getId() { long id = SnowFlakeUtil.get().nextId(); return Long.toString(id); } }
package com.util; import lombok.SneakyThrows; import java.io.ByteArrayOutputStream; import java.io.InputStream; /** ******* * * @author yuSen.zhang * @version 1.0 * @date 2023/04/17 */ public class StringUtil { @SneakyThrows public static String readToStreamToString(InputStream is) { ByteArrayOutputStream bos = new ByteArrayOutputStream(); byte[] buffer = new byte[2048]; int bytesRead; /* 这一步可能会卡住,因为客户端如果传输完数据以后, 如果没有调用socket.shutdownOutput()方法,会导致服务端不知道流是否已传输完毕, 等待我们之前设置的10S流读取时间后,连接就会被自动关掉 如果出现这种情况,服务端可以通过其它方式判断。例如换行符或者特殊字符等,只需要在 while条件中加一个&&判断即可.例如我这里的业务结束标记是字符: ">",那么判断逻辑如下 若客户端调用了shutdownOutput()方法,则不需要这个判断 */ while (!bos.toString().contains(">") && (bytesRead = is.read(buffer)) != -1) { bos.write(buffer, 0, bytesRead); } return bos.toString(); } }
public static void main(String[] args) throws Exception { requestInfoToSocketServer(); } private static void requestInfoToSocketServer() { try (Socket socket = new Socket("127.0.0.1", 8082); PrintWriter out = new PrintWriter(socket.getOutputStream(), true)) { out.println("发送给服务端的消息"); //记得调用shutdownOutput()方法,否则服务端的流读取会一直等待 socket.shutdownOutput(); //开始接收服务端的消息 BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); System.out.println("接收到服务端的回复:" + in.readLine()); } catch (Exception e) { System.out.println("Socket传输数据异常!" + e.getMessage()); } }
若有不完善或者运行失败的情况,可以私信我或者在下方留言哦~