Audience: SDK users (TypeScript, Go, anything that speaks HTTP)
who need live token-by-token output from an agent. If you only
need a single full reply, the blocking JSON variants (no Accept: text/event-stream) are simpler — start with
Calling Agents.
This page is the canonical reference for every SSE surface
openapi.beeos.ai exposes. All three endpoints follow the same
Content-Type: text/event-stream framing but use slightly different
event-naming conventions; this guide lays them side-by-side so you
can pick the right one and write a correct reconnect loop.
1. The three SSE surfaces
| Endpoint | Use when | Auto-close? | Event names |
|---|
POST /api/v1/agents/{agentId}/invoke with Accept: text/event-stream | One-shot: stream the single agent reply token-by-token | Yes — closes after one terminal done frame | Unnamed message (default); type lives in JSON type field |
GET /api/v1/agents/{agentId}/tasks/{taskId}/events | Asynchronous task — observe progress, paused state, and the eventual terminal reply | Yes — closes on terminal envelope OR upstream EOF | Named message for content, end for shutdown |
GET /api/v1/agents/{agentId}/conversations/{convId}/events | Multi-turn dialog — observe every turn live | No — stays open across turns until DELETE or client disconnect | Named message for content, end for shutdown |
All three require Authorization: Bearer <JWT or oag_...>. oag_
keys are user-scoped — any key whose owner owns the underlying task /
conversation can invoke or stream. See
Authentication & API Keys for the
owner-ACL authorization model (per-route scopes were removed in v1.1.0).
2. The invoke SSE flow (one-shot)
POST /api/v1/agents/agent_abc/invoke
Authorization: Bearer oag_...
Accept: text/event-stream
Content-Type: application/json
{ "message": "Tell me a haiku" }
Frames (no event: name — clients dispatch on the JSON type):
data: {"type":"delta","text":"Quiet "}
data: {"type":"delta","text":"morning "}
data: {"type":"delta","text":"breeze..."}
data: {"type":"done","text":"Quiet morning breeze...","context_id":"ch-..."}
If the agent emitted an in-band agent_reply_error, you get the
same done frame with extra fields:
data: {"type":"done","is_error":true,
"code":"agent_reply_error","error":"index out of range",
"text":"index out of range","context_id":"ch-..."}
If a transport-level error (agent_offline, service_timeout,
agent_rejected, etc.) occurs you get an error frame first,
then done (the stream’s “single shutdown signal” — both frames
carry the same code so callers that only key on done still
dispatch correctly, audit-v4 P1-1):
data: {"type":"error","code":"agent_offline","status_code":503,
"message":"agent is offline"}
data: {"type":"done","is_error":true,"code":"agent_offline",
"error":"agent is offline"}
No event: lines on this endpoint. Standard
EventSource-style clients receive all frames as the default
"message" event; dispatch on data.type.
Required client logic (invoke)
- Append
delta.text to a running buffer as deltas arrive.
- Treat the first
done frame as terminal — close the connection.
- If
done.is_error === true, surface done.code + done.error to
the caller. Don’t retry blindly on agent_reply_error (in-band)
but DO retry on service_timeout / agent_offline (transport).
The done.text field is the full assembled reply — chunks are a
UX nicety, not the truth. SDKs that don’t need streaming UX can
ignore deltas entirely and just consume done.
3. The task / conversation SSE flow (named events)
Both tasks/{id}/events and conversations/{id}/events use the
same framing:
event: message
data: { …per-frame JSON envelope… }
event: message
data: { …another frame… }
event: end
data: {"reason":"task_terminal"}
The JSON envelope schema is shared (SSEStreamMessage):
| Field | Type | Notes |
|---|
type | string | E.g. chat_message, agent_message_chunk, agent_reply, agent_reply_error, agent.input_required |
state | string | Envelope v3 lifecycle marker: streaming, completed, failed, cancelled. Single-shot frames (e.g. chat_message) carry completed from the start; agent_reply arrives as a sequence of state="streaming" frames culminating in one state="completed". |
stop_reason | string | Set only on the terminal completed/failed frame for agent_reply (end_turn, error, cancelled, length). |
body | string | Envelope v3 cumulative text snapshot for agent_reply streaming. Every PATCH writes the full accumulated reply so far — clients SHOULD render body directly and IGNORE the legacy per-token agent_message_chunk frames if both are present. |
parts | array | Envelope v3 structured parts (tool calls, attachments, etc.) accumulated alongside body. Same cumulative snapshot semantic. |
message_id | string | UUID; unique per message in the channel. One v3 agent_reply envelope = one message_id across all streaming frames + the terminal one. |
offset | int64 | Monotonic position in the channel log (use as since on reconnect). Different per streaming frame even though message_id repeats. |
in_reply_to | string | message_id of the request this is replying to (set on agent_reply / chunks) |
publisher_id | string | user:<uid> or agent:<id> — who emitted it |
payload | any | Per-type body (e.g. { "text": "..." } for chat_message). For v3 agent_reply, prefer top-level body over payload.text. |
created_at | RFC3339 string | Server timestamp |
Envelope v3 (ADR-0022 + ADR-0023, GA in v1.1): Agent replies are
“one row, mutating in place”. The same message_id appears across N
streaming frames (each a cumulative snapshot of body / parts)
followed by exactly one state="completed" (or failed / cancelled)
terminal frame. Legacy per-token agent_reply_delta rows are no
longer emitted by beeos-claw and will not appear on new channels,
but historical rows remain readable via ?include_deltas=true on the
GET /messages polling path.
Terminal event: end
Emitted exactly once before the connection closes. The reason
distinguishes the cause:
reason | Source | Meaning |
|---|
task_terminal | tasks/events only | An agent_reply (or _error / agent.refuse / agent_busy) was just forwarded — task is complete |
channel_closed | conversations/events (or tasks/events when DELETE-d) | The channel was explicitly closed (user DELETE, peer with same auth, or task cancel) |
stream_closed | both | Upstream Message Service stream ended — typically MS restart or TTL eviction |
The end frame is the only signal — there are no in-band
keepalive comments and no Connection: close semantics. If you see
end, DON’T reconnect with since=lastOffset — the channel is gone
or terminal; reconnecting won’t replay anything new.
Required client logic (task / conversation)
const es = new EventSource(url);
let lastOffset = startingSince ?? 0;
es.addEventListener("message", (evt) => {
const frame = JSON.parse(evt.data);
lastOffset = frame.offset ?? lastOffset;
handle(frame); // UI render, etc.
});
es.addEventListener("end", (evt) => {
const { reason } = JSON.parse(evt.data);
console.log("stream ended:", reason);
es.close();
// Do NOT reconnect — terminal.
});
es.onerror = () => {
// Connection dropped before `end`. Reconnect with since=lastOffset.
setTimeout(() => connect(`${url}?since=${lastOffset}`), backoffMs);
};
4. The since cursor (drop / reconnect compensation)
since is an integer offset on the channel’s monotonic log.
Both /tasks/{id}/events and /conversations/{id}/events accept it
as a query parameter. The semantics:
since=0 (or omitted) — replay the full history of the channel,
then keep streaming new events. Useful for late attachers who
want the entire transcript.
since=N (N > 0) — replay every event at offset > N, then keep
streaming. Use this on reconnect: pass the last observed
frame.offset so you don’t see duplicates or miss any frames.
Offsets are strictly monotonic but NOT guaranteed contiguous
(ADR-0022 §1.2). A producer-side storage failure may leave a small
hole — you may observe … 40, 42, … with 41 missing. Always
treat offset > since as the resume invariant; never assume
offset == since + 1. Code like if (newOffset !== lastOffset + 1) throw is a bug.
The same cursor doubles as the pagination key on the
non-streaming GET /messages endpoint:
?since=<lastOffset>&limit=200 returns up to 200 frames at
offset > lastOffset. Mixing streaming + polling is fine.OpenAPI v1.1 (ADR-0022 + ADR-0023): GET /messages default-filters
out ephemeral streaming chunks (agent_reply_delta,
agent_thought_chunk, agent_message_chunk). With v3 envelopes
(ADR-0023) live agent_reply rows carry the full cumulative reply
in body — you no longer need include_deltas=true to reconstruct
the text, and new beeos-claw channels won’t have any
agent_reply_delta rows at all. The flag remains supported for
reading historical pre-v3 channels. latest_offset still
reflects the full server-side max so since=<latest_offset>
resumes from the right place either way.
Edge cases
-
Connection dropped before any frame.
lastOffset is still
0 — reconnect with since=0, which replays from the beginning.
On invoke SSE (no offsets) you have to redrive the invoke
altogether (the underlying chat_message wasn’t durable because
you never got a message_id to retry idempotently against).
-
Last frame was
agent_message_chunk then connection dropped.
Reconnect with since=<that chunk's offset>. You will get every
remaining chunk PLUS the final agent_reply. There is no risk of
duplicating already-rendered text — each chunk has a distinct
message_id.
-
Same client opens two SSE connections to the same task. Both
get the full live stream. Message Service is fan-out — there’s no
“you already have a subscription” semantic on the gateway.
-
backfill_truncated frame on reconnect (since OpenAPI v1.1 /
ADR-0022). When the channel has been idle long enough that the
ephemeral stream has aged out before your Last-Event-ID, the
server emits a single backfill_truncated event before the
normal replay_complete. Shape:
event: backfill_truncated
data: { "oldest_redis_offset": 4711, "since": 3120,
"hint": "ephemeral chunks before oldest_redis_offset have aged out of the Redis stream" }
Recovery options, in order of decreasing fidelity:
- Replay surviving durable rows — fetch
GET /messages?since=<since> (durable rows only); you’ll miss
the per-token chunks but recover the final reply / non-chunk
state. The server only emits this frame for ephemeral types, so
chat_message / agent_reply / agent.input_required etc.
are still in the durable log.
- Fast-forward — resume with
since=replay_complete.latest_offset
and accept that the intermediate token chunks are gone. This is
what the official SDK does by default since the chunks are
rendering UX, not data of record.
SDKs that don’t recognise backfill_truncated SHOULD treat the
frame as a no-op message — it’s purely informational and doesn’t
change the rest of the protocol.
5. Keepalive
Neither SSE handler emits explicit keepalive comments today.
Strategies:
Browser / EventSource
EventSource automatically reconnects on socket close. Use the
since=<lastOffset> query-string parameter to compensate for the
gap. Note: EventSource cannot set headers, so for oag_ keys you
either:
- Hand-roll the request via
fetch + manual SSE parsing (recommended
for production — gives you precise control over headers, retries,
and reconnect timing); or
- Embed the token in the URL (
?access_token=...) — avoid this:
the URL ends up in CDN logs / proxy access logs / browser history.
Node.js (server-to-server)
Use fetch + response.body as an async iterable, or the
eventsource package
(which supports custom headers). Implement an exponential backoff on
onerror and pass the last observed offset:
let attempt = 0;
let lastOffset = 0;
let stop = false;
while (!stop) {
try {
const url = `${BASE}/api/v1/agents/${agentId}/tasks/${taskId}/events?since=${lastOffset}`;
const res = await fetch(url, { headers: { Authorization: `Bearer ${TOKEN}` } });
if (!res.body) throw new Error("no body");
const reader = res.body.getReader();
// ...parse SSE frames, update lastOffset on each `message`...
// ...break out on `event: end`...
attempt = 0; // reset on a clean run
} catch (e) {
attempt++;
await new Promise((r) => setTimeout(r, Math.min(2 ** attempt * 250, 30_000)));
}
}
github.com/r3labs/sse/v2 handles reconnect + offset compensation
out of the box. Pass since=<offset> and let it resume on its own.
Server-side considerations
If you run your client behind a corporate proxy / NLB / CDN, the
idle connection cap might be lower than your turn cadence. Tested
limits on common infra:
| Proxy | Idle SSE cap |
|---|
| Browser default | ~60s before retry |
| AWS NLB | 350s (HTTP keepalive timeout) |
| Cloudflare Free | 100s |
| Cloudflare Pro+ | 100s — 6h (configurable) |
Your client MUST handle reconnect regardless. The since= cursor
exists precisely so reconnects are loss-free.
6. Error frames vs done (invoke) / end (task & conversation)
Different endpoints have different shutdown grammars; misreading them
is the most common SDK bug we see.
| Endpoint | Error grammar |
|---|
| invoke SSE | error frame (optional) then always a done frame. Both carry code; done.is_error flips on. Stream closes after done. |
| task SSE | No error frame. Errors arrive as regular message frames with type agent_reply_error / agent.refuse / agent_busy, then end reason=task_terminal. |
| conversation SSE | No error frame and no auto-end on terminal reply. The error envelope arrives as a regular message frame; the stream stays open for the next turn. |
This is why the matching rule is:
- invoke: branch on
data.type === "done" to decide “stop”.
- task: branch on
event === "end" (the named SSE event) to
decide “stop”; everything before is a message.
- conversation: same as task, but
end fires only on explicit
delete / upstream close, not on any per-turn terminal.
7. Worked example — robust task watcher (TypeScript)
This snippet shows the full reconnect-on-error + offset-resume loop
for the task SSE surface. It’s intentionally written without
the SDK so the wire mechanics are visible.
import { Buffer } from "node:buffer";
interface Frame {
type: string;
message_id?: string;
offset?: number;
in_reply_to?: string;
payload?: any;
created_at?: string;
}
async function watchTask(taskId: string, agentId: string, token: string) {
let lastOffset = 0;
let attempt = 0;
loop: while (true) {
const url =
`https://openapi.beeos.ai/api/v1/agents/${agentId}/tasks/${taskId}/events` +
`?since=${lastOffset}`;
try {
const res = await fetch(url, {
headers: { Authorization: `Bearer ${token}`, Accept: "text/event-stream" },
});
if (!res.ok || !res.body) throw new Error(`http ${res.status}`);
const reader = res.body.getReader();
const dec = new TextDecoder();
let buf = "";
while (true) {
const { value, done } = await reader.read();
if (done) break;
buf += dec.decode(value, { stream: true });
// SSE: frames separated by blank lines
let idx: number;
while ((idx = buf.indexOf("\n\n")) >= 0) {
const block = buf.slice(0, idx);
buf = buf.slice(idx + 2);
let evName = "message";
let data = "";
for (const line of block.split("\n")) {
if (line.startsWith("event:")) evName = line.slice(6).trim();
else if (line.startsWith("data:")) data += line.slice(5).trim();
}
if (evName === "end") {
console.log("task ended:", JSON.parse(data).reason);
break loop; // <- terminal; do NOT retry
}
const f: Frame = JSON.parse(data);
if (f.offset != null) lastOffset = f.offset;
render(f);
}
}
// EOF without `end` — reconnect (atypical; treat as transient)
} catch (e) {
attempt++;
const wait = Math.min(2 ** attempt * 250, 30_000);
await new Promise((r) => setTimeout(r, wait));
continue;
}
attempt = 0;
}
}
The same shape works for conversations — replace the event === "end"
break with a “stop on explicit unsubscribe” flag, since conversations
don’t terminate on per-turn reply.
8. Frame replay & idempotency
Every frame’s message_id is unique per channel. Replays caused by
reconnects MAY redeliver frames you’ve already seen if your
lastOffset tracking lost an update — UI code SHOULD dedupe on
message_id.
Internally Message Service uses an offset-only ordering invariant
(monotonic per channel), so seeing two frames with the same
message_id but different offset indicates a bug worth reporting.
9. Common mistakes
- Polling
lastOffset = 0 on reconnect. Re-replays the entire
channel history, costs you bandwidth, and floods your UI with
duplicate frames you’ve already rendered. Always track the latest
observed frame.offset and pass it as since=.
- Treating
event: end as a recoverable error. It’s not — it
means there will be NO more frames. Reconnecting just yields
another end (or 404 if the channel was evicted). Close the
connection and move on.
- Branching on
event name for invoke SSE. Invoke SSE emits
unnamed frames; clients keyed on event === "message" will treat
every frame as ignorable. Use JSON.parse(data).type === "done"
for invoke; use event === "end" for task / conversation.
- Mixing
Accept: text/event-stream with POST /tasks/.... The
task SSE surface is the separate GET /events endpoint, not
the create call. POST /tasks always returns a JSON 200 with the
task ID; you then open SSE on GET /tasks/{id}/events.
- Embedding the token in the URL. Set
Authorization: via a
custom-header SSE client (or fetch + manual parsing). URLs end
up in proxy / CDN access logs.
10. See also