From Prototype to Production - LangGraph Systems

A working prototype is maybe 20% of production effort. The remaining 80% involves error handling, monitoring, deployment, scaling, and building systems that fail gracefully. This post covers the gap between “it works on my machine” and “it handles thousands of users reliably.”

The Production Gap

Prototypes assume happy paths. Production systems face:

graph TD
    subgraph Prototype["Prototype World"]
        A1[Single User] --> B1[Fast Responses]
        B1 --> C1[No Errors]
        C1 --> D1[Unlimited Budget]
    end

    subgraph Production["Production Reality"]
        A2[Concurrent Users] --> B2[Latency Budgets]
        B2 --> C2[Failures Happen]
        C2 --> D2[Cost Constraints]
        D2 --> E2[Observability Required]
    end

    classDef blueClass fill:#4A90E2,stroke:#333,stroke-width:2px,color:#fff
    classDef orangeClass fill:#F39C12,stroke:#333,stroke-width:2px,color:#fff

    class A1,B1,C1,D1 blueClass
    class A2,B2,C2,D2,E2 orangeClass
Concern Prototype Production
Errors Crash and debug Graceful recovery
State In-memory Persistent, replicated
Scaling Single instance Horizontal scaling
Costs Ignored Tracked and optimized
Monitoring Print statements Structured logging, traces

Error Handling Strategies

Layered Error Handling

Errors can occur at multiple levels. Handle each appropriately:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from typing import TypedDict
import logging

logger = logging.getLogger(__name__)

class RobustState(TypedDict):
query: str
result: str
error: str | None
retries: int
fallback_used: bool

class LLMError(Exception):
"""Custom exception for LLM-related errors."""
pass

class ToolError(Exception):
"""Custom exception for tool execution errors."""
pass

def safe_llm_call(prompt: str, max_retries: int = 3) -> str:
"""LLM call with retry logic and exponential backoff."""
import time
from openai import RateLimitError, APIError

llm = ChatOpenAI(model="gpt-4o")

for attempt in range(max_retries):
try:
response = llm.invoke([HumanMessage(content=prompt)])
return response.content
except RateLimitError:
wait_time = 2 ** attempt # Exponential backoff
logger.warning(f"Rate limited, waiting {wait_time}s (attempt {attempt + 1})")
time.sleep(wait_time)
except APIError as e:
logger.error(f"API error: {e}")
if attempt == max_retries - 1:
raise LLMError(f"API failed after {max_retries} attempts: {e}")
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise LLMError(f"Unexpected LLM error: {e}")

raise LLMError("Max retries exceeded")

def process_with_fallback(state: RobustState) -> dict:
"""Process with fallback to simpler approach on failure."""
try:
# Primary approach - complex reasoning
result = safe_llm_call(f"Analyze in detail: {state['query']}")
return {"result": result, "fallback_used": False}

except LLMError as e:
logger.warning(f"Primary approach failed, using fallback: {e}")

try:
# Fallback - simpler approach
result = safe_llm_call(f"Briefly answer: {state['query']}")
return {"result": result, "fallback_used": True}

except LLMError as e2:
# Both approaches failed
return {
"error": f"All approaches failed: {e2}",
"result": "Unable to process your request. Please try again later."
}

Circuit Breaker Pattern

Prevent cascading failures by stopping calls to failing services:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from datetime import datetime, timedelta
from enum import Enum
from dataclasses import dataclass

class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject calls
HALF_OPEN = "half_open" # Testing if recovered

@dataclass
class CircuitBreaker:
failure_threshold: int = 5
recovery_timeout: int = 60 # seconds
_failure_count: int = 0
_last_failure: datetime | None = None
_state: CircuitState = CircuitState.CLOSED

def can_execute(self) -> bool:
"""Check if we should attempt the call."""
if self._state == CircuitState.CLOSED:
return True

if self._state == CircuitState.OPEN:
# Check if recovery timeout passed
if self._last_failure and \
datetime.now() - self._last_failure > timedelta(seconds=self.recovery_timeout):
self._state = CircuitState.HALF_OPEN
return True
return False

# HALF_OPEN - allow one test request
return True

