跳转到主要内容
受众:需要从智能体获得实时 token 级输出的 SDK 用户 (TypeScript、Go、任何会说 HTTP 的)。如果只需要单条完整回复, 阻塞 JSON 变体(不带 Accept: text/event-stream)更简单 —— 从 调用智能体 开始。
本页是 openapi.beeos.ai 暴露的每个 SSE 接入面的规范参考。 三个端点都遵循同一 Content-Type: text/event-stream 框架,但事件 命名约定略有不同;本指南并排呈现,帮你挑对端点 + 写对重连循环。

1. 三个 SSE 接入面

端点何时用自动关闭?事件名
POST /api/v1/agents/{agentId}/invokeAccept: text/event-stream一次性:token 级流式接收单条智能体回复是 —— 一个终止 done frame 后关闭无名 message(默认);type 在 JSON type 字段
GET /api/v1/agents/{agentId}/tasks/{taskId}/events异步任务 —— 观察进度、暂停状态、最终终止回复是 —— 终止信封或上游 EOF 触发关闭命名 message 装内容、end 装关闭
GET /api/v1/agents/{agentId}/conversations/{convId}/events多轮对话 —— 实时观察每一回合否 —— 多回合保持开放,直到 DELETE 或客户端断开命名 message 装内容、end 装关闭
三者都要 Authorization: Bearer <JWT or oag_...>oag_ key 是 user-scoped 的 —— 任何 key 只要 owner 拥有底层任务 / 会话,就能 invoke 或开流。鉴权模型见 认证与 API Key § 鉴权(v1.1.0 起 per-route scope 已下线)。

2. invoke SSE 流(一次性)

POST /api/v1/agents/agent_abc/invoke
Authorization: Bearer oag_...
Accept: text/event-stream
Content-Type: application/json

{ "message": "Tell me a haiku" }
Frame( event: 名 —— 客户端按 JSON type 分派):
data: {"type":"delta","text":"Quiet "}

data: {"type":"delta","text":"morning "}

data: {"type":"delta","text":"breeze..."}

data: {"type":"done","text":"Quiet morning breeze...","context_id":"ch-..."}
如果智能体发出 in-band agent_reply_error,你拿到的是同一个 done frame 但带额外字段:
data: {"type":"done","is_error":true,
       "code":"agent_reply_error","error":"index out of range",
       "text":"index out of range","context_id":"ch-..."}
如果发生传输层错误(agent_offlineservice_timeoutagent_rejected 等),你先拿到 error frame、再拿到 done (流的”单一关闭信号”—— 两个 frame 携带相同 code,只 key 在 done 的调用方仍能正确分派,audit-v4 P1-1):
data: {"type":"error","code":"agent_offline","status_code":503,
       "message":"agent is offline"}

data: {"type":"done","is_error":true,"code":"agent_offline",
       "error":"agent is offline"}
此端点上没有 event:。标准 EventSource 风格客户端收到的 所有 frame 都是默认 "message" 事件;按 data.type 分派。

客户端必备逻辑(invoke)

  1. 随 delta 到达,把 delta.text 追加到运行 buffer。
  2. 把第一个 done frame 当终止 —— 关闭连接。
  3. done.is_error === true 时把 done.code + done.error 暴露给 调用方。不要agent_reply_error(in-band)盲目重试;但对 service_timeout / agent_offline(传输层)重试。
done.text 字段是完整拼接后的回复 —— 增量是 UX 锦上添花,不是 真相。不需要流式 UX 的 SDK 可以完全忽略 delta,只消费 done

3. 任务 / 会话 SSE 流(命名事件)

tasks/{id}/eventsconversations/{id}/events同一种 framing:
event: message
data: { …每 frame JSON 信封… }

event: message
data: { …另一个 frame… }

