Skip to main content
Audience: SDK users (TypeScript, Go, anything that speaks HTTP) who need live token-by-token output from an agent. If you only need a single full reply, the blocking JSON variants (no Accept: text/event-stream) are simpler — start with Calling Agents.
This page is the canonical reference for every SSE surface openapi.beeos.ai exposes. All three endpoints follow the same Content-Type: text/event-stream framing but use slightly different event-naming conventions; this guide lays them side-by-side so you can pick the right one and write a correct reconnect loop.

1. The three SSE surfaces

EndpointUse whenAuto-close?Event names
POST /api/v1/agents/{agentId}/invoke with Accept: text/event-streamOne-shot: stream the single agent reply token-by-tokenYes — closes after one terminal done frameUnnamed message (default); type lives in JSON type field
GET /api/v1/agents/{agentId}/tasks/{taskId}/eventsAsynchronous task — observe progress, paused state, and the eventual terminal replyYes — closes on terminal envelope OR upstream EOFNamed message for content, end for shutdown
GET /api/v1/agents/{agentId}/conversations/{convId}/eventsMulti-turn dialog — observe every turn liveNo — stays open across turns until DELETE or client disconnectNamed message for content, end for shutdown
All three require Authorization: Bearer <JWT or oag_...>. oag_ keys are user-scoped — any key whose owner owns the underlying task / conversation can invoke or stream. See Authentication & API Keys for the owner-ACL authorization model (per-route scopes were removed in v1.1.0).

2. The invoke SSE flow (one-shot)

POST /api/v1/agents/agent_abc/invoke
Authorization: Bearer oag_...
Accept: text/event-stream
Content-Type: application/json

{ "message": "Tell me a haiku" }
Frames (no event: name — clients dispatch on the JSON type):
data: {"type":"delta","text":"Quiet "}

data: {"type":"delta","text":"morning "}

data: {"type":"delta","text":"breeze..."}

data: {"type":"done","text":"Quiet morning breeze...","context_id":"ch-..."}
If the agent emitted an in-band agent_reply_error, you get the same done frame with extra fields:
data: {"type":"done","is_error":true,
       "code":"agent_reply_error","error":"index out of range",
       "text":"index out of range","context_id":"ch-..."}
If a transport-level error (agent_offline, service_timeout, agent_rejected, etc.) occurs you get an error frame first, then done (the stream’s “single shutdown signal” — both frames carry the same code so callers that only key on done still dispatch correctly, audit-v4 P1-1):
data: {"type":"error","code":"agent_offline","status_code":503,
       "message":"agent is offline"}

data: {"type":"done","is_error":true,"code":"agent_offline",
       "error":"agent is offline"}
No event: lines on this endpoint. Standard EventSource-style clients receive all frames as the default "message" event; dispatch on data.type.

Required client logic (invoke)

  1. Append delta.text to a running buffer as deltas arrive.
  2. Treat the first done frame as terminal — close the connection.
  3. If done.is_error === true, surface done.code + done.error to the caller. Don’t retry blindly on agent_reply_error (in-band) but DO retry on service_timeout / agent_offline (transport).
The done.text field is the full assembled reply — chunks are a UX nicety, not the truth. SDKs that don’t need streaming UX can ignore deltas entirely and just consume done.

3. The task / conversation SSE flow (named events)

Both tasks/{id}/events and conversations/{id}/events use the same framing:
event: message
data: { …per-frame JSON envelope… }

event: message
data: { …another frame… }

