Multi-Agent Routing, State, and Coordination

When multiple agents work together, three challenges emerge: how do requests reach the right agent (routing), how does information flow between agents (data flow), and how do agents maintain a consistent view of the world (state coordination). In this post, I’ll explore patterns for managing these critical aspects of multi-agent systems.

Routing in Multi-Agent Systems

Beyond simple orchestration, real systems face unpredictable streams of diverse requests. Routing ensures each request reaches the appropriate specialist, like a sophisticated mail sorting facility directing letters to the right destination.

flowchart LR
    R[Requests] --> RT{Router}
    RT -->|Type A| A1[Agent Pool 1]
    RT -->|Type B| A2[Agent Pool 2]
    RT -->|Urgent| A3[Priority Agent]

    classDef orangeClass fill:#F39C12,stroke:#333,stroke-width:2px,color:#fff

    class RT orangeClass

Three Core Routing Patterns

1. Content-Based Routing

Inspect the message content to decide where it goes:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class ContentRouter:
def __init__(self):
self.banking_agent = BankingAgent()
self.postal_agent = PostalAgent()
self.general_agent = GeneralAgent()

def route(self, request: str) -> str:
request_lower = request.lower()

# Keyword-based routing
if any(word in request_lower for word in ["account", "balance", "transfer"]):
return self.banking_agent.handle(request)
elif any(word in request_lower for word in ["package", "mail", "shipping"]):
return self.postal_agent.handle(request)
else:
return self.general_agent.handle(request)

Routing rules can be based on:

  • Keyword detection (“refund”, “password reset”)
  • Sentiment analysis (positive, negative, neutral)
  • Data type (image vs text)
  • Metadata tags

Use cases:

  • Customer service intake points
  • Multi-service systems
  • Specialized query handling

2. Round-Robin Routing

Distribute load evenly across similar agents:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from itertools import cycle

class RoundRobinRouter:
def __init__(self, agents: list):
self.agents = agents
self.agent_cycle = cycle(agents)

def route(self, task: str) -> str:
# Get next agent in rotation
agent = next(self.agent_cycle)
return agent.process(task)


# With load awareness
class SmartLoadBalancer:
def __init__(self, agents: list):
self.agents = agents
self.workloads = {agent.name: 0 for agent in agents}

def route(self, task: str) -> str:
# Find agent with smallest workload
agent = min(self.agents, key=lambda a: self.workloads[a.name])

self.workloads[agent.name] += 1
try:
result = agent.process(task)
return result
finally:
self.workloads[agent.name] -= 1

Use cases:

  • Processing large batches of similar tasks
  • Scaling when one agent isn’t enough
  • Parallel image/document processing

3. Priority-Based Routing

Handle urgent tasks before routine ones:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from queue import PriorityQueue
from dataclasses import dataclass, field

@dataclass(order=True)
class PrioritizedTask:
priority: int
task: str = field(compare=False)

class PriorityRouter:
def __init__(self):
self.urgent_queue = PriorityQueue()
self.normal_queue = PriorityQueue()

def submit(self, task: str, is_urgent: bool = False):
if is_urgent or self._detect_urgency(task):
self.urgent_queue.put(PrioritizedTask(1, task))
else:
self.normal_queue.put(PrioritizedTask(5, task))

def _detect_urgency(self, task: str) -> bool:
urgent_keywords = ["urgent", "critical", "emergency", "asap"]
return any(kw in task.lower() for kw in urgent_keywords)

def process_next(self, agent) -> str:
# Always process urgent queue first
if not self.urgent_queue.empty():
task = self.urgent_queue.get()
return agent.process(task.task)

if not self.normal_queue.empty():
task = self.normal_queue.get()
return agent.process(task.task)

return None

Use cases:

  • Financial trading systems
  • Hospital patient monitoring
  • Real-time system alerts

Data Flow Management

As data moves between agents, it may need transformation, enhancement, or filtering. Like passing a baton in a relay race - the handoff needs to be clean.

Three Data Operations

flowchart LR
    D[Data] --> E[Enhance]
    E --> F[Filter]
    F --> T[Transform]
    T --> N[Next Agent]

    classDef blueClass fill:#4A90E2,stroke:#333,stroke-width:2px,color:#fff
    classDef greenClass fill:#27AE60,stroke:#333,stroke-width:2px,color:#fff
    classDef pinkClass fill:#E74C3C,stroke:#333,stroke-width:2px,color:#fff

    class E blueClass
    class F greenClass
    class T pinkClass

Enhancement - Adding related information:

