Senior Architect Interview Series

LangGraph & Agentic AI
Complete Interview Prep Guide

10 chapters · From ReAct patterns to production agents · 2026 Edition

📅 March 28, 2026⏱ ~90 min read🎯 Senior Engineer / Architect level
Chapter 10 of 10Production Agents: Streaming, Tracing & Scaling

LangGraph Chapter 10 — Production Agents: Streaming, Tracing & Scaling

Senior Architect Interview Series — LangGraph & Agentic AI


Navigation

Chapter 9 — Error Handling | ← Back to Chapter 1


10.0 What This Chapter Covers

Chapters 1–9 covered how to build agents that work. This chapter covers how to run them at production scale. The gap between a working demo and a reliable production system is filled by:

  1. Streaming — how to deliver tokens as they're generated
  2. Async design — handling many concurrent requests
  3. LangSmith tracing — observability for debugging agent behavior
  4. Cost management — controlling token spend at scale
  5. Scaling patterns — horizontal scaling and load balancing
  6. Deployment considerations — containers, health checks, graceful shutdown
  7. Monitoring and alerting — what to measure

10.1 Why Production Agents Are Hard

┌──────────────────────────────────────────────────────┐
│            PRODUCTION COMPLEXITY LAYERS              │
│                                                      │
│  Demo:        Single user, no errors, local           │
│  Dev:         Multiple users, occasional errors       │
│  Production:  Hundreds of concurrent users,           │
│               persistent failures, cost pressure,     │
│               latency SLAs, compliance, observability │
└──────────────────────────────────────────────────────┘

Your current project runs correctly in development. The production journey requires addressing each layer:

ChallengeChapter ReferenceProduction Solution
Agent loops infinitelyCh. 9Iteration limits
Tool failures crash agentCh. 9Error ToolMessages
No audit trailThis chapterLangSmith tracing
Slow response (user waiting)This chapterStreaming
Can't scale horizontallyThis chapterStateless + Redis
High costThis chapterToken optimization

10.2 Streaming — Delivering Tokens as Generated

The Problem Without Streaming

Without streaming, the user experience is:

  1. User sends question
  2. Wait… (agent runs, LLM generates, total 5-15 seconds)
  3. Full answer appears at once

This feels laggy even if total time is acceptable. The perceived latency is the full time-to-completion.

With Streaming

With streaming, the user experience is:

  1. User sends question
  2. Tokens appear within ~1 second, word by word
  3. Full answer arrives over 2-4 seconds

This feels responsive — same total time, but better UX through incremental delivery.


10.3 LangGraph Streaming Modes

LangGraph supports multiple streaming modes:

Mode 1 — stream() — Node-Level Updates

# Streams state updates as each NODE completes
for chunk in agent.stream({"messages": [HumanMessage("hello")]}):
    print(chunk)
    # Output (one dict per completed node):
    # {'call_llm': {'messages': [AIMessage(tool_calls=[...])]}}
    # {'call_tools': {'messages': [ToolMessage('Agent Factory is...')]}}
    # {'call_llm': {'messages': [AIMessage('Based on the search...')]}}

Use for: Showing progress ("Agent is searching...", "Agent is thinking..."), not individual tokens.

Mode 2 — astream_events() — Token-Level Streaming (Production Standard)

async def stream_tokens(question: str, session_id: str):
    """Yield tokens as they're generated by the LLM."""
    config = {"configurable": {"thread_id": session_id}}
    
    async for event in agent.astream_events(
        {"messages": [HumanMessage(content=question)]},
        config=config,
        version="v2"    # use v2 for the modern events format
    ):
        if event["event"] == "on_chat_model_stream":
            # LLM is generating tokens
            chunk = event["data"]["chunk"]
            if chunk.content:   # text token (not tool call)
                yield chunk.content   # single token or small chunk
        
        elif event["event"] == "on_tool_start":
            # Tool is being called
            yield f"\n[Searching: {event['name']}...]\n"
        
        elif event["event"] == "on_tool_end":
            # Tool returned
            yield "\n[Search complete]\n"

Mode 3 — astream() — Async Node Updates

async for chunk in agent.astream({"messages": history}):
    # chunk is a dict: {node_name: state_update}
    node_name = list(chunk.keys())[0]
    state_update = chunk[node_name]
    print(f"Node '{node_name}' completed")

10.4 Streaming with FastAPI + Server-Sent Events

The canonical production pattern is SSE (Server-Sent Events) for HTTP streaming:

# main.py — streaming endpoint
from fastapi import FastAPI, Depends
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
import asyncio

app = FastAPI()

@app.get("/chat/stream")
async def chat_stream(
    question: str,
    session_id: str,
    db: Session = Depends(get_db)
):
    """Stream tokens to the client via Server-Sent Events."""
    
    # Guardrail check (synchronous — fast)
    if not check_input(question):
        async def blocked():
            yield "data: I'm unable to assist with that request.\n\n"
        return StreamingResponse(blocked(), media_type="text/event-stream")
    
    # Load history
    history = load_history(session_id, db)
    history.append(HumanMessage(content=question))
    
    config = {"configurable": {"thread_id": session_id}}
    
    accumulated_response = []  # collect for saving to DB
    
    async def generate():
        nonlocal accumulated_response
        
        async for event in agent.astream_events(
            {"messages": history},
            config=config,
            version="v2"
        ):
            if event["event"] == "on_chat_model_stream":
                chunk = event["data"]["chunk"]
                if chunk.content:
                    token = chunk.content
                    accumulated_response.append(token)
                    # SSE format: "data: {token}\n\n"
                    yield f"data: {token}\n\n"
        
        # Save full response to DB after streaming completes
        full_response = "".join(accumulated_response)
        save_history(session_id, question, full_response, db)
        
        # Signal completion
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(generate(), media_type="text/event-stream")

Client-Side (JavaScript)

const source = new EventSource(`/chat/stream?question=${encodeURIComponent(q)}&session_id=${sid}`);

let fullResponse = '';
source.onmessage = (event) => {
    if (event.data === '[DONE]') {
        source.close();
        return;
    }
    fullResponse += event.data;
    document.getElementById('response').textContent = fullResponse;
};

10.5 Async Design for Concurrency

Your project's run_agent() is synchronous. For production, use async:

Current (Synchronous)

def run_agent(question: str, session_id: str, db: Session) -> str:
    # Blocks the thread for the full duration
    final_state = agent.invoke({"messages": history})
    return final_state["messages"][-1].content

Problem: With 100 concurrent requests, 100 threads are blocked waiting for OpenAI. Python's GIL makes this worse for CPU work.

Production (Async)

async def run_agent_async(question: str, session_id: str, db: Session) -> str:
    history = await asyncio.to_thread(load_history, session_id, db)  # async DB call
    history.append(HumanMessage(content=question))
    
    config = {"configurable": {"thread_id": session_id}}
    
    # ainvoke is non-blocking — frees the event loop while waiting for OpenAI
    final_state = await agent.ainvoke({"messages": history}, config=config)
    
    answer = final_state["messages"][-1].content
    await asyncio.to_thread(save_history, session_id, question, answer, db)
    return answer

@app.post("/chat")
async def chat_endpoint(question: str, session_id: str, db: Session = Depends(get_db)):
    answer = await run_agent_async(question, session_id, db)
    return {"answer": answer}

With async, 100 concurrent requests use ~4 threads (event loop + thread pool), not 100.


10.6 LangSmith Tracing — Observability

LangSmith is LangChain's observability platform — it records every LLM call, tool execution, and state transition in your agent.

Setup

pip install langsmith
export LANGCHAIN_API_KEY="your-langsmith-api-key"
export LANGCHAIN_PROJECT="agent-factory-prod"
export LANGCHAIN_TRACING_V2=true

That's it. With these environment variables set, every agent.invoke() and agent.ainvoke() call is automatically traced — no code changes needed.

What LangSmith Records

For each agent run:

Run: "How does Agent Factory work?"
├── call_llm (450ms)
│   ├── Input: [HumanMessage, HumanMessage, ...] (3 messages)
│   ├── Output: AIMessage(tool_calls=[{"name": "rag_search", ...}])
│   ├── Model: gpt-4o-mini
│   ├── Tokens: 847 prompt + 45 completion
│   └── Cost: $0.0002
├── call_tools (230ms)
│   ├── Tool: rag_search
│   │   ├── Input: {"query": "Agent Factory architecture"}
│   │   └── Output: "Agent Factory is PepsiCo's..."
│   └── ToolMessage appended to state
└── call_llm (890ms)
    ├── Input: [all messages including ToolMessage]
    ├── Output: AIMessage("Based on the documentation...")
    ├── Tokens: 1240 prompt + 167 completion
    └── Cost: $0.0004
Total: 1570ms, $0.0006

Custom Annotations

Add metadata to traces for filtering and analysis:

from langsmith import traceable

@traceable(name="agent-run", metadata={"env": "production"})
async def run_agent_async(question: str, session_id: str, db: Session) -> str:
    # Entire function is traced with the metadata
    ...

Feedback API

Capture user feedback tied to specific traces:

from langsmith import Client

ls_client = Client()

@app.post("/feedback")
async def submit_feedback(
    run_id: str,       # LangSmith trace ID
    score: int,        # 1 (positive) or 0 (negative)
    comment: str = ""
):
    ls_client.create_feedback(
        run_id=run_id,
        key="user_rating",
        score=score,
        comment=comment
    )
    return {"status": "ok"}

10.7 Cost Management

LLM costs grow linearly with usage. Key control levers:

Token Optimization

# 1. Trim history to last N messages
history = load_history(session_id, db, max_turns=5)   # not 100

# 2. Use cheaper model for routing
supervisor_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)  # not gpt-4o

# 3. Cache common questions
from functools import lru_cache

@lru_cache(maxsize=1000)
def get_cached_answer(question_hash: str) -> str | None:
    return cache.get(question_hash)

# 4. Compress tool results before sending to LLM
def call_tools_compressed(state):
    ...
    # Truncate tool output to 2000 chars
    content = str(output)[:2000]
    results.append(ToolMessage(content=content, tool_call_id=...))

Cost Monitoring

# LangSmith exposes token counts — query the API for cost reports
from langsmith import Client

client = Client()
runs = client.list_runs(project_name="agent-factory-prod", start_time=yesterday)

total_tokens = sum(
    (r.prompt_tokens or 0) + (r.completion_tokens or 0)
    for r in runs
)
estimated_cost = total_tokens / 1_000_000 * 0.15   # gpt-4o-mini rate

print(f"Yesterday: {total_tokens:,} tokens, ~${estimated_cost:.2f}")

Token Budget Guards

import tiktoken

MAX_CONTEXT_TOKENS = 8000  # leave room for completion

def run_agent_with_budget(question: str, session_id: str, db: Session) -> str:
    history = load_history(session_id, db)
    
    # Count tokens
    enc = tiktoken.encoding_for_model("gpt-4o-mini")
    token_count = sum(len(enc.encode(str(m.content))) for m in history)
    
    # Trim until within budget
    while token_count > MAX_CONTEXT_TOKENS and len(history) > 0:
        removed = history.pop(0)   # remove oldest message
        token_count -= len(enc.encode(str(removed.content)))
    
    history.append(HumanMessage(content=question))
    ...

10.8 Horizontal Scaling

FastAPI + LangGraph agents are stateless (if using a shared database checkpointer), so they scale horizontally:

                    ┌─────────────┐
     User Requests ─► Load Balancer│
                    └──────┬──────┘
                           │
           ┌───────────────┼───────────────┐
           ▼               ▼               ▼
    ┌────────────┐  ┌────────────┐  ┌────────────┐
    │  Agent     │  │  Agent     │  │  Agent     │
    │  Instance  │  │  Instance  │  │  Instance  │
    │  (Pod 1)   │  │  (Pod 2)   │  │  (Pod 3)   │
    └─────┬──────┘  └─────┬──────┘  └─────┬──────┘
          │               │               │
          └───────────────┴───────────────┘
                          │
                 ┌─────────────────┐
                 │   PostgreSQL    │
                 │ (Shared State:  │
                 │ ChatHistory +   │
                 │ Checkpoints)    │
                 └─────────────────┘

Requirements for Horizontal Scaling

  1. No local state — all state in PostgreSQL/Redis, not in-process
  2. Shared checkpointerPostgresSaver (not SqliteSaver which is file-based)
  3. Shared session dataChatHistory in the shared database
  4. Stateless auth — JWT tokens, not server-side sessions

Docker Configuration

# docker-compose.yml
version: "3.8"
services:
  agent:
    image: agent-factory:latest
    environment:
      - DATABASE_URL=postgresql://user:pass@postgres/agentdb
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - LANGCHAIN_API_KEY=${LANGCHAIN_API_KEY}
    deploy:
      replicas: 3          # 3 identical instances
      resources:
        limits:
          memory: 512M     # agents are CPU/network-bound, not memory-bound
    depends_on:
      - postgres
  
  postgres:
    image: postgres:15
    volumes:
      - pgdata:/var/lib/postgresql/data
    environment:
      POSTGRES_DB: agentdb
      POSTGRES_USER: user
      POSTGRES_PASSWORD: ${DB_PASSWORD}
  
  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
    # load balance across agent instances

10.9 Health Checks and Graceful Shutdown

# main.py
import signal
import asyncio
from fastapi import FastAPI

app = FastAPI()

# Health check endpoint for load balancer
@app.get("/health")
async def health():
    """Returns 200 if the service is healthy. Load balancer polls this."""
    try:
        # Check critical dependencies
        db.execute("SELECT 1")            # database responsive
        await test_openai_connectivity()   # OpenAI reachable
        return {"status": "healthy", "version": "1.0.0"}
    except Exception as e:
        from fastapi import HTTPException
        raise HTTPException(status_code=503, detail=str(e))

@app.get("/ready")
async def readiness():
    """Readiness probe — is this instance ready to serve traffic?"""
    return {"status": "ready"}

# Graceful shutdown
shutdown_event = asyncio.Event()

@app.on_event("shutdown")
async def shutdown():
    """Allow in-flight requests to complete before stopping."""
    shutdown_event.set()
    await asyncio.sleep(5)  # wait for in-flight requests

10.10 Production Monitoring Checklist

Metrics to Track

# What to instrument and alert on:
"""
LATENCY:
  - p50, p95, p99 agent turn duration
  - per-node latency (how long does call_llm take vs call_tools?)
  - LangSmith provides these automatically

COST:
  - tokens per request (prompt + completion)
  - daily/monthly spend
  - cost per session, cost per user

QUALITY:
  - user feedback scores (thumbs up/down)
  - error rates by type
  - routing distribution (% RAG vs DATA vs GENERAL)

RELIABILITY:
  - 5xx error rate
  - tool failure rate by tool name
  - LLM rate limit frequency
  - iteration limit hit rate (agent runaway indicator)

CAPACITY:
  - concurrent request count
  - queue depth
  - DB connection pool utilization
"""

Alert Thresholds

ALERT_THRESHOLDS = {
    "p99_latency_seconds":    30,    # page if any request >30s
    "error_rate_5xx":         0.05,  # alert if >5% requests fail
    "cost_per_hour_usd":      50,    # alert if spending >$50/hour
    "tool_failure_rate":      0.10,  # alert if >10% tool calls fail
    "iteration_limit_rate":   0.01,  # alert if >1% hits iteration limit
}

10.11 The Production Readiness Checklist

Before going to production with your agent:

Infrastructure:
  ✓ PostgreSQL checkpointer (not SQLite)
  ✓ Environment variables for all secrets (no hardcoded keys)
  ✓ Docker container with resource limits
  ✓ Health check endpoints (/health, /ready)
  ✓ Graceful shutdown handling
  ✓ Horizontal scaling tested

Reliability:
  ✓ Tool error handling (return ToolMessage, not raise)
  ✓ Iteration limit in AgentState
  ✓ Request timeout (asyncio.wait_for)
  ✓ Retry with exponential backoff for rate limits
  ✓ Rate limiting per user

Observability:
  ✓ LangSmith tracing enabled
  ✓ Structured logging (JSON) with session_id in every log line
  ✓ Custom metrics exported to monitoring
  ✓ Alerts configured for critical thresholds

Cost:
  ✓ History windowing (last N turns only)
  ✓ Tool output truncation
  ✓ Cheap model for routing (not GPT-4 for classification)
  ✓ Token budget guards

Security:
  ✓ Guardrails on input (your guardrails.py)
  ✓ Parameterized DB queries (no SQL injection)
  ✓ Authentication on all endpoints (your auth.py)
  ✓ No sensitive data logged or traced
  ✓ API key rotation strategy

10.12 Interview Q&A

Q: How do you implement token-level streaming in a LangGraph agent?

I use agent.astream_events(initial_state, config, version="v2") which emits fine-grained events for every part of the execution. For token streaming, I filter for event["event"] == "on_chat_model_stream" — these events carry individual token chunks as they're generated by the LLM. The tokens are immediately yielded back to the client via a StreamingResponse in FastAPI using Server-Sent Events format (data: {token}\n\n). Tool execution events (on_tool_start, on_tool_end) are also streamed as status messages so the user knows when the agent is searching. After streaming completes, the full response is assembled and saved to the chat history table.


Q: How does your agent scale horizontally?

The agent is stateless at the instance level — all persistent state lives in PostgreSQL (ChatHistory table and LangGraph checkpoints via PostgresSaver). Any request can be handled by any instance without session affinity. Scaling is a matter of adding more container replicas behind a load balancer. The shared PostgreSQL database is the only stateful component, and it handles connection pooling via SQLAlchemy. For very high scale, Redis would be added as a session cache to reduce DB load. LangSmith traces are sent directly to the LangSmith API, so they don't create inter-service dependencies.


Q: What does LangSmith give you that standard logging doesn't?

LangSmith provides structured, hierarchical traces of the entire agent execution — not just flat log lines. For each run, it shows the exact messages sent to and received from the LLM, which tools were called with which arguments, token counts and costs, per-node latency, and the complete state at each step. This level of detail is critical for debugging subtle agent issues: why did it choose tool A over tool B? Why did it loop three times? What exact prompt triggered a hallucination? Standard logging can capture inputs and outputs, but the hierarchical trace reveals the reasoning chain. LangSmith also provides a UI for browsing and filtering traces, human feedback collection, and dataset creation for evaluation.


Q: How do you control OpenAI costs in a production agent?

Five levers. First, model selection — use gpt-4o-mini for routing and tool-calling (cheap), reserve gpt-4o only for tasks that demonstrably need it. Second, context windowing — load only the last N=5 turns from history rather than the full conversation. Third, tool output truncation — ChromaDB results and SQL outputs are truncated to 2000 chars before appending to state. Fourth, response caching — frequently asked identical questions are served from a Redis cache. Fifth, token budget guards — count tokens before invoking and trim if above a threshold. LangSmith's cost reporting allows monitoring daily spend and identifying expensive sessions or queries for further optimization.


Q: What are Server-Sent Events and why use them for agent streaming instead of WebSockets?

SSE is a one-directional HTTP streaming protocol — the server pushes events to the client over a persistent HTTP connection. WebSockets are bidirectional. For agent streaming, SSE is preferred because: (1) it works over standard HTTP/2 without upgrades, (2) it's naturally unidirectional — the agent generates, the client displays, (3) it automatically reconnects on network interruption, (4) it integrates seamlessly with FastAPI's StreamingResponse, and (5) it's simpler to proxy through load balancers and CDNs than WebSocket upgrades. WebSockets are better for interactive real-time features (collaborative editing, live updates) where the client also needs to send data over the same connection.


10.13 Key One-Liners to Memorize

"astream_events version v2 is the production standard for token-level agent streaming."

"Agents scale horizontally when ALL state (history, checkpoints) is in shared PostgreSQL."

"LangSmith = automatic structured tracing for every LLM call and tool execution."

"Cost control: model selection, context windowing, tool truncation, caching, token budgets."

"SSE for streaming: one-directional, HTTP/2 compatible, auto-reconnect, simpler than WebSockets."

"Production readiness: health checks, graceful shutdown, rate limiting, alerting, LangSmith."

10.14 Full Series Summary

You now have a complete mental model of production LangGraph agents:

CH01 — Why Agents?         ReAct loop, agents vs. chains, your project overview
CH02 — StateGraph          Nodes, edges, compile, invoke — the graph primitives
CH03 — AgentState          TypedDict, reducers, add_messages — how state flows
CH04 — Tool Calling        @tool, bind_tools, AIMessage/ToolMessage cycle
CH05 — Routing             add_conditional_edges, supervisor routing, patterns
CH06 — Memory              4 levels: in-context, session, long-term, semantic
CH07 — Multi-Agent         Supervisor pattern, worker handoffs, data_agent.py
CH08 — HITL                Interrupts, checkpointer, approve/reject/resume
CH09 — Error Handling      ToolMessage errors, retries, iteration limits, fallbacks
CH10 — Production          Streaming, async, LangSmith, cost control, scaling

Your project implements:

  • ReAct loop via StateGraph ✓
  • Tool calling via @tool + bind_tools ✓
  • Session memory via PostgreSQL ✓
  • Semantic memory via ChromaDB ✓
  • Guardrails via LLM classification ✓
  • Supervisor routing via route_question() ✓
  • Multi-agent via RAG + Data agents ✓
  • Auth via auth.py ✓

Ready to add:

  • Streaming via astream_events
  • LangSmith tracing (just set env vars)
  • PostgresSaver checkpointer
  • HITL for high-stakes actions
  • Async refactor of run_agent()

This is the final chapter. → Return to Chapter 1 for a full review.

Header Logo