在生产环境下,服务端的SseEmitter对象在初始化时可以填入参数,以保证其存活时间,一旦超时,客户端会自动断线重连,在这个过程中如果没有做消息队列等缓存手段,就可能会丢数据。
但是如果设置SseEmitter存活时间为永久(参数填0),就会导致服务端无法感知客户端下线,从而使服务端维持连接池会越来越大无法释放。
导致这一问题的出现,就是服务端在发送消息后没有直接可用的方法可以感知客户端是否接到消息,我们可以手写一些判断解决上述问题。依据的原理是当客户端无法接受消息时,SseEmitter对象在send一次之后sendFailed状态会变为True,这时候就可以剔除。同时在订阅时用此判断可以减少重复创建的机会(获取对象属性的方法式借用网上大佬的代码)
public boolean checkSseConnectAlive(SseEmitter sseEmitter) { if (sseEmitter == null) { return false; } return !(Boolean) getFieldInstance(sseEmitter, "sendFailed") && !(Boolean) getFieldInstance(sseEmitter, "complete"); } public static Object getFieldInstance(Object obj, String fieldPath) { String fields[] = fieldPath.split("#"); for (String field : fields) { obj = getField(obj, obj.getClass(), field); if (obj == null) { return null; } } return obj; } public static Object getField(Object obj, Class> clazz, String fieldName) { for (; clazz != Object.class; clazz = clazz.getSuperclass()) { try { Field field; field = clazz.getDeclaredField(fieldName); field.setAccessible(true); return field.get(obj); } catch (Exception e) { } } return null; }
以下为关键步骤:
1、首先设置服务端生存时间为永久
2、建立定时器,固定时间发送消息用来检测客户端是否离线
3、订阅判断
附完整代码:
private static MapsseCache = new ConcurrentHashMap<>(); public SseEmitter subscribe(String sseClientId) { if (!NotNullCheck.str(sseClientId)) { return null; } SseEmitter sseEmitter = null; SseEmitter getClientEmitter = sseCache.get(sseClientId); if (getClientEmitter == null || !checkSseConnectAlive(getClientEmitter)) { // 生存时间设置 默认30s sseEmitter = new SseEmitter(); // 设置前端的重试时间 try { sseEmitter.send(SseEmitter.event().reconnectTime(Settings.sseClientReconnectTime).data("连接成功")); sseCache.put(sseClientId, sseEmitter); System.out.println("add " + sseClientId); sseEmitter.onTimeout(() -> { System.out.println(sseClientId + "超时"); sseCache.remove(sseClientId); }); sseEmitter.onCompletion(() -> System.out.println("已关闭连接:" + sseClientId)); } catch (Exception e) { System.err.println(e.getMessage()); } } else { System.out.println("已存在!"); sseEmitter = getClientEmitter; } return sseEmitter; } public boolean sendOneClientMessage(String sseClientId,Object msg) throws IOException { SseEmitter sseEmitter = sseCache.get(sseClientId); if (sseEmitter != null && checkSseConnectAlive(sseEmitter)) { sseEmitter.send(SseEmitter.event().data(msg)); return true; } else { return false; } } public void sendAllClientMsg(Object msg) { if (sseCache != null && !sseCache.isEmpty()) { sseCache.entrySet().forEach(sseEmitterSet -> { String sseClientId = sseEmitterSet.getKey(); SseEmitter sseEmitter = sseEmitterSet.getValue(); if (sseEmitter != null) { if (checkSseConnectAlive(sseEmitter)) { SseEmitter.SseEventBuilder sseEventBuilder = SseEmitter .event() .data(msg) .reconnectTime(Settings.sseClientReconnectTime); try { sseEmitter.send(sseEventBuilder); } catch (Exception e) { System.err.println("SSE check sseClientId send error:" + sseClientId); } } else { sseEmitter.complete(); sseCache.remove(sseClientId); System.out.println("SSE check sseClientId offline:" + sseClientId); } } }); } } public boolean close(String sseClientId) { SseEmitter sseEmitter = sseCache.get(sseClientId); if (sseEmitter != null) { System.out.println("SSE active close connection :" + sseClientId); sseEmitter.complete(); sseCache.remove(sseClientId); return true; } else { return false; } } public boolean checkSseConnectAlive(SseEmitter sseEmitter) { if (sseEmitter == null) { return false; } return !(Boolean) GetObjField.getFieldInstance(sseEmitter, "sendFailed") && !(Boolean) GetObjField.getFieldInstance(sseEmitter, "complete"); } public void aliveCheck(ApplicationArguments args) throws Exception { new Timer().schedule(new TimerTask() { @Override public void run() { sendAllClientMsg("Check Alive"); } }, 3 * 1000, 3 * 1000); }
当然这只是基础做法,如果要保证时效和可靠性还需要其他工具来辅助,欢迎各位大佬留言相互学习。