一、为什么需要 SSE?

SSE(Server-Sent Events,服务器推送事件) 是一种基于 HTTP 的单向实时通信协议。与 WebSocket 的全双工不同,SSE 是 服务端 → 客户端 的单向推送,天然适合以下场景:

  • 💬 实时聊天:接收对方发送的消息
  • 🤖 AI 流式输出:ChatGPT 风格的逐字返回
  • 📊 数据看板:实时刷新指标数据
  • 🔔 消息通知:订单状态变更、审核结果推送

相比 WebSocket,SSE 的优势在于:

维度SSE ✅SSE ⚠️ 局限
基于 HTTP,天然支持认证、CORS单向通信,客户端需另外发请求
自动重连机制(浏览器端)浏览器限制最大 6 个并发连接
协议简单,调试方便不支持二进制数据
防火墙友好,不会被拦截IE/Edge Legacy 不支持

二、浏览器中的 SSE 长什么样?

在浏览器中,使用 SSE 非常简单:

// 浏览器原生 EventSource —— 就这么简单
const es = new EventSource('/api/events');

// 监听默认消息
es.onmessage = (e) => {
  console.log('收到消息:', JSON.parse(e.data));
};

// 监听自定义事件
es.addEventListener('chat-message', (e) => {
  const msg = JSON.parse(e.data);
  renderMessage(msg);
});

// 监听连接状态
es.onerror = () => console.log('连接断开,浏览器会自动重连');

浏览器的 EventSource API 帮你处理了所有脏活累活:

  • 自动解析 text/event-stream 协议
  • 自动处理 event:data:id: 字段
  • 断线后自动重连,并发送 Last-Event-ID
  • 处理 UTF-8 多字节解码
  ┌──────────┐          ┌──────────┐
  │  浏览器   │ ──GET──▶ │  服务器   │
  │          │          │          │
  │ EventSrc │ ◀─chunk── │ SSE 推送  │
  │ 自动解析  │ ◀─chunk── │          │
  │ 自动重连  │ ◀─chunk── │          │
  └──────────┘          └──────────┘

三、小程序的困境:没有 EventSource

微信小程序的运行环境 不是浏览器。它没有 EventSource、没有 fetch、没有 WebSocket 的标准实现(小程序有 wx.connectSocket,但接口完全不同)。

小程序提供的网络 API 只有:

API能力能否用于 SSE?
wx.requestHTTP 请求✅ 配合 enableChunked
wx.connectSocketWebSocket❌ 协议不同,需要服务端适配
wx.downloadFile文件下载⚠️ 理论可行,但不适合流式

💡 关键突破:enableChunked 模式

从基础库 2.20.2 开始,wx.request 支持 enableChunked: true 参数。开启后,可以通过 onChunkReceived 回调逐块接收响应数据——这正是模拟 SSE 的关键。


四、核心实现:用 wx.request 模拟 SSE

4.1 建立连接

核心思路:发起一个 GET 请求,开启分块传输,保持连接不断开:

// utils/sse-client.ts

// 微信非标准 API 类型扩展
type ChunkedRequestTask = WechatMiniprogram.RequestTask & {
  onChunkReceived?: (callback: (chunk: {
    data: string | ArrayBuffer
  }) => void) => void
}

let requestTask: ChunkedRequestTask | null = null

function requestSSE() {
  // 清理旧连接
  if (requestTask) {
    requestTask.abort()
    requestTask = null
  }

  const sseId = getSseId()
  const url = `${BASE_URL}/api/auth/message/subscribe/${sseId}`

  const task = wx.request({
    url,
    method: 'GET',
    enableChunked: true,        // 🔑 关键:开启分块传输
    responseType: 'arraybuffer', // 二进制接收,避免编码问题
    header: {
      Accept: 'text/event-stream',
      Authorization: `Bearer ${token}`,
    },
  }) as ChunkedRequestTask

  requestTask = task

  // 监听分块数据
  task.onChunkReceived?.((chunk) => {
    handleSseText(decodeChunk(chunk.data))
  })
}

