skillby Lobbi-Docs

streaming

Server-Sent Events (SSE) streaming for Claude API with support for text, tool use, and extended thinking. Activate for real-time responses, stream handling, and progressive output.

Installs: 0
Used in: 1 repos
Updated: 1d ago
$npx ai-builder add skill Lobbi-Docs/streaming

Installs to .claude/skills/streaming/

# Streaming Skill

Implement real-time streaming responses from Claude API using Server-Sent Events (SSE).

## When to Use This Skill

- Real-time user interfaces
- Long-running generations
- Progressive output display
- Tool use with streaming
- Extended thinking visualization

## SSE Event Flow

```
message_start
    → content_block_start
        → content_block_delta (repeated)
    → content_block_stop
    → (more blocks...)
→ message_delta
→ message_stop
```

## Core Implementation

### Basic Text Streaming (Python)

```python
import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a short story."}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
```

### Event-Based Streaming

```python
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="")
            elif event.delta.type == "input_json_delta":
                # Tool input (accumulate, don't parse yet!)
                tool_input_buffer += event.delta.partial_json
        elif event.type == "content_block_stop":
            # Now safe to parse tool input
            if tool_input_buffer:
                tool_input = json.loads(tool_input_buffer)
```

### TypeScript Streaming

```typescript
import Anthropic from '@anthropic-ai/sdk';

const client = new Anthropic();

const stream = client.messages.stream({
    model: 'claude-sonnet-4-20250514',
    max_tokens: 1024,
    messages: [{ role: 'user', content: 'Write a story.' }]
});

for await (const event of stream) {
    if (event.type === 'content_block_delta' &&
        event.delta.type === 'text_delta') {
        process.stdout.write(event.delta.text);
    }
}

const finalMessage = await stream.finalMessage();
```

## Event Types Reference

| Event | When | Data |
|-------|------|------|
| `message_start` | Beginning | Message metadata |
| `content_block_start` | Block begins | Block type, index |
| `content_block_delta` | Content chunk | Delta content |
| `content_block_stop` | Block ends | - |
| `message_delta` | Message update | Stop reason, usage |
| `message_stop` | Complete | - |

## Delta Types

| Delta Type | Content | When |
|-----------|---------|------|
| `text_delta` | `.text` | Text content |
| `input_json_delta` | `.partial_json` | Tool input |
| `thinking_delta` | `.thinking` | Extended thinking |
| `signature_delta` | `.signature` | Thinking signature |

## Tool Use Streaming

### Critical Rule: Never Parse JSON Mid-Stream!

```python
# WRONG - Will fail on partial JSON!
for event in stream:
    if event.delta.type == "input_json_delta":
        tool_input = json.loads(event.delta.partial_json)  # FAILS!

# CORRECT - Accumulate then parse
tool_json_buffer = ""
for event in stream:
    if event.delta.type == "input_json_delta":
        tool_json_buffer += event.delta.partial_json
    elif event.type == "content_block_stop":
        if tool_json_buffer:
            tool_input = json.loads(tool_json_buffer)  # Safe now!
            tool_json_buffer = ""
```

### Complete Tool Streaming Pattern

```python
def stream_with_tools(client, messages, tools):
    current_block = None
    tool_input_buffer = ""

    with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=4096,
        messages=messages,
        tools=tools
    ) as stream:
        for event in stream:
            if event.type == "content_block_start":
                current_block = event.content_block
                tool_input_buffer = ""

            elif event.type == "content_block_delta":
                if event.delta.type == "text_delta":
                    yield {"type": "text", "content": event.delta.text}
                elif event.delta.type == "input_json_delta":
                    tool_input_buffer += event.delta.partial_json

            elif event.type == "content_block_stop":
                if current_block.type == "tool_use":
                    yield {
                        "type": "tool_call",
                        "id": current_block.id,
                        "name": current_block.name,
                        "input": json.loads(tool_input_buffer)
                    }
```

## Extended Thinking Streaming

```python
thinking_content = ""
signature = ""

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=16000,
    thinking={"type": "enabled", "budget_tokens": 10000},
    messages=[{"role": "user", "content": "Solve this complex problem..."}]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "thinking_delta":
                thinking_content += event.delta.thinking
                # Optionally display thinking in UI
            elif event.delta.type == "signature_delta":
                signature = event.delta.signature
            elif event.delta.type == "text_delta":
                print(event.delta.text, end="")
```

## Error Handling

### Retriable Errors

```python
import time

RETRIABLE_ERRORS = [529, 429, 500, 502, 503]

def stream_with_retry(client, **kwargs):
    max_retries = 3
    base_delay = 1

    for attempt in range(max_retries):
        try:
            with client.messages.stream(**kwargs) as stream:
                for event in stream:
                    yield event
            return
        except anthropic.APIStatusError as e:
            if e.status_code in RETRIABLE_ERRORS and attempt < max_retries - 1:
                delay = base_delay * (2 ** attempt)
                time.sleep(delay)
            else:
                raise
```

### Silent Overloaded Errors

```python
# CRITICAL: Check for error events even at HTTP 200!
for event in stream:
    if event.type == "error":
        if event.error.type == "overloaded_error":
            # Retry with backoff
            pass
```

## Connection Management

### Keep-Alive Configuration

```python
import httpx

# Proper timeout configuration
http_client = httpx.Client(
    timeout=httpx.Timeout(
        connect=10.0,    # Connection timeout
        read=120.0,      # Read timeout (long for streaming!)
        write=30.0,      # Write timeout
        pool=30.0        # Pool timeout
    )
)

client = anthropic.Anthropic(http_client=http_client)
```

### Connection Pooling

```python
http_client = httpx.Client(
    limits=httpx.Limits(
        max_keepalive_connections=20,
        max_connections=100,
        keepalive_expiry=30.0
    )
)
```

## Best Practices

### DO:
- Set read timeout >= 60 seconds
- Accumulate tool JSON, parse after block_stop
- Handle error events even at HTTP 200
- Use exponential backoff for retries

### DON'T:
- Parse partial JSON during streaming
- Use short timeouts
- Ignore overloaded_error events
- Leave connections idle >5 minutes

## UI Integration Pattern

```python
async def stream_to_ui(websocket, prompt):
    """Stream Claude response to WebSocket client"""
    async with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=4096,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        async for text in stream.text_stream:
            await websocket.send_json({
                "type": "chunk",
                "content": text
            })

        await websocket.send_json({
            "type": "complete",
            "usage": stream.get_final_message().usage
        })
```

## See Also

- [[llm-integration]] - API basics
- [[tool-use]] - Tool calling
- [[extended-thinking]] - Deep reasoning

Quick Install

$npx ai-builder add skill Lobbi-Docs/streaming

Details

Type
skill
Slug
Lobbi-Docs/streaming
Created
4d ago