Senior Architect Interview Series

LangGraph & Agentic AI
Complete Interview Prep Guide

10 chapters · From ReAct patterns to production agents · 2026 Edition

📅 March 28, 2026⏱ ~90 min read🎯 Senior Engineer / Architect level
Chapter 9 of 10Error Handling, Retries & Fallback Agents

LangGraph Chapter 9 — Error Handling, Retries & Fallback Agents

Senior Architect Interview Series — LangGraph & Agentic AI


Navigation

Chapter 8 — Human-in-the-Loop | Chapter 10 — Production Agents →


9.0 What This Chapter Covers

Production agents fail. APIs time out, databases go down, LLMs return garbage, tools throw exceptions. This chapter covers how to build resilient agents that degrade gracefully:

  1. Error categories in LLM agent systems
  2. Tool-level error handling (return errors as ToolMessages)
  3. Node-level error handling (try/except in nodes)
  4. Retry strategies within the ReAct loop
  5. Fallback agent patterns
  6. Rate limit and timeout handling
  7. Dead-letter and escalation patterns
  8. Error observability

9.1 Error Categories in Agent Systems

┌──────────────────────────────────────────────────────────┐
│                    ERROR TAXONOMY                        │
│                                                          │
│  L1 — Tool Errors                                        │
│      ChromaDB unavailable, SQL syntax error              │
│      → Handle in call_tools: return ToolMessage(error)   │
│                                                          │
│  L2 — LLM Errors                                         │
│      Rate limit (429), API timeout, context too long     │
│      → Handle in call_llm: retry with backoff            │
│                                                          │
│  L3 — State Errors                                       │
│      Missing field, wrong type, reducer conflict          │
│      → Handle at graph level: validate before invoke     │
│                                                          │
│  L4 — Logic Errors                                       │
│      Infinite loop, wrong routing, hallucinated tool call│
│      → Handle with iteration limits, guardrails          │
│                                                          │
│  L5 — Infrastructure Errors                              │
│      DB down, OOM killed, network partition              │
│      → Handle with checkpointer recovery, circuit breaker│
└──────────────────────────────────────────────────────────┘

9.2 L1 — Tool Error Handling

The golden rule: never raise from a tool execution loop. Return errors as ToolMessage objects so the LLM can recover.

Current Implementation in Your Project

# agent/agent.py — basic version without error handling
def call_tools(state: AgentState) -> AgentState:
    last_message = state["messages"][-1]
    results = []
    for tool_call in last_message.tool_calls:
        tool_fn = tool_map[tool_call["name"]]
        output = tool_fn.invoke(tool_call["args"])
        results.append(ToolMessage(content=str(output), tool_call_id=tool_call["id"]))
    return {"messages": results}

Hardened Version

def call_tools(state: AgentState) -> AgentState:
    last_message = state["messages"][-1]
    results = []
    
    for tool_call in last_message.tool_calls:
        tool_name    = tool_call["name"]
        tool_args    = tool_call["args"]
        tool_call_id = tool_call["id"]
        
        try:
            # Check if tool exists
            if tool_name not in tool_map:
                raise ValueError(f"Unknown tool: {tool_name}. Available: {list(tool_map.keys())}")
            
            tool_fn = tool_map[tool_name]
            output  = tool_fn.invoke(tool_args)
            
            results.append(ToolMessage(
                content=str(output),
                tool_call_id=tool_call_id
            ))
            
        except Exception as e:
            # Error → ToolMessage with error description
            # LLM sees this and can retry with different args or acknowledge failure
            error_content = f"Tool '{tool_name}' failed: {type(e).__name__}: {str(e)}"
            logger.error("tool_execution_error", extra={
                "tool_name": tool_name,
                "error": str(e),
                "args": tool_args
            })
            results.append(ToolMessage(
                content=error_content,
                tool_call_id=tool_call_id
            ))
    
    return {"messages": results}

Why this works: The LLM receives "Tool 'rag_search' failed: ConnectionError: ChromaDB is unavailable" as context. It can then:

  • Retry with different parameters
  • Answer from its parametric knowledge (training data)
  • Tell the user it couldn't complete the search

9.3 L2 — LLM Error Handling

OpenAI API errors you'll encounter in production:

Error CodeMeaningHandling Strategy
429Rate limit exceededExponential backoff + retry
500/503OpenAI server errorRetry with backoff
408Request timeoutRetry once, then fail gracefully
400Bad request (context too long)Trim messages and retry
401Invalid API keyAlert operator, fail immediately

Retry with Exponential Backoff

import time
import random
from openai import RateLimitError, APITimeoutError, APIError

def call_llm_with_retry(state: AgentState, max_retries: int = 3) -> AgentState:
    """call_llm node with OpenAI error handling and exponential backoff."""
    
    for attempt in range(max_retries):
        try:
            response = llm_with_tools.invoke(state["messages"])
            return {"messages": [response]}
        
        except RateLimitError as e:
            if attempt == max_retries - 1:
                raise   # re-raise on final attempt
            wait = (2 ** attempt) + random.uniform(0, 1)   # exponential + jitter
            logger.warning(f"Rate limit hit, waiting {wait:.1f}s (attempt {attempt+1})")
            time.sleep(wait)
        
        except APITimeoutError as e:
            if attempt == max_retries - 1:
                # Final attempt failed — return graceful error message
                return {"messages": [AIMessage(
                    content="I'm having trouble connecting to the AI service. Please try again."
                )]}
            time.sleep(2 ** attempt)
        
        except Exception as e:
            # Unexpected error — log and fail fast
            logger.error(f"Unexpected LLM error: {e}")
            raise

Context Too Long (400 Error)

from openai import BadRequestError

def call_llm_with_trimming(state: AgentState) -> AgentState:
    """call_llm with automatic context trimming on context length errors."""
    messages = state["messages"]
    
    while messages:
        try:
            response = llm_with_tools.invoke(messages)
            return {"messages": [response]}
        
        except BadRequestError as e:
            if "context_length_exceeded" in str(e).lower():
                # Remove oldest 2 messages (one exchange) and retry
                if len(messages) > 2:
                    messages = messages[2:]   # drop oldest pair
                    logger.warning("Context too long, trimmed 2 messages")
                else:
                    # Only 1-2 messages and still too long — truncate content
                    return {"messages": [AIMessage(
                        content="Your message is too long for me to process. Please shorten it."
                    )]}
            else:
                raise

9.4 Iteration Limit — Preventing Infinite Loops

Add a safety counter to the state to prevent runaway agent loops:

import operator

class AgentState(TypedDict):
    messages:       Annotated[list[BaseMessage], add_messages]
    iteration_count: Annotated[int, operator.add]   # accumulates on each call_llm

def call_llm(state: AgentState) -> AgentState:
    response = llm_with_tools.invoke(state["messages"])
    return {
        "messages": [response],
        "iteration_count": 1   # operator.add: increments by 1 each call
    }

def should_call_tools(state: AgentState) -> str:
    last_message = state["messages"][-1]
    
    # Safety: hard limit of 10 LLM calls per invocation
    if state.get("iteration_count", 0) >= 10:
        logger.warning("Agent hit iteration limit — forcing END")
        return END
    
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "call_tools"
    return END

9.5 Fallback Agent Patterns

When the primary agent or tool fails, a fallback provides a degraded-but-functional response.

Pattern 1 — Fallback Tool Response

If rag_search fails (ChromaDB unavailable), fall back to LLM parametric knowledge:

@tool
def rag_search(query: str) -> str:
    """Search the knowledge base for Agent Factory information."""
    try:
        results = retrieve(query)
        if not results:
            return f"No results found for '{query}'. Using general knowledge."
        return build_prompt(query, results)
    except Exception as e:
        # Graceful fallback — tell LLM to use its knowledge
        return f"Knowledge base unavailable ({type(e).__name__}). Please answer from general knowledge."

The LLM receives the fallback message and can still provide an answer from its training data with a note that the knowledge base was unavailable.

Pattern 2 — Agent-Level Fallback Node

def primary_agent_node(state: SupervisorState) -> dict:
    try:
        result = rag_agent.invoke({"messages": state["messages"]})
        return {"messages": [AIMessage(content=result["messages"][-1].content)]}
    except Exception as e:
        logger.error(f"RAG agent failed: {e}")
        return {"messages": [AIMessage(content="__FALLBACK__")]}   # signal fallback

def should_fallback(state: SupervisorState) -> str:
    last = state["messages"][-1].content
    if last == "__FALLBACK__":
        return "fallback_agent"
    return END

def fallback_agent_node(state: SupervisorState) -> dict:
    """Simple LLM response when all else fails."""
    question = state["messages"][-2].content  # user's question
    response = llm.invoke([
        SystemMessage("You're a helpful assistant. The knowledge base is unavailable."),
        HumanMessage(content=question)
    ])
    return {"messages": [response]}

Pattern 3 — Model Fallback

Primary: expensive model. Fallback: cheaper/faster model.

class ModelFallback:
    def __init__(self):
        self.primary = ChatOpenAI(model="gpt-4o", temperature=0)
        self.fallback = ChatOpenAI(model="gpt-4o-mini", temperature=0)
    
    def invoke(self, messages):
        try:
            return self.primary.invoke(messages)
        except Exception as e:
            logger.warning(f"Primary model failed ({e}), using fallback model")
            return self.fallback.invoke(messages)

llm = ModelFallback()

9.6 Structured Error State

For production agents, track errors in state for observability and recovery decisions:

class RobustAgentState(TypedDict):
    messages:       Annotated[list[BaseMessage], add_messages]
    errors:         Annotated[list[str], operator.add]   # accumulate all errors
    retry_count:    Annotated[int, operator.add]
    last_error:     str | None

def call_tools(state: RobustAgentState) -> dict:
    last_message = state["messages"][-1]
    results = []
    new_errors = []
    
    for tool_call in last_message.tool_calls:
        try:
            output = tool_map[tool_call["name"]].invoke(tool_call["args"])
            results.append(ToolMessage(content=str(output), tool_call_id=tool_call["id"]))
        except Exception as e:
            error_msg = f"{tool_call['name']}: {str(e)}"
            new_errors.append(error_msg)
            results.append(ToolMessage(
                content=f"Error: {str(e)}",
                tool_call_id=tool_call["id"]
            ))
    
    update = {"messages": results}
    if new_errors:
        update["errors"] = new_errors
        update["last_error"] = new_errors[-1]
    return update

def should_call_tools(state: RobustAgentState) -> str:
    # Abort if too many errors accumulated
    if len(state.get("errors", [])) >= 5:
        return END
    # ... normal routing

9.7 Rate Limiting and Throttling

In production, you'll need to protect against:

  1. Your own users sending too many requests
  2. OpenAI rate limits affecting all users

Request-Level Rate Limiting (FastAPI)

from fastapi import HTTPException
from datetime import datetime, timedelta
import asyncio

class RateLimiter:
    def __init__(self, max_requests: int, window: timedelta):
        self.max_requests = max_requests
        self.window = window
        self.requests: dict[str, list[datetime]] = {}
    
    def check(self, user_id: str) -> bool:
        now = datetime.utcnow()
        window_start = now - self.window
        
        # Clean old requests
        self.requests.setdefault(user_id, [])
        self.requests[user_id] = [
            t for t in self.requests[user_id] if t > window_start
        ]
        
        # Check limit
        if len(self.requests[user_id]) >= self.max_requests:
            return False
        
        self.requests[user_id].append(now)
        return True

rate_limiter = RateLimiter(max_requests=10, window=timedelta(minutes=1))

@app.post("/chat")
async def chat(question: str, session_id: str, user_id: str = "default"):
    if not rate_limiter.check(user_id):
        raise HTTPException(
            status_code=429,
            detail="Too many requests. Please wait before sending more."
        )
    # ... rest of handler

9.8 Timeout Handling

Long-running agent turns must have timeouts:

import asyncio

async def run_agent_with_timeout(
    question: str,
    session_id: str,
    db: Session,
    timeout_seconds: float = 30.0
) -> str:
    """Run the agent with a hard timeout."""
    try:
        result = await asyncio.wait_for(
            agent.ainvoke(
                {"messages": history},
                config={"configurable": {"thread_id": session_id}}
            ),
            timeout=timeout_seconds
        )
        return result["messages"][-1].content
    
    except asyncio.TimeoutError:
        logger.warning(f"Agent timeout for session {session_id}")
        return "I'm taking too long to process this request. Please try again with a simpler question."
    
    except Exception as e:
        logger.error(f"Agent error: {e}", exc_info=True)
        return "I encountered an error. Please try again."

9.9 Dead-Letter Queue Pattern

For requests that consistently fail, don't keep retrying — escalate to a dead-letter queue:

class DeadLetterQueue:
    def __init__(self, db: Session):
        self.db = db
    
    def push(self, session_id: str, question: str, error: str, state: dict | None = None):
        """Store a failed request for later investigation."""
        record = FailedRequest(
            session_id=session_id,
            question=question,
            error=error,
            state_snapshot=json.dumps(state) if state else None,
            created_at=datetime.utcnow()
        )
        self.db.add(record)
        self.db.commit()
        
        # Alert the team (PagerDuty, Slack, etc.)
        if alert_enabled:
            send_alert(f"Dead-lettered request from {session_id}: {error[:100]}")

dlq = DeadLetterQueue(db)

async def run_agent_robust(question: str, session_id: str, db: Session) -> str:
    max_retries = 3
    last_error = None
    
    for attempt in range(max_retries):
        try:
            return await run_agent_with_timeout(question, session_id, db)
        except Exception as e:
            last_error = e
            await asyncio.sleep(2 ** attempt)   # backoff between retries
    
    # All retries exhausted — dead-letter it
    dlq.push(session_id, question, str(last_error))
    return "I'm unable to process your request at this time. Our team has been notified."

9.10 Interview Q&A

Q: How do you handle tool failures in a LangGraph agent?

The key principle is: return errors as ToolMessage objects rather than raising exceptions. If rag_search fails because ChromaDB is down, the call_tools node catches the exception, creates a ToolMessage with content="Tool failed: ChromaDB unavailable" and the correct tool_call_id, and returns it to state. On the next call_llm invocation, the LLM sees this error in its context and can either retry with different parameters, fall back to its training knowledge, or politely explain to the user that the service is unavailable. This keeps the agent loop running gracefully instead of crashing mid-turn.


Q: How do you prevent an agent from entering an infinite loop?

I add an iteration_count: Annotated[int, operator.add] field to AgentState. The call_llm node returns {"iteration_count": 1} as part of its update — the operator.add reducer accumulates it across calls. The should_call_tools routing function checks this: if state["iteration_count"] >= MAX_ITERATIONS (e.g., 10), it returns END regardless of whether tool_calls are present. This creates a hard ceiling on the number of LLM calls per invocation. For production, I also log a warning when the limit is hit — it usually indicates a tool that's returning unhelpful results that cause the LLM to keep retrying.


Q: How do you handle OpenAI rate limits in a production agent?

Rate limits need to be handled at two levels. At the application level: implement per-user request rate limiting with a sliding window counter (10 requests per minute per user) using Redis for distributed deployment. At the OpenAI API level: wrap llm.invoke() with exponential backoff — catch RateLimitError (429), wait (2^attempt + random_jitter) seconds, and retry up to 3 times. For sustained high traffic, OpenAI Batch API and request queuing with priority lanes separate interactive (low-latency) from background (high-throughput) workloads. LangSmith's ratelimit monitoring can proactively alert you to threshold breaches.


Q: What is a dead-letter queue in the context of an agent system?

A dead-letter queue (DLQ) is a persistent store for requests that exhausted all retry attempts and still failed. Instead of returning a generic error, the system saves the full context (question, session_id, error, state snapshot) to a FailedRequest table and alerts the engineering team. This ensures no request silently disappears and provides all the information needed to diagnose the root cause. For an agent system, the DLQ is especially valuable because agent failures are often non-deterministic — seeing the exact state at failure time is critical for debugging.


Q: How would you implement a fallback from GPT-4o to GPT-4o-mini on failure?

Create a ModelFallback class that wraps both models. invoke() tries the primary model first; on APIError, RateLimitError, or other failures, it logs the fallback event and calls the secondary model. The fallback is transparent to the rest of the agent — it still returns a valid AIMessage. This can also be implemented as a RunnableFallback using LangChain's LCEL: chain = primary_llm.with_fallbacks([fallback_llm]). For production, include a circuit breaker so that if the primary model has been down for > 5 minutes, all traffic goes to the fallback without attempting the primary each time.


9.11 Key One-Liners to Memorize

"Never raise from call_tools — return errors as ToolMessages and let the LLM recover."

"Iteration limit: Annotated[int, operator.add] in state, checked in the routing function."

"Exponential backoff for rate limits: wait 2^attempt + jitter, retry up to 3 times."

"Fallback agents provide degraded-but-functional responses when primary fails."

"Dead-letter queue: when retries are exhausted, persist the failure and alert the team."

"Timeout every agent invocation — an unresponsive agent is worse than a fast error."

Next: Chapter 10 — Production Agents: Streaming, Tracing & Scaling

Header Logo