⚠️ 为什么要用 responseType: ‘arraybuffer’?

如果用字符串模式接收,小程序可能会尝试按 GBK 解码中文,导致乱码。用 ArrayBuffer 接收后手动 UTF-8 解码更可靠。

4.2 数据解码:处理 ArrayBuffer

收到的 chunk 可能是 stringArrayBuffer,需要统一转为字符串:

// utils/sse-client.ts

function decodeChunk(data: string | ArrayBuffer): string {
  if (typeof data === 'string') return data

  const uint8 = new Uint8Array(data)
  try {
    // 优先使用 TextDecoder(部分基础库支持)
    return new TextDecoder('utf-8').decode(uint8)
  } catch {
    // 降级:手动 UTF-8 解码
    return utf8Decode(uint8)
  }
}

// 手动 UTF-8 解码(兼容所有基础库版本)
function utf8Decode(uint8: Uint8Array): string {
  let result = ''
  let i = 0
  while (i < uint8.length) {
    const b = uint8[i]
    let code: number

    if (b <= 0x7f) {
      code = b; i += 1
    } else if (b >= 0xc0 && b <= 0xdf) {
      code = ((b & 0x1f) << 6) | (uint8[i+1] & 0x3f); i += 2
    } else if (b >= 0xe0 && b <= 0xef) {
      code = ((b & 0x0f) << 12) | ((uint8[i+1] & 0x3f) << 6)
             | (uint8[i+2] & 0x3f); i += 3
    } else if (b >= 0xf0 && b <= 0xf7) {
      code = ((b & 0x07) << 18) | ((uint8[i+1] & 0x3f) << 12)
             | ((uint8[i+2] & 0x3f) << 6) | (uint8[i+3] & 0x3f); i += 4
    } else {
      i += 1; continue // 跳过无法识别的字节
    }
    result += String.fromCharCode(code)
  }
  return result
}

五、协议解析:手动实现 SSE 帧解析器

SSE 协议格式非常简单,但必须严格遵守:

event: chat-message        ← 事件类型(可选)
data: {"content":"你好"}    ← 数据载荷(必须)
id: 12345                  ← 消息 ID(可选)
                           ← 空行 = 消息结束

event: notification
data: {"type":"order","msg":"订单已发货"}
                           ← 空行 = 消息结束

关键规则:

  • 每条消息以 空行(\n\n) 分隔
  • data: 可以有多行,解析时用 \n 拼接
  • 没有 event: 字段时,默认事件名为 message

5.1 解析器实现

// utils/sse-client.ts

function parseSseBlock(block: string) {
  const eventLines: string[] = []
  const dataLines: string[] = []

  // 逐行解析
  block.split(/\r?\n/).forEach((line) => {
    if (line.startsWith('event:')) {
      eventLines.push(line.replace(/^event:\s?/, ''))
    }
    if (line.startsWith('data:')) {
      dataLines.push(line.replace(/^data:\s?/, ''))
    }
  })

  // 多行 data 用换行拼接
  const dataText = dataLines.join('\n').trim()
  if (!dataText) return

  try {
    const json = JSON.parse(dataText)
    // 兼容 {data: {...}} 和 {...} 两种格式
    const data = json.data === undefined ? json : json.data
    const eventName = eventLines[0] || json.event || 'message'

    // 触发事件监听
    dispatchSseData(eventName, data)
  } catch (err) {
    console.warn('[SSE] 解析失败:', dataText.slice(0, 120))
  }
}

5.2 粘包处理:Buffer 管理

