Skip to Content
APIStreaming (SSE)

Streaming (SSE)

The Computer Agents API uses Server-Sent Events (SSE) to stream real-time responses from agent executions. This enables you to show progress, tool calls, and results as they happen.

Overview

When you create a thread with messages or send a follow-up message, the API returns an SSE stream with events describing the execution progress.

curl -X POST https://api.computer-agents.com/v1/threads \ -H "Authorization: Bearer $API_KEY" \ -H "Content-Type: application/json" \ -d '{ "messages": [{ "role": "user", "content": "Create hello.py" }], "stream": true }'

Event Format

Each event follows this format:

data: {"type":"event_type","...payload"}

Events are newline-separated. Parse each line starting with data: as JSON.


Execution Paths

The API uses an intelligent Router Agent that determines the optimal execution path for each request:

Direct Response Path (No Computer)

For questions the LLM can answer directly:

data: {"type":"thread.created","thread":{"id":"thread_xxx",...}} data: {"type":"direct.response","message":"React is a JavaScript library...","threadId":"thread_xxx"} data: {"type":"response.completed","response":{"status":"completed",...}} data: {"type":"stream.completed"}

Computer Execution Path

For tasks requiring file access or commands:

data: {"type":"thread.created","thread":{"id":"thread_xxx",...}} data: {"type":"computer.starting","threadId":"thread_xxx"} data: {"type":"response.item.completed","item":{"type":"planning","content":{"text":"I'm starting my computer to..."}}} data: {"type":"response.started","response":{...}} ... execution events ... data: {"type":"response.completed","response":{...}} data: {"type":"stream.completed"}

Event Types

thread.created

Emitted when a new thread is created (first event for new threads).

{ "type": "thread.created", "thread": { "id": "thread_xxx", "status": "running", "environmentId": "env_xxx" } }

direct.response

Emitted when the Router Agent responds directly without using the computer. This means the LLM answered the question from its knowledge.

{ "type": "direct.response", "message": "React is a JavaScript library for building user interfaces...", "threadId": "thread_xxx", "userId": "user_xxx" }

When you receive direct.response, no container was started. The response is complete and fast (~1-2s).

computer.starting

Emitted when the Router Agent decides to use the computer. This triggers container initialization.

{ "type": "computer.starting", "threadId": "thread_xxx", "userId": "user_xxx" }

response.item.completed (planning)

Emitted after computer.starting with the Router Agent’s planning message explaining what it will do.

{ "type": "response.item.completed", "item": { "type": "planning", "id": "planning_xxx", "role": "assistant", "status": "completed", "content": { "text": "I'm starting my computer to create a hello.py file and run it for you." } } }

response.started

Emitted when container execution begins (after planning message for computer tasks).

{ "type": "response.started", "response": { "id": "run_xxx", "thread_id": "thread_xxx", "user_id": "user_xxx", "workspace_id": "workspace_xxx", "model": "codex", "execution_mode": "containerized", "created_at": 1705312200, "status": "in_progress" } }

response.item.completed

Emitted when an output item is fully completed. Item types include planning, reasoning, message, tool_call, file_change, setup, and error.

Message item:

{ "type": "response.item.completed", "item": { "type": "message", "id": "msg_xxx", "role": "assistant", "status": "completed", "content": [{ "type": "text", "text": "I've created hello.py..." }] } }

Tool call item:

{ "type": "response.item.completed", "item": { "type": "tool_call", "id": "tool_xxx", "status": "completed", "tool": { "type": "command_execution", "command": "python hello.py", "output": "Hello, World!", "exit_code": 0 }, "metadata": { "duration_ms": 150 } } }

Reasoning item:

{ "type": "response.item.completed", "item": { "type": "reasoning", "id": "reasoning_xxx", "status": "completed", "content": { "text": "I should create a simple Python file..." } } }

response.completed

Emitted when the full response is complete.