event: end
data: {"reason":"task_terminal"}
JSON 信封 schema 共享(SSEStreamMessage):
字段类型备注
typestring例如 chat_messageagent_message_chunkagent_replyagent_reply_erroragent.input_required
statestringEnvelope v3 生命周期标记:streamingcompletedfailedcancelled。单次类型(如 chat_message)一开始就是 completedagent_reply 会以一组 state="streaming" 帧落地,最后以唯一一帧 state="completed" 收口。
stop_reasonstring仅在 agent_reply 的终态帧(completed/failed)上设:end_turnerrorcancelledlength
bodystringEnvelope v3 累积文本快照,用于 agent_reply 的流式渲染。每次 PATCH 都写入截止当前为止的完整正文 —— 客户端直接渲染 body忽略遗留的 agent_message_chunk
partsarrayEnvelope v3 结构化 parts(工具调用、附件等),与 body 同样按累积快照语义。
message_idstringUUID;每条消息在该通道唯一。一条 v3 agent_reply = 一个 message_id,跨所有 streaming 帧 + 终态帧。
offsetint64通道日志中的单调位置(重连时作为 since)。同一 message_id 的多帧 offset 不同。
in_reply_tostring它回复的请求的 message_id(在 agent_reply / chunks 上设)
publisher_idstringuser:<uid>agent:<id> —— 谁发的
payloadany按 type 的 body(如 chat_message{ "text": "..." })。v3 agent_reply 优先读顶层 body 而非 payload.text
created_atRFC3339 string服务端时间戳
Envelope v3(ADR-0022 + ADR-0023,v1.1 GA):Agent 回复采用”单行原位 变更”模型 —— 同一 message_id 跨 N 个 streaming 帧(每帧都是 body / parts 的累积快照),最后恰好一帧 state="completed"(或 failed / cancelled)作为终态。beeos-claw 不再写 per-token 的 agent_reply_delta 行,新建通道上根本不会出现该类型;为读取历史 通道,GET /messages?include_deltas=true 仍然有效。

终止 event: end

连接关闭前恰好发一次reason 区分原因:
reason来源含义
task_terminal仅 tasks/events刚转发了一个 agent_reply(或 _error / agent.refuse / agent_busy)—— 任务完成
channel_closedconversations/events(或 tasks/events 被 DELETE 时)通道被显式关闭(用户 DELETE、同认证的对端、或任务 cancel)
stream_closed两者上游 Message Service 流结束 —— 通常是 MS 重启或 TTL 驱逐
end frame 是唯一信号 —— 没有 in-band keepalive 注释,也没有 Connection: close 语义。看到 end 不要since=lastOffset 重连 —— 通道已消失或终止;重连不会回放任何新东西。

客户端必备逻辑(任务 / 会话)

const es = new EventSource(url);
let lastOffset = startingSince ?? 0;

es.addEventListener("message", (evt) => {
  const frame = JSON.parse(evt.data);
  lastOffset = frame.offset ?? lastOffset;
  handle(frame);  // UI render, etc.
});

es.addEventListener("end", (evt) => {
  const { reason } = JSON.parse(evt.data);
  console.log("stream ended:", reason);
  es.close();
  // Do NOT reconnect — terminal.
});

es.onerror = () => {
  // Connection dropped before `end`. Reconnect with since=lastOffset.
  setTimeout(() => connect(`${url}?since=${lastOffset}`), backoffMs);
};

4. since cursor(断线 / 重连补偿)

since 是通道单调日志上的整数 offset /tasks/{id}/events/conversations/{id}/events 都接受它作为 query 参数。语义:
  • since=0(或省略)—— 回放完整历史,然后继续流式发新事件。 适合晚到附加方想要完整 transcript。
  • since=N(N > 0)—— 回放所有 offset > N 的事件,然后继续流式。 在重连时用:传入最后观察到的 frame.offset,避免重复或漏掉。
Offset 严格单调但不保证连续(ADR-0022 §1.2)。Producer 端写存储 失败可能留下小空洞 —— 例如可能观察到 … 40, 42, …,其中 41 缺失。 总是用 offset > since 做续传不变量;不要假定 offset == since + 1。形如 if (newOffset !== lastOffset + 1) throw 的代码是 bug。
同样的 cursor 还充当非流式 GET /messages 端点上的分页 key: ?since=<lastOffset>&limit=200 返回 offset > lastOffset 的最多 200 frame。流式 + 轮询混用没问题。OpenAPI v1.1(ADR-0022 + ADR-0023)GET /messages 默认过滤掉 临时流式 chunk(agent_reply_deltaagent_thought_chunkagent_message_chunk)。v3 envelope(ADR-0023)落地后,live 的 agent_reply 行会在 body 里直接携带完整累积文本,不再需要 include_deltas=true 来重建文本;beeos-claw 通道根本不 会写 agent_reply_delta 行。该 flag 仅为读取历史通道(pre-v3) 保留。latest_offset 仍然反映完整的服务端最大值,所以 since=<latest_offset> 不管怎样都能续到正确位置。

