A working prototype is maybe 20% of production effort. The remaining 80% involves error handling, monitoring, deployment, scaling, and building systems that fail gracefully. This post covers the gap between “it works on my machine” and “it handles thousands of users reliably.”
The Production Gap Prototypes assume happy paths. Production systems face:
graph TD
subgraph Prototype["Prototype World"]
A1[Single User] --> B1[Fast Responses]
B1 --> C1[No Errors]
C1 --> D1[Unlimited Budget]
end
subgraph Production["Production Reality"]
A2[Concurrent Users] --> B2[Latency Budgets]
B2 --> C2[Failures Happen]
C2 --> D2[Cost Constraints]
D2 --> E2[Observability Required]
end
classDef blueClass fill:#4A90E2,stroke:#333,stroke-width:2px,color:#fff
classDef orangeClass fill:#F39C12,stroke:#333,stroke-width:2px,color:#fff
class A1,B1,C1,D1 blueClass
class A2,B2,C2,D2,E2 orangeClass
Concern
Prototype
Production
Errors
Crash and debug
Graceful recovery
State
In-memory
Persistent, replicated
Scaling
Single instance
Horizontal scaling
Costs
Ignored
Tracked and optimized
Monitoring
Print statements
Structured logging, traces
Error Handling Strategies Layered Error Handling Errors can occur at multiple levels. Handle each appropriately:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 from langgraph.graph import StateGraph, START, ENDfrom langchain_openai import ChatOpenAIfrom langchain_core.messages import HumanMessagefrom typing import TypedDictimport logginglogger = logging.getLogger(__name__) class RobustState (TypedDict ): query: str result: str error: str | None retries: int fallback_used: bool class LLMError (Exception ): """Custom exception for LLM-related errors.""" pass class ToolError (Exception ): """Custom exception for tool execution errors.""" pass def safe_llm_call (prompt: str , max_retries: int = 3 ) -> str : """LLM call with retry logic and exponential backoff.""" import time from openai import RateLimitError, APIError llm = ChatOpenAI(model="gpt-4o" ) for attempt in range (max_retries): try : response = llm.invoke([HumanMessage(content=prompt)]) return response.content except RateLimitError: wait_time = 2 ** attempt logger.warning(f"Rate limited, waiting {wait_time} s (attempt {attempt + 1 } )" ) time.sleep(wait_time) except APIError as e: logger.error(f"API error: {e} " ) if attempt == max_retries - 1 : raise LLMError(f"API failed after {max_retries} attempts: {e} " ) except Exception as e: logger.error(f"Unexpected error: {e} " ) raise LLMError(f"Unexpected LLM error: {e} " ) raise LLMError("Max retries exceeded" ) def process_with_fallback (state: RobustState ) -> dict : """Process with fallback to simpler approach on failure.""" try : result = safe_llm_call(f"Analyze in detail: {state['query' ]} " ) return {"result" : result, "fallback_used" : False } except LLMError as e: logger.warning(f"Primary approach failed, using fallback: {e} " ) try : result = safe_llm_call(f"Briefly answer: {state['query' ]} " ) return {"result" : result, "fallback_used" : True } except LLMError as e2: return { "error" : f"All approaches failed: {e2} " , "result" : "Unable to process your request. Please try again later." }
Circuit Breaker Pattern Prevent cascading failures by stopping calls to failing services:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 from datetime import datetime, timedeltafrom enum import Enumfrom dataclasses import dataclassclass CircuitState (Enum ): CLOSED = "closed" OPEN = "open" HALF_OPEN = "half_open" @dataclass class CircuitBreaker : failure_threshold: int = 5 recovery_timeout: int = 60 _failure_count: int = 0 _last_failure: datetime | None = None _state: CircuitState = CircuitState.CLOSED def can_execute (self ) -> bool : """Check if we should attempt the call.""" if self ._state == CircuitState.CLOSED: return True if self ._state == CircuitState.OPEN: if self ._last_failure and \ datetime.now() - self ._last_failure > timedelta(seconds=self .recovery_timeout): self ._state = CircuitState.HALF_OPEN return True return False return True def record_success (self ): """Record successful call.""" self ._failure_count = 0 self ._state = CircuitState.CLOSED def record_failure (self ): """Record failed call.""" self ._failure_count += 1 self ._last_failure = datetime.now() if self ._failure_count >= self .failure_threshold: self ._state = CircuitState.OPEN llm_circuit = CircuitBreaker(failure_threshold=3 , recovery_timeout=30 ) def protected_llm_call (state: RobustState ) -> dict : """LLM call protected by circuit breaker.""" if not llm_circuit.can_execute(): return { "error" : "Service temporarily unavailable" , "result" : "System is recovering from errors. Please wait." } try : result = safe_llm_call(state["query" ]) llm_circuit.record_success() return {"result" : result} except LLMError as e: llm_circuit.record_failure() return {"error" : str (e)}
stateDiagram-v2
[*] --> Closed
Closed --> Open: Failures >= Threshold
Open --> HalfOpen: Timeout Expired
HalfOpen --> Closed: Success
HalfOpen --> Open: Failure
Closed --> Closed: Success/Failure < Threshold
Structured Logging and Tracing Structured Logging Replace print statements with structured logs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import structlogimport jsonfrom typing import Any from datetime import datetimestructlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.processors.TimeStamper(fmt="iso" ), structlog.processors.JSONRenderer() ], context_class=dict , logger_factory=structlog.stdlib.LoggerFactory(), ) logger = structlog.get_logger() def logged_node (state: dict ) -> dict : """Node with structured logging.""" log = logger.bind( node="processor" , query_id=state.get("query_id" ), user_id=state.get("user_id" ) ) log.info("node_started" , input_size=len (str (state.get("query" , "" )))) start_time = datetime.now() try : result = process_query(state) duration_ms = (datetime.now() - start_time).total_seconds() * 1000 log.info( "node_completed" , duration_ms=duration_ms, output_size=len (str (result.get("result" , "" ))) ) return result except Exception as e: duration_ms = (datetime.now() - start_time).total_seconds() * 1000 log.error( "node_failed" , duration_ms=duration_ms, error_type=type (e).__name__, error_message=str (e) ) raise
Distributed Tracing Track requests across agent nodes:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 from opentelemetry import tracefrom opentelemetry.sdk.trace import TracerProviderfrom opentelemetry.sdk.trace.export import BatchSpanProcessorfrom opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExportertrace.set_tracer_provider(TracerProvider()) tracer = trace.get_tracer(__name__) otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4317" ) span_processor = BatchSpanProcessor(otlp_exporter) trace.get_tracer_provider().add_span_processor(span_processor) def traced_node (node_name: str ): """Decorator to add tracing to nodes.""" def decorator (func ): def wrapper (state: dict ) -> dict : with tracer.start_as_current_span( node_name, attributes={ "agent.query_id" : state.get("query_id" , "unknown" ), "agent.node" : node_name } ) as span: try : result = func(state) span.set_attribute("agent.success" , True ) return result except Exception as e: span.set_attribute("agent.success" , False ) span.set_attribute("agent.error" , str (e)) span.record_exception(e) raise return wrapper return decorator @traced_node("research" ) def research_node (state: dict ) -> dict : """Research node with automatic tracing.""" return {"research" : "findings..." } @traced_node("synthesis" ) def synthesis_node (state: dict ) -> dict : """Synthesis node with automatic tracing.""" return {"synthesis" : "combined results..." }
Cost Management Token Tracking Monitor and limit LLM costs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 from dataclasses import dataclass, fieldfrom threading import Lock@dataclass class TokenTracker : """Track token usage and enforce limits.""" max_tokens_per_request: int = 10000 max_tokens_per_day: int = 1000000 _daily_usage: int = 0 _lock: Lock = field(default_factory=Lock) costs_per_1k = { "gpt-4o" : {"input" : 0.005 , "output" : 0.015 }, "gpt-4o-mini" : {"input" : 0.00015 , "output" : 0.0006 }, "claude-3-opus" : {"input" : 0.015 , "output" : 0.075 }, "claude-3-sonnet" : {"input" : 0.003 , "output" : 0.015 }, } def check_budget (self, estimated_tokens: int ) -> bool : """Check if request is within budget.""" with self ._lock: return (self ._daily_usage + estimated_tokens) <= self .max_tokens_per_day def record_usage (self, input_tokens: int , output_tokens: int , model: str ) -> dict : """Record token usage and return cost info.""" with self ._lock: total = input_tokens + output_tokens self ._daily_usage += total costs = self .costs_per_1k.get(model, {"input" : 0.01 , "output" : 0.03 }) cost = (input_tokens / 1000 * costs["input" ] + output_tokens / 1000 * costs["output" ]) return { "input_tokens" : input_tokens, "output_tokens" : output_tokens, "total_tokens" : total, "estimated_cost" : round (cost, 6 ), "daily_usage" : self ._daily_usage } tracker = TokenTracker() def cost_aware_llm_call (prompt: str , model: str = "gpt-4o" ) -> tuple [str , dict ]: """LLM call with cost tracking.""" from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage estimated_input = len (prompt) // 4 if not tracker.check_budget(estimated_input * 2 ): raise Exception("Daily token budget exceeded" ) llm = ChatOpenAI(model=model) response = llm.invoke([HumanMessage(content=prompt)]) usage = tracker.record_usage( input_tokens=response.response_metadata.get("token_usage" , {}).get("prompt_tokens" , estimated_input), output_tokens=response.response_metadata.get("token_usage" , {}).get("completion_tokens" , len (response.content) // 4 ), model=model ) return response.content, usage
Model Selection by Task Use cheaper models for simple tasks:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 from enum import Enumclass TaskComplexity (Enum ): SIMPLE = "simple" MEDIUM = "medium" COMPLEX = "complex" def select_model (complexity: TaskComplexity ) -> str : """Select appropriate model based on task complexity.""" model_map = { TaskComplexity.SIMPLE: "gpt-4o-mini" , TaskComplexity.MEDIUM: "gpt-4o" , TaskComplexity.COMPLEX: "gpt-4o" , } return model_map[complexity] def adaptive_processing (state: dict ) -> dict : """Process with model selection based on complexity.""" query = state["query" ] complexity_prompt = f"Classify this query as 'simple', 'medium', or 'complex': {query} " classification, _ = cost_aware_llm_call(complexity_prompt, "gpt-4o-mini" ) complexity = TaskComplexity(classification.strip().lower()) model = select_model(complexity) result, usage = cost_aware_llm_call( f"Answer this query thoroughly: {query} " , model ) return { "result" : result, "model_used" : model, "complexity" : complexity.value, "cost_info" : usage }
Deployment Architecture Async Processing with Queues Handle high concurrency with task queues:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 import asynciofrom redis import Redisimport jsonfrom typing import Callable from langgraph.graph import StateGraphclass AsyncAgentRunner : """Run agents asynchronously with Redis queue.""" def __init__ (self, redis_url: str , agent: StateGraph ): self .redis = Redis.from_url(redis_url) self .agent = agent.compile () self .queue_name = "agent_tasks" self .results_prefix = "result:" def submit_task (self, task_id: str , input_state: dict ) -> str : """Submit task to queue, return task ID.""" task = { "task_id" : task_id, "input" : input_state, "submitted_at" : datetime.now().isoformat() } self .redis.rpush(self .queue_name, json.dumps(task)) return task_id def get_result (self, task_id: str , timeout: int = 300 ) -> dict | None : """Get result for a task, waiting if necessary.""" result_key = f"{self.results_prefix} {task_id} " for _ in range (timeout): result = self .redis.get(result_key) if result: return json.loads(result) asyncio.sleep(1 ) return None async def worker (self ): """Process tasks from queue.""" while True : task_data = self .redis.blpop(self .queue_name, timeout=5 ) if task_data: _, task_json = task_data task = json.loads(task_json) try : result = self .agent.invoke(task["input" ]) result_key = f"{self.results_prefix} {task['task_id' ]} " self .redis.setex( result_key, 3600 , json.dumps({"status" : "success" , "result" : result}) ) except Exception as e: result_key = f"{self.results_prefix} {task['task_id' ]} " self .redis.setex( result_key, 3600 , json.dumps({"status" : "error" , "error" : str (e)}) )
Health Checks and Readiness Implement proper health endpoints:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 from fastapi import FastAPI, HTTPExceptionfrom datetime import datetimeapp = FastAPI() class HealthStatus : def __init__ (self ): self .llm_healthy = True self .db_healthy = True self .last_check = datetime.now() def check_llm (self ) -> bool : """Verify LLM service is responding.""" try : llm = ChatOpenAI(model="gpt-4o-mini" ) response = llm.invoke([HumanMessage(content="ping" )]) self .llm_healthy = len (response.content) > 0 except Exception: self .llm_healthy = False return self .llm_healthy def check_db (self ) -> bool : """Verify database connection.""" try : self .db_healthy = True except Exception: self .db_healthy = False return self .db_healthy health = HealthStatus() @app.get("/health" ) def health_check (): """Liveness probe - is the service running?""" return {"status" : "ok" , "timestamp" : datetime.now().isoformat()} @app.get("/ready" ) def readiness_check (): """Readiness probe - can the service handle requests?""" llm_ok = health.check_llm() db_ok = health.check_db() if not (llm_ok and db_ok): raise HTTPException( status_code=503 , detail={ "status" : "not_ready" , "llm" : llm_ok, "database" : db_ok } ) return { "status" : "ready" , "llm" : llm_ok, "database" : db_ok, "timestamp" : datetime.now().isoformat() } @app.get("/metrics" ) def metrics (): """Prometheus-style metrics endpoint.""" return { "agent_requests_total" : 1000 , "agent_errors_total" : 5 , "agent_latency_seconds" : {"p50" : 1.2 , "p95" : 3.5 , "p99" : 8.0 }, "token_usage_daily" : tracker._daily_usage, "circuit_breaker_state" : llm_circuit._state.value }
graph TD
A[Load Balancer] --> B[Agent Instance 1]
A --> C[Agent Instance 2]
A --> D[Agent Instance N]
B --> E[Redis Queue]
C --> E
D --> E
E --> F[Worker Pool]
F --> G[LLM API]
F --> H[Database]
F --> I[Vector Store]
subgraph Monitoring["Monitoring"]
J[Prometheus]
K[Grafana]
L[Alerting]
end
B --> J
C --> J
D --> J
classDef blueClass fill:#4A90E2,stroke:#333,stroke-width:2px,color:#fff
classDef orangeClass fill:#F39C12,stroke:#333,stroke-width:2px,color:#fff
classDef greenClass fill:#27AE60,stroke:#333,stroke-width:2px,color:#fff
class A,E blueClass
class B,C,D,F orangeClass
class G,H,I,J,K,L greenClass
Complete Production Agent Putting it all together—a production-ready agent service:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 from langgraph.graph import StateGraph, START, ENDfrom langgraph.checkpoint.sqlite import SqliteSaverfrom langchain_openai import ChatOpenAIfrom langchain_core.messages import HumanMessagefrom fastapi import FastAPI, BackgroundTasksfrom pydantic import BaseModelfrom typing import TypedDict, Optional import structlogimport uuidlogger = structlog.get_logger() class ProductionState (TypedDict ): request_id: str user_id: str query: str result: str error: Optional [str ] metadata: dict llm_circuit = CircuitBreaker(failure_threshold=5 , recovery_timeout=60 ) token_tracker = TokenTracker(max_tokens_per_day=500000 ) def create_production_agent (): """Create a production-ready agent with all safety features.""" @traced_node("process_query" ) def process_query (state: ProductionState ) -> dict : log = logger.bind( request_id=state["request_id" ], user_id=state["user_id" ] ) if not llm_circuit.can_execute(): log.warning("circuit_open" , message="LLM circuit breaker open" ) return { "error" : "Service temporarily unavailable" , "result" : "Please try again in a few moments." } estimated_tokens = len (state["query" ]) // 4 * 3 if not token_tracker.check_budget(estimated_tokens): log.warning("budget_exceeded" , estimated=estimated_tokens) return { "error" : "Daily capacity reached" , "result" : "Service capacity reached. Please try again tomorrow." } try : complexity = classify_complexity(state["query" ]) model = select_model(complexity) log.info("processing" , model=model, complexity=complexity.value) result, usage = cost_aware_llm_call( f"Answer this query helpfully: {state['query' ]} " , model ) llm_circuit.record_success() log.info("completed" , tokens=usage["total_tokens" ], cost=usage["estimated_cost" ]) return { "result" : result, "metadata" : { "model" : model, "complexity" : complexity.value, "tokens" : usage["total_tokens" ], "cost" : usage["estimated_cost" ] } } except Exception as e: llm_circuit.record_failure() log.error("processing_failed" , error=str (e)) return { "error" : str (e), "result" : "An error occurred processing your request." } graph = StateGraph(ProductionState) graph.add_node("process" , process_query) graph.add_edge(START, "process" ) graph.add_edge("process" , END) checkpointer = SqliteSaver.from_conn_string("production_agent.db" ) return graph.compile (checkpointer=checkpointer) app = FastAPI(title="Production Agent API" ) agent = create_production_agent() class QueryRequest (BaseModel ): query: str user_id: str class QueryResponse (BaseModel ): request_id: str result: str error: Optional [str ] = None metadata: dict = {} @app.post("/query" , response_model=QueryResponse ) async def handle_query (request: QueryRequest ): """Handle incoming query requests.""" request_id = str (uuid.uuid4()) logger.info( "request_received" , request_id=request_id, user_id=request.user_id, query_length=len (request.query) ) initial_state = { "request_id" : request_id, "user_id" : request.user_id, "query" : request.query, "result" : "" , "error" : None , "metadata" : {} } config = {"configurable" : {"thread_id" : request_id}} try : result = agent.invoke(initial_state, config=config) return QueryResponse( request_id=request_id, result=result["result" ], error=result.get("error" ), metadata=result.get("metadata" , {}) ) except Exception as e: logger.error("request_failed" , request_id=request_id, error=str (e)) return QueryResponse( request_id=request_id, result="Request failed" , error=str (e) ) @app.get("/status/{request_id}" ) async def get_status (request_id: str ): """Get status of a previous request.""" config = {"configurable" : {"thread_id" : request_id}} try : state = agent.get_state(config) return { "request_id" : request_id, "found" : state is not None , "state" : state.values if state else None } except Exception as e: return {"request_id" : request_id, "error" : str (e)}
Deployment Checklist Before going to production, verify:
Infrastructure
Monitoring
Resilience
Cost Control
Security
Key Takeaways
Production is 80% of the work : Prototypes skip error handling, monitoring, and scaling. Budget accordingly.
Layer your error handling : Retries for transient failures, fallbacks for degraded operation, circuit breakers for cascading failures.
Observe everything : Structured logs, distributed traces, and metrics. You can’t fix what you can’t see.
Control costs actively : Track tokens, select models by complexity, set budgets, and alert on anomalies.
Design for failure : Services fail. Networks fail. LLMs fail. Build systems that degrade gracefully.
Use the checklist : Systematic verification catches gaps that intuition misses.
This concludes the LangChain and LangGraph series. We’ve covered foundations through production systems—from prompts and LCEL through multi-agent architectures and deployment. The tools are here; build something useful.
Comments