一、为什么需要 SSE?
SSE(Server-Sent Events,服务器推送事件) 是一种基于 HTTP 的单向实时通信协议。与 WebSocket 的全双工不同,SSE 是 服务端 → 客户端 的单向推送,天然适合以下场景:
- 💬 实时聊天:接收对方发送的消息
- 🤖 AI 流式输出:ChatGPT 风格的逐字返回
- 📊 数据看板:实时刷新指标数据
- 🔔 消息通知:订单状态变更、审核结果推送
相比 WebSocket,SSE 的优势在于:
| 维度 | SSE ✅ | SSE ⚠️ 局限 |
|---|---|---|
| 基于 HTTP,天然支持认证、CORS | ✅ | 单向通信,客户端需另外发请求 |
| 自动重连机制(浏览器端) | ✅ | 浏览器限制最大 6 个并发连接 |
| 协议简单,调试方便 | ✅ | 不支持二进制数据 |
| 防火墙友好,不会被拦截 | ✅ | IE/Edge Legacy 不支持 |
二、浏览器中的 SSE 长什么样?
在浏览器中,使用 SSE 非常简单:
// 浏览器原生 EventSource —— 就这么简单
const es = new EventSource('/api/events');
// 监听默认消息
es.onmessage = (e) => {
console.log('收到消息:', JSON.parse(e.data));
};
// 监听自定义事件
es.addEventListener('chat-message', (e) => {
const msg = JSON.parse(e.data);
renderMessage(msg);
});
// 监听连接状态
es.onerror = () => console.log('连接断开,浏览器会自动重连');
浏览器的 EventSource API 帮你处理了所有脏活累活:
- 自动解析
text/event-stream协议 - 自动处理
event:、data:、id:字段 - 断线后自动重连,并发送
Last-Event-ID - 处理 UTF-8 多字节解码
┌──────────┐ ┌──────────┐
│ 浏览器 │ ──GET──▶ │ 服务器 │
│ │ │ │
│ EventSrc │ ◀─chunk── │ SSE 推送 │
│ 自动解析 │ ◀─chunk── │ │
│ 自动重连 │ ◀─chunk── │ │
└──────────┘ └──────────┘
三、小程序的困境:没有 EventSource
微信小程序的运行环境 不是浏览器。它没有 EventSource、没有 fetch、没有 WebSocket 的标准实现(小程序有 wx.connectSocket,但接口完全不同)。
小程序提供的网络 API 只有:
| API | 能力 | 能否用于 SSE? |
|---|---|---|
wx.request | HTTP 请求 | ✅ 配合 enableChunked |
wx.connectSocket | WebSocket | ❌ 协议不同,需要服务端适配 |
wx.downloadFile | 文件下载 | ⚠️ 理论可行,但不适合流式 |
💡 关键突破:enableChunked 模式
从基础库 2.20.2 开始,
wx.request支持enableChunked: true参数。开启后,可以通过onChunkReceived回调逐块接收响应数据——这正是模拟 SSE 的关键。
四、核心实现:用 wx.request 模拟 SSE
4.1 建立连接
核心思路:发起一个 GET 请求,开启分块传输,保持连接不断开:
// utils/sse-client.ts
// 微信非标准 API 类型扩展
type ChunkedRequestTask = WechatMiniprogram.RequestTask & {
onChunkReceived?: (callback: (chunk: {
data: string | ArrayBuffer
}) => void) => void
}
let requestTask: ChunkedRequestTask | null = null
function requestSSE() {
// 清理旧连接
if (requestTask) {
requestTask.abort()
requestTask = null
}
const sseId = getSseId()
const url = `${BASE_URL}/api/auth/message/subscribe/${sseId}`
const task = wx.request({
url,
method: 'GET',
enableChunked: true, // 🔑 关键:开启分块传输
responseType: 'arraybuffer', // 二进制接收,避免编码问题
header: {
Accept: 'text/event-stream',
Authorization: `Bearer ${token}`,
},
}) as ChunkedRequestTask
requestTask = task
// 监听分块数据
task.onChunkReceived?.((chunk) => {
handleSseText(decodeChunk(chunk.data))
})
}
⚠️ 为什么要用 responseType: ‘arraybuffer’?
如果用字符串模式接收,小程序可能会尝试按 GBK 解码中文,导致乱码。用 ArrayBuffer 接收后手动 UTF-8 解码更可靠。
4.2 数据解码:处理 ArrayBuffer
收到的 chunk 可能是 string 或 ArrayBuffer,需要统一转为字符串:
// utils/sse-client.ts
function decodeChunk(data: string | ArrayBuffer): string {
if (typeof data === 'string') return data
const uint8 = new Uint8Array(data)
try {
// 优先使用 TextDecoder(部分基础库支持)
return new TextDecoder('utf-8').decode(uint8)
} catch {
// 降级:手动 UTF-8 解码
return utf8Decode(uint8)
}
}
// 手动 UTF-8 解码(兼容所有基础库版本)
function utf8Decode(uint8: Uint8Array): string {
let result = ''
let i = 0
while (i < uint8.length) {
const b = uint8[i]
let code: number
if (b <= 0x7f) {
code = b; i += 1
} else if (b >= 0xc0 && b <= 0xdf) {
code = ((b & 0x1f) << 6) | (uint8[i+1] & 0x3f); i += 2
} else if (b >= 0xe0 && b <= 0xef) {
code = ((b & 0x0f) << 12) | ((uint8[i+1] & 0x3f) << 6)
| (uint8[i+2] & 0x3f); i += 3
} else if (b >= 0xf0 && b <= 0xf7) {
code = ((b & 0x07) << 18) | ((uint8[i+1] & 0x3f) << 12)
| ((uint8[i+2] & 0x3f) << 6) | (uint8[i+3] & 0x3f); i += 4
} else {
i += 1; continue // 跳过无法识别的字节
}
result += String.fromCharCode(code)
}
return result
}
五、协议解析:手动实现 SSE 帧解析器
SSE 协议格式非常简单,但必须严格遵守:
event: chat-message ← 事件类型(可选)
data: {"content":"你好"} ← 数据载荷(必须)
id: 12345 ← 消息 ID(可选)
← 空行 = 消息结束
event: notification
data: {"type":"order","msg":"订单已发货"}
← 空行 = 消息结束
关键规则:
- 每条消息以 空行(\n\n) 分隔
data:可以有多行,解析时用\n拼接- 没有
event:字段时,默认事件名为message
5.1 解析器实现
// utils/sse-client.ts
function parseSseBlock(block: string) {
const eventLines: string[] = []
const dataLines: string[] = []
// 逐行解析
block.split(/\r?\n/).forEach((line) => {
if (line.startsWith('event:')) {
eventLines.push(line.replace(/^event:\s?/, ''))
}
if (line.startsWith('data:')) {
dataLines.push(line.replace(/^data:\s?/, ''))
}
})
// 多行 data 用换行拼接
const dataText = dataLines.join('\n').trim()
if (!dataText) return
try {
const json = JSON.parse(dataText)
// 兼容 {data: {...}} 和 {...} 两种格式
const data = json.data === undefined ? json : json.data
const eventName = eventLines[0] || json.event || 'message'
// 触发事件监听
dispatchSseData(eventName, data)
} catch (err) {
console.warn('[SSE] 解析失败:', dataText.slice(0, 120))
}
}
5.2 粘包处理:Buffer 管理
TCP 是流式协议,onChunkReceived 回调的边界 不一定 是 SSE 消息的边界。一个 chunk 可能包含:
- 半条消息(消息被截断)
- 多条消息(消息粘连)
- 以上两者同时存在
Chunk 1: "event: chat\ndata: {\"hel"
Chunk 2: "lo\":\"world\"}\n\nevent: noti"
Chunk 3: "fication\ndata: {\"type\":1}\n\n"
─────────────────────────────────────────
合并后: "event: chat\ndata: {\"hello\":\"world\"}\n\n
event: notification\ndata: {\"type\":1}\n\n"
─────────────────────────────────────────
按 \n\n 切割 → 两个完整的 SSE 消息块
解决方案:维护一个 字符串缓冲区,累积数据直到遇到完整的 \n\n 分隔符:
// utils/sse-client.ts
let sseBuffer = '' // 全局缓冲区
function handleSseText(text: string) {
// 先检查是否是服务端控制消息
if (handleControlMessage(text)) return
// 累积到缓冲区
sseBuffer += text
// 按双换行切分
const parts = sseBuffer.split(/\r?\n\r?\n/)
// 最后一个元素可能是不完整的消息,保留在缓冲区
sseBuffer = parts.pop() || ''
// 解析完整的消息块
parts.forEach(parseSseBlock)
}
✅ 为什么 parts.pop() 是关键?
split('\n\n')后,最后一个元素很可能是不完整的消息(还没收到下一个\n\n)。用pop()把它放回缓冲区,等下一个 chunk 到来时继续拼接。
六、重连机制:比浏览器更精细的控制
浏览器的 EventSource 会自动重连,但你无法控制重连策略。小程序中我们可以实现更精细的控制:
// utils/sse-client.ts
let isManualClose = false
let reconnectTimer: ReturnType<typeof setTimeout> | null = null
// 主动断开(不重连)
export function stopSSE() {
isManualClose = true
cleanupConnection()
}
// 被动断开(自动重连)
function scheduleReconnect() {
if (isManualClose) return // 主动断开不重连
cleanupConnection()
reconnectTimer = setTimeout(() => {
reconnectTimer = null
console.log('[SSE] 重连中...')
startSSE()
}, 5000) // 固定 5 秒后重连
}
function cleanupConnection() {
if (requestTask) {
requestTask.abort()
requestTask = null
}
sseBuffer = ''
if (reconnectTimer) {
clearTimeout(reconnectTimer)
reconnectTimer = null
}
}
6.1 服务端主动关闭的处理
有些服务端会在空闲一段时间后主动关闭连接(发送 {"msg": "auto close"}),需要识别并触发重连:
function handleControlMessage(text: string): boolean {
const trimmed = text.trim()
if (!trimmed.startsWith('{') || !trimmed.endsWith('}')) return false
try {
const msg = JSON.parse(trimmed)
if (msg.msg === 'auto close') {
console.log('[SSE] 服务端主动关闭,触发重连')
scheduleReconnect()
return true
}
} catch {
return false
}
return false
}
🚨 生产环境建议:指数退避(Exponential Backoff)
固定 5 秒重连在服务端宕机时会产生大量无效请求。建议实现指数退避:第 1 次 1 秒,第 2 次 2 秒,第 3 次 4 秒……最大 30 秒。连接成功后重置计时器。
七、事件系统:自定义事件总线
小程序没有 DOM 事件模型,需要自己实现一个轻量级的 事件总线(Event Bus):
// utils/sse-client.ts
// 事件监听器注册表
const listeners: Record<string, ((data: unknown) => void)[]> = {}
export function addCustomEventListener(
eventName: string,
fn: (data: unknown) => void
) {
if (!listeners[eventName]) listeners[eventName] = []
listeners[eventName].push(fn)
}
export function removeCustomEventListener(
eventName: string,
fn: (data: unknown) => void
) {
if (!listeners[eventName]) return
listeners[eventName] = listeners[eventName].filter(f => f !== fn)
if (listeners[eventName].length === 0) delete listeners[eventName]
}
function dispatchSseData(eventName: string, data: unknown) {
listeners[eventName]?.forEach(fn => fn(data))
}
7.1 多事件名映射
一条 SSE 消息可以同时触发多个事件名,提高监听的灵活性:
function getEventNames(json: Record<string, unknown>, data: unknown) {
const names: string[] = []
// 1. 显式 event 字段
const explicitEvent = toText(json.event)
if (explicitEvent) names.push(explicitEvent)
// 2. data.type 字段作为事件名
const dataRecord = toRecord(data)
if (typeof dataRecord.type === 'string' && dataRecord.type) {
names.push(dataRecord.type)
}
// 3. 内容特征匹配(如咨询消息)
if (isConsultMessage(data)) {
names.push('consult-message')
}
// 去重
return Array.from(new Set(names))
}
SSE 消息: event: chat\ndata: {"content":"你好","sender":{...}}
│
▼
┌─────────────────────────────────┐
│ getEventNames() 返回: │
│ 1. "chat" (event) │
│ 2. "consult-message"(特征) │
└─────────────────────────────────┘
│
▼
触发所有监听 "chat" 和 "consult-message" 的回调
八、房间机制:定向消息投递
传统 SSE 是 广播模式:所有连接的客户端收到相同的消息。但在聊天场景中,我们需要 定向投递——只接收属于自己的消息。
解决方案是在 SSE 之上加入 房间(Room) 概念:
┌──────────┐ join("2026-06-05") ┌──────────┐
│ 客户端 A │ ──────────────────────▶ │ │
│ │ │ 服务器 │
│ 客户端 B │ ──────────────────────▶ │ 房间管理 │
│ │ join("2026-06-04") │ │
└──────────┘ └────┬─────┘
│
推送消息时按房间过滤
│
┌────────▼───────┐
│ 只推给同房间 │
│ 的 SSE 连接 │
└────────────────┘
8.1 Join / Leave 实现
// utils/sse-client.ts
export async function join(roomName: string) {
const app = getApp<IAppOption>()
const appData = app.globalData as Record<string, unknown>
const joinedArray = appData.joinedArray as string[] || []
// 幂等:已加入则跳过
if (joinedArray.includes(roomName)) return
const prevArray = [...joinedArray] // 备份,用于失败回滚
const sseId = getSseId()
try {
// 乐观更新
appData.joinedArray = [roomName, ...joinedArray]
await new Promise<void>((resolve, reject) => {
wx.request({
url: `${BASE_URL}/api/auth/message/join`,
method: 'POST',
data: { id: sseId, name: roomName },
header: {
'Content-Type': 'application/json',
Authorization: `Bearer ${token}`,
},
success: (res) => {
if (res.statusCode >= 200 && res.statusCode < 300) {
resolve()
} else {
reject(new Error(`join failed: ${res.statusCode}`))
}
},
fail: reject,
})
})
} catch {
// 加入失败,回滚
appData.joinedArray = prevArray
}
}
export async function leave(roomName: string) {
// 类似实现,从 joinedArray 中移除
}
💡 乐观更新(Optimistic Update)策略
先更新本地状态,再发请求。如果请求失败再回滚。这样 UI 反应更快,用户体验更好。注意保存
prevArray用于回滚。
九、生命周期绑定:与小程序页面同步
小程序的页面有严格的生命周期,SSE 连接必须与之同步:
// pages/consult-chat/index.ts
Page({
async onShow() {
const roomName = getRoomName(this.data.startTime)
// 1. 启动 SSE 连接
startSSE()
// 2. 加入房间
await join(roomName)
// 3. 注册事件监听
this._sseMessageHandler = (data: unknown) => {
this.appendMessage(normalizeMessage(data))
}
addCustomEventListener('consult-message', this._sseMessageHandler)
},
onHide() {
// 页面隐藏:断开连接,移除监听
stopSSE()
leave(roomName)
removeCustomEventListener('consult-message', this._sseMessageHandler)
},
onUnload() {
// 页面销毁:确保资源释放
stopSSE()
leave(roomName)
removeCustomEventListener('consult-message', this._sseMessageHandler)
},
})
生命周期与 SSE 状态的对应关系
| 生命周期 | 触发时机 | SSE 操作 |
|---|---|---|
onShow | 页面显示 / 从后台切回 | startSSE() + join() + 注册监听 |
onHide | 页面隐藏 / 切到其他页面 | stopSSE() + leave() + 移除监听 |
onUnload | 页面销毁(redirectTo / navigateBack) | stopSSE() + leave() + 移除监听 |
十、竞态条件处理
页面快速切换时(如用户快速点击返回再进入),onShow 可能连续触发多次,导致:
- 多个 SSE 连接并存
- 事件监听器重复注册
- 旧连接的回调更新了新页面的数据
10.1 Show Token 方案
用一个递增的 token 来标识每次 onShow,确保只有最新的那次生效:
// pages/consult-chat/index.ts
async onShow() {
const page = getRuntime(this)
// 递增 token
const showToken = (page._sseShowToken || 0) + 1
page._sseShowToken = showToken
page._isSseVisible = true
startSSE()
await join(roomName)
// ⚡ 关键检查:如果 token 已过期,直接返回
if (!page._isSseVisible || page._sseShowToken !== showToken) return
// 移除旧监听器,注册新的
if (page._sseMessageHandler) {
removeCustomEventListener('consult-message', page._sseMessageHandler)
}
page._sseMessageHandler = (data: unknown) => {
this.appendMessage(normalizeMessage(data))
}
addCustomEventListener('consult-message', page._sseMessageHandler)
},
onHide() {
const page = getRuntime(this)
page._isSseVisible = false
page._sseShowToken = (page._sseShowToken || 0) + 1 // 使旧 token 失效
stopSSE()
// ...
}
时间线: ─────────────────────────────────────────▶
onShow() #1 onHide() onShow() #2
token = 1 token = 2
│ │
▼ ▼
startSSE() startSSE()
join() join()
await... ←──── 此时 #1 的 await 恢复
│ 但 token(1) !== _sseShowToken(2)
│ 所以 #1 的后续逻辑被跳过 ✅
十一、生产环境注意事项
11.1 错误处理
function requestSSE() {
let hasDisconnected = false
const handleDisconnect = (message: string, errMsg?: string) => {
// 防止重复触发
if (hasDisconnected || isManualClose || requestTask !== task) return
hasDisconnected = true
console.warn(`[SSE] ${message},5 秒后重连:`, errMsg || '')
scheduleReconnect()
}
const task = wx.request({
// ...
fail: (err) => {
// 忽略主动 abort 导致的错误
if (err.errMsg?.includes('abort')) return
handleDisconnect('连接失败', err.errMsg)
},
complete: (res) => {
// complete 在 fail 之后也会触发,需要防重
if (res.errMsg?.includes('abort')) return
handleDisconnect('连接结束', res.errMsg)
},
})
}
11.2 内存泄漏防护
- 监听器清理:每次注册前先移除旧监听器,
onUnload时确保全部清理 - Buffer 清理:断开连接时清空
sseBuffer,避免残留数据 - 定时器清理:重连定时器在
stopSSE()时清除 - 请求任务清理:调用
abort()释放网络资源
11.3 认证与安全
// SSE 连接携带 Bearer Token
header: {
Accept: 'text/event-stream',
Authorization: `Bearer ${getApp().globalData.token}`,
}
// 房间 join/leave 同样需要认证
wx.request({
url: `${BASE_URL}/api/auth/message/join`,
header: {
'Content-Type': 'application/json',
Authorization: `Bearer ${token}`,
},
})
⚠️ Token 过期怎么办?
SSE 是长连接,Token 可能在连接期间过期。建议:
- 服务端在 Token 过期时发送控制消息,客户端收到后刷新 Token 并重连
- 客户端定期检查 Token 有效期,主动重连
- 使用 Refresh Token 机制自动续期
11.4 监控与调试
// 在关键节点打日志
console.log('[SSE] 连接建立:', url)
console.log('[SSE] 收到消息:', eventName, data)
console.log('[SSE] 连接断开,原因:', reason)
console.log('[SSE] 重连中... (第 N 次)')
console.log('[SSE] 加入房间:', roomName)
console.log('[SSE] 离开房间:', roomName)
// 可选:上报连接状态到监控平台
function reportSSEStatus(status: 'connect' | 'disconnect' | 'reconnect') {
// 上报到你的监控系统
}
十二、SSE vs WebSocket:如何选择?
| 维度 | SSE(本方案) | WebSocket |
|---|---|---|
| 通信方向 | 单向(服务端 → 客户端) | 双向全双工 |
| 协议 | HTTP | 独立协议 (ws://) |
| 小程序支持 | wx.request + enableChunked | wx.connectSocket |
| 自动重连 | 需手动实现 | 需手动实现 |
| 认证 | HTTP Header(原生支持) | 需在 URL 或首条消息中传递 |
| 负载均衡 | 标准 HTTP LB | 需要 Sticky Session |
| 适用场景 | 通知、聊天、AI 流式输出 | 游戏、协同编辑、高频交互 |
| 实现复杂度 | 🟢 中等 | 🟡 较高 |
经验法则:如果你的场景是"服务端推、客户端收"(如聊天、通知、AI 输出),SSE 是更简单的选择。如果需要高频双向通信(如实时游戏、协同编辑),选 WebSocket。
十三、完整架构总览
┌─────────────────────────────────────────────────────────┐
│ 微信小程序客户端 │
│ │
│ ┌──────────────┐ ┌───────────────────────────────┐ │
│ │ 页面层 │ │ sse-client.ts │ │
│ │ │ │ │ │
│ │ onShow() │───▶│ startSSE() │ │
│ │ onHide() │───▶│ stopSSE() │ │
│ │ onUnload() │ │ │ │
│ │ │ │ ┌─────────────────────────┐ │ │
│ │ 监听事件: │ │ │ wx.request │ │ │
│ │ addListener │◀───│ │ enableChunked: true │ │ │
│ │ │ │ │ onChunkReceived() │ │ │
│ └──────────────┘ │ └───────────┬─────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────┐ │ │
│ │ │ UTF-8 解码 │ │ │
│ │ │ Buffer 粘包处理 │ │ │
│ │ │ SSE 帧解析 │ │ │
│ │ │ 事件分发 │ │ │
│ │ └─────────────────────────┘ │ │
│ │ │ │
│ │ 房间管理: join() / leave() │ │
│ │ 重连: scheduleReconnect() │ │
│ └───────────────────────────────┘ │
│ │
└──────────────────────────┬──────────────────────────────┘
│
HTTP / SSE
│
▼
┌─────────────────────────────────────────────────────────┐
│ 服务器 │
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌───────────────┐ │
│ │ SSE 推送端 │ │ 房间管理器 │ │ 消息队列 │ │
│ │ │ │ │ │ │ │
│ │ text/event │ │ join/leave │ │ 按房间投递 │ │
│ │ -stream │ │ 订阅关系 │ │ 消息过滤 │ │
│ └─────────────┘ └──────────────┘ └───────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
十四、总结
微信小程序实现 SSE 的核心要点:
- 传输层:使用
wx.request+enableChunked: true+onChunkReceived模拟流式接收 - 解码层:用 ArrayBuffer 接收 + 手动 UTF-8 解码,避免编码问题
- 协议层:手动解析
event:/data:字段,用 Buffer 处理粘包 - 事件层:自定义 Event Bus,支持多事件名映射
- 房间层:join/leave 实现定向消息投递,乐观更新 + 失败回滚
- 生命周期层:与 onShow/onHide/onUnload 同步,Show Token 防竞态
- 健壮性:防重入、自动重连、资源清理、错误处理、监控日志
虽然比浏览器的 EventSource 复杂得多,但换来的是 完全可控的连接管理 和 更灵活的消息路由。这套方案已在生产环境稳定运行,支撑实时聊天、AI 流式输出等场景。