相关文章推荐

什么是SSE

实时性获取数据的解决方案

对于某些需要实时更新的数据(例如Facebook/Twitter 更新、估价更新、新的博文、赛事结果等)来说,有这么几种解决方案:
Polling(轮询)
在客户端重复的向服务端发送新请求。如果服务器没有新的数据更动,关闭本次连接。然后客户端在稍等一段时间之后,再次发起新请求,一直重复这样的步骤。
Long-polling(长轮询)
在长轮询中,客户端发送一个请求到服务端。如果服务端没有新的数据更动,那么本次连接将会被保持,直到等待到更新后的数据,返回给客户端并关闭这个连接。
Server-Sent Events
SSE类似于长轮询的机制,但是它在每一次的连接中,不只等待一次数据的更动。客户端发送一个请求到服务端 ,服务端保持这个请求直到一个新的消息准备好,将消息返回至客户端,此时不关闭连接,仍然保持它,供其它消息使用。SSE的一大特色就是重复利用一个连接来处理每一个消息(又称event)。
WebSocket
WebSocket不同于以上的这些技术,因为它提供了一个真正意义上的双向连接。WebSocket是HTML5中非常强大的新特性,已经得到广泛应用。

SSE介绍

一般来说HTTP协议是要客户端先请求服务器,服务器才能响应给客户端,无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(event-streaming)。
也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。
SSE 就是利用这种机制,使用流信息向客户端推送信息。

  1. 客户端请求建立事件流类型的连接,即Request Headers Accept = text/event-stream。
  1. 服务端响应请求,并将Response Headers Content-Type设置为text/event-stream,证明数据将以这种类型传送
  1. 服务端如果有数据就会发送给客户端使用示例

SSE的推送数据格式

event: 事件类型,服务端可以自定义,默认是message事件
Id: 每一条事件流的ID,在失败重传事件流的时候有重要作用
retry: 浏览器连接断开之后重连的间隔时间,单位:毫秒,在自动重新连接的过程中,之前收到的最后一个事件流ID会被发送到服务端。
data: 发送的数据
每个字段K-V后面用"\n"结尾,如:

SSE的使用

借助webflux实现定时

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

