受众:需要从智能体获得实时 token 级输出的 SDK 用户
(TypeScript、Go、任何会说 HTTP 的)。如果只需要单条完整回复,
阻塞 JSON 变体(不带 Accept: text/event-stream)更简单 —— 从
调用智能体 开始。
本页是 openapi.beeos.ai 暴露的每个 SSE 接入面的规范参考。
三个端点都遵循同一 Content-Type: text/event-stream 框架,但事件
命名约定略有不同;本指南并排呈现,帮你挑对端点 + 写对重连循环。
1. 三个 SSE 接入面
| 端点 | 何时用 | 自动关闭? | 事件名 |
|---|
POST /api/v1/agents/{agentId}/invoke 带 Accept: text/event-stream | 一次性:token 级流式接收单条智能体回复 | 是 —— 一个终止 done frame 后关闭 | 无名 message(默认);type 在 JSON type 字段 |
GET /api/v1/agents/{agentId}/tasks/{taskId}/events | 异步任务 —— 观察进度、暂停状态、最终终止回复 | 是 —— 终止信封或上游 EOF 触发关闭 | 命名 message 装内容、end 装关闭 |
GET /api/v1/agents/{agentId}/conversations/{convId}/events | 多轮对话 —— 实时观察每一回合 | 否 —— 多回合保持开放,直到 DELETE 或客户端断开 | 命名 message 装内容、end 装关闭 |
三者都要 Authorization: Bearer <JWT or oag_...>。oag_ key 是
user-scoped 的 —— 任何 key 只要 owner 拥有底层任务 / 会话,就能
invoke 或开流。鉴权模型见
认证与 API Key § 鉴权(v1.1.0 起 per-route
scope 已下线)。
2. invoke SSE 流(一次性)
POST /api/v1/agents/agent_abc/invoke
Authorization: Bearer oag_...
Accept: text/event-stream
Content-Type: application/json
{ "message": "Tell me a haiku" }
Frame(无 event: 名 —— 客户端按 JSON type 分派):
data: {"type":"delta","text":"Quiet "}
data: {"type":"delta","text":"morning "}
data: {"type":"delta","text":"breeze..."}
data: {"type":"done","text":"Quiet morning breeze...","context_id":"ch-..."}
如果智能体发出 in-band agent_reply_error,你拿到的是同一个 done
frame 但带额外字段:
data: {"type":"done","is_error":true,
"code":"agent_reply_error","error":"index out of range",
"text":"index out of range","context_id":"ch-..."}
如果发生传输层错误(agent_offline、service_timeout、
agent_rejected 等),你先拿到 error frame、再拿到 done
(流的”单一关闭信号”—— 两个 frame 携带相同 code,只 key 在
done 的调用方仍能正确分派,audit-v4 P1-1):
data: {"type":"error","code":"agent_offline","status_code":503,
"message":"agent is offline"}
data: {"type":"done","is_error":true,"code":"agent_offline",
"error":"agent is offline"}
此端点上没有 event: 行。标准 EventSource 风格客户端收到的
所有 frame 都是默认 "message" 事件;按 data.type 分派。
客户端必备逻辑(invoke)
- 随 delta 到达,把
delta.text 追加到运行 buffer。
- 把第一个
done frame 当终止 —— 关闭连接。
done.is_error === true 时把 done.code + done.error 暴露给
调用方。不要对 agent_reply_error(in-band)盲目重试;但对
service_timeout / agent_offline(传输层)要重试。
done.text 字段是完整拼接后的回复 —— 增量是 UX 锦上添花,不是
真相。不需要流式 UX 的 SDK 可以完全忽略 delta,只消费 done。
3. 任务 / 会话 SSE 流(命名事件)
tasks/{id}/events 和 conversations/{id}/events 用同一种 framing:
event: message
data: { …每 frame JSON 信封… }
event: message
data: { …另一个 frame… }
event: end
data: {"reason":"task_terminal"}
JSON 信封 schema 共享(SSEStreamMessage):
| 字段 | 类型 | 备注 |
|---|
type | string | 例如 chat_message、agent_message_chunk、agent_reply、agent_reply_error、agent.input_required |
state | string | Envelope v3 生命周期标记:streaming、completed、failed、cancelled。单次类型(如 chat_message)一开始就是 completed;agent_reply 会以一组 state="streaming" 帧落地,最后以唯一一帧 state="completed" 收口。 |
stop_reason | string | 仅在 agent_reply 的终态帧(completed/failed)上设:end_turn、error、cancelled、length。 |
body | string | Envelope v3 累积文本快照,用于 agent_reply 的流式渲染。每次 PATCH 都写入截止当前为止的完整正文 —— 客户端应直接渲染 body 并忽略遗留的 agent_message_chunk。 |
parts | array | Envelope v3 结构化 parts(工具调用、附件等),与 body 同样按累积快照语义。 |
message_id | string | UUID;每条消息在该通道唯一。一条 v3 agent_reply = 一个 message_id,跨所有 streaming 帧 + 终态帧。 |
offset | int64 | 通道日志中的单调位置(重连时作为 since)。同一 message_id 的多帧 offset 不同。 |
in_reply_to | string | 它回复的请求的 message_id(在 agent_reply / chunks 上设) |
publisher_id | string | user:<uid> 或 agent:<id> —— 谁发的 |
payload | any | 按 type 的 body(如 chat_message 为 { "text": "..." })。v3 agent_reply 优先读顶层 body 而非 payload.text。 |
created_at | RFC3339 string | 服务端时间戳 |
Envelope v3(ADR-0022 + ADR-0023,v1.1 GA):Agent 回复采用”单行原位
变更”模型 —— 同一 message_id 跨 N 个 streaming 帧(每帧都是 body /
parts 的累积快照),最后恰好一帧 state="completed"(或
failed / cancelled)作为终态。beeos-claw 不再写 per-token 的
agent_reply_delta 行,新建通道上根本不会出现该类型;为读取历史
通道,GET /messages?include_deltas=true 仍然有效。
终止 event: end
连接关闭前恰好发一次。reason 区分原因:
reason | 来源 | 含义 |
|---|
task_terminal | 仅 tasks/events | 刚转发了一个 agent_reply(或 _error / agent.refuse / agent_busy)—— 任务完成 |
channel_closed | conversations/events(或 tasks/events 被 DELETE 时) | 通道被显式关闭(用户 DELETE、同认证的对端、或任务 cancel) |
stream_closed | 两者 | 上游 Message Service 流结束 —— 通常是 MS 重启或 TTL 驱逐 |
end frame 是唯一信号 —— 没有 in-band keepalive 注释,也没有
Connection: close 语义。看到 end 不要用 since=lastOffset
重连 —— 通道已消失或终止;重连不会回放任何新东西。
客户端必备逻辑(任务 / 会话)
const es = new EventSource(url);
let lastOffset = startingSince ?? 0;
es.addEventListener("message", (evt) => {
const frame = JSON.parse(evt.data);
lastOffset = frame.offset ?? lastOffset;
handle(frame); // UI render, etc.
});
es.addEventListener("end", (evt) => {
const { reason } = JSON.parse(evt.data);
console.log("stream ended:", reason);
es.close();
// Do NOT reconnect — terminal.
});
es.onerror = () => {
// Connection dropped before `end`. Reconnect with since=lastOffset.
setTimeout(() => connect(`${url}?since=${lastOffset}`), backoffMs);
};
4. since cursor(断线 / 重连补偿)
since 是通道单调日志上的整数 offset。
/tasks/{id}/events 和 /conversations/{id}/events 都接受它作为
query 参数。语义:
since=0(或省略)—— 回放完整历史,然后继续流式发新事件。
适合晚到附加方想要完整 transcript。
since=N(N > 0)—— 回放所有 offset > N 的事件,然后继续流式。
在重连时用:传入最后观察到的 frame.offset,避免重复或漏掉。
Offset 严格单调但不保证连续(ADR-0022 §1.2)。Producer 端写存储
失败可能留下小空洞 —— 例如可能观察到 … 40, 42, …,其中 41 缺失。
总是用 offset > since 做续传不变量;不要假定
offset == since + 1。形如 if (newOffset !== lastOffset + 1) throw
的代码是 bug。
同样的 cursor 还充当非流式 GET /messages 端点上的分页 key:
?since=<lastOffset>&limit=200 返回 offset > lastOffset 的最多 200
frame。流式 + 轮询混用没问题。OpenAPI v1.1(ADR-0022 + ADR-0023):GET /messages 默认过滤掉
临时流式 chunk(agent_reply_delta、agent_thought_chunk、
agent_message_chunk)。v3 envelope(ADR-0023)落地后,live 的
agent_reply 行会在 body 里直接携带完整累积文本,不再需要
include_deltas=true 来重建文本;新的 beeos-claw 通道根本不
会写 agent_reply_delta 行。该 flag 仅为读取历史通道(pre-v3)
保留。latest_offset 仍然反映完整的服务端最大值,所以
since=<latest_offset> 不管怎样都能续到正确位置。
边界情况
-
连接在收到任何 frame 之前掉了。
lastOffset 仍为 0 ——
用 since=0 重连即从头开始回放。invoke SSE 上(无 offset)必须
整体重发 invoke(底层 chat_message 没持久化,因为你没拿到
message_id 做幂等重试)。
-
最后一个 frame 是
agent_message_chunk 然后连接掉了。
用 since=<该 chunk 的 offset> 重连。你会拿到所有剩余 chunk
加上最终 agent_reply。已渲染的文本没有重复风险 —— 每个 chunk
有不同的 message_id。
-
同一客户端对同一任务开两个 SSE 连接。 两者都拿到完整实时流。
Message Service 是扇出 —— 网关上没有”你已订阅”的语义。
-
重连时收到
backfill_truncated 帧(自 OpenAPI v1.1 /
ADR-0022)。当通道闲置足够久使临时 stream 在你的 Last-Event-ID
之前老化失踪时,服务端会在正常 replay_complete 之前发一个
backfill_truncated 事件。形状:
event: backfill_truncated
data: { "oldest_redis_offset": 4711, "since": 3120,
"hint": "ephemeral chunks before oldest_redis_offset have aged out of the Redis stream" }
恢复策略,按保真度从高到低:
- 重放幸存的 durable 行 —— 拉
GET /messages?since=<since>(仅 durable 行);你会丢掉逐 token
chunk,但能恢复终态回复 / 非 chunk 状态。服务端仅对临时类型发此
帧,所以 chat_message / agent_reply / agent.input_required
等仍然在持久化日志里。
- 快进 —— 直接用
since=replay_complete.latest_offset 续传,
接受中间 token chunk 已经丢失。官方 SDK 默认就这么做,因为
chunk 是渲染 UX 不是 system-of-record 数据。
不识别 backfill_truncated 的 SDK 应当把该帧当作 no-op
消息处理 —— 它是纯信息性的,不改变协议的其它部分。
5. Keepalive
两个 SSE handler 目前都不发显式 keepalive 注释。策略:
浏览器 / EventSource
EventSource 在 socket close 时自动重连。用 since=<lastOffset>
query 参数补偿间隙。注意:EventSource 不能设 header,所以 oag_
key 要么:
- 手写
fetch + 手动 SSE 解析(推荐做生产 —— 让你对 header、重试、
重连时机有精确控制);或
- 把 token 嵌入 URL(
?access_token=...)—— 避免:URL 会出现在
CDN 日志 / 代理 access log / 浏览器历史里。
Node.js(server-to-server)
用 fetch + 把 response.body 当 async iterable,或
eventsource 包(支持
自定义 header)。在 onerror 上做指数退避,传入最后观察到的 offset:
let attempt = 0;
let lastOffset = 0;
let stop = false;
while (!stop) {
try {
const url = `${BASE}/api/v1/agents/${agentId}/tasks/${taskId}/events?since=${lastOffset}`;
const res = await fetch(url, { headers: { Authorization: `Bearer ${TOKEN}` } });
if (!res.body) throw new Error("no body");
const reader = res.body.getReader();
// ...parse SSE frames, update lastOffset on each `message`...
// ...break out on `event: end`...
attempt = 0; // reset on a clean run
} catch (e) {
attempt++;
await new Promise((r) => setTimeout(r, Math.min(2 ** attempt * 250, 30_000)));
}
}
github.com/r3labs/sse/v2 开箱处理重连 + offset 补偿。传
since=<offset> 让它自动续传。
服务端考量
如果客户端在企业代理 / NLB / CDN 后面,idle 连接上限可能比你的
回合节奏短。常见基础设施上的实测上限:
| 代理 | Idle SSE 上限 |
|---|
| 浏览器默认 | ~60s 后重试 |
| AWS NLB | 350s(HTTP keepalive timeout) |
| Cloudflare Free | 100s |
| Cloudflare Pro+ | 100s — 6h(可配) |
无论如何客户端都必须处理重连。since= cursor 就是为此而存在,
让重连无损。
6. error frame 对 done(invoke)/ end(task & conversation)
不同端点关闭语法不同;读错是最常见的 SDK bug。
| 端点 | 错误语法 |
|---|
| invoke SSE | error frame(可选)然后始终一个 done frame。两者都带 code;done.is_error 翻 true。done 后流关闭。 |
| task SSE | 没有 error frame。错误作为普通 message frame 到达,type 为 agent_reply_error / agent.refuse / agent_busy,然后 end reason=task_terminal。 |
| conversation SSE | 没有 error frame,终止回复也不会 auto-end。错误信封作为普通 message frame 到达;流保持开放等下一回合。 |
所以匹配规则是:
- invoke:按
data.type === "done" 决定 “停”。
- task:按
event === "end"(命名 SSE 事件)决定 “停”;之前的
都是 message。
- conversation:和 task 一样,但
end 仅在显式删除 / 上游关闭
时触发,不在任何每回合终止时触发。
7. 实例 —— 健壮的任务监视器(TypeScript)
下面这段展示任务 SSE 接入面完整的”出错重连 + offset 续传”循环。
有意没用 SDK,让 wire 机制看得见。
import { Buffer } from "node:buffer";
interface Frame {
type: string;
message_id?: string;
offset?: number;
in_reply_to?: string;
payload?: any;
created_at?: string;
}
async function watchTask(taskId: string, agentId: string, token: string) {
let lastOffset = 0;
let attempt = 0;
loop: while (true) {
const url =
`https://openapi.beeos.ai/api/v1/agents/${agentId}/tasks/${taskId}/events` +
`?since=${lastOffset}`;
try {
const res = await fetch(url, {
headers: { Authorization: `Bearer ${token}`, Accept: "text/event-stream" },
});
if (!res.ok || !res.body) throw new Error(`http ${res.status}`);
const reader = res.body.getReader();
const dec = new TextDecoder();
let buf = "";
while (true) {
const { value, done } = await reader.read();
if (done) break;
buf += dec.decode(value, { stream: true });
// SSE: frames separated by blank lines
let idx: number;
while ((idx = buf.indexOf("\n\n")) >= 0) {
const block = buf.slice(0, idx);
buf = buf.slice(idx + 2);
let evName = "message";
let data = "";
for (const line of block.split("\n")) {
if (line.startsWith("event:")) evName = line.slice(6).trim();
else if (line.startsWith("data:")) data += line.slice(5).trim();
}
if (evName === "end") {
console.log("task ended:", JSON.parse(data).reason);
break loop; // <- terminal; do NOT retry
}
const f: Frame = JSON.parse(data);
if (f.offset != null) lastOffset = f.offset;
render(f);
}
}
// EOF without `end` — reconnect (atypical; treat as transient)
} catch (e) {
attempt++;
const wait = Math.min(2 ** attempt * 250, 30_000);
await new Promise((r) => setTimeout(r, wait));
continue;
}
attempt = 0;
}
}
会话同样形状 —— 把 event === "end" 的 break 换成 “停在显式
unsubscribe 标志”,因为会话不会因每回合回复而终止。
8. Frame 重放与幂等
每个 frame 的 message_id 在通道内唯一。重连引起的回放在你
lastOffset 追踪丢失更新时可能重发已见过的 frame —— UI 代码
应当按 message_id 去重。
内部 Message Service 用 offset-only 排序不变量(每通道单调),所以
看到两个 message_id 相同但 offset 不同的 frame 是值得报告的 bug。
9. 常见错误
- 重连时
lastOffset = 0。 重放整个通道历史,浪费带宽,把已
渲染的重复 frame 灌进 UI。永远追踪最新观察到的 frame.offset
并作为 since= 传入。
- 把
event: end 当可恢复错误。 不是 —— 它意味着不会有更多
frame。重连只是再得到一次 end(或如果通道被驱逐,得到 404)。
关闭连接走人。
- 对 invoke SSE 按
event 名分支。 invoke SSE 发无名 frame;
按 event === "message" 分发的客户端会把每个 frame 当可忽略。
invoke 用 JSON.parse(data).type === "done";task / conversation
用 event === "end"。
- 把
Accept: text/event-stream 和 POST /tasks/... 混用。
任务 SSE 接入面是另一个 GET /events 端点,不是 create 调用。
POST /tasks 始终返回 JSON 200 + task ID;然后在
GET /tasks/{id}/events 上开 SSE。
- 把 token 嵌入 URL。 通过支持自定义 header 的 SSE 客户端设
Authorization:(或 fetch + 手动解析)。URL 会落到代理 / CDN
access log 里。
10. 另请参阅