1
2
3
4
5
6
def enhance_request(self, request: dict) -> dict:
# Add customer context
customer_id = request.get("customer_id")
request["customer_history"] = self.get_customer_history(customer_id)
request["customer_tier"] = self.get_customer_tier(customer_id)
return request

Filtering - Removing unnecessary data:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def filter_for_agent(self, data: dict, agent_type: str) -> dict:
if agent_type == "billing":
# Billing agent doesn't need personal details
return {
"order_id": data["order_id"],
"amount": data["amount"],
"payment_method": data["payment_method"]
}
elif agent_type == "shipping":
# Shipping agent doesn't need payment info
return {
"order_id": data["order_id"],
"address": data["address"],
"items": data["items"]
}
return data

Transformation - Changing structure or format:

1
2
3
4
5
6
7
8
9
10
11
12
13
def transform_for_api(self, internal_data: dict) -> dict:
# Convert internal format to external API format
return {
"orderId": internal_data["order_id"], # camelCase for API
"customerInfo": {
"name": internal_data["customer_name"],
"email": internal_data["customer_email"]
},
"lineItems": [
{"sku": item["id"], "qty": item["quantity"]}
for item in internal_data["items"]
]
}

State Management in Multi-Agent Systems

When multiple agents operate, they need a consistent understanding of the world. State management is the challenge of keeping everyone synchronized.

Shared State Pattern

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from threading import Lock

class SharedState:
"""Thread-safe shared state for multi-agent systems"""

def __init__(self):
self._state = {
"inventory": {},
"orders": [],
"customer_data": {}
}
self._lock = Lock()

def read(self, key: str):
with self._lock:
return self._state.get(key)

def write(self, key: str, value):
with self._lock:
self._state[key] = value

def update(self, key: str, update_fn):
with self._lock:
current = self._state.get(key)
self._state[key] = update_fn(current)


# Agents share the same state object
shared_state = SharedState()

class SalesAgent:
def __init__(self, state: SharedState):
self.state = state

def process_sale(self, item: str, quantity: int):
def update_inventory(inv):
inv[item] = inv.get(item, 0) - quantity
return inv

self.state.update("inventory", update_inventory)


class RestockAgent:
def __init__(self, state: SharedState):
self.state = state

def check_and_restock(self):
inventory = self.state.read("inventory")
for item, stock in inventory.items():
if stock < 5:
def restock(inv):
inv[item] = inv.get(item, 0) + 20
return inv
self.state.update("inventory", restock)

Context Passing

Don’t dump entire state onto workers - pass only what they need:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class SmartOrchestrator:
def process_order(self, order: dict):
# Inventory agent only needs product info
inventory_result = self.inventory_agent.run(
task="Check stock levels",
context={
"product_ids": [item["id"] for item in order["items"]],
"quantities": [item["qty"] for item in order["items"]]
}
)

# Shipping agent doesn't need payment details
shipping_result = self.shipping_agent.run(
task="Calculate delivery",
context={
"address": order["shipping_address"],
"items": order["items"],
"priority": order.get("express", False)
}
)

# Payment agent only needs financial info
payment_result = self.payment_agent.run(
task="Process payment",
context={
"amount": order["total"],
"method": order["payment_method"],
"customer_id": order["customer_id"]
}
)

State Synchronization Strategies

1. Database as Source of Truth

Let the database handle concurrency:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import sqlite3

class DatabaseBackedState:
def __init__(self, db_path: str):
self.db_path = db_path

