我们知道HTTP协议一般来说只能由客户端发起请求然后服务端响应,服务端是无法主动向客户端推送数据的。然而有时我们又确实有服务端主动推送数据的需求,比如主动弹出一些提示信息、主动弹出系统公告等场景。实现该功能可以采用WebSocket,或者本篇笔记介绍的SSE(Server Side Events)技术。
HTML5中专门提供了EventSource
类,用于处理SSE连接和消息。SSE实现原理其实很简单,首先客户端发出一个普通的HTTP请求,服务端返回的消息头中则包含以下信息:
Content-Type: text/event-stream
Content-Type: text/event-stream
表示返回内容是Server Side Events数据,此时浏览器就不会断开这次HTTP连接,而是持续保持连接并接收数据。具体来说,服务端发送消息时,每一条消息结束会附带两个换行符,以此作为消息分割符,因此SSE也仅支持文本数据。
下面例子我们简单实现了一个实时价格推送功能,客户端和服务端建立SSE连接后,服务端会不断向客户端推送当前的商品价格数据,客户端收到消息后更新页面上的价格表格。
$(function () {
const source = new EventSource('/beginPullData?token=' + uuid());
source.onopen = function () {
console.log('SSE连接建立成功');
};
source.onmessage = function (event) {
const jsonData = event.data;
if (jsonData) {
const data = JSON.parse(jsonData);
console.log(data);
const ele0 = $("<tr><th>商品</th><th>价格</th></tr>");
const ele1 = $("<tr><td>白菜</td><td>" + data.data.cabbagePrice + "</td></tr>");
const ele2 = $("<tr><td>茄子</td><td>" + data.data.eggPlantPrice + "</td></tr>");
const ele3 = $("<tr><td>土豆</td><td>" + data.data.potatoPrice + "</td></tr>");
$('#data-table').empty()
.append(ele0)
.append(ele1)
.append(ele2)
.append(ele3);
}
};
});
代码中我们创建了EventSource
对象,参数是建立SSE连接的URL,其中的token
参数用于在服务端唯一标识客户端,创建SSE连接、关闭SSE连接等操作都需要将这个参数传递给服务端,以供服务端能找到相应的连接进行处理,实际开发中我们可以用UUID等唯一值来实现。
我们这里编写了EventSource
的onopen
和onmessage
回调,分别用来监听SSE连接的建立和服务端推送消息,具体的消息内容从event.data
中取出。如果想要主动结束SSE连接,可以调用source.close()
方法。
如果SSE连接返回200
状态码,EventSource
默认会自动进行重连。
很多服务端Web框架都对SSE功能进行了封装,我们这里以SpringMVC为例进行介绍。
package com.gacfox.demo.controller;
import com.gacfox.demo.model.ApiResult;
import com.gacfox.demo.service.TradingService;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;
@Controller
public class TradingController {
@Resource
private TradingService tradingService;
@GetMapping("/tradingPage")
public String getTradingPage() {
return "trading";
}
@GetMapping(value = "/beginPullData", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter beginPullData(@RequestParam String token) {
SseEmitter sseEmitter = tradingService.initEmitter(token);
CompletableFuture.runAsync(() -> {
tradingService.beginEmitter(token);
});
return sseEmitter;
}
@ResponseBody
@GetMapping("/completePullData")
public ApiResult<?> completePullData(@RequestParam String token) {
tradingService.completeEmitter(token);
return ApiResult.success();
}
}
Controller代码中,/beginPullData
用于创建SSE连接,该方法返回SseEmitter
对象,方法内部我们首先初始化了该对象,然后新启动一个线程来向客户端实时发送消息。/completePullData
用于结束并销毁SSE连接。
package com.gacfox.demo.service;
import com.gacfox.demo.model.ApiResult;
import com.gacfox.demo.model.TradingPrice;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Service("tradingService")
public class TradingService {
private final static Map<String, SseEmitter> emitterCache = new ConcurrentHashMap<>();
public SseEmitter initEmitter(String token) {
if (emitterCache.containsKey(token)) {
return emitterCache.get(token);
} else {
SseEmitter emitter = new SseEmitter(600000L);
emitter.onTimeout(() -> {
log.info("SSE连接超时 Token [{}]", token);
emitterCache.remove(token);
});
emitter.onCompletion(() -> {
log.info("SSE连接关闭 Token [{}]", token);
emitterCache.remove(token);
});
emitter.onError((throwable -> {
log.error("SSE连接异常 Token [{}] 异常信息 [{}]", token, throwable.getLocalizedMessage());
emitterCache.remove(token);
}));
emitterCache.put(token, emitter);
log.info("SSE连接建立 Token [{}]", token);
return emitter;
}
}
public void beginEmitter(String token) {
while (true) {
SseEmitter sseEmitter = emitterCache.get(token);
if (sseEmitter != null) {
try {
// 假装查询了数据库得出实时交易数据
Thread.sleep(1000);
TradingPrice tradingPrice = new TradingPrice();
tradingPrice.setCabbagePrice(new Random().nextInt(100) + ".00");
tradingPrice.setEggPlantPrice(new Random().nextInt(100) + ".00");
tradingPrice.setPotatoPrice(new Random().nextInt(100) + ".00");
ApiResult<TradingPrice> result = ApiResult.success(tradingPrice);
// 通过SSE返回给前端
sseEmitter.send(result);
} catch (Exception e) {
log.error("生成交易数据异常:", e);
}
} else {
break;
}
}
}
public void completeEmitter(String token) {
SseEmitter sseEmitter = emitterCache.get(token);
if (sseEmitter != null) {
sseEmitter.complete();
}
}
}
Service代码中包含了具体对SseEmitter
的操作逻辑,每个SseEmitter
对象对应一个SSE连接,我们这里使用了ConcurrentHashMap
缓存当前所有保持的SSE连接,而这些连接我们使用前端传过来的token
唯一标识。
代码中,创建SseEmitter
时我们指定了超时时间,一个SSE连接超时后服务端会主动关闭连接,并返回HTTP的200
状态码,这种超时结束的SSE连接客户端的EventSource
会自动尝试重连,指定该超时时间的意义是防止客户端已经离线但服务端不知道,SseEmitter
没有及时销毁造成资源无法释放。主动调用sseEmitter.complete()
方法也将主动结束一个SSE连接,但和超时类似,它也会让SSE连接返回200
状态码,默认客户端的EventSource
会自动尝试重连。
此外,我们调用sseEmitter.send()
方法可以向客户端主动推送消息,SpringMVC默认会将对象序列化为JSON返回给客户都。注意SSE仅支持文本消息,如果希望发送二进制消息则必须采用Base64编码等不太好的方式了,此时不如选择使用WebSocket来实现。