{ "type": "response.completed", "response": { "id": "run_xxx", "thread_id": "thread_xxx", "status": "completed", "duration_ms": 5234, "usage": { "input_tokens": 500, "output_tokens": 250, "cached_tokens": 0, "total_tokens": 750 }, "completed_at": 1705312205 } }

response.failed

Emitted when the response fails (replaces response.completed on error).

{ "type": "response.failed", "response": { "id": "run_xxx", "thread_id": "thread_xxx", "status": "failed", "error": { "message": "Execution timeout", "code": "execution_timeout" }, "duration_ms": 60000, "failed_at": 1705312260 } }

stream.completed

Final event indicating the stream has ended successfully.

{ "type": "stream.completed" }

Token usage and cost data are provided in the response.completed event, not in stream.completed.

stream.error

Emitted when the stream encounters an error.

{ "type": "stream.error", "error": { "message": "Connection lost", "code": "stream_error" } }

Client Implementation

JavaScript/TypeScript

async function executeWithStream(task: string) { const response = await fetch('https://api.computer-agents.com/v1/threads', { method: 'POST', headers: { 'Authorization': `Bearer ${API_KEY}`, 'Content-Type': 'application/json' }, body: JSON.stringify({ messages: [{ role: 'user', content: task }], stream: true }) }); const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); buffer = lines.pop() || ''; // Keep incomplete line in buffer for (const line of lines) { if (line.startsWith('data: ')) { try { const event = JSON.parse(line.slice(6)); handleEvent(event); } catch (e) { console.error('Failed to parse event:', line); } } } } } function handleEvent(event: any) { switch (event.type) { // Thread lifecycle case 'thread.created': console.log('Thread created:', event.thread.id); break; // Router Agent decisions case 'direct.response': // LLM answered directly - no computer needed console.log('Direct response:', event.message); break; case 'computer.starting': console.log('Starting computer...'); break; // Execution events case 'response.item.completed': if (event.item.type === 'planning') { console.log('Planning:', event.item.content.text); } else if (event.item.type === 'message') { console.log('Response:', event.item.content[0]?.text); } else if (event.item.type === 'reasoning') { console.log('Thinking:', event.item.content.text); } break; case 'response.content_part.delta': process.stdout.write(event.delta.text); break; // Completion case 'response.completed': console.log('\nCompleted. Tokens:', event.response.usage); break; case 'stream.completed': console.log('Stream ended.'); break; // Errors case 'stream.error': case 'error': console.error('Error:', event.error.message); break; } }

Python

import requests import json def execute_with_stream(task: str): response = requests.post( 'https://api.computer-agents.com/v1/threads', headers={ 'Authorization': f'Bearer {API_KEY}', 'Content-Type': 'application/json' }, json={ 'messages': [{'role': 'user', 'content': task}], 'stream': True }, stream=True ) for line in response.iter_lines(): if line: line = line.decode('utf-8') if line.startswith('data: '): try: event = json.loads(line[6:]) handle_event(event) except json.JSONDecodeError: pass def handle_event(event): event_type = event.get('type') # Thread lifecycle if event_type == 'thread.created': print(f"Thread created: {event['thread']['id']}") # Router Agent decisions elif event_type == 'direct.response': # LLM answered directly - no computer needed print(f"Response: {event['message']}") elif event_type == 'computer.starting': print("Starting computer...") # Execution events elif event_type == 'response.item.completed': item = event.get('item', {}) if item.get('type') == 'planning': print(f"Planning: {item['content']['text']}") elif item.get('type') == 'message': content = item.get('content', []) if content and content[0].get('text'): print(f"Response: {content[0]['text']}") elif item.get('type') == 'reasoning': print(f"Thinking: {item['content']['text']}") elif event_type == 'response.content_part.delta': print(event['delta']['text'], end='', flush=True) # Completion elif event_type == 'response.completed': usage = event.get('response', {}).get('usage', {}) print(f"\nCompleted. Tokens: {usage}") elif event_type == 'stream.completed': usage = event.get('usage', {}) cost = usage.get('totalCost', 0) print(f"Done! Cost: ${cost:.4f}") # Errors elif event_type in ('stream.error', 'error'): print(f"Error: {event['error']['message']}")