def atomic_reserve(self, product_id: str, quantity: int) -> bool:
"""Atomically check and reserve inventory"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()

try:
# Use database transaction for atomicity
cursor.execute("BEGIN EXCLUSIVE")

cursor.execute(
"SELECT stock FROM inventory WHERE product_id = ?",
(product_id,)
)
current_stock = cursor.fetchone()[0]

if current_stock >= quantity:
cursor.execute(
"UPDATE inventory SET stock = stock - ? WHERE product_id = ?",
(quantity, product_id)
)
conn.commit()
return True
else:
conn.rollback()
return False
finally:
conn.close()

2. Optimistic Concurrency Control

Assume conflicts are rare, but check before committing:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class OptimisticState:
def __init__(self):
self.data = {}
self.versions = {}

def read(self, key: str) -> tuple:
"""Return value and version number"""
return self.data.get(key), self.versions.get(key, 0)

def write(self, key: str, value, expected_version: int) -> bool:
"""
Write only if version matches (no one else changed it).
Returns True if successful, False if conflict detected.
"""
current_version = self.versions.get(key, 0)

if current_version != expected_version:
return False # Conflict - another agent updated

self.data[key] = value
self.versions[key] = current_version + 1
return True


# Usage
state = OptimisticState()

def update_with_retry(state, key, update_fn, max_retries=3):
for _ in range(max_retries):
value, version = state.read(key)
new_value = update_fn(value)

if state.write(key, new_value, version):
return True # Success

# Conflict detected, retry with fresh data

raise Exception("Failed after max retries")

3. Event Broadcasting

Proactively notify when state changes:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from typing import Callable, List

class EventBus:
def __init__(self):
self.subscribers: dict[str, List[Callable]] = {}

def subscribe(self, event_type: str, handler: Callable):
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)

def publish(self, event_type: str, data: dict):
for handler in self.subscribers.get(event_type, []):
handler(data)


event_bus = EventBus()

class InventoryManager:
def update_stock(self, product_id: str, new_level: int):
# Update inventory
self.inventory[product_id] = new_level

# Broadcast event
if new_level < 5:
event_bus.publish("inventory_low", {
"product_id": product_id,
"current_level": new_level
})


class ReorderAgent:
def __init__(self):
# Subscribe to inventory events
event_bus.subscribe("inventory_low", self.handle_low_inventory)

def handle_low_inventory(self, data: dict):
product_id = data["product_id"]
self.trigger_reorder(product_id)

Conflict Resolution

When agents conflict, you need resolution strategies:

Predefined Rules

1
2
3
4
5
6
7
8
9
10
11
class ConflictResolver:
def resolve(self, conflicts: list) -> dict:
# Priority-based resolution
priority_order = ["emergency", "vip_customer", "standard"]

for priority in priority_order:
matching = [c for c in conflicts if c["type"] == priority]
if matching:
return matching[0] # Highest priority wins

return conflicts[0] # Default: first-come-first-served

Rollback and Retry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class TransactionalWorkflow:
def __init__(self):
self.completed_actions = []

def execute_with_rollback(self, actions: list):
try:
for action in actions:
result = action.execute()
self.completed_actions.append((action, result))

except Exception as e:
# Rollback all completed actions in reverse order
for action, result in reversed(self.completed_actions):
action.rollback(result)
raise

def compensate(self):
"""Undo all actions if later step fails"""
for action, result in reversed(self.completed_actions):
action.rollback(result)

Human Escalation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class EscalationHandler:
def should_escalate(self, conflict: dict) -> bool:
# Escalate for high-value or complex conflicts
if conflict.get("value", 0) > 10000:
return True
if conflict.get("attempts", 0) > 3:
return True
if conflict.get("type") == "security":
return True
return False

def escalate(self, conflict: dict):
ticket = {
"type": "conflict_resolution",
"data": conflict,
"context": self.gather_context(conflict),
"timestamp": datetime.now()
}
self.create_support_ticket(ticket)
return "Escalated to human review"

Failure Handling Patterns

Retry with Backoff

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time
from functools import wraps

def retry_with_backoff(max_attempts=3, base_delay=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise
delay = base_delay * (2 ** attempt) # Exponential backoff
time.sleep(delay)
return wrapper
return decorator


@retry_with_backoff(max_attempts=3)
def call_external_service(request):
return api.call(request)

Fallback Paths

1
2
3
4
5
6
7
8
9
10
11
12
class ResilientOrchestrator:
def process(self, request: str) -> str:
try:
return self.primary_agent.run(request)
except Exception:
try:
return self.backup_agent.run(request)
except Exception:
return self.minimal_response(request)

def minimal_response(self, request: str) -> str:
return "We're experiencing issues. A support agent will contact you shortly."

Key Takeaways

  1. Route intelligently: Content-based for specialization, round-robin for load balancing, priority-based for urgency
  2. Manage data flow: Enhance, filter, and transform data between agents
  3. Share state carefully: Use thread-safe patterns and pass only needed context
  4. Handle conflicts: Predefined rules, optimistic locking, or human escalation
  5. Plan for failure: Retry with backoff, fallback paths, and compensating actions

With proper routing, data flow, and state coordination, multi-agent systems can handle complex, dynamic workloads reliably. In the next post, I’ll explore Multi-Agent RAG and how to build complete end-to-end systems.


This is Part 13 of my series on building intelligent AI systems. Next: Multi-Agent RAG and building complete systems.

6 Prompt Engineering Techniques Used by Top AI Engineers Multi-Agent RAG and Building Complete Systems

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×