skillby Lobbi-Docs
streaming
Server-Sent Events (SSE) streaming for Claude API with support for text, tool use, and extended thinking. Activate for real-time responses, stream handling, and progressive output.
Installs: 0
Used in: 1 repos
Updated: 1d ago
$
npx ai-builder add skill Lobbi-Docs/streamingInstalls to .claude/skills/streaming/
# Streaming Skill
Implement real-time streaming responses from Claude API using Server-Sent Events (SSE).
## When to Use This Skill
- Real-time user interfaces
- Long-running generations
- Progressive output display
- Tool use with streaming
- Extended thinking visualization
## SSE Event Flow
```
message_start
→ content_block_start
→ content_block_delta (repeated)
→ content_block_stop
→ (more blocks...)
→ message_delta
→ message_stop
```
## Core Implementation
### Basic Text Streaming (Python)
```python
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a short story."}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
```
### Event-Based Streaming
```python
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="")
elif event.delta.type == "input_json_delta":
# Tool input (accumulate, don't parse yet!)
tool_input_buffer += event.delta.partial_json
elif event.type == "content_block_stop":
# Now safe to parse tool input
if tool_input_buffer:
tool_input = json.loads(tool_input_buffer)
```
### TypeScript Streaming
```typescript
import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic();
const stream = client.messages.stream({
model: 'claude-sonnet-4-20250514',
max_tokens: 1024,
messages: [{ role: 'user', content: 'Write a story.' }]
});
for await (const event of stream) {
if (event.type === 'content_block_delta' &&
event.delta.type === 'text_delta') {
process.stdout.write(event.delta.text);
}
}
const finalMessage = await stream.finalMessage();
```
## Event Types Reference
| Event | When | Data |
|-------|------|------|
| `message_start` | Beginning | Message metadata |
| `content_block_start` | Block begins | Block type, index |
| `content_block_delta` | Content chunk | Delta content |
| `content_block_stop` | Block ends | - |
| `message_delta` | Message update | Stop reason, usage |
| `message_stop` | Complete | - |
## Delta Types
| Delta Type | Content | When |
|-----------|---------|------|
| `text_delta` | `.text` | Text content |
| `input_json_delta` | `.partial_json` | Tool input |
| `thinking_delta` | `.thinking` | Extended thinking |
| `signature_delta` | `.signature` | Thinking signature |
## Tool Use Streaming
### Critical Rule: Never Parse JSON Mid-Stream!
```python
# WRONG - Will fail on partial JSON!
for event in stream:
if event.delta.type == "input_json_delta":
tool_input = json.loads(event.delta.partial_json) # FAILS!
# CORRECT - Accumulate then parse
tool_json_buffer = ""
for event in stream:
if event.delta.type == "input_json_delta":
tool_json_buffer += event.delta.partial_json
elif event.type == "content_block_stop":
if tool_json_buffer:
tool_input = json.loads(tool_json_buffer) # Safe now!
tool_json_buffer = ""
```
### Complete Tool Streaming Pattern
```python
def stream_with_tools(client, messages, tools):
current_block = None
tool_input_buffer = ""
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=messages,
tools=tools
) as stream:
for event in stream:
if event.type == "content_block_start":
current_block = event.content_block
tool_input_buffer = ""
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
yield {"type": "text", "content": event.delta.text}
elif event.delta.type == "input_json_delta":
tool_input_buffer += event.delta.partial_json
elif event.type == "content_block_stop":
if current_block.type == "tool_use":
yield {
"type": "tool_call",
"id": current_block.id,
"name": current_block.name,
"input": json.loads(tool_input_buffer)
}
```
## Extended Thinking Streaming
```python
thinking_content = ""
signature = ""
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=16000,
thinking={"type": "enabled", "budget_tokens": 10000},
messages=[{"role": "user", "content": "Solve this complex problem..."}]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "thinking_delta":
thinking_content += event.delta.thinking
# Optionally display thinking in UI
elif event.delta.type == "signature_delta":
signature = event.delta.signature
elif event.delta.type == "text_delta":
print(event.delta.text, end="")
```
## Error Handling
### Retriable Errors
```python
import time
RETRIABLE_ERRORS = [529, 429, 500, 502, 503]
def stream_with_retry(client, **kwargs):
max_retries = 3
base_delay = 1
for attempt in range(max_retries):
try:
with client.messages.stream(**kwargs) as stream:
for event in stream:
yield event
return
except anthropic.APIStatusError as e:
if e.status_code in RETRIABLE_ERRORS and attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
time.sleep(delay)
else:
raise
```
### Silent Overloaded Errors
```python
# CRITICAL: Check for error events even at HTTP 200!
for event in stream:
if event.type == "error":
if event.error.type == "overloaded_error":
# Retry with backoff
pass
```
## Connection Management
### Keep-Alive Configuration
```python
import httpx
# Proper timeout configuration
http_client = httpx.Client(
timeout=httpx.Timeout(
connect=10.0, # Connection timeout
read=120.0, # Read timeout (long for streaming!)
write=30.0, # Write timeout
pool=30.0 # Pool timeout
)
)
client = anthropic.Anthropic(http_client=http_client)
```
### Connection Pooling
```python
http_client = httpx.Client(
limits=httpx.Limits(
max_keepalive_connections=20,
max_connections=100,
keepalive_expiry=30.0
)
)
```
## Best Practices
### DO:
- Set read timeout >= 60 seconds
- Accumulate tool JSON, parse after block_stop
- Handle error events even at HTTP 200
- Use exponential backoff for retries
### DON'T:
- Parse partial JSON during streaming
- Use short timeouts
- Ignore overloaded_error events
- Leave connections idle >5 minutes
## UI Integration Pattern
```python
async def stream_to_ui(websocket, prompt):
"""Stream Claude response to WebSocket client"""
async with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
await websocket.send_json({
"type": "chunk",
"content": text
})
await websocket.send_json({
"type": "complete",
"usage": stream.get_final_message().usage
})
```
## See Also
- [[llm-integration]] - API basics
- [[tool-use]] - Tool calling
- [[extended-thinking]] - Deep reasoningQuick Install
$
npx ai-builder add skill Lobbi-Docs/streamingDetails
- Type
- skill
- Author
- Lobbi-Docs
- Slug
- Lobbi-Docs/streaming
- Created
- 4d ago