React Hook

import { useState, useCallback } from 'react'; interface StreamState { threadId: string | null; content: string; planning: string | null; // Planning message before computer execution isComputerExecution: boolean; // Whether computer was used status: 'idle' | 'streaming' | 'completed' | 'error'; error: string | null; cost: number | null; } export function useAgentStream() { const [state, setState] = useState<StreamState>({ threadId: null, content: '', planning: null, isComputerExecution: false, status: 'idle', error: null, cost: null }); const execute = useCallback(async (task: string) => { setState(s => ({ ...s, status: 'streaming', content: '', planning: null, isComputerExecution: false, error: null })); try { const response = await fetch('/api/threads', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ messages: [{ role: 'user', content: task }], stream: true }) }); const reader = response.body!.getReader(); const decoder = new TextDecoder(); let buffer = ''; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); buffer = lines.pop() || ''; for (const line of lines) { if (line.startsWith('data: ')) { const event = JSON.parse(line.slice(6)); switch (event.type) { // Thread lifecycle case 'thread.created': setState(s => ({ ...s, threadId: event.thread.id })); break; // Router Agent: Direct LLM response (no computer) case 'direct.response': setState(s => ({ ...s, content: event.message })); break; // Router Agent: Computer starting case 'computer.starting': setState(s => ({ ...s, isComputerExecution: true })); break; // Item completed (planning, message, reasoning) case 'response.item.completed': if (event.item.type === 'planning') { setState(s => ({ ...s, planning: event.item.content.text })); } else if (event.item.type === 'message') { const text = event.item.content?.[0]?.text || ''; setState(s => ({ ...s, content: text })); } break; // Response completed (contains usage data) case 'response.completed': const usage = event.response?.usage; if (usage) { // Calculate cost from tokens (approximate) const inputCost = (usage.input_tokens / 1000) * 0.01; const outputCost = (usage.output_tokens / 1000) * 0.03; setState(s => ({ ...s, cost: inputCost + outputCost })); } break; // Stream completed (final event) case 'stream.completed': setState(s => ({ ...s, status: 'completed' })); break; // Errors case 'error': case 'stream.error': setState(s => ({ ...s, status: 'error', error: event.error.message })); break; } } } } } catch (error) { setState(s => ({ ...s, status: 'error', error: error instanceof Error ? error.message : 'Unknown error' })); } }, []); return { ...state, execute }; }

Non-Streaming Mode

Set stream: false to receive a single JSON response instead:

curl -X POST https://api.computer-agents.com/v1/threads \ -H "Authorization: Bearer $API_KEY" \ -H "Content-Type: application/json" \ -d '{ "messages": [{ "role": "user", "content": "Create hello.py" }], "stream": false }'

Response:

{ "thread": { ... }, "execution": { "success": true, "response": "I've created hello.py...", "actions": ["Created: hello.py"], "durationMs": 5234, "usage": { "inputTokens": 150, "outputTokens": 50 } } }

Non-streaming mode waits for the full execution to complete before responding. Use streaming for better UX with progress updates.


Error Handling

Handle errors from both stream.error (stream-level errors) and response.failed (execution errors):

function handleEvent(event: any) { // Stream-level error (connection issues, etc.) if (event.type === 'stream.error') { const { code, message } = event.error; showErrorMessage(message); return; } // Execution-level error (the agent failed) if (event.type === 'response.failed') { const { code, message } = event.response.error; switch (code) { case 'usage_exhausted': showUpgradePrompt(); break; case 'execution_timeout': showRetryOption(); break; default: showErrorMessage(message); } return; } // Handle other events... }

Common Error Codes

CodeDescription
usage_exhaustedMonthly usage allowance exhausted
execution_timeoutTask took too long
execution_failedAgent execution error
rate_limitedToo many requests
stream_errorStream connection error

Last updated on