边界情况

  1. 连接在收到任何 frame 之前掉了。 lastOffset 仍为 0 —— 用 since=0 重连即从头开始回放。invoke SSE 上(无 offset)必须 整体重发 invoke(底层 chat_message 没持久化,因为你没拿到 message_id 做幂等重试)。
  2. 最后一个 frame 是 agent_message_chunk 然后连接掉了。since=<该 chunk 的 offset> 重连。你会拿到所有剩余 chunk 加上最终 agent_reply。已渲染的文本没有重复风险 —— 每个 chunk 有不同的 message_id
  3. 同一客户端对同一任务开两个 SSE 连接。 两者都拿到完整实时流。 Message Service 是扇出 —— 网关上没有”你已订阅”的语义。
  4. 重连时收到 backfill_truncated(自 OpenAPI v1.1 / ADR-0022)。当通道闲置足够久使临时 stream 在你的 Last-Event-ID 之前老化失踪时,服务端会在正常 replay_complete 之前发一个 backfill_truncated 事件。形状:
    event: backfill_truncated
    data: { "oldest_redis_offset": 4711, "since": 3120,
            "hint": "ephemeral chunks before oldest_redis_offset have aged out of the Redis stream" }
    
    恢复策略,按保真度从高到低:
    • 重放幸存的 durable 行 —— 拉 GET /messages?since=<since>(仅 durable 行);你会丢掉逐 token chunk,但能恢复终态回复 / 非 chunk 状态。服务端仅对临时类型发此 帧,所以 chat_message / agent_reply / agent.input_required 等仍然在持久化日志里。
    • 快进 —— 直接用 since=replay_complete.latest_offset 续传, 接受中间 token chunk 已经丢失。官方 SDK 默认就这么做,因为 chunk 是渲染 UX 不是 system-of-record 数据。
    不识别 backfill_truncated 的 SDK 应当把该帧当作 no-op 消息处理 —— 它是纯信息性的,不改变协议的其它部分。

5. Keepalive

两个 SSE handler 目前都不发显式 keepalive 注释。策略:

浏览器 / EventSource

EventSource 在 socket close 时自动重连。用 since=<lastOffset> query 参数补偿间隙。注意:EventSource 不能设 header,所以 oag_ key 要么:
  • 手写 fetch + 手动 SSE 解析(推荐做生产 —— 让你对 header、重试、 重连时机有精确控制);或
  • 把 token 嵌入 URL(?access_token=...)—— 避免:URL 会出现在 CDN 日志 / 代理 access log / 浏览器历史里。

Node.js(server-to-server)

fetch + 把 response.body 当 async iterable,或 eventsource 包(支持 自定义 header)。在 onerror 上做指数退避,传入最后观察到的 offset:
let attempt = 0;
let lastOffset = 0;
let stop = false;

while (!stop) {
  try {
    const url = `${BASE}/api/v1/agents/${agentId}/tasks/${taskId}/events?since=${lastOffset}`;
    const res = await fetch(url, { headers: { Authorization: `Bearer ${TOKEN}` } });
    if (!res.body) throw new Error("no body");
    const reader = res.body.getReader();
    // ...parse SSE frames, update lastOffset on each `message`...
    // ...break out on `event: end`...
    attempt = 0;  // reset on a clean run
  } catch (e) {
    attempt++;
    await new Promise((r) => setTimeout(r, Math.min(2 ** attempt * 250, 30_000)));
  }
}

Go

github.com/r3labs/sse/v2 开箱处理重连 + offset 补偿。传 since=<offset> 让它自动续传。

服务端考量

如果客户端在企业代理 / NLB / CDN 后面,idle 连接上限可能比你的 回合节奏短。常见基础设施上的实测上限:
代理Idle SSE 上限
浏览器默认~60s 后重试
AWS NLB350s(HTTP keepalive timeout)
Cloudflare Free100s
Cloudflare Pro+100s — 6h(可配)
无论如何客户端都必须处理重连。since= cursor 就是为此而存在, 让重连无损。

6. error frame 对 done(invoke)/ end(task & conversation)

