目录
1、前言
2、什么是SSE
2.1、技术原理
2.2、SSE和WebSocket
2.2.1、SSE (Server-Sent Events)
2.2.2、WebSocket
2.2.3、选择 SSE 还是 WebSocket?
3、Springboot快速集成
3.1、添加依赖
3.2、创建SSE控制器
3.2.1、SSEmitter创建实例
3.2.2、SSEmitter API
3.2.3、SSEmitter注册回调
4、小结
如果项目中有一个场景,假设对接ChatGPT或对接天气类接口的时候,需要服务端主动往客户端进行消息推送或推流。通常的做法有:
那么今天再介绍另一种机制:SSE,也就是服务器发送事件机制。
SSE(Server-Sent Events)是一种允许服务器向客户端推送实时数据的技术,它建立在 HTTP 和简单文本格式之上,提供了一种轻量级的服务器推送方式,通常也被称为“事件流”(Event Stream)。他通过在客户端和服务端之间建立一个长连接,并通过这条连接实现服务端和客户端的消息实时推送。
SSE是建立在HTTP协议之上的,所以原理比较简单,也与HTTP原理类似:
1)建立连接:
客户端通过普通的 HTTP 请求向服务器发起连接请求,类似于普通的 Web 请求。这个请求的关键在于使用了 text/event-stream 的 MIME 类型,告知服务器该请求是 SSE 请求。
httpCopy codeGET /sse/stream HTTP/1.1 Host: example.com Accept: text/event-stream
2)服务器处理请求:
服务器接收到 SSE 请求后,会在连接上保持打开状态,不会立即关闭。这是与普通的请求-响应模式的主要不同之处。服务器端通过这个持久连接向客户端发送数据。
3)数据推送:
服务器端通过打开的连接,周期性地向客户端发送消息。这些消息以文本的形式发送,并遵循一定的格式,通常以 data 字段表示消息内容。
httpCopy codeHTTP/1.1 200 OK Content-Type: text/event-stream data: This is a message\n\n
上述例子中,data 字段包含了实际的消息内容,两个换行符(\n\n)表示消息的结束。
4)客户端接收消息:
客户端通过监听连接的 message 事件来接收服务器推送的消息。一旦接收到消息,客户端可以采取相应的操作,例如更新界面内容。
javascriptCopy codeconst eventSource = new EventSource('/sse/stream'); eventSource.onmessage = function (event) { console.log('Received message:', event.data); // 处理消息,例如更新界面 };
5)连接关闭:
当服务器端不再需要向客户端推送消息时,或者发生错误时,服务器可以关闭连接。客户端也可以通过调用 eventSource.close() 来关闭连接。
提到SSE,那自然要提一下WebSocket了。WebSocket是一种HTML5提供的全双工通信协议(指可以在同一时间内允许两个设备之间进行双向发送和接收数据的通信协议),基于TCP协议,并复用HTTP的握手通道(允许一次TCP连接中传输多个HTTP请求和相应),常用于浏览器与服务器之间的实时通信。
SSE和WebSocket尽管功能类似,都是用来实现服务器向客户端实时推送数据的技术,但还是有一定区别:
Springboot项目中,sse不需要额外添加依赖,引用了web相关的springboot依赖即可:
org.springframework.boot spring-boot-starter-web
这里简单创建一个控制器类,用于处理SSE请求。在JAVA中通常使用SSEmitter来实现sse的消息推送。
package com.example.springbootsse.controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; import java.util.Date; @RestController @RequestMapping("/sse") public class SSEmitterController { @GetMapping("/stream") public SseEmitter stream() { // 用于创建一个 SSE 连接对象 SseEmitter emitter = new SseEmitter(); // 在后台线程中模拟实时数据 new Thread(() -> { try { for (int i = 0; i < 10; i++) { // emitter.send() 方法向客户端发送消息 // 使用SseEmitter.event()创建一个事件对象,设置事件名称和数据 emitter.send(SseEmitter.event().name("message").data("[" + new Date() + "] Data #" + i)); Thread.sleep(1000); } // 数据发送完成后,关闭连接 emitter.complete(); } catch (IOException | InterruptedException e) { // 发生错误时,关闭连接并报错 emitter.completeWithError(e); } }).start(); return emitter; } }
查看执行结果,可以看到每一秒服务端都会自动像客户端推送messag消息:
我们来关注下SSEmitter这个类,SseEmitter 是 Spring Framework 中用于实现 Server-Sent Events(SSE)的一个类。它允许服务器向客户端推送数据,通过建立一个持久连接,实现服务器向客户端的实时单向通信。在 Spring 框架中,SseEmitter 类通常用于处理 SSE 请求,推送事件给客户端。
SSEmitter提供了两个构造函数用于创建实例。在创建实例时,我们可以指定超时时间timeout,如果传0或使用无参构造,则表示永不过期。连接超时是指在一段时间内没有数据传输时,连接将被认为是超时的,并自动关闭。
除此以外,SSEmitter还提供了几种API,如上面例子中使用到的:
SseEmitter 可以通过注册回调函数来处理服务器端发往客户端的事件。当服务器端有新的数据需要推送给客户端时,注册的回调函数将会被调用。SSEmitter继承了ResponseBodyEmitter,提供的一系列注册回调函数有:
示例代码:
package com.example.springbootsse.controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; import java.util.Date; @RestController @RequestMapping("/sse") public class SSEmitterController { @GetMapping("/stream") public SseEmitter stream() { // 3S超时 SseEmitter emitter = new SseEmitter(10000L); // 注册回调函数,处理服务器向客户端推送的消息 emitter.onCompletion(() -> { System.out.println("Connection completed"); // 在连接完成时执行一些清理工作 }); emitter.onTimeout(() -> { System.out.println("Connection timeout"); // 在连接超时时执行一些处理 emitter.complete(); }); // 在后台线程中模拟实时数据 new Thread(() -> { try { for (int i = 0; i < 10; i++) { emitter.send(SseEmitter.event().name("message").data("[" + new Date() + "] Data #" + i)); Thread.sleep(1000); } emitter.complete(); // 数据发送完成后,关闭连接 } catch (IOException | InterruptedException e) { emitter.completeWithError(e); // 发生错误时,关闭连接并报错 } }).start(); return emitter; } }
其实SSE已经出来很久了,但是熟知他的人却很少,大多数项目中还是直接使用了websocket技术。直到最近ChatGPT火了之后,很多项目需要对接GPT进行实时推流,才逐渐又被人提起。所以借此篇文章给自己扫盲一下。