LangGraph Chapter 10 — Production Agents: Streaming, Tracing & Scaling
Senior Architect Interview Series — LangGraph & Agentic AI
Navigation
← Chapter 9 — Error Handling | ← Back to Chapter 1
10.0 What This Chapter Covers
Chapters 1–9 covered how to build agents that work. This chapter covers how to run them at production scale. The gap between a working demo and a reliable production system is filled by:
- Streaming — how to deliver tokens as they're generated
- Async design — handling many concurrent requests
- LangSmith tracing — observability for debugging agent behavior
- Cost management — controlling token spend at scale
- Scaling patterns — horizontal scaling and load balancing
- Deployment considerations — containers, health checks, graceful shutdown
- Monitoring and alerting — what to measure
10.1 Why Production Agents Are Hard
┌──────────────────────────────────────────────────────┐
│ PRODUCTION COMPLEXITY LAYERS │
│ │
│ Demo: Single user, no errors, local │
│ Dev: Multiple users, occasional errors │
│ Production: Hundreds of concurrent users, │
│ persistent failures, cost pressure, │
│ latency SLAs, compliance, observability │
└──────────────────────────────────────────────────────┘
Your current project runs correctly in development. The production journey requires addressing each layer:
| Challenge | Chapter Reference | Production Solution |
|---|---|---|
| Agent loops infinitely | Ch. 9 | Iteration limits |
| Tool failures crash agent | Ch. 9 | Error ToolMessages |
| No audit trail | This chapter | LangSmith tracing |
| Slow response (user waiting) | This chapter | Streaming |
| Can't scale horizontally | This chapter | Stateless + Redis |
| High cost | This chapter | Token optimization |
10.2 Streaming — Delivering Tokens as Generated
The Problem Without Streaming
Without streaming, the user experience is:
- User sends question
- Wait… (agent runs, LLM generates, total 5-15 seconds)
- Full answer appears at once
This feels laggy even if total time is acceptable. The perceived latency is the full time-to-completion.
With Streaming
With streaming, the user experience is:
- User sends question
- Tokens appear within ~1 second, word by word
- Full answer arrives over 2-4 seconds
This feels responsive — same total time, but better UX through incremental delivery.
10.3 LangGraph Streaming Modes
LangGraph supports multiple streaming modes:
Mode 1 — stream() — Node-Level Updates
# Streams state updates as each NODE completes
for chunk in agent.stream({"messages": [HumanMessage("hello")]}):
print(chunk)
# Output (one dict per completed node):
# {'call_llm': {'messages': [AIMessage(tool_calls=[...])]}}
# {'call_tools': {'messages': [ToolMessage('Agent Factory is...')]}}
# {'call_llm': {'messages': [AIMessage('Based on the search...')]}}
Use for: Showing progress ("Agent is searching...", "Agent is thinking..."), not individual tokens.
Mode 2 — astream_events() — Token-Level Streaming (Production Standard)
async def stream_tokens(question: str, session_id: str):
"""Yield tokens as they're generated by the LLM."""
config = {"configurable": {"thread_id": session_id}}
async for event in agent.astream_events(
{"messages": [HumanMessage(content=question)]},
config=config,
version="v2" # use v2 for the modern events format
):
if event["event"] == "on_chat_model_stream":
# LLM is generating tokens
chunk = event["data"]["chunk"]
if chunk.content: # text token (not tool call)
yield chunk.content # single token or small chunk
elif event["event"] == "on_tool_start":
# Tool is being called
yield f"\n[Searching: {event['name']}...]\n"
elif event["event"] == "on_tool_end":
# Tool returned
yield "\n[Search complete]\n"
Mode 3 — astream() — Async Node Updates
async for chunk in agent.astream({"messages": history}):
# chunk is a dict: {node_name: state_update}
node_name = list(chunk.keys())[0]
state_update = chunk[node_name]
print(f"Node '{node_name}' completed")
10.4 Streaming with FastAPI + Server-Sent Events
The canonical production pattern is SSE (Server-Sent Events) for HTTP streaming:
# main.py — streaming endpoint
from fastapi import FastAPI, Depends
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
import asyncio
app = FastAPI()
@app.get("/chat/stream")
async def chat_stream(
question: str,
session_id: str,
db: Session = Depends(get_db)
):
"""Stream tokens to the client via Server-Sent Events."""
# Guardrail check (synchronous — fast)
if not check_input(question):
async def blocked():
yield "data: I'm unable to assist with that request.\n\n"
return StreamingResponse(blocked(), media_type="text/event-stream")
# Load history
history = load_history(session_id, db)
history.append(HumanMessage(content=question))
config = {"configurable": {"thread_id": session_id}}
accumulated_response = [] # collect for saving to DB
async def generate():
nonlocal accumulated_response
async for event in agent.astream_events(
{"messages": history},
config=config,
version="v2"
):
if event["event"] == "on_chat_model_stream":
chunk = event["data"]["chunk"]
if chunk.content:
token = chunk.content
accumulated_response.append(token)
# SSE format: "data: {token}\n\n"
yield f"data: {token}\n\n"
# Save full response to DB after streaming completes
full_response = "".join(accumulated_response)
save_history(session_id, question, full_response, db)
# Signal completion
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
Client-Side (JavaScript)
const source = new EventSource(`/chat/stream?question=${encodeURIComponent(q)}&session_id=${sid}`);
let fullResponse = '';
source.onmessage = (event) => {
if (event.data === '[DONE]') {
source.close();
return;
}
fullResponse += event.data;
document.getElementById('response').textContent = fullResponse;
};
10.5 Async Design for Concurrency
Your project's run_agent() is synchronous. For production, use async:
Current (Synchronous)
def run_agent(question: str, session_id: str, db: Session) -> str:
# Blocks the thread for the full duration
final_state = agent.invoke({"messages": history})
return final_state["messages"][-1].content
Problem: With 100 concurrent requests, 100 threads are blocked waiting for OpenAI. Python's GIL makes this worse for CPU work.
Production (Async)
async def run_agent_async(question: str, session_id: str, db: Session) -> str:
history = await asyncio.to_thread(load_history, session_id, db) # async DB call
history.append(HumanMessage(content=question))
config = {"configurable": {"thread_id": session_id}}
# ainvoke is non-blocking — frees the event loop while waiting for OpenAI
final_state = await agent.ainvoke({"messages": history}, config=config)
answer = final_state["messages"][-1].content
await asyncio.to_thread(save_history, session_id, question, answer, db)
return answer
@app.post("/chat")
async def chat_endpoint(question: str, session_id: str, db: Session = Depends(get_db)):
answer = await run_agent_async(question, session_id, db)
return {"answer": answer}
With async, 100 concurrent requests use ~4 threads (event loop + thread pool), not 100.
10.6 LangSmith Tracing — Observability
LangSmith is LangChain's observability platform — it records every LLM call, tool execution, and state transition in your agent.
Setup
pip install langsmith
export LANGCHAIN_API_KEY="your-langsmith-api-key"
export LANGCHAIN_PROJECT="agent-factory-prod"
export LANGCHAIN_TRACING_V2=true
That's it. With these environment variables set, every agent.invoke() and agent.ainvoke() call is automatically traced — no code changes needed.
What LangSmith Records
For each agent run:
Run: "How does Agent Factory work?"
├── call_llm (450ms)
│ ├── Input: [HumanMessage, HumanMessage, ...] (3 messages)
│ ├── Output: AIMessage(tool_calls=[{"name": "rag_search", ...}])
│ ├── Model: gpt-4o-mini
│ ├── Tokens: 847 prompt + 45 completion
│ └── Cost: $0.0002
├── call_tools (230ms)
│ ├── Tool: rag_search
│ │ ├── Input: {"query": "Agent Factory architecture"}
│ │ └── Output: "Agent Factory is PepsiCo's..."
│ └── ToolMessage appended to state
└── call_llm (890ms)
├── Input: [all messages including ToolMessage]
├── Output: AIMessage("Based on the documentation...")
├── Tokens: 1240 prompt + 167 completion
└── Cost: $0.0004
Total: 1570ms, $0.0006
Custom Annotations
Add metadata to traces for filtering and analysis:
from langsmith import traceable
@traceable(name="agent-run", metadata={"env": "production"})
async def run_agent_async(question: str, session_id: str, db: Session) -> str:
# Entire function is traced with the metadata
...
Feedback API
Capture user feedback tied to specific traces:
from langsmith import Client
ls_client = Client()
@app.post("/feedback")
async def submit_feedback(
run_id: str, # LangSmith trace ID
score: int, # 1 (positive) or 0 (negative)
comment: str = ""
):
ls_client.create_feedback(
run_id=run_id,
key="user_rating",
score=score,
comment=comment
)
return {"status": "ok"}
10.7 Cost Management
LLM costs grow linearly with usage. Key control levers:
Token Optimization
# 1. Trim history to last N messages
history = load_history(session_id, db, max_turns=5) # not 100
# 2. Use cheaper model for routing
supervisor_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) # not gpt-4o
# 3. Cache common questions
from functools import lru_cache
@lru_cache(maxsize=1000)
def get_cached_answer(question_hash: str) -> str | None:
return cache.get(question_hash)
# 4. Compress tool results before sending to LLM
def call_tools_compressed(state):
...
# Truncate tool output to 2000 chars
content = str(output)[:2000]
results.append(ToolMessage(content=content, tool_call_id=...))
Cost Monitoring
# LangSmith exposes token counts — query the API for cost reports
from langsmith import Client
client = Client()
runs = client.list_runs(project_name="agent-factory-prod", start_time=yesterday)
total_tokens = sum(
(r.prompt_tokens or 0) + (r.completion_tokens or 0)
for r in runs
)
estimated_cost = total_tokens / 1_000_000 * 0.15 # gpt-4o-mini rate
print(f"Yesterday: {total_tokens:,} tokens, ~${estimated_cost:.2f}")
Token Budget Guards
import tiktoken
MAX_CONTEXT_TOKENS = 8000 # leave room for completion
def run_agent_with_budget(question: str, session_id: str, db: Session) -> str:
history = load_history(session_id, db)
# Count tokens
enc = tiktoken.encoding_for_model("gpt-4o-mini")
token_count = sum(len(enc.encode(str(m.content))) for m in history)
# Trim until within budget
while token_count > MAX_CONTEXT_TOKENS and len(history) > 0:
removed = history.pop(0) # remove oldest message
token_count -= len(enc.encode(str(removed.content)))
history.append(HumanMessage(content=question))
...
10.8 Horizontal Scaling
FastAPI + LangGraph agents are stateless (if using a shared database checkpointer), so they scale horizontally:
┌─────────────┐
User Requests ─► Load Balancer│
└──────┬──────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│ Agent │ │ Agent │ │ Agent │
│ Instance │ │ Instance │ │ Instance │
│ (Pod 1) │ │ (Pod 2) │ │ (Pod 3) │
└─────┬──────┘ └─────┬──────┘ └─────┬──────┘
│ │ │
└───────────────┴───────────────┘
│
┌─────────────────┐
│ PostgreSQL │
│ (Shared State: │
│ ChatHistory + │
│ Checkpoints) │
└─────────────────┘
Requirements for Horizontal Scaling
- No local state — all state in PostgreSQL/Redis, not in-process
- Shared checkpointer —
PostgresSaver(notSqliteSaverwhich is file-based) - Shared session data —
ChatHistoryin the shared database - Stateless auth — JWT tokens, not server-side sessions
Docker Configuration
# docker-compose.yml
version: "3.8"
services:
agent:
image: agent-factory:latest
environment:
- DATABASE_URL=postgresql://user:pass@postgres/agentdb
- OPENAI_API_KEY=${OPENAI_API_KEY}
- LANGCHAIN_API_KEY=${LANGCHAIN_API_KEY}
deploy:
replicas: 3 # 3 identical instances
resources:
limits:
memory: 512M # agents are CPU/network-bound, not memory-bound
depends_on:
- postgres
postgres:
image: postgres:15
volumes:
- pgdata:/var/lib/postgresql/data
environment:
POSTGRES_DB: agentdb
POSTGRES_USER: user
POSTGRES_PASSWORD: ${DB_PASSWORD}
nginx:
image: nginx:alpine
ports:
- "80:80"
# load balance across agent instances
10.9 Health Checks and Graceful Shutdown
# main.py
import signal
import asyncio
from fastapi import FastAPI
app = FastAPI()
# Health check endpoint for load balancer
@app.get("/health")
async def health():
"""Returns 200 if the service is healthy. Load balancer polls this."""
try:
# Check critical dependencies
db.execute("SELECT 1") # database responsive
await test_openai_connectivity() # OpenAI reachable
return {"status": "healthy", "version": "1.0.0"}
except Exception as e:
from fastapi import HTTPException
raise HTTPException(status_code=503, detail=str(e))
@app.get("/ready")
async def readiness():
"""Readiness probe — is this instance ready to serve traffic?"""
return {"status": "ready"}
# Graceful shutdown
shutdown_event = asyncio.Event()
@app.on_event("shutdown")
async def shutdown():
"""Allow in-flight requests to complete before stopping."""
shutdown_event.set()
await asyncio.sleep(5) # wait for in-flight requests
10.10 Production Monitoring Checklist
Metrics to Track
# What to instrument and alert on:
"""
LATENCY:
- p50, p95, p99 agent turn duration
- per-node latency (how long does call_llm take vs call_tools?)
- LangSmith provides these automatically
COST:
- tokens per request (prompt + completion)
- daily/monthly spend
- cost per session, cost per user
QUALITY:
- user feedback scores (thumbs up/down)
- error rates by type
- routing distribution (% RAG vs DATA vs GENERAL)
RELIABILITY:
- 5xx error rate
- tool failure rate by tool name
- LLM rate limit frequency
- iteration limit hit rate (agent runaway indicator)
CAPACITY:
- concurrent request count
- queue depth
- DB connection pool utilization
"""
Alert Thresholds
ALERT_THRESHOLDS = {
"p99_latency_seconds": 30, # page if any request >30s
"error_rate_5xx": 0.05, # alert if >5% requests fail
"cost_per_hour_usd": 50, # alert if spending >$50/hour
"tool_failure_rate": 0.10, # alert if >10% tool calls fail
"iteration_limit_rate": 0.01, # alert if >1% hits iteration limit
}
10.11 The Production Readiness Checklist
Before going to production with your agent:
Infrastructure:
✓ PostgreSQL checkpointer (not SQLite)
✓ Environment variables for all secrets (no hardcoded keys)
✓ Docker container with resource limits
✓ Health check endpoints (/health, /ready)
✓ Graceful shutdown handling
✓ Horizontal scaling tested
Reliability:
✓ Tool error handling (return ToolMessage, not raise)
✓ Iteration limit in AgentState
✓ Request timeout (asyncio.wait_for)
✓ Retry with exponential backoff for rate limits
✓ Rate limiting per user
Observability:
✓ LangSmith tracing enabled
✓ Structured logging (JSON) with session_id in every log line
✓ Custom metrics exported to monitoring
✓ Alerts configured for critical thresholds
Cost:
✓ History windowing (last N turns only)
✓ Tool output truncation
✓ Cheap model for routing (not GPT-4 for classification)
✓ Token budget guards
Security:
✓ Guardrails on input (your guardrails.py)
✓ Parameterized DB queries (no SQL injection)
✓ Authentication on all endpoints (your auth.py)
✓ No sensitive data logged or traced
✓ API key rotation strategy
10.12 Interview Q&A
Q: How do you implement token-level streaming in a LangGraph agent?
I use
agent.astream_events(initial_state, config, version="v2")which emits fine-grained events for every part of the execution. For token streaming, I filter forevent["event"] == "on_chat_model_stream"— these events carry individual token chunks as they're generated by the LLM. The tokens are immediately yielded back to the client via aStreamingResponsein FastAPI using Server-Sent Events format (data: {token}\n\n). Tool execution events (on_tool_start,on_tool_end) are also streamed as status messages so the user knows when the agent is searching. After streaming completes, the full response is assembled and saved to the chat history table.
Q: How does your agent scale horizontally?
The agent is stateless at the instance level — all persistent state lives in PostgreSQL (ChatHistory table and LangGraph checkpoints via PostgresSaver). Any request can be handled by any instance without session affinity. Scaling is a matter of adding more container replicas behind a load balancer. The shared PostgreSQL database is the only stateful component, and it handles connection pooling via SQLAlchemy. For very high scale, Redis would be added as a session cache to reduce DB load. LangSmith traces are sent directly to the LangSmith API, so they don't create inter-service dependencies.
Q: What does LangSmith give you that standard logging doesn't?
LangSmith provides structured, hierarchical traces of the entire agent execution — not just flat log lines. For each run, it shows the exact messages sent to and received from the LLM, which tools were called with which arguments, token counts and costs, per-node latency, and the complete state at each step. This level of detail is critical for debugging subtle agent issues: why did it choose tool A over tool B? Why did it loop three times? What exact prompt triggered a hallucination? Standard logging can capture inputs and outputs, but the hierarchical trace reveals the reasoning chain. LangSmith also provides a UI for browsing and filtering traces, human feedback collection, and dataset creation for evaluation.
Q: How do you control OpenAI costs in a production agent?
Five levers. First, model selection — use
gpt-4o-minifor routing and tool-calling (cheap), reservegpt-4oonly for tasks that demonstrably need it. Second, context windowing — load only the last N=5 turns from history rather than the full conversation. Third, tool output truncation — ChromaDB results and SQL outputs are truncated to 2000 chars before appending to state. Fourth, response caching — frequently asked identical questions are served from a Redis cache. Fifth, token budget guards — count tokens before invoking and trim if above a threshold. LangSmith's cost reporting allows monitoring daily spend and identifying expensive sessions or queries for further optimization.
Q: What are Server-Sent Events and why use them for agent streaming instead of WebSockets?
SSE is a one-directional HTTP streaming protocol — the server pushes events to the client over a persistent HTTP connection. WebSockets are bidirectional. For agent streaming, SSE is preferred because: (1) it works over standard HTTP/2 without upgrades, (2) it's naturally unidirectional — the agent generates, the client displays, (3) it automatically reconnects on network interruption, (4) it integrates seamlessly with FastAPI's
StreamingResponse, and (5) it's simpler to proxy through load balancers and CDNs than WebSocket upgrades. WebSockets are better for interactive real-time features (collaborative editing, live updates) where the client also needs to send data over the same connection.
10.13 Key One-Liners to Memorize
"astream_events version v2 is the production standard for token-level agent streaming."
"Agents scale horizontally when ALL state (history, checkpoints) is in shared PostgreSQL."
"LangSmith = automatic structured tracing for every LLM call and tool execution."
"Cost control: model selection, context windowing, tool truncation, caching, token budgets."
"SSE for streaming: one-directional, HTTP/2 compatible, auto-reconnect, simpler than WebSockets."
"Production readiness: health checks, graceful shutdown, rate limiting, alerting, LangSmith."
10.14 Full Series Summary
You now have a complete mental model of production LangGraph agents:
CH01 — Why Agents? ReAct loop, agents vs. chains, your project overview
CH02 — StateGraph Nodes, edges, compile, invoke — the graph primitives
CH03 — AgentState TypedDict, reducers, add_messages — how state flows
CH04 — Tool Calling @tool, bind_tools, AIMessage/ToolMessage cycle
CH05 — Routing add_conditional_edges, supervisor routing, patterns
CH06 — Memory 4 levels: in-context, session, long-term, semantic
CH07 — Multi-Agent Supervisor pattern, worker handoffs, data_agent.py
CH08 — HITL Interrupts, checkpointer, approve/reject/resume
CH09 — Error Handling ToolMessage errors, retries, iteration limits, fallbacks
CH10 — Production Streaming, async, LangSmith, cost control, scaling
Your project implements:
- ReAct loop via StateGraph ✓
- Tool calling via @tool + bind_tools ✓
- Session memory via PostgreSQL ✓
- Semantic memory via ChromaDB ✓
- Guardrails via LLM classification ✓
- Supervisor routing via route_question() ✓
- Multi-agent via RAG + Data agents ✓
- Auth via auth.py ✓
Ready to add:
- Streaming via astream_events
- LangSmith tracing (just set env vars)
- PostgresSaver checkpointer
- HITL for high-stakes actions
- Async refactor of run_agent()
This is the final chapter. → Return to Chapter 1 for a full review.