event: end
data: {"reason":"task_terminal"}
The JSON envelope schema is shared (SSEStreamMessage):
FieldTypeNotes
typestringE.g. chat_message, agent_message_chunk, agent_reply, agent_reply_error, agent.input_required
statestringEnvelope v3 lifecycle marker: streaming, completed, failed, cancelled. Single-shot frames (e.g. chat_message) carry completed from the start; agent_reply arrives as a sequence of state="streaming" frames culminating in one state="completed".
stop_reasonstringSet only on the terminal completed/failed frame for agent_reply (end_turn, error, cancelled, length).
bodystringEnvelope v3 cumulative text snapshot for agent_reply streaming. Every PATCH writes the full accumulated reply so far — clients SHOULD render body directly and IGNORE the legacy per-token agent_message_chunk frames if both are present.
partsarrayEnvelope v3 structured parts (tool calls, attachments, etc.) accumulated alongside body. Same cumulative snapshot semantic.
message_idstringUUID; unique per message in the channel. One v3 agent_reply envelope = one message_id across all streaming frames + the terminal one.
offsetint64Monotonic position in the channel log (use as since on reconnect). Different per streaming frame even though message_id repeats.
in_reply_tostringmessage_id of the request this is replying to (set on agent_reply / chunks)
publisher_idstringuser:<uid> or agent:<id> — who emitted it
payloadanyPer-type body (e.g. { "text": "..." } for chat_message). For v3 agent_reply, prefer top-level body over payload.text.
created_atRFC3339 stringServer timestamp
Envelope v3 (ADR-0022 + ADR-0023, GA in v1.1): Agent replies are “one row, mutating in place”. The same message_id appears across N streaming frames (each a cumulative snapshot of body / parts) followed by exactly one state="completed" (or failed / cancelled) terminal frame. Legacy per-token agent_reply_delta rows are no longer emitted by beeos-claw and will not appear on new channels, but historical rows remain readable via ?include_deltas=true on the GET /messages polling path.

Terminal event: end

Emitted exactly once before the connection closes. The reason distinguishes the cause:
reasonSourceMeaning
task_terminaltasks/events onlyAn agent_reply (or _error / agent.refuse / agent_busy) was just forwarded — task is complete
channel_closedconversations/events (or tasks/events when DELETE-d)The channel was explicitly closed (user DELETE, peer with same auth, or task cancel)
stream_closedbothUpstream Message Service stream ended — typically MS restart or TTL eviction
The end frame is the only signal — there are no in-band keepalive comments and no Connection: close semantics. If you see end, DON’T reconnect with since=lastOffset — the channel is gone or terminal; reconnecting won’t replay anything new.

Required client logic (task / conversation)

const es = new EventSource(url);
let lastOffset = startingSince ?? 0;

es.addEventListener("message", (evt) => {
  const frame = JSON.parse(evt.data);
  lastOffset = frame.offset ?? lastOffset;
  handle(frame);  // UI render, etc.
});

es.addEventListener("end", (evt) => {
  const { reason } = JSON.parse(evt.data);
  console.log("stream ended:", reason);
  es.close();
  // Do NOT reconnect — terminal.
});

es.onerror = () => {
  // Connection dropped before `end`. Reconnect with since=lastOffset.
  setTimeout(() => connect(`${url}?since=${lastOffset}`), backoffMs);
};

4. The since cursor (drop / reconnect compensation)

since is an integer offset on the channel’s monotonic log. Both /tasks/{id}/events and /conversations/{id}/events accept it as a query parameter. The semantics:
  • since=0 (or omitted) — replay the full history of the channel, then keep streaming new events. Useful for late attachers who want the entire transcript.
  • since=N (N > 0) — replay every event at offset > N, then keep streaming. Use this on reconnect: pass the last observed frame.offset so you don’t see duplicates or miss any frames.
Offsets are strictly monotonic but NOT guaranteed contiguous (ADR-0022 §1.2). A producer-side storage failure may leave a small hole — you may observe … 40, 42, … with 41 missing. Always treat offset > since as the resume invariant; never assume offset == since + 1. Code like if (newOffset !== lastOffset + 1) throw is a bug.
The same cursor doubles as the pagination key on the non-streaming GET /messages endpoint: ?since=<lastOffset>&limit=200 returns up to 200 frames at offset > lastOffset. Mixing streaming + polling is fine.OpenAPI v1.1 (ADR-0022 + ADR-0023): GET /messages default-filters out ephemeral streaming chunks (agent_reply_delta, agent_thought_chunk, agent_message_chunk). With v3 envelopes (ADR-0023) live agent_reply rows carry the full cumulative reply in body — you no longer need include_deltas=true to reconstruct the text, and new beeos-claw channels won’t have any agent_reply_delta rows at all. The flag remains supported for reading historical pre-v3 channels. latest_offset still reflects the full server-side max so since=<latest_offset> resumes from the right place either way.

