SSE服务端推送

我们知道HTTP协议一般来说只能由客户端发起请求然后服务端响应,服务端是无法主动向客户端推送数据的。然而有时我们又确实有服务端主动推送数据的需求,比如主动弹出一些提示信息、主动弹出系统公告等场景。实现该功能可以采用WebSocket,或者本篇笔记介绍的SSE(Server Side Events)技术。

SSE实现原理

HTML5中专门提供了EventSource类,用于处理SSE连接和消息。SSE实现原理其实很简单,首先客户端发出一个普通的HTTP请求,服务端返回的消息头中则包含以下信息:

Content-Type: text/event-stream

Content-Type: text/event-stream表示返回内容是Server Side Events数据,此时浏览器就不会断开这次HTTP连接,而是持续保持连接并接收数据。具体来说,服务端发送消息时,每一条消息结束会附带两个换行符,以此作为消息分割符,因此SSE也仅支持文本数据。

SSE客户端实现

下面例子我们简单实现了一个实时价格推送功能,客户端和服务端建立SSE连接后,服务端会不断向客户端推送当前的商品价格数据,客户端收到消息后更新页面上的价格表格。

$(function () {
    const source = new EventSource('/beginPullData?token=' + uuid());
    source.onopen = function () {
        console.log('SSE连接建立成功');
    };
    source.onmessage = function (event) {
        const jsonData = event.data;
        if (jsonData) {
            const data = JSON.parse(jsonData);
            console.log(data);
            const ele0 = $("<tr><th>商品</th><th>价格</th></tr>");
            const ele1 = $("<tr><td>白菜</td><td>" + data.data.cabbagePrice + "</td></tr>");
            const ele2 = $("<tr><td>茄子</td><td>" + data.data.eggPlantPrice + "</td></tr>");
            const ele3 = $("<tr><td>土豆</td><td>" + data.data.potatoPrice + "</td></tr>");
            $('#data-table').empty()
                .append(ele0)
                .append(ele1)
                .append(ele2)
                .append(ele3);
        }
    };
});

代码中我们创建了EventSource对象,参数是建立SSE连接的URL,其中的token参数用于在服务端唯一标识客户端,创建SSE连接、关闭SSE连接等操作都需要将这个参数传递给服务端,以供服务端能找到相应的连接进行处理,实际开发中我们可以用UUID等唯一值来实现。

我们这里编写了EventSourceonopenonmessage回调,分别用来监听SSE连接的建立和服务端推送消息,具体的消息内容从event.data中取出。如果想要主动结束SSE连接,可以调用source.close()方法。

如果SSE连接返回200状态码,EventSource默认会自动进行重连。

SSE服务端实现

很多服务端Web框架都对SSE功能进行了封装,我们这里以SpringMVC为例进行介绍。

package com.gacfox.demo.controller;

import com.gacfox.demo.model.ApiResult;
import com.gacfox.demo.service.TradingService;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;

@Controller
public class TradingController {
    @Resource
    private TradingService tradingService;

    @GetMapping("/tradingPage")
    public String getTradingPage() {
        return "trading";
    }

    @GetMapping(value = "/beginPullData", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    public SseEmitter beginPullData(@RequestParam String token) {
        SseEmitter sseEmitter = tradingService.initEmitter(token);
        CompletableFuture.runAsync(() -> {
            tradingService.beginEmitter(token);
        });
        return sseEmitter;
    }

    @ResponseBody
    @GetMapping("/completePullData")
    public ApiResult<?> completePullData(@RequestParam String token) {
        tradingService.completeEmitter(token);
        return ApiResult.success();
    }
}

Controller代码中,/beginPullData用于创建SSE连接,该方法返回SseEmitter对象,方法内部我们首先初始化了该对象,然后新启动一个线程来向客户端实时发送消息。/completePullData用于结束并销毁SSE连接。

package com.gacfox.demo.service;

import com.gacfox.demo.model.ApiResult;
import com.gacfox.demo.model.TradingPrice;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Service("tradingService")
public class TradingService {
    private final static Map<String, SseEmitter> emitterCache = new ConcurrentHashMap<>();

    public SseEmitter initEmitter(String token) {
        if (emitterCache.containsKey(token)) {
            return emitterCache.get(token);
        } else {
            SseEmitter emitter = new SseEmitter(600000L);
            emitter.onTimeout(() -> {
                log.info("SSE连接超时 Token [{}]", token);
                emitterCache.remove(token);
            });
            emitter.onCompletion(() -> {
                log.info("SSE连接关闭 Token [{}]", token);
                emitterCache.remove(token);
            });
            emitter.onError((throwable -> {
                log.error("SSE连接异常 Token [{}] 异常信息 [{}]", token, throwable.getLocalizedMessage());
                emitterCache.remove(token);
            }));
            emitterCache.put(token, emitter);
            log.info("SSE连接建立 Token [{}]", token);
            return emitter;
        }
    }

    public void beginEmitter(String token) {
        while (true) {
            SseEmitter sseEmitter = emitterCache.get(token);
            if (sseEmitter != null) {
                try {
                    // 假装查询了数据库得出实时交易数据
                    Thread.sleep(1000);
                    TradingPrice tradingPrice = new TradingPrice();
                    tradingPrice.setCabbagePrice(new Random().nextInt(100) + ".00");
                    tradingPrice.setEggPlantPrice(new Random().nextInt(100) + ".00");
                    tradingPrice.setPotatoPrice(new Random().nextInt(100) + ".00");
                    ApiResult<TradingPrice> result = ApiResult.success(tradingPrice);

                    // 通过SSE返回给前端
                    sseEmitter.send(result);
                } catch (Exception e) {
                    log.error("生成交易数据异常:", e);
                }
            } else {
                break;
            }
        }
    }

    public void completeEmitter(String token) {
        SseEmitter sseEmitter = emitterCache.get(token);
        if (sseEmitter != null) {
            sseEmitter.complete();
        }
    }
}

Service代码中包含了具体对SseEmitter的操作逻辑,每个SseEmitter对象对应一个SSE连接,我们这里使用了ConcurrentHashMap缓存当前所有保持的SSE连接,而这些连接我们使用前端传过来的token唯一标识。

代码中,创建SseEmitter时我们指定了超时时间,一个SSE连接超时后服务端会主动关闭连接,并返回HTTP的200状态码,这种超时结束的SSE连接客户端的EventSource会自动尝试重连,指定该超时时间的意义是防止客户端已经离线但服务端不知道,SseEmitter没有及时销毁造成资源无法释放。主动调用sseEmitter.complete()方法也将主动结束一个SSE连接,但和超时类似,它也会让SSE连接返回200状态码,默认客户端的EventSource会自动尝试重连。

此外,我们调用sseEmitter.send()方法可以向客户端主动推送消息,SpringMVC默认会将对象序列化为JSON返回给客户都。注意SSE仅支持文本消息,如果希望发送二进制消息则必须采用Base64编码等不太好的方式了,此时不如选择使用WebSocket来实现。

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。
Copyright © 2017-2024 Gacfox All Rights Reserved.
Build with NextJS | Sitemap