不同端点关闭语法不同;读错是最常见的 SDK bug。
端点错误语法
invoke SSEerror frame(可选)然后始终一个 done frame。两者都带 codedone.is_error 翻 true。done 后流关闭。
task SSE没有 error frame。错误作为普通 message frame 到达,type 为 agent_reply_error / agent.refuse / agent_busy,然后 end reason=task_terminal
conversation SSE没有 error frame,终止回复也不会 auto-end。错误信封作为普通 message frame 到达;流保持开放等下一回合。
所以匹配规则是:
  • invoke:按 data.type === "done" 决定 “停”。
  • task:按 event === "end"(命名 SSE 事件)决定 “停”;之前的 都是 message
  • conversation:和 task 一样,但 end 仅在显式删除 / 上游关闭 时触发,不在任何每回合终止时触发。

7. 实例 —— 健壮的任务监视器(TypeScript)

下面这段展示任务 SSE 接入面完整的”出错重连 + offset 续传”循环。 有意没用 SDK,让 wire 机制看得见。
import { Buffer } from "node:buffer";

interface Frame {
  type: string;
  message_id?: string;
  offset?: number;
  in_reply_to?: string;
  payload?: any;
  created_at?: string;
}

async function watchTask(taskId: string, agentId: string, token: string) {
  let lastOffset = 0;
  let attempt = 0;

  loop: while (true) {
    const url =
      `https://openapi.beeos.ai/api/v1/agents/${agentId}/tasks/${taskId}/events` +
      `?since=${lastOffset}`;
    try {
      const res = await fetch(url, {
        headers: { Authorization: `Bearer ${token}`, Accept: "text/event-stream" },
      });
      if (!res.ok || !res.body) throw new Error(`http ${res.status}`);
      const reader = res.body.getReader();
      const dec = new TextDecoder();
      let buf = "";

      while (true) {
        const { value, done } = await reader.read();
        if (done) break;
        buf += dec.decode(value, { stream: true });
        // SSE: frames separated by blank lines
        let idx: number;
        while ((idx = buf.indexOf("\n\n")) >= 0) {
          const block = buf.slice(0, idx);
          buf = buf.slice(idx + 2);
          let evName = "message";
          let data = "";
          for (const line of block.split("\n")) {
            if (line.startsWith("event:")) evName = line.slice(6).trim();
            else if (line.startsWith("data:")) data += line.slice(5).trim();
          }
          if (evName === "end") {
            console.log("task ended:", JSON.parse(data).reason);
            break loop;          // <- terminal; do NOT retry
          }
          const f: Frame = JSON.parse(data);
          if (f.offset != null) lastOffset = f.offset;
          render(f);
        }
      }
      // EOF without `end` — reconnect (atypical; treat as transient)
    } catch (e) {
      attempt++;
      const wait = Math.min(2 ** attempt * 250, 30_000);
      await new Promise((r) => setTimeout(r, wait));
      continue;
    }
    attempt = 0;
  }
}
会话同样形状 —— 把 event === "end" 的 break 换成 “停在显式 unsubscribe 标志”,因为会话不会因每回合回复而终止。

8. Frame 重放与幂等

每个 frame 的 message_id 在通道内唯一。重连引起的回放在你 lastOffset 追踪丢失更新时可能重发已见过的 frame —— UI 代码 应当message_id 去重。 内部 Message Service 用 offset-only 排序不变量(每通道单调),所以 看到两个 message_id 相同但 offset 不同的 frame 是值得报告的 bug。

9. 常见错误

  • 重连时 lastOffset = 0 重放整个通道历史,浪费带宽,把已 渲染的重复 frame 灌进 UI。永远追踪最新观察到的 frame.offset 并作为 since= 传入。
  • event: end 当可恢复错误。 不是 —— 它意味着不会有更多 frame。重连只是再得到一次 end(或如果通道被驱逐,得到 404)。 关闭连接走人。
  • 对 invoke SSE 按 event 名分支。 invoke SSE 发无名 frame; 按 event === "message" 分发的客户端会把每个 frame 当可忽略。 invoke 用 JSON.parse(data).type === "done";task / conversation 用 event === "end"
  • Accept: text/event-streamPOST /tasks/... 混用。 任务 SSE 接入面是另一个 GET /events 端点,不是 create 调用。 POST /tasks 始终返回 JSON 200 + task ID;然后在 GET /tasks/{id}/events 上开 SSE。
  • 把 token 嵌入 URL。 通过支持自定义 header 的 SSE 客户端设 Authorization:(或 fetch + 手动解析)。URL 会落到代理 / CDN access log 里。

10. 另请参阅