Edge cases

  1. Connection dropped before any frame. lastOffset is still 0 — reconnect with since=0, which replays from the beginning. On invoke SSE (no offsets) you have to redrive the invoke altogether (the underlying chat_message wasn’t durable because you never got a message_id to retry idempotently against).
  2. Last frame was agent_message_chunk then connection dropped. Reconnect with since=<that chunk's offset>. You will get every remaining chunk PLUS the final agent_reply. There is no risk of duplicating already-rendered text — each chunk has a distinct message_id.
  3. Same client opens two SSE connections to the same task. Both get the full live stream. Message Service is fan-out — there’s no “you already have a subscription” semantic on the gateway.
  4. backfill_truncated frame on reconnect (since OpenAPI v1.1 / ADR-0022). When the channel has been idle long enough that the ephemeral stream has aged out before your Last-Event-ID, the server emits a single backfill_truncated event before the normal replay_complete. Shape:
    event: backfill_truncated
    data: { "oldest_redis_offset": 4711, "since": 3120,
            "hint": "ephemeral chunks before oldest_redis_offset have aged out of the Redis stream" }
    
    Recovery options, in order of decreasing fidelity:
    • Replay surviving durable rows — fetch GET /messages?since=<since> (durable rows only); you’ll miss the per-token chunks but recover the final reply / non-chunk state. The server only emits this frame for ephemeral types, so chat_message / agent_reply / agent.input_required etc. are still in the durable log.
    • Fast-forward — resume with since=replay_complete.latest_offset and accept that the intermediate token chunks are gone. This is what the official SDK does by default since the chunks are rendering UX, not data of record.
    SDKs that don’t recognise backfill_truncated SHOULD treat the frame as a no-op message — it’s purely informational and doesn’t change the rest of the protocol.

5. Keepalive

Neither SSE handler emits explicit keepalive comments today. Strategies:

Browser / EventSource

EventSource automatically reconnects on socket close. Use the since=<lastOffset> query-string parameter to compensate for the gap. Note: EventSource cannot set headers, so for oag_ keys you either:
  • Hand-roll the request via fetch + manual SSE parsing (recommended for production — gives you precise control over headers, retries, and reconnect timing); or
  • Embed the token in the URL (?access_token=...) — avoid this: the URL ends up in CDN logs / proxy access logs / browser history.

Node.js (server-to-server)

Use fetch + response.body as an async iterable, or the eventsource package (which supports custom headers). Implement an exponential backoff on onerror and pass the last observed offset:
let attempt = 0;
let lastOffset = 0;
let stop = false;

while (!stop) {
  try {
    const url = `${BASE}/api/v1/agents/${agentId}/tasks/${taskId}/events?since=${lastOffset}`;
    const res = await fetch(url, { headers: { Authorization: `Bearer ${TOKEN}` } });
    if (!res.body) throw new Error("no body");
    const reader = res.body.getReader();
    // ...parse SSE frames, update lastOffset on each `message`...
    // ...break out on `event: end`...
    attempt = 0;  // reset on a clean run
  } catch (e) {
    attempt++;
    await new Promise((r) => setTimeout(r, Math.min(2 ** attempt * 250, 30_000)));
  }
}

Go

github.com/r3labs/sse/v2 handles reconnect + offset compensation out of the box. Pass since=<offset> and let it resume on its own.

Server-side considerations

If you run your client behind a corporate proxy / NLB / CDN, the idle connection cap might be lower than your turn cadence. Tested limits on common infra:
ProxyIdle SSE cap
Browser default~60s before retry
AWS NLB350s (HTTP keepalive timeout)
Cloudflare Free100s
Cloudflare Pro+100s — 6h (configurable)
Your client MUST handle reconnect regardless. The since= cursor exists precisely so reconnects are loss-free.

6. Error frames vs done (invoke) / end (task & conversation)