springMvc支持sse
服务端代码

    @GetMapping("/sse")
    @CrossOrigin
    public SseEmitter handleSse() {
        SseEmitter emitter = new SseEmitter();
        Flux.interval(Duration.ofSeconds(1))
                .map(i -> "Server-Sent Event #" + i)
                .doOnCancel(() -> emitter.complete())
                .subscribe(
                        data -> {
                                emitter.send(data);  
                        error -> emitter.completeWithError(error),
                        () -> emitter.complete()
        return emitter;

postMan请求
效果
image.png

SSE原理分析

可以发现与普通http请求不同地方在于请求过程中有SseEmitter这个对象
首先返回这个对象,后续通过SseEmitter对浏览器推送消息

一、springMvc对返回的SseEmitter处理

首先介绍返回值的HandlerMethodReturnValueHandler

org.springframework.web.method.support.HandlerMethodReturnValueHandlerComposite#selectHandler

@Nullable
private HandlerMethodReturnValueHandler selectHandler(@Nullable Object value, MethodParameter returnType) {
boolean isAsyncValue = isAsyncReturnValue(value, returnType);
for (HandlerMethodReturnValueHandler handler : this.returnValueHandlers) {
    if (isAsyncValue && !(handler instanceof AsyncHandlerMethodReturnValueHandler)) {
        continue;
    if (handler.supportsReturnType(returnType)) {
        return handler;
return null;

springMvc内置了很多处理器
image.png
简单介绍其中的几种处理器

  1. 直接返回字符串使用ViewNameMethodReturnValueHandler会根据字符串找到对应ModelAndView渲染
  2. RequestResponseBodyMethodProcessor声明注解@ResponseBody使用此类处理,会直接返回字符串,如果返回的是字符串,直接返回,如果是对象序列化返回字符串
  3. 而我们的SseEmitter显然需要由ResponseBodyEmitterReturnValueHandler进行处理

二、ResponseBodyEmitterReturnValueHandler

public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
            ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {<!-- -->
        // 返回值为空处理
        ...
        HttpServletResponse response = webRequest.getNativeResponse(HttpServletResponse.class);
        ServerHttpResponse outputMessage = new ServletServerHttpResponse(response);
        // 返回值为ResponseEntity<ResponseBodyEmitter> 或 ResponseEntity<SseEmitter>时的处理
        ...
        ServletRequest request = webRequest.getNativeRequest(ServletRequest.class);
        ResponseBodyEmitter emitter;
        if (returnValue instanceof ResponseBodyEmitter) {<!-- -->
            emitter = (ResponseBodyEmitter) returnValue;
        }else {<!-- -->
            // 这里是响应式编程解析的部分,暂时不去了解
            ....
        // 默认空实现,SseEmitter中覆盖重写,设置了响应头类型为MediaType.TEXT_EVENT_STREAM
        emitter.extendResponse(outputMessage);
        // 流式场景不需要对响应缓存
        ShallowEtagHeaderFilter.disableContentCaching(request);
        // 包装响应以忽略进一步的头更改,头将在第一次写入时刷新
        outputMessage = new StreamingServletServerHttpResponse(outputMessage);
        HttpMessageConvertingHandler handler;
        try {<!-- -->
            // 这里使用了DeferredResult
            DeferredResult<?> deferredResult = new DeferredResult<>(emitter.getTimeout());
             //设置异步请求,可以在别的线程进行对response进行恢复
            WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);
            handler = new HttpMessageConvertingHandler(outputMessage, deferredResult);
        catch (Throwable ex) {<!-- -->
            emitter.initializeWithError(ex);
            throw ex;
        // 这块是主要逻辑
        emitter.initialize(handler);

这里介绍DeferredResult

DeferredResult

1.为什么要使用DeferredResult?

建立一次连接,让他们等待尽可能长的时间。这样同时如果有新的数据到达服务器,服务器可以直接返回响应。通过这种方式,我们绝对可以减少所涉及的请求和响应周期的数量。

2.DeferredResult执行逻辑

浏览器发起异步请求
请求到达服务端被挂起(使用浏览器查看请求状态,此时为pending)
向浏览器进行响应,分为两种情况:
3.1 调用DeferredResult.setResult(),请求被唤醒,返回结果
3.2 超时,返回一个你设定的结果
浏览得到响应,再次重复1,处理此次响应结果

DeferredResult使用示例

下面代码逻辑是只要有告警状态的变更,就从数据库中获取所有设备的最新状态,然后和redis中保存的比较,如果有更新,就调setResult方法,把结果返回给客户端。如果没有更新,就在20秒后跳出循环。

  @GetMapping("/defferResult")
    @CrossOrigin
    @ResponseBody
    public DeferredResult defferResult() {
        DeferredResult deferredResult = new DeferredResult(3*1000l);
        Executors.newSingleThreadExecutor().execute(()->{
           try {
               sleep(4000l);
           } catch (InterruptedException e) {
               e.printStackTrace();
             deferredResult.setResult("sucess");
        });
        return deferredResult;

如果超时会抛出AsyncRequestTimeoutException异常,并且由springmvc处理,对于返回值,也是由springMvc进行再次处理。
例如抛出超时异常,可以被我们进行处理
image.png
因此DeferredResult会通过两次springMvc的解析,一次是返回自身deferredResult,一次是设置是设置结果。
也就是说,利用了DeferredResult对http请求起到挂起作用。

三、初始化HttpMessageConvertingHandler

	handler = new HttpMessageConvertingHandler(outputMessage, deferredResult);

根据前面我们看出来
后续消息的发送都通过这个handler

 synchronized void initialize(Handler handler) throws IOException {<!-- -->
        this.handler = handler;
        try {<!-- -->
            // 遍历之前发送的数据
            for (DataWithMediaType sendAttempt : this.earlySendAttempts) {<!-- -->
                // 这里会调用handler的send方法
                sendInternal(sendAttempt.getData(), sendAttempt.getMediaType());
        }finally {<!-- -->
            this.earlySendAttempts.clear();
        // 数据是否已经发完了
        if (this.complete) {<!-- -->
            // 有没有报错
            if (this.failure != null) {<!-- -->
                this.handler.completeWithError(this.failure);
            }else {<!-- -->
                // 这里最终会调用DefferedResult.setResult
                this.handler.complete();
        }else {<!-- -->
            this.handler.onTimeout(this.timeoutCallback);
            this.handler.onError(this.errorCallback);
            this.handler.onCompletion(this.completionCallback);

这里需要注意的地方,在handler没有初始化反正的时候,调用SseEmitter的send发送的消息都暂存到earlySendAttempts,当初始化完成后,首先将之前暂存的消息进行发送

     this.handler.onTimeout(this.timeoutCallback);
     this.handler.onError(this.errorCallback);
     this.handler.onCompletion(this.completionCallback);

分别设置DeferredResult的onTimeout,onError,onCompletion方法。
总体来说sse方式,还是利用了DeferredResult异步响应的方式

三、消息发送

首先调用emitter的send方法

@Override
	public void send(Object object) throws IOException {
		send(object, null);
@Override
	public void send(Object object, @Nullable MediaType mediaType) throws IOException {
		send(event().data(object, mediaType));

这里我们发现,这种发送消息的方式事件的消息体只有data属性,而没有id,和retry属性。
如果需要id等属性可以这样写

@GetMapping("/sse")
    @CrossOrigin
    @ResponseBody
    public SseEmitter handleSse() {
        SseEmitter emitter = new SseEmitter(20000l);
        AtomicInteger atomicInteger = new AtomicInteger();
        Flux.interval(Duration.ofSeconds(1))
                .map(i -> "Server-Sent Event #" + i)
                .doOnCancel(() -> emitter.complete())
                .subscribe(
                        data -> {
                            try {
                                emitter.send(SseEmitter.event()
                                        .id(String.valueOf(atomicInteger.getAndAdd(1)))
                                        .name("message")
                                        .data("Message " + atomicInteger.get()));
                        //        emitter.complete();
                            } catch (IOException e) {
                                emitter.completeWithError(new RuntimeException("发送错误"));
                        error -> emitter.completeWithError(error),
                        () -> emitter.complete()
        return emitter;

客户端收到id的话,如果消息未接受完毕连接就断开了,那么可以再次通过最后的id请求服务器,服务器根据id继续发送消息。进行短线重连的操作
构建事件进行发送

public void send(SseEventBuilder builder) throws IOException {
		Set<DataWithMediaType> dataToSend = builder.build();
		synchronized (this) {
			for (DataWithMediaType entry : dataToSend) {
				super.send(entry.getData(), entry.getMediaType());

image.png
分别发送三次,正好构成了
data:hello world\n\n一个消息的数据格式,浏览器可以对应的进行解析

private class HttpMessageConvertingHandler implements ResponseBodyEmitter.Handler {<!-- -->
        ...
        @SuppressWarnings("unchecked")
        private <T> void sendInternal(T data, @Nullable MediaType mediaType) throws IOException {<!-- -->
            // RequestMappingHandlerAdapter实例化的时候会设置,例如ByteArrayHttpMessageConverter,StringHttpMessageConverter
            for (HttpMessageConverter<?> converter : ResponseBodyEmitterReturnValueHandler.this.sseMessageConverters) {<!-- -->
                if (converter.canWrite(data.getClass(), mediaType)) {<!-- -->
                    // 将消息写入输出流
                    ((HttpMessageConverter<T>) converter).write(data, mediaType, this.outputMessage);
                    this.outputMessage.flush();
                    return;
            throw new IllegalArgumentException("No suitable converter for " + data.getClass());

在http请求挂起期间,还可以对ServerHttpResponse进行写入操作,不断的发送给浏览器。

四、结束http请求

发送完成执行emitter.complete(),完成请求,结束对http请求的挂起。
发送中间出现错误,将错误信息发送给浏览器

五、关于超时时间的思考

SseEmitter emitter = new SseEmitter(1000l);

构造SseEmitter时可以指定超时时间,实际就是DeferredResult的超时时间,
当到达超时时间,连接自动释放掉,因此需要设置适当的超时时间

java作为客户端使用sse

需要借助okhttp3

   <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp-sse</artifactId>
            <version>3.14.9</version>
        </dependency>

编写响应监听器

package com.unfbx.chatgpt.sse;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import java.util.Objects;
 * 描述: sse
 * @author https:www.unfbx.com
 * 2023-02-28
@Slf4j
public class ConsoleEventSourceListener extends EventSourceListener {
    @Override
    public void onOpen(EventSource eventSource, Response response) {
        log.info("OpenAI建立sse连接...");
    @Override
    public void onEvent(EventSource eventSource, String id, String type, String data) {
        log.info("OpenAI返回数据:{}", data);
        if (data.equals("[DONE]")) {
            log.info("OpenAI返回数据结束了");
            return;
    @Override
    public void onClosed(EventSource eventSource) {
        log.info("OpenAI关闭sse连接...");
    @SneakyThrows
    @Override
    public void onFailure(EventSource eventSource, Throwable t, Response response) {
        if(Objects.isNull(response)){
            log.error("OpenAI  sse连接异常:{}", t);
            eventSource.cancel();
            return;
        ResponseBody body = response.body();
        if (Objects.nonNull(body)) {
            log.error("OpenAI  sse连接异常data:{},异常:{}", body.string(), t);
        } else {
            log.error("OpenAI  sse连接异常data:{},异常:{}", response, t);
        eventSource.cancel();

发送请求,并且设置监听器对sse事件进行监听

  public static void main(String[] args) {
        try {
            OkHttpClient okHttpClient = new OkHttpClient
                    .Builder()
                    .connectTimeout(30, TimeUnit.SECONDS)
                    .writeTimeout(30, TimeUnit.SECONDS)
                    .readTimeout(30, TimeUnit.SECONDS)
                    .build();
            EventSource.Factory factory = EventSources.createFactory(okHttpClient);
            Request request = new Request.Builder()
                    .url("http://localhost:8062/user/sse")
                    .get()
                    .build();
            //创建事件
            EventSource eventSource = factory.newEventSource(request, new ConsoleEventSourceListener());
        } catch (Exception e) {
            log.error("请求参数解析异常:{}", e);
            e.printStackTrace();
                                    SSE是一种可以主动从服务端推送消息的技术SSE的本质其实就是一个HTTP的长连接,只不过它给客户端发送的不是一次性的数据包,而是一个stream流,格式为text/event-stream。所以客户端不会关闭连接,会一直等着服务器发过来的新的数据流。
SSE 使用 HTTP 协议,现有的服务器软件都支持。WebSocket 是一个独立协议。
	SSE 属于轻量级,使用简单;WebSocket 协议相对复杂。
	SSE 默认支持断线重连,WebSocket 需要自己实现。
	SSE 一般只用来传
首先,我们进入到适配器调用处理器的代码,看重要的部分代码
	@Nullable
	protected ModelAndView invokeHandlerMethod(HttpServletRequest request,
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.an...
                                    SSE简介
SSEServer-sent Events )是 WebSocket 的一种轻量代替方案,使用 HTTP 协议。
严格地说,HTTP 协议是没有办法做服务器推送的,但是当服务器向客户端声明接下来要发送流信息时,客户端就会保持连接打开,SSE 使用的就是这种原理。
SSEServer-Sent Events 的简称, 是一种服务器端到客户端(浏览器)的单项消息推送。
相比于 WebSocket,SSE 简单不少,服务器端和客户端工做量都要小不少、简单不少,同时实现的功能也有局限。
                                    消息推送(push)通常是指网站的运营工作等人员,通过某种工具对用户当前网页或移动设备APP进行的主动消息推送。推送的场景比较多,比如有人关注我的公众号,这时我就会收到一条推送消息,以此来吸引我点击打开应用,消息推送一般又分为web端消息推送和移动端消息推送。另外注意主流浏览器只支持6个连接我有7种实现web实时消息推送的方案,7种短轮询客户端定期向服务器发送请求。如果服务器有更新,它会向客户端发送响应并关闭连接。如果服务器没有更新,它也会向客户端发送一个响应并关闭连接。长轮询客户端向服务器发送请求。...
后端主动推送消息给前端,之前做过的实时聊天系统,是使用webscoket实现前后端的双向通信,这次只需要接收后端推送的消息,故使用Sse实现
SSE(Server-Sent Events):是一种基于HTTP的,以流的形式由服务端持续向客户端发送数据的技术
createEventSource() {
      if (window.EventSource) {
        this.source = new EventSource('/sse/connect/')
                                    详细了解后得知SSE是基于http协议,无需导入其他依赖,特点是服务端主动给客户端推送消息(单向),适合浏览器端只做数据接收。但是在 sse 的场景下,客户端发起请求,连接一直保持,服务端有数据就可以返回数据给客户端,这个返回可以是多次间隔的方式。从 sse 的特点出发,我们可以大致的判断出它的应用场景,需要轮询获取服务端最新数据的 case 下,多半是可以用它的。了解 websocket 的小伙伴,可能也知道它也是长连接,可以推送信息,但是它们有一个明显的区别。SSE 最大的特点,可以简单规划为两个。
                                    SSE主要解决了客户端与服务器之间的单向实时通信需求(例如ChatGpt回答的流式输出),相较于WebSocket(双向实时),它更加轻量级且易于实现。其次,SSE在跨域通信时可能遇到一些限制,需要进行额外的配置。为了实现这种实时通信,多种技术应运而生,如WebSocket、长轮询和Server-Sent EventsSSE)。在本文中,我们将重点探讨Server-Sent Events,一种基于HTTP的实时通信协议。在实际应用中,SSE已经在实时通知、聊天应用和服务器监控等场景中得到广泛应用。
 
推荐文章