TCP 是流式协议,onChunkReceived 回调的边界 不一定 是 SSE 消息的边界。一个 chunk 可能包含:

  • 半条消息(消息被截断)
  • 多条消息(消息粘连)
  • 以上两者同时存在
  Chunk 1:  "event: chat\ndata: {\"hel"
  Chunk 2:  "lo\":\"world\"}\n\nevent: noti"
  Chunk 3:  "fication\ndata: {\"type\":1}\n\n"
  ─────────────────────────────────────────
  合并后:   "event: chat\ndata: {\"hello\":\"world\"}\n\n
             event: notification\ndata: {\"type\":1}\n\n"
  ─────────────────────────────────────────
  按 \n\n 切割 → 两个完整的 SSE 消息块

解决方案:维护一个 字符串缓冲区,累积数据直到遇到完整的 \n\n 分隔符:

// utils/sse-client.ts

let sseBuffer = ''  // 全局缓冲区

function handleSseText(text: string) {
  // 先检查是否是服务端控制消息
  if (handleControlMessage(text)) return

  // 累积到缓冲区
  sseBuffer += text

  // 按双换行切分
  const parts = sseBuffer.split(/\r?\n\r?\n/)

  // 最后一个元素可能是不完整的消息,保留在缓冲区
  sseBuffer = parts.pop() || ''

  // 解析完整的消息块
  parts.forEach(parseSseBlock)
}

为什么 parts.pop() 是关键?

split('\n\n') 后,最后一个元素很可能是不完整的消息(还没收到下一个 \n\n)。用 pop() 把它放回缓冲区,等下一个 chunk 到来时继续拼接。


六、重连机制:比浏览器更精细的控制

浏览器的 EventSource 会自动重连,但你无法控制重连策略。小程序中我们可以实现更精细的控制:

// utils/sse-client.ts

let isManualClose = false
let reconnectTimer: ReturnType<typeof setTimeout> | null = null

// 主动断开(不重连)
export function stopSSE() {
  isManualClose = true
  cleanupConnection()
}

// 被动断开(自动重连)
function scheduleReconnect() {
  if (isManualClose) return  // 主动断开不重连

  cleanupConnection()
  reconnectTimer = setTimeout(() => {
    reconnectTimer = null
    console.log('[SSE] 重连中...')
    startSSE()
  }, 5000)  // 固定 5 秒后重连
}

function cleanupConnection() {
  if (requestTask) {
    requestTask.abort()
    requestTask = null
  }
  sseBuffer = ''
  if (reconnectTimer) {
    clearTimeout(reconnectTimer)
    reconnectTimer = null
  }
}

6.1 服务端主动关闭的处理

有些服务端会在空闲一段时间后主动关闭连接(发送 {"msg": "auto close"}),需要识别并触发重连:

function handleControlMessage(text: string): boolean {
  const trimmed = text.trim()
  if (!trimmed.startsWith('{') || !trimmed.endsWith('}')) return false

  try {
    const msg = JSON.parse(trimmed)
    if (msg.msg === 'auto close') {
      console.log('[SSE] 服务端主动关闭,触发重连')
      scheduleReconnect()
      return true
    }
  } catch {
    return false
  }
  return false
}

🚨 生产环境建议:指数退避(Exponential Backoff)

固定 5 秒重连在服务端宕机时会产生大量无效请求。建议实现指数退避:第 1 次 1 秒,第 2 次 2 秒,第 3 次 4 秒……最大 30 秒。连接成功后重置计时器。


七、事件系统:自定义事件总线

小程序没有 DOM 事件模型,需要自己实现一个轻量级的 事件总线(Event Bus)

// utils/sse-client.ts

// 事件监听器注册表
const listeners: Record<string, ((data: unknown) => void)[]> = {}

export function addCustomEventListener(
  eventName: string,
  fn: (data: unknown) => void
) {
  if (!listeners[eventName]) listeners[eventName] = []
  listeners[eventName].push(fn)
}

export function removeCustomEventListener(
  eventName: string,
  fn: (data: unknown) => void
) {
  if (!listeners[eventName]) return
  listeners[eventName] = listeners[eventName].filter(f => f !== fn)
  if (listeners[eventName].length === 0) delete listeners[eventName]
}

function dispatchSseData(eventName: string, data: unknown) {
  listeners[eventName]?.forEach(fn => fn(data))
}

7.1 多事件名映射

一条 SSE 消息可以同时触发多个事件名,提高监听的灵活性:

function getEventNames(json: Record<string, unknown>, data: unknown) {
  const names: string[] = []

  // 1. 显式 event 字段
  const explicitEvent = toText(json.event)
  if (explicitEvent) names.push(explicitEvent)

  // 2. data.type 字段作为事件名
  const dataRecord = toRecord(data)
  if (typeof dataRecord.type === 'string' && dataRecord.type) {
    names.push(dataRecord.type)
  }

  // 3. 内容特征匹配(如咨询消息)
  if (isConsultMessage(data)) {
    names.push('consult-message')
  }

  // 去重
  return Array.from(new Set(names))
}
  SSE 消息: event: chat\ndata: {"content":"你好","sender":{...}}
  ┌─────────────────────────────────┐
  │ getEventNames() 返回:           │
  │   1. "chat"           (event)   │
  │   2. "consult-message"(特征)    │
  └─────────────────────────────────┘
  触发所有监听 "chat" 和 "consult-message" 的回调

八、房间机制:定向消息投递

传统 SSE 是 广播模式:所有连接的客户端收到相同的消息。但在聊天场景中,我们需要 定向投递——只接收属于自己的消息。

解决方案是在 SSE 之上加入 房间(Room) 概念:

  ┌──────────┐    join("2026-06-05")    ┌──────────┐
  │ 客户端 A  │ ──────────────────────▶ │          │
  │          │                          │  服务器   │
  │ 客户端 B  │ ──────────────────────▶ │  房间管理 │
  │          │    join("2026-06-04")    │          │
  └──────────┘                          └────┬─────┘
                              推送消息时按房间过滤
                                    ┌────────▼───────┐
                                    │  只推给同房间   │
                                    │  的 SSE 连接    │
                                    └────────────────┘

8.1 Join / Leave 实现

// utils/sse-client.ts

export async function join(roomName: string) {
  const app = getApp<IAppOption>()
  const appData = app.globalData as Record<string, unknown>
  const joinedArray = appData.joinedArray as string[] || []

  // 幂等:已加入则跳过
  if (joinedArray.includes(roomName)) return

  const prevArray = [...joinedArray]  // 备份,用于失败回滚
  const sseId = getSseId()

  try {
    // 乐观更新
    appData.joinedArray = [roomName, ...joinedArray]

    await new Promise<void>((resolve, reject) => {
      wx.request({
        url: `${BASE_URL}/api/auth/message/join`,
        method: 'POST',
        data: { id: sseId, name: roomName },
        header: {
          'Content-Type': 'application/json',
          Authorization: `Bearer ${token}`,
        },
        success: (res) => {
          if (res.statusCode >= 200 && res.statusCode < 300) {
            resolve()
          } else {
            reject(new Error(`join failed: ${res.statusCode}`))
          }
        },
        fail: reject,
      })
    })
  } catch {
    // 加入失败,回滚
    appData.joinedArray = prevArray
  }
}

export async function leave(roomName: string) {
  // 类似实现,从 joinedArray 中移除
}

💡 乐观更新(Optimistic Update)策略

先更新本地状态,再发请求。如果请求失败再回滚。这样 UI 反应更快,用户体验更好。注意保存 prevArray 用于回滚。


九、生命周期绑定:与小程序页面同步

小程序的页面有严格的生命周期,SSE 连接必须与之同步:

// pages/consult-chat/index.ts

Page({
  async onShow() {
    const roomName = getRoomName(this.data.startTime)

    // 1. 启动 SSE 连接
    startSSE()

    // 2. 加入房间
    await join(roomName)

    // 3. 注册事件监听
    this._sseMessageHandler = (data: unknown) => {
      this.appendMessage(normalizeMessage(data))
    }
    addCustomEventListener('consult-message', this._sseMessageHandler)
  },

  onHide() {
    // 页面隐藏:断开连接,移除监听
    stopSSE()
    leave(roomName)
    removeCustomEventListener('consult-message', this._sseMessageHandler)
  },

  onUnload() {
    // 页面销毁:确保资源释放
    stopSSE()
    leave(roomName)
    removeCustomEventListener('consult-message', this._sseMessageHandler)
  },
})

生命周期与 SSE 状态的对应关系

生命周期触发时机SSE 操作
onShow页面显示 / 从后台切回startSSE() + join() + 注册监听
onHide页面隐藏 / 切到其他页面stopSSE() + leave() + 移除监听
onUnload页面销毁(redirectTo / navigateBack)stopSSE() + leave() + 移除监听

十、竞态条件处理

页面快速切换时(如用户快速点击返回再进入),onShow 可能连续触发多次,导致:

  • 多个 SSE 连接并存
  • 事件监听器重复注册
  • 旧连接的回调更新了新页面的数据

10.1 Show Token 方案

用一个递增的 token 来标识每次 onShow,确保只有最新的那次生效:

// pages/consult-chat/index.ts

async onShow() {
  const page = getRuntime(this)

  // 递增 token
  const showToken = (page._sseShowToken || 0) + 1
  page._sseShowToken = showToken
  page._isSseVisible = true

  startSSE()
  await join(roomName)

  // ⚡ 关键检查:如果 token 已过期,直接返回
  if (!page._isSseVisible || page._sseShowToken !== showToken) return

  // 移除旧监听器,注册新的
  if (page._sseMessageHandler) {
    removeCustomEventListener('consult-message', page._sseMessageHandler)
  }

  page._sseMessageHandler = (data: unknown) => {
    this.appendMessage(normalizeMessage(data))
  }
  addCustomEventListener('consult-message', page._sseMessageHandler)
},

onHide() {
  const page = getRuntime(this)
  page._isSseVisible = false
  page._sseShowToken = (page._sseShowToken || 0) + 1  // 使旧 token 失效
  stopSSE()
  // ...
}
  时间线:  ─────────────────────────────────────────▶

  onShow() #1    onHide()    onShow() #2
  token = 1                  token = 2
      │                          │
      ▼                          ▼
  startSSE()               startSSE()
  join()                   join()
  await... ←──── 此时 #1 的 await 恢复
      │                    但 token(1) !== _sseShowToken(2)
      │                    所以 #1 的后续逻辑被跳过 ✅

十一、生产环境注意事项

11.1 错误处理

function requestSSE() {
  let hasDisconnected = false

  const handleDisconnect = (message: string, errMsg?: string) => {
    // 防止重复触发
    if (hasDisconnected || isManualClose || requestTask !== task) return
    hasDisconnected = true
    console.warn(`[SSE] ${message},5 秒后重连:`, errMsg || '')
    scheduleReconnect()
  }

  const task = wx.request({
    // ...
    fail: (err) => {
      // 忽略主动 abort 导致的错误
      if (err.errMsg?.includes('abort')) return
      handleDisconnect('连接失败', err.errMsg)
    },
    complete: (res) => {
      // complete 在 fail 之后也会触发,需要防重
      if (res.errMsg?.includes('abort')) return
      handleDisconnect('连接结束', res.errMsg)
    },
  })
}

11.2 内存泄漏防护

  • 监听器清理:每次注册前先移除旧监听器,onUnload 时确保全部清理
  • Buffer 清理:断开连接时清空 sseBuffer,避免残留数据
  • 定时器清理:重连定时器在 stopSSE() 时清除
  • 请求任务清理:调用 abort() 释放网络资源

11.3 认证与安全

// SSE 连接携带 Bearer Token
header: {
  Accept: 'text/event-stream',
  Authorization: `Bearer ${getApp().globalData.token}`,
}

// 房间 join/leave 同样需要认证
wx.request({
  url: `${BASE_URL}/api/auth/message/join`,
  header: {
    'Content-Type': 'application/json',
    Authorization: `Bearer ${token}`,
  },
})

⚠️ Token 过期怎么办?

SSE 是长连接,Token 可能在连接期间过期。建议:

  • 服务端在 Token 过期时发送控制消息,客户端收到后刷新 Token 并重连
  • 客户端定期检查 Token 有效期,主动重连
  • 使用 Refresh Token 机制自动续期

11.4 监控与调试

// 在关键节点打日志
console.log('[SSE] 连接建立:', url)
console.log('[SSE] 收到消息:', eventName, data)
console.log('[SSE] 连接断开,原因:', reason)
console.log('[SSE] 重连中... (第 N 次)')
console.log('[SSE] 加入房间:', roomName)
console.log('[SSE] 离开房间:', roomName)

// 可选:上报连接状态到监控平台
function reportSSEStatus(status: 'connect' | 'disconnect' | 'reconnect') {
  // 上报到你的监控系统
}

十二、SSE vs WebSocket:如何选择?

维度SSE(本方案)WebSocket
通信方向单向(服务端 → 客户端)双向全双工
协议HTTP独立协议 (ws://)
小程序支持wx.request + enableChunkedwx.connectSocket
自动重连需手动实现需手动实现
认证HTTP Header(原生支持)需在 URL 或首条消息中传递
负载均衡标准 HTTP LB需要 Sticky Session
适用场景通知、聊天、AI 流式输出游戏、协同编辑、高频交互
实现复杂度🟢 中等🟡 较高

经验法则:如果你的场景是"服务端推、客户端收"(如聊天、通知、AI 输出),SSE 是更简单的选择。如果需要高频双向通信(如实时游戏、协同编辑),选 WebSocket。


十三、完整架构总览

┌─────────────────────────────────────────────────────────┐
                    微信小程序客户端                        
                                                         
  ┌──────────────┐    ┌───────────────────────────────┐  
     页面层                  sse-client.ts            
                                                     
    onShow()    │───▶│  startSSE()                     
    onHide()    │───▶│  stopSSE()                      
    onUnload()                                       
                      ┌─────────────────────────┐    
    监听事件:              wx.request               
    addListener │◀───│      enableChunked: true      
                          onChunkReceived()        
  └──────────────┘      └───────────┬─────────────┘    
                                                       
                                                       
                        ┌─────────────────────────┐    
                           UTF-8 解码                 
                           Buffer 粘包处理            
                           SSE 帧解析                 
                           事件分发                   
                        └─────────────────────────┘    
                                                       
                        房间管理: join() / leave()      
                        重连: scheduleReconnect()       
                      └───────────────────────────────┘  
                                                         
└──────────────────────────┬──────────────────────────────┘
                           
                     HTTP / SSE
                           
                           
┌─────────────────────────────────────────────────────────┐
                       服务器                              
                                                         
  ┌─────────────┐  ┌──────────────┐  ┌───────────────┐  
   SSE 推送端       房间管理器       消息队列       
                                                 
   text/event      join/leave      按房间投递     
   -stream         订阅关系         消息过滤       
  └─────────────┘  └──────────────┘  └───────────────┘  
                                                         
└─────────────────────────────────────────────────────────┘

十四、总结

微信小程序实现 SSE 的核心要点:

  1. 传输层:使用 wx.request + enableChunked: true + onChunkReceived 模拟流式接收
  2. 解码层:用 ArrayBuffer 接收 + 手动 UTF-8 解码,避免编码问题
  3. 协议层:手动解析 event: / data: 字段,用 Buffer 处理粘包
  4. 事件层:自定义 Event Bus,支持多事件名映射
  5. 房间层:join/leave 实现定向消息投递,乐观更新 + 失败回滚
  6. 生命周期层:与 onShow/onHide/onUnload 同步,Show Token 防竞态
  7. 健壮性:防重入、自动重连、资源清理、错误处理、监控日志

虽然比浏览器的 EventSource 复杂得多,但换来的是 完全可控的连接管理更灵活的消息路由。这套方案已在生产环境稳定运行,支撑实时聊天、AI 流式输出等场景。