作者简介
Chris Xia,携程前端开发专家,关注新技术革新和研发效率提升。
团队热招岗位:
一、概述
二、SSE介绍
2.1 SSE 是什么?
2.2 SSE 的使用场景
三、应用实践
四、方案对比
4.1 服务端推送
4.2 内部SSE实践方案
4.3 SSE 技术选型
五、全链路支持
5.1 链路层
5.2 框架层
5.3 数据层
六、结语
在如今互联网应用中,实时数据推送已成为很多业务场景的关键技术解决方案。携程机票业务作为在线旅游行业的核心场景,面临着航班数据实时性要求高、信息维度复杂等挑战。Server-Sent Events(SSE)技术作为一种基于 HTTP 长连接的服务器推送方案,非常适用于机票业务"服务端主动推送、客户端实时展示"的需求特点。相较于 WebSocket 等双向通信协议,SSE 在实现简单性、协议轻量级和浏览器兼容性等方面具有显著优势,适合机票列表页这类以服务端数据为主导的业务场景。
二、SSE介绍
2.1 SSE 是什么?
Server-Sent Events(SSE)服务器发送事件,是一种基于 HTTP 长连接,允许服务器单向实时推送数据到客户端的技术。
SSE 的工作原理非常简单直观。客户端通过与服务器建立一条持久化的 HTTP 连接,然后服务器使用该连接将数据以事件流(event stream)的形式发送给客户端。这些事件流由多个事件(event)组成,每个事件包含一个标识符、类型和数据字段。客户端通过监听事件流来获取最新的数据,并在接收到事件后进行处理。
2.2 SSE 的使用场景
SSE 使用场景非常广泛,大家熟知的 Chatgpt 对话的交互形式使用的就是 SSE 技术。SSE 在服务器单向实时推送数据的场景非常适用:
实时数据流:如股票市场更新、新闻推送、体育比分更新等。
实时通知:如社交媒体消息提醒、新订单通知等。
仪表盘更新:如系统监控、实时数据统计等。
机票前端首次在核心业务中(机票航班列表)使用 SSE 技术,机票列表页由原先客户端串行请求获取多批次航班数据变为一次请求由服务持续推送数据给客户端。在调研了公司内外各种实现方案,最终联合携程框架、SRE、机票前后端团队共同实现了全公司通用的SSE技术解决方案(详情见下文中的全链路支持部分)。
使用 SSE 前(如下图)
客户端需要发起两次请求获取完整航班数据
服务端采用预取优化:在响应第一次请求时,提前获取第二批数据并缓存至 Redis(降低客户端第二次请求响应的耗时)
客户端发起第二次请求时,可直接获取缓存数据
使用 SSE 后(如下图)
客户端发送一次 SSE 请求,服务端实时推送数据到客户端,服务间上下游同样采用流式传输,实现客户端到服务端全链路流式通信。
SSE 为前后端带来的价值
减少请求传输耗时:无需请求多次,减少了多次请求的传输耗时。
前后端代码结构优化:代码更简洁且易于理解,减少串行请求的回调监听/嵌套。
服务逻辑优化:列表数据移除了 redis 的发布订阅流程,简化了代码架构。
资源利用率提升:减少冗余请求(只有一批数据时,客户端不用再次请求问询服务)。
通过分析请求流程(建立链接 -> 发送请求 -> 响应数据传输)和其原理,发现 HTTP 1.1 和 2 支持链路复用,因此链接建立的次数本质上没有变化。在传输通道和数据压缩方式保持不变的情况下,响应数据传输的耗时也不会有明显变化。
SSE 的核心性能优势在于减少了请求发送的次数,其性能增益取决于具体的使用场景:
当服务端响应耗时大于网络传输耗时,性能提升有限。
当网络传输耗时大于服务端处理耗时,减少请求次数可以显著降低整体延迟。
目前市面上很多服务端推送的技术解决方案:SSE、轮询/串行、Websocket 等,我们从易用性,资源开销,使用场景等多维度对比了几个使用较多的主流方案,最终选择了 SSE。
4.1 服务端推送(SSE、轮询、Websocket)
SSE 使用
简单几行代码实现服务端推送
数据传输格式
SSE 的数据传输规范中有 4 个关键字段 event、data、id 和 retry,用于定义和传输事件数据。
even:定义消息的事件类型,客户端可以根据事件类型触发不同的处理逻辑。
data:消息的主体内容
id:为消息设置一个唯一的 ID,用于客户端断线重连时标识最后接收的消息。
retry:服务端指定客户端在连接断开后重新连接的时间间隔(单位为毫秒)。
这些字段共同构成了 SSE 消息的基本格式,每条消息以两个换行符 nn 结束,确保客户端能够正确解析和处理事件数据。
前端使用样例
// 创建 EventSource
const evtSource = new EventSource("接口地址");
// 监听服务端推送的数据
evtSource.onmessage = function (event) {
console.log("接收到的消息:", event);
};
// 监听连接建立
evtSource.onopen = function () {
console.log("连接已建立");
};
// 监听报错
evtSource.onerror = function (err) {
console.error("发生异常:", err);
};
const http = require("http");
http
.createServer((req, res) => {
// 设置Response Header
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
});
// 不断推送数据给客户端
const pushData = setInterval(() => {
res.write(data);
}, 1000);
req.on("close", () => clearInterval(pushData));
})
.listen(3000);
调研发现公司内部有两套实践方案:
自定义响应式网关,实现网关轮询服务批量获取数据,从而实现流式传输。
前端轮询下沉BFF(服务),前端与BFF建立SSE通道,BFF不断轮询向上游批量获取数据。
在携程企业级网络生态架构下,从通用性和完整度分析对比了两套方案,并没有真正意义上从前到后打通整条链路。仅仅只是简单接入SSE是远远不够的,离不开全链路(SSE技术选型,多层网络架构的适配,服务间的流式通信等等)的支持,所以最终决定联合携程框架、SRE、机票前后端团队共同来实现对SSE全链路的适配,真正意义上实现全公司通用的普适方案。
4.3 SSE 技术选型(原生 SSE vs fetch-event-source)
确定好整体技术方案后,我们在实际测试过程中发现了 2 个 Web 原生 SSE 的局限性问题:
1)仅支持 Get 请求:对需要传递一些复杂请求体的场景不友好。
2)不支持自定义 http header:无法支持自定义 header 透传,鉴权等场景,目前市面大部分解决方案是使用 Cookie 来携带自定义参数。
针对上述问题,调研发现微软开源的 SSE 网络库 @microsoft/fetch-event-source(以下简称 fes)能够很好的解决。fes 是基于 Fetch 和 ReadableStream 来实现的 SSE 功能,旨在提供更加灵活便利的调用方式。
原生 SSE 和 fes 的对比
fetch-event-source 详解
fes 的核心原理是通过 Fetch 发送请求,ReadableStream 读取响应流,在 JS 侧实现字节流数据的解析。通过对比原生 SSE(chromium 内核中 EventSource)和 fes 的代码,发现整体流程与实现方案大致相同,关键区别在于流的解析,原生 SSE 在浏览器内核由 C++实现,fes 在 JS 侧实现。
fes 的流解析
核心方法:getBytes、getLines 和 getMessages
getBytes:通过 ReadableStream 读取响应字节流,获取每个字节块。
getLines :将 getBytes 获取到的字节块解析为 EventSource 行缓冲区,处理这些字节块并解析为行,然后调用 onLine 回调函数处理每一行。
getMessages:创建 EventSourceMessage 对象,将行缓冲区数据解析并进行组装,处理完成后回调给调用方。
export async function getBytes(stream: ReadableStream<Uint8Array>, onChunk: (arr: Uint8Array) => void) {
const reader = stream.getReader();
let result: ReadableStreamDefaultReadResult<Uint8Array>;
while (!(result = await reader.read()).done) {
onChunk(result.value);
}
}
export function getMessages(
onId: (id: string) => void,
onRetry: (retry: number) => void,
onMessage?: (msg: EventSourceMessage) => void
) {
let message = newMessage();
const decoder = new TextDecoder();
// return a function that can process each incoming line buffer:
return function onLine(line: Uint8Array, fieldLength: number) {
if (line.length === 0) {
// empty line denotes end of message. Trigger the callback and start a new message:
onMessage?.(message);
message = newMessage();
} else if (fieldLength > 0) { // exclude comments and lines with no values
// line is of format "<field>:<value>" or "<field>: <value>"
// https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
const field = decoder.decode(line.subarray(0, fieldLength));
const valueOffset = fieldLength + (line[fieldLength + 1] === ControlChars.Space ? 2 : 1);
const value = decoder.decode(line.subarray(valueOffset));
switch (field) {
case 'data':
// if this message already has data, append the new value to the old.
// otherwise, just set to the new value:
message.data = message.data
? message.data + 'n' + value
: value;
break;
case 'event':
message.event = value;
break;
case 'id':
onId(message.id = value);
break;
case 'retry':
const retry = parseInt(value, 10);
if (!isNaN(retry)) {
onRetry(message.retry = retry);
}
break;
}
}
}
}
export function getLines(onLine: (line: Uint8Array, fieldLength: number) => void) {
let buffer: Uint8Array | undefined;
let position: number; // current read position
let fieldLength: number; // length of the `field` portion of the line
let discardTrailingNewline = false;
return function onChunk(arr: Uint8Array) {
if (buffer === undefined) {
buffer = arr;
position = 0;
fieldLength = -1;
} else {
buffer = concat(buffer, arr);
}
const bufLength = buffer.length;
let lineStart = 0; // index where the current line starts
while (position < bufLength) {
if (discardTrailingNewline) {
if (buffer[position] === ControlChars.NewLine) {
lineStart = ++position; // skip to next char
}
discardTrailingNewline = false;
}
let lineEnd = -1; // index of the r or n char
for (; position < bufLength && lineEnd === -1; ++position) {
switch (buffer[position]) {
case ControlChars.Colon:
if (fieldLength === -1) { // first colon in line
fieldLength = position - lineStart;
}
break;
case ControlChars.CarriageReturn:
discardTrailingNewline = true;
case ControlChars.NewLine:
lineEnd = position;
break;
}
}
if (lineEnd === -1) {
break;
}
onLine(buffer.subarray(lineStart, lineEnd), fieldLength);
lineStart = position; // we're now on the next line
fieldLength = -1;
}
if (lineStart === bufLength) {
buffer = undefined; // we've finished reading it
} else if (lineStart !== 0) {
buffer = buffer.subarray(lineStart);
position -= lineStart;
}
}
}
fes 在使用上更适用于现代 Web 应用和 Node.js 环境,解决了原生 EventSource 的局限性,提供了更丰富的功能和更细粒度的控制。
五、全链路支持
企业级应用时,在非直连多层网络架构的环境下,应用SSE不仅需要考虑前后端的使用,还需要考虑链路层、框架层、数据层等多环节的支持。通过不同团队(如框架、SRE、机票前端和后端团队)的协作,开发出一个在公司范围内通用的解决方案。
5.1 链路层
在携程海外上云、多地多活服务架构、多层网络架构的背景下,携程框架及SRE团队提供了大力支持,完整打通了各链路层之间的流式传输。
多层网络架构
7层加速节点(akamai/aws):提供全球范围内的快速数据传输。
流量接入层(slb):确保高可用性和负载均衡。
中间转发节点(虫洞):优化跨Region数据传输路径,减少延迟。
sidecar(envoy/nginx):容器流量管理,增强了应用的可维护性和扩展性。
以Nginx为例
Nginx 会缓存代理服务器的响应(聚合类型),服务推送的数据被 Nginx 缓存到缓冲区,导致客户端没有实时收到数据,而是等到服务所有数据推送完后,客户端才一次性收到了所有数据。
适配方案:
禁用缓存功能,服务端响应时除了设置 SSE 所必须的 Response Header 外,还需要添加非标 Header:X-Accel-Buffering: no,告知 Nginx 不缓存响应,确保数据实时发送到客户端。
值得注意的是,在多层网络架构的环境下 X-Accel-Buffering: no Header 在各层网关之间转发时会丢失,所以在多层网络架构下 Nginx 需要添加 proxy_pass_header X-Accel-Buffering,来确保整条链路上 Header 的传递。
5.2 框架层
前端框架团队基于fes实现SSE网络请求,合并到公司基础网络框架,共享网络优化,监控等基建能力,全公司通用。服务端基于Reactor + Dubbo Streaming实现服务间上下游全链路响应式流式传输。
通过链路层的支持,从前端到服务端实现了统一的全链路流式传输通信,确保数据的高效传输和处理。
5.3 数据层
数据传输需注意代理服务器或 Web 容器(Nginx、Tomcat)对SSE MIME Type:text/event-stream的支持,未正确配置,服务端推送的数据不会经过任何压缩,传输数据大,导致客户端响应耗时增加。
适配方案:根据不同的服务器类型进行配置。
1)Nginx
2)Tomcat
六、结语
本文介绍了 SSE 在携程机票前端全链路企业级应用实践,解决了服务向前端实时推送数据的问题。通过合理的技术选型、流式数据解析和链路传输层优化,从链路层,框架层,数据层全链路实现全公司通用的普适方案。降低了前后端代码复杂度,提升了资源利用率。随着流式通信技术的不断发展,SSE 将在更多场景中(覆盖更多客户端,支持更多网络协议)发挥重要作用,为实时数据处理提供更高效的解决方案。
【推荐阅读】
“携程技术”公众号
分享,交流,成长
推荐站内搜索:最好用的开发软件、免费开源系统、渗透测试工具云盘下载、最新渗透测试资料、最新黑客工具下载……
还没有评论,来说两句吧...