Different endpoints have different shutdown grammars; misreading them is the most common SDK bug we see.
EndpointError grammar
invoke SSEerror frame (optional) then always a done frame. Both carry code; done.is_error flips on. Stream closes after done.
task SSENo error frame. Errors arrive as regular message frames with type agent_reply_error / agent.refuse / agent_busy, then end reason=task_terminal.
conversation SSENo error frame and no auto-end on terminal reply. The error envelope arrives as a regular message frame; the stream stays open for the next turn.
This is why the matching rule is:
  • invoke: branch on data.type === "done" to decide “stop”.
  • task: branch on event === "end" (the named SSE event) to decide “stop”; everything before is a message.
  • conversation: same as task, but end fires only on explicit delete / upstream close, not on any per-turn terminal.

7. Worked example — robust task watcher (TypeScript)

This snippet shows the full reconnect-on-error + offset-resume loop for the task SSE surface. It’s intentionally written without the SDK so the wire mechanics are visible.
import { Buffer } from "node:buffer";

interface Frame {
  type: string;
  message_id?: string;
  offset?: number;
  in_reply_to?: string;
  payload?: any;
  created_at?: string;
}

async function watchTask(taskId: string, agentId: string, token: string) {
  let lastOffset = 0;
  let attempt = 0;

  loop: while (true) {
    const url =
      `https://openapi.beeos.ai/api/v1/agents/${agentId}/tasks/${taskId}/events` +
      `?since=${lastOffset}`;
    try {
      const res = await fetch(url, {
        headers: { Authorization: `Bearer ${token}`, Accept: "text/event-stream" },
      });
      if (!res.ok || !res.body) throw new Error(`http ${res.status}`);
      const reader = res.body.getReader();
      const dec = new TextDecoder();
      let buf = "";

      while (true) {
        const { value, done } = await reader.read();
        if (done) break;
        buf += dec.decode(value, { stream: true });
        // SSE: frames separated by blank lines
        let idx: number;
        while ((idx = buf.indexOf("\n\n")) >= 0) {
          const block = buf.slice(0, idx);
          buf = buf.slice(idx + 2);
          let evName = "message";
          let data = "";
          for (const line of block.split("\n")) {
            if (line.startsWith("event:")) evName = line.slice(6).trim();
            else if (line.startsWith("data:")) data += line.slice(5).trim();
          }
          if (evName === "end") {
            console.log("task ended:", JSON.parse(data).reason);
            break loop;          // <- terminal; do NOT retry
          }
          const f: Frame = JSON.parse(data);
          if (f.offset != null) lastOffset = f.offset;
          render(f);
        }
      }
      // EOF without `end` — reconnect (atypical; treat as transient)
    } catch (e) {
      attempt++;
      const wait = Math.min(2 ** attempt * 250, 30_000);
      await new Promise((r) => setTimeout(r, wait));
      continue;
    }
    attempt = 0;
  }
}
The same shape works for conversations — replace the event === "end" break with a “stop on explicit unsubscribe” flag, since conversations don’t terminate on per-turn reply.

8. Frame replay & idempotency

Every frame’s message_id is unique per channel. Replays caused by reconnects MAY redeliver frames you’ve already seen if your lastOffset tracking lost an update — UI code SHOULD dedupe on message_id. Internally Message Service uses an offset-only ordering invariant (monotonic per channel), so seeing two frames with the same message_id but different offset indicates a bug worth reporting.

9. Common mistakes

  • Polling lastOffset = 0 on reconnect. Re-replays the entire channel history, costs you bandwidth, and floods your UI with duplicate frames you’ve already rendered. Always track the latest observed frame.offset and pass it as since=.
  • Treating event: end as a recoverable error. It’s not — it means there will be NO more frames. Reconnecting just yields another end (or 404 if the channel was evicted). Close the connection and move on.
  • Branching on event name for invoke SSE. Invoke SSE emits unnamed frames; clients keyed on event === "message" will treat every frame as ignorable. Use JSON.parse(data).type === "done" for invoke; use event === "end" for task / conversation.
  • Mixing Accept: text/event-stream with POST /tasks/.... The task SSE surface is the separate GET /events endpoint, not the create call. POST /tasks always returns a JSON 200 with the task ID; you then open SSE on GET /tasks/{id}/events.
  • Embedding the token in the URL. Set Authorization: via a custom-header SSE client (or fetch + manual parsing). URLs end up in proxy / CDN access logs.

10. See also