def record_success(self):
"""Record successful call."""
self._failure_count = 0
self._state = CircuitState.CLOSED

def record_failure(self):
"""Record failed call."""
self._failure_count += 1
self._last_failure = datetime.now()

if self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN

# Usage in agent nodes
llm_circuit = CircuitBreaker(failure_threshold=3, recovery_timeout=30)

def protected_llm_call(state: RobustState) -> dict:
"""LLM call protected by circuit breaker."""
if not llm_circuit.can_execute():
return {
"error": "Service temporarily unavailable",
"result": "System is recovering from errors. Please wait."
}

try:
result = safe_llm_call(state["query"])
llm_circuit.record_success()
return {"result": result}
except LLMError as e:
llm_circuit.record_failure()
return {"error": str(e)}
stateDiagram-v2
    [*] --> Closed
    Closed --> Open: Failures >= Threshold
    Open --> HalfOpen: Timeout Expired
    HalfOpen --> Closed: Success
    HalfOpen --> Open: Failure
    Closed --> Closed: Success/Failure < Threshold

Structured Logging and Tracing

Structured Logging

Replace print statements with structured logs:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import structlog
import json
from typing import Any
from datetime import datetime

# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
)

logger = structlog.get_logger()

def logged_node(state: dict) -> dict:
"""Node with structured logging."""
log = logger.bind(
node="processor",
query_id=state.get("query_id"),
user_id=state.get("user_id")
)

log.info("node_started", input_size=len(str(state.get("query", ""))))

start_time = datetime.now()
try:
result = process_query(state)

duration_ms = (datetime.now() - start_time).total_seconds() * 1000
log.info(
"node_completed",
duration_ms=duration_ms,
output_size=len(str(result.get("result", "")))
)

return result

except Exception as e:
duration_ms = (datetime.now() - start_time).total_seconds() * 1000
log.error(
"node_failed",
duration_ms=duration_ms,
error_type=type(e).__name__,
error_message=str(e)
)
raise

Distributed Tracing

Track requests across agent nodes:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

# Setup tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

# Add exporter (e.g., Jaeger, Zipkin, or cloud provider)
otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4317")
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

def traced_node(node_name: str):
"""Decorator to add tracing to nodes."""
def decorator(func):
def wrapper(state: dict) -> dict:
with tracer.start_as_current_span(
node_name,
attributes={
"agent.query_id": state.get("query_id", "unknown"),
"agent.node": node_name
}
) as span:
try:
result = func(state)
span.set_attribute("agent.success", True)
return result
except Exception as e:
span.set_attribute("agent.success", False)
span.set_attribute("agent.error", str(e))
span.record_exception(e)
raise
return wrapper
return decorator

@traced_node("research")
def research_node(state: dict) -> dict:
"""Research node with automatic tracing."""
# Work happens here
return {"research": "findings..."}

@traced_node("synthesis")
def synthesis_node(state: dict) -> dict:
"""Synthesis node with automatic tracing."""
return {"synthesis": "combined results..."}

Cost Management

Token Tracking

Monitor and limit LLM costs:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from dataclasses import dataclass, field
from threading import Lock

@dataclass
class TokenTracker:
"""Track token usage and enforce limits."""
max_tokens_per_request: int = 10000
max_tokens_per_day: int = 1000000
_daily_usage: int = 0
_lock: Lock = field(default_factory=Lock)

# Approximate costs per 1K tokens (varies by model)
costs_per_1k = {
"gpt-4o": {"input": 0.005, "output": 0.015},
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
"claude-3-opus": {"input": 0.015, "output": 0.075},
"claude-3-sonnet": {"input": 0.003, "output": 0.015},
}

def check_budget(self, estimated_tokens: int) -> bool:
"""Check if request is within budget."""
with self._lock:
return (self._daily_usage + estimated_tokens) <= self.max_tokens_per_day

def record_usage(self, input_tokens: int, output_tokens: int, model: str) -> dict:
"""Record token usage and return cost info."""
with self._lock:
total = input_tokens + output_tokens
self._daily_usage += total

costs = self.costs_per_1k.get(model, {"input": 0.01, "output": 0.03})
cost = (input_tokens / 1000 * costs["input"] +
output_tokens / 1000 * costs["output"])

return {
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": total,
"estimated_cost": round(cost, 6),
"daily_usage": self._daily_usage
}

tracker = TokenTracker()

def cost_aware_llm_call(prompt: str, model: str = "gpt-4o") -> tuple[str, dict]:
"""LLM call with cost tracking."""
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

# Estimate tokens (rough: 4 chars per token)
estimated_input = len(prompt) // 4

if not tracker.check_budget(estimated_input * 2): # Assume similar output
raise Exception("Daily token budget exceeded")

llm = ChatOpenAI(model=model)
response = llm.invoke([HumanMessage(content=prompt)])

# Get actual usage from response
usage = tracker.record_usage(
input_tokens=response.response_metadata.get("token_usage", {}).get("prompt_tokens", estimated_input),
output_tokens=response.response_metadata.get("token_usage", {}).get("completion_tokens", len(response.content) // 4),
model=model
)

return response.content, usage

Model Selection by Task

Use cheaper models for simple tasks:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from enum import Enum

class TaskComplexity(Enum):
SIMPLE = "simple" # Classification, extraction
MEDIUM = "medium" # Summarization, Q&A
COMPLEX = "complex" # Reasoning, multi-step

def select_model(complexity: TaskComplexity) -> str:
"""Select appropriate model based on task complexity."""
model_map = {
TaskComplexity.SIMPLE: "gpt-4o-mini",
TaskComplexity.MEDIUM: "gpt-4o",
TaskComplexity.COMPLEX: "gpt-4o", # or claude-3-opus for hardest
}
return model_map[complexity]

def adaptive_processing(state: dict) -> dict:
"""Process with model selection based on complexity."""
query = state["query"]

# Classify complexity (using cheap model)
complexity_prompt = f"Classify this query as 'simple', 'medium', or 'complex': {query}"
classification, _ = cost_aware_llm_call(complexity_prompt, "gpt-4o-mini")

complexity = TaskComplexity(classification.strip().lower())
model = select_model(complexity)

# Process with appropriate model
result, usage = cost_aware_llm_call(
f"Answer this query thoroughly: {query}",
model
)

return {
"result": result,
"model_used": model,
"complexity": complexity.value,
"cost_info": usage
}

Deployment Architecture

Async Processing with Queues

Handle high concurrency with task queues:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import asyncio
from redis import Redis
import json
from typing import Callable
from langgraph.graph import StateGraph

class AsyncAgentRunner:
"""Run agents asynchronously with Redis queue."""

def __init__(self, redis_url: str, agent: StateGraph):
self.redis = Redis.from_url(redis_url)
self.agent = agent.compile()
self.queue_name = "agent_tasks"
self.results_prefix = "result:"

def submit_task(self, task_id: str, input_state: dict) -> str:
"""Submit task to queue, return task ID."""
task = {
"task_id": task_id,
"input": input_state,
"submitted_at": datetime.now().isoformat()
}
self.redis.rpush(self.queue_name, json.dumps(task))
return task_id

def get_result(self, task_id: str, timeout: int = 300) -> dict | None:
"""Get result for a task, waiting if necessary."""
result_key = f"{self.results_prefix}{task_id}"

# Poll for result
for _ in range(timeout):
result = self.redis.get(result_key)
if result:
return json.loads(result)
asyncio.sleep(1)

return None

async def worker(self):
"""Process tasks from queue."""
while True:
# Blocking pop from queue
task_data = self.redis.blpop(self.queue_name, timeout=5)

if task_data:
_, task_json = task_data
task = json.loads(task_json)

try:
# Run the agent
result = self.agent.invoke(task["input"])

# Store result
result_key = f"{self.results_prefix}{task['task_id']}"
self.redis.setex(
result_key,
3600, # 1 hour TTL
json.dumps({"status": "success", "result": result})
)

except Exception as e:
result_key = f"{self.results_prefix}{task['task_id']}"
self.redis.setex(
result_key,
3600,
json.dumps({"status": "error", "error": str(e)})
)

Health Checks and Readiness

Implement proper health endpoints:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from fastapi import FastAPI, HTTPException
from datetime import datetime

app = FastAPI()

class HealthStatus:
def __init__(self):
self.llm_healthy = True
self.db_healthy = True
self.last_check = datetime.now()

def check_llm(self) -> bool:
"""Verify LLM service is responding."""
try:
# Minimal health check call
llm = ChatOpenAI(model="gpt-4o-mini")
response = llm.invoke([HumanMessage(content="ping")])
self.llm_healthy = len(response.content) > 0
except Exception:
self.llm_healthy = False
return self.llm_healthy

def check_db(self) -> bool:
"""Verify database connection."""
try:
# Test query
self.db_healthy = True
except Exception:
self.db_healthy = False
return self.db_healthy

health = HealthStatus()

@app.get("/health")
def health_check():
"""Liveness probe - is the service running?"""
return {"status": "ok", "timestamp": datetime.now().isoformat()}

@app.get("/ready")
def readiness_check():
"""Readiness probe - can the service handle requests?"""
llm_ok = health.check_llm()
db_ok = health.check_db()

if not (llm_ok and db_ok):
raise HTTPException(
status_code=503,
detail={
"status": "not_ready",
"llm": llm_ok,
"database": db_ok
}
)

return {
"status": "ready",
"llm": llm_ok,
"database": db_ok,
"timestamp": datetime.now().isoformat()
}

@app.get("/metrics")
def metrics():
"""Prometheus-style metrics endpoint."""
return {
"agent_requests_total": 1000,
"agent_errors_total": 5,
"agent_latency_seconds": {"p50": 1.2, "p95": 3.5, "p99": 8.0},
"token_usage_daily": tracker._daily_usage,
"circuit_breaker_state": llm_circuit._state.value
}
graph TD
    A[Load Balancer] --> B[Agent Instance 1]
    A --> C[Agent Instance 2]
    A --> D[Agent Instance N]

    B --> E[Redis Queue]
    C --> E
    D --> E

    E --> F[Worker Pool]

    F --> G[LLM API]
    F --> H[Database]
    F --> I[Vector Store]

    subgraph Monitoring["Monitoring"]
        J[Prometheus]
        K[Grafana]
        L[Alerting]
    end

    B --> J
    C --> J
    D --> J

    classDef blueClass fill:#4A90E2,stroke:#333,stroke-width:2px,color:#fff
    classDef orangeClass fill:#F39C12,stroke:#333,stroke-width:2px,color:#fff
    classDef greenClass fill:#27AE60,stroke:#333,stroke-width:2px,color:#fff

    class A,E blueClass
    class B,C,D,F orangeClass
    class G,H,I,J,K,L greenClass

Complete Production Agent

Putting it all together—a production-ready agent service:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.sqlite import SqliteSaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from typing import TypedDict, Optional
import structlog
import uuid

# Structured logging
logger = structlog.get_logger()

# State definition
class ProductionState(TypedDict):
request_id: str
user_id: str
query: str
result: str
error: Optional[str]
metadata: dict

# Circuit breaker for LLM calls
llm_circuit = CircuitBreaker(failure_threshold=5, recovery_timeout=60)

# Token tracker
token_tracker = TokenTracker(max_tokens_per_day=500000)

# Build the agent
def create_production_agent():
"""Create a production-ready agent with all safety features."""

@traced_node("process_query")
def process_query(state: ProductionState) -> dict:
log = logger.bind(
request_id=state["request_id"],
user_id=state["user_id"]
)

# Check circuit breaker
if not llm_circuit.can_execute():
log.warning("circuit_open", message="LLM circuit breaker open")
return {
"error": "Service temporarily unavailable",
"result": "Please try again in a few moments."
}

# Check token budget
estimated_tokens = len(state["query"]) // 4 * 3
if not token_tracker.check_budget(estimated_tokens):
log.warning("budget_exceeded", estimated=estimated_tokens)
return {
"error": "Daily capacity reached",
"result": "Service capacity reached. Please try again tomorrow."
}

try:
# Select model based on complexity
complexity = classify_complexity(state["query"])
model = select_model(complexity)

log.info("processing", model=model, complexity=complexity.value)

result, usage = cost_aware_llm_call(
f"Answer this query helpfully: {state['query']}",
model
)

llm_circuit.record_success()

log.info("completed", tokens=usage["total_tokens"], cost=usage["estimated_cost"])

return {
"result": result,
"metadata": {
"model": model,
"complexity": complexity.value,
"tokens": usage["total_tokens"],
"cost": usage["estimated_cost"]
}
}

except Exception as e:
llm_circuit.record_failure()
log.error("processing_failed", error=str(e))
return {
"error": str(e),
"result": "An error occurred processing your request."
}

# Build graph
graph = StateGraph(ProductionState)
graph.add_node("process", process_query)
graph.add_edge(START, "process")
graph.add_edge("process", END)

# Compile with persistence
checkpointer = SqliteSaver.from_conn_string("production_agent.db")
return graph.compile(checkpointer=checkpointer)

# FastAPI application
app = FastAPI(title="Production Agent API")
agent = create_production_agent()

class QueryRequest(BaseModel):
query: str
user_id: str

class QueryResponse(BaseModel):
request_id: str
result: str
error: Optional[str] = None
metadata: dict = {}

@app.post("/query", response_model=QueryResponse)
async def handle_query(request: QueryRequest):
"""Handle incoming query requests."""
request_id = str(uuid.uuid4())

logger.info(
"request_received",
request_id=request_id,
user_id=request.user_id,
query_length=len(request.query)
)

initial_state = {
"request_id": request_id,
"user_id": request.user_id,
"query": request.query,
"result": "",
"error": None,
"metadata": {}
}

config = {"configurable": {"thread_id": request_id}}

try:
result = agent.invoke(initial_state, config=config)

return QueryResponse(
request_id=request_id,
result=result["result"],
error=result.get("error"),
metadata=result.get("metadata", {})
)

except Exception as e:
logger.error("request_failed", request_id=request_id, error=str(e))
return QueryResponse(
request_id=request_id,
result="Request failed",
error=str(e)
)

@app.get("/status/{request_id}")
async def get_status(request_id: str):
"""Get status of a previous request."""
config = {"configurable": {"thread_id": request_id}}

try:
state = agent.get_state(config)
return {
"request_id": request_id,
"found": state is not None,
"state": state.values if state else None
}
except Exception as e:
return {"request_id": request_id, "error": str(e)}

Deployment Checklist

Before going to production, verify:

Infrastructure

  • Persistent storage for checkpoints (PostgreSQL, Redis)
  • API keys in secrets manager (not environment variables)
  • Rate limiting at API gateway
  • SSL/TLS for all connections

Monitoring

  • Structured logging to aggregation service
  • Distributed tracing enabled
  • Metrics dashboard (latency, errors, tokens, costs)
  • Alerting on error rates and latency

Resilience

  • Circuit breakers for external services
  • Retry logic with exponential backoff
  • Graceful degradation paths
  • Health and readiness probes

Cost Control

  • Token tracking per request and daily
  • Budget alerts and limits
  • Model selection by task complexity
  • Request size limits

Security

  • Input validation and sanitization
  • Output filtering for sensitive data
  • Audit logging for compliance
  • Rate limiting per user

Key Takeaways

  1. Production is 80% of the work: Prototypes skip error handling, monitoring, and scaling. Budget accordingly.

  2. Layer your error handling: Retries for transient failures, fallbacks for degraded operation, circuit breakers for cascading failures.

  3. Observe everything: Structured logs, distributed traces, and metrics. You can’t fix what you can’t see.

  4. Control costs actively: Track tokens, select models by complexity, set budgets, and alert on anomalies.

  5. Design for failure: Services fail. Networks fail. LLMs fail. Build systems that degrade gracefully.

  6. Use the checklist: Systematic verification catches gaps that intuition misses.


This concludes the LangChain and LangGraph series. We’ve covered foundations through production systems—from prompts and LCEL through multi-agent architectures and deployment. The tools are here; build something useful.

Multi-Agent Architecture with LangGraph From Chatbots to Agents - Understanding Intelligent AI Systems

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×