Multi-Agent RAG and Building Complete Systems

Standard RAG retrieves from a single source, but real problems often require information from multiple specialized domains. Multi-Agent RAG coordinates multiple retrieval specialists, each expert in querying specific data sources, then synthesizes their findings into coherent answers. In this final post of the series, I’ll explore Multi-Agent RAG patterns and bring together everything we’ve learned into complete, production-ready systems.

The Limits of Single-Source RAG

Traditional RAG follows a simple pattern: embed query, search vector store, augment prompt, generate response. This breaks down when:

  • Information spans multiple databases (SQL, vector, document stores)
  • Different data types require specialized retrieval (structured vs unstructured)
  • Some sources require specific query languages or APIs
  • Access controls differ across data sources

Multi-Agent RAG solves this by employing specialists:

flowchart TD
    Q[Query] --> C[Coordinator]
    C --> R1[SQL Agent]
    C --> R2[Vector Agent]
    C --> R3[API Agent]
    R1 --> S[Synthesizer]
    R2 --> S
    R3 --> S
    S --> A[Answer]

    classDef orangeClass fill:#F39C12,stroke:#333,stroke-width:2px,color:#fff
    classDef greenClass fill:#27AE60,stroke:#333,stroke-width:2px,color:#fff

    class C orangeClass
    class S greenClass

Multi-Agent RAG Architecture

Key Roles

1. Specialized Retrieval Agents

Each focuses on a specific data source:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class SQLRetrievalAgent:
"""Specializes in querying relational databases"""

def __init__(self, db_connection):
self.db = db_connection

def retrieve(self, query: str) -> dict:
# Generate SQL from natural language
sql = self.generate_sql(query)
results = self.db.execute(sql)
return {
"source": "database",
"query": sql,
"results": results
}


class VectorRetrievalAgent:
"""Specializes in semantic search across documents"""

def __init__(self, vectorstore):
self.vectorstore = vectorstore

def retrieve(self, query: str, k: int = 5) -> dict:
docs = self.vectorstore.similarity_search(query, k=k)
return {
"source": "documents",
"results": [doc.page_content for doc in docs]
}


class APIRetrievalAgent:
"""Specializes in calling external APIs"""

def __init__(self, api_client):
self.api = api_client

def retrieve(self, query: str) -> dict:
# Determine appropriate API endpoint
endpoint, params = self.parse_query(query)
results = self.api.call(endpoint, params)
return {
"source": "api",
"endpoint": endpoint,
"results": results
}

2. Retrieval Coordinator

Decides which specialists to invoke:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class RetrievalCoordinator:
def __init__(self):
self.sql_agent = SQLRetrievalAgent(db_conn)
self.vector_agent = VectorRetrievalAgent(vectorstore)
self.api_agent = APIRetrievalAgent(api_client)

def coordinate(self, query: str) -> dict:
# Analyze query to determine needed sources
sources = self.analyze_query(query)

results = {}

if "database" in sources:
results["database"] = self.sql_agent.retrieve(query)

if "documents" in sources:
results["documents"] = self.vector_agent.retrieve(query)

if "external" in sources:
results["api"] = self.api_agent.retrieve(query)

return results

def analyze_query(self, query: str) -> list:
# Use LLM to classify what sources are needed
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "system",
"content": """Analyze this query and return which data sources
are needed: database, documents, external (API). Return as JSON list."""
}, {
"role": "user",
"content": query
}]
)
return json.loads(response.choices[0].message.content)

3. Synthesis Agent

Combines results into coherent answers:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class SynthesisAgent:
def synthesize(self, query: str, retrieved_data: dict) -> str:
context_parts = []

for source, data in retrieved_data.items():
context_parts.append(f"=== From {source} ===\n{data['results']}")

full_context = "\n\n".join(context_parts)

response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "system",
"content": """You are a synthesis expert. Combine information
from multiple sources into a coherent, comprehensive answer.
Cite sources when relevant. Identify any conflicts between sources."""
}, {
"role": "user",
"content": f"Query: {query}\n\nRetrieved Information:\n{full_context}"
}]
)

return response.choices[0].message.content

Complete Multi-Agent RAG Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class MultiAgentRAG:
def __init__(self):
self.coordinator = RetrievalCoordinator()
self.synthesizer = SynthesisAgent()
self.gap_analyzer = GapAnalyzer()

def query(self, user_query: str, max_iterations: int = 2) -> str:
# Step 1: Initial retrieval
retrieved = self.coordinator.coordinate(user_query)

# Step 2: Check for gaps
for _ in range(max_iterations):
gaps = self.gap_analyzer.find_gaps(user_query, retrieved)

if not gaps:
break

# Step 3: Follow-up retrieval for gaps
additional = self.coordinator.coordinate(gaps["follow_up_query"])
retrieved = self.merge_results(retrieved, additional)

# Step 4: Synthesize final answer
return self.synthesizer.synthesize(user_query, retrieved)

def merge_results(self, existing: dict, new: dict) -> dict:
for source, data in new.items():
if source in existing:
existing[source]["results"].extend(data["results"])
else:
existing[source] = data
return existing


class GapAnalyzer:
def find_gaps(self, query: str, retrieved: dict) -> dict:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "system",
"content": """Analyze if the retrieved information fully answers
the query. If gaps exist, provide a follow-up query.
Return JSON: {"has_gaps": bool, "follow_up_query": str or null}"""
}, {
"role": "user",
"content": f"Query: {query}\n\nRetrieved: {retrieved}"
}]
)

result = json.loads(response.choices[0].message.content)
return result if result["has_gaps"] else None

Parallel Retrieval Pattern

For better performance, retrieve from multiple sources simultaneously:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import concurrent.futures

class ParallelMultiAgentRAG:
def retrieve_parallel(self, query: str) -> dict:
sources = self.analyze_needed_sources(query)

retrieval_tasks = {
"database": (self.sql_agent.retrieve, query),
"documents": (self.vector_agent.retrieve, query),
"api": (self.api_agent.retrieve, query)
}

results = {}

with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {}
for source in sources:
if source in retrieval_tasks:
fn, args = retrieval_tasks[source]
futures[source] = executor.submit(fn, args)

for source, future in futures.items():
try:
results[source] = future.result(timeout=30)
except Exception as e:
results[source] = {"error": str(e)}

return results

Privacy-Aware Retrieval

Different agents can have different access levels:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from enum import Enum

class AccessLevel(Enum):
PUBLIC = 1
INTERNAL = 2
CONFIDENTIAL = 3
RESTRICTED = 4


class PrivacyAwareRetrievalAgent:
def __init__(self, access_level: AccessLevel):
self.access_level = access_level

def retrieve(self, query: str) -> dict:
# Filter results based on access level
all_results = self._raw_retrieve(query)

filtered = [
r for r in all_results
if r.get("classification", AccessLevel.PUBLIC).value <= self.access_level.value
]

return {"results": filtered}


class PrivacyCoordinator:
def __init__(self, user_access_level: AccessLevel):
self.customer_agent = PrivacyAwareRetrievalAgent(AccessLevel.PUBLIC)
self.internal_agent = PrivacyAwareRetrievalAgent(AccessLevel.INTERNAL)
self.admin_agent = PrivacyAwareRetrievalAgent(AccessLevel.RESTRICTED)

# Select appropriate agent based on user's access
self.active_agents = self._select_agents(user_access_level)

def _select_agents(self, level: AccessLevel) -> list:
agents = [self.customer_agent]
if level.value >= AccessLevel.INTERNAL.value:
agents.append(self.internal_agent)
if level.value >= AccessLevel.RESTRICTED.value:
agents.append(self.admin_agent)
return agents

Building Complete Systems

Bringing together all concepts from this series into a production-ready architecture:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
from dataclasses import dataclass
from typing import List, Dict, Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class AgentConfig:
name: str
system_prompt: str
tools: List
access_level: AccessLevel = AccessLevel.PUBLIC


class ProductionMultiAgentSystem:
"""
Complete multi-agent system combining:
- Specialized agents with clear roles
- Routing based on content/priority
- State management across agents
- Multi-source RAG retrieval
- Error handling and monitoring
"""

def __init__(self, config: Dict):
self.config = config
self.state = SharedState()
self.router = ContentRouter()
self.retrieval = MultiAgentRAG()
self.monitor = SystemMonitor()

# Initialize specialized agents
self.agents = self._initialize_agents()

def _initialize_agents(self) -> Dict[str, BaseAgent]:
agents = {}

for agent_config in self.config["agents"]:
agents[agent_config.name] = BaseAgent(
name=agent_config.name,
system_prompt=agent_config.system_prompt,
tools=agent_config.tools
)

return agents

def process_request(self, request: Dict) -> str:
"""Main entry point for processing requests"""
request_id = self.monitor.start_request(request)

try:
# Step 1: Route to appropriate workflow
workflow = self.router.route(request)
logger.info(f"[{request_id}] Routed to: {workflow}")

# Step 2: Retrieve relevant context
context = self.retrieval.query(request["message"])
logger.info(f"[{request_id}] Retrieved context from {len(context)} sources")

# Step 3: Execute workflow with context
result = self._execute_workflow(workflow, request, context)

# Step 4: Update state
self.state.write(f"request_{request_id}", {
"request": request,
"result": result,
"status": "completed"
})

self.monitor.complete_request(request_id, success=True)
return result

except Exception as e:
logger.error(f"[{request_id}] Failed: {str(e)}")
self.monitor.complete_request(request_id, success=False, error=str(e))
return self._handle_failure(request, e)

def _execute_workflow(self, workflow: str, request: Dict, context: Dict) -> str:
if workflow == "customer_inquiry":
return self._customer_inquiry_workflow(request, context)
elif workflow == "order_processing":
return self._order_processing_workflow(request, context)
elif workflow == "technical_support":
return self._technical_support_workflow(request, context)
else:
return self.agents["general"].run(request["message"], context)

def _customer_inquiry_workflow(self, request: Dict, context: Dict) -> str:
# Sequential: Classify → Retrieve history → Respond
classification = self.agents["classifier"].run(
f"Classify this inquiry: {request['message']}"
)

customer_history = self.state.read(f"customer_{request.get('customer_id')}")

response = self.agents["customer_service"].run(
request["message"],
context={
"classification": classification,
"history": customer_history,
"knowledge": context
}
)

return response

def _order_processing_workflow(self, request: Dict, context: Dict) -> str:
# Sequential with validation: Check inventory → Validate → Process → Confirm
inventory_check = self.agents["inventory"].run(
f"Check stock for: {request.get('items')}"
)

if "out of stock" in inventory_check.lower():
return self.agents["customer_service"].run(
"Apologize for item unavailability and suggest alternatives",
context={"original_request": request, "inventory": inventory_check}
)

validation = self.agents["validator"].run(
f"Validate order: {request}",
context={"inventory": inventory_check}
)

if "invalid" in validation.lower():
return f"Order validation failed: {validation}"

confirmation = self.agents["fulfillment"].run(
f"Process order: {request}",
context={"validation": validation}
)

return confirmation

def _handle_failure(self, request: Dict, error: Exception) -> str:
# Graceful degradation
return self.agents["fallback"].run(
"Apologize for the issue and offer to connect with human support",
context={"original_request": request, "error": str(error)}
)


class SystemMonitor:
"""Monitors system health and request processing"""

def __init__(self):
self.requests = {}
self.metrics = {
"total_requests": 0,
"successful": 0,
"failed": 0,
"avg_latency_ms": 0
}

def start_request(self, request: Dict) -> str:
request_id = str(uuid.uuid4())
self.requests[request_id] = {
"start_time": time.time(),
"request": request
}
self.metrics["total_requests"] += 1
return request_id

def complete_request(self, request_id: str, success: bool, error: str = None):
if request_id in self.requests:
latency = (time.time() - self.requests[request_id]["start_time"]) * 1000

if success:
self.metrics["successful"] += 1
else:
self.metrics["failed"] += 1
logger.error(f"Request {request_id} failed: {error}")

# Update rolling average latency
total = self.metrics["total_requests"]
old_avg = self.metrics["avg_latency_ms"]
self.metrics["avg_latency_ms"] = (old_avg * (total - 1) + latency) / total

def get_health(self) -> Dict:
return {
"total_requests": self.metrics["total_requests"],
"success_rate": self.metrics["successful"] / max(self.metrics["total_requests"], 1),
"avg_latency_ms": self.metrics["avg_latency_ms"],
"active_requests": len([r for r in self.requests.values() if "end_time" not in r])
}

Production Best Practices

1. Logging and Observability

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import structlog

logger = structlog.get_logger()

class ObservableAgent:
def run(self, task: str, context: dict = None) -> str:
log = logger.bind(agent=self.name, task_preview=task[:100])
log.info("agent_started")

try:
result = self._execute(task, context)
log.info("agent_completed", result_length=len(result))
return result
except Exception as e:
log.error("agent_failed", error=str(e))
raise

2. Graceful Degradation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class ResilientSystem:
def process(self, request: str) -> str:
# Try primary path
try:
return self.full_workflow(request)
except Exception:
pass

# Try simplified path
try:
return self.simple_workflow(request)
except Exception:
pass

# Minimal fallback
return "We're experiencing issues. Please try again later or contact support."

3. Rate Limiting and Resource Management

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from collections import deque
import time

class RateLimiter:
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = deque()

def allow_request(self) -> bool:
now = time.time()

# Remove old requests outside window
while self.requests and self.requests[0] < now - self.window_seconds:
self.requests.popleft()

if len(self.requests) < self.max_requests:
self.requests.append(now)
return True

return False

4. Configuration Management

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import yaml

def load_system_config(config_path: str) -> Dict:
with open(config_path) as f:
config = yaml.safe_load(f)

# Validate required fields
required = ["agents", "routing_rules", "retrieval_sources"]
for field in required:
if field not in config:
raise ValueError(f"Missing required config field: {field}")

return config


# config.yaml example:
# agents:
# - name: classifier
# system_prompt: "Classify customer requests..."
# tools: []
# - name: customer_service
# system_prompt: "You are a helpful customer service agent..."
# tools: [search_kb, get_order_status]
# routing_rules:
# - pattern: "order|shipping|delivery"
# workflow: order_processing
# - pattern: "refund|return|money"
# workflow: refund_processing
# retrieval_sources:
# - type: vector
# collection: knowledge_base
# - type: sql
# connection: postgres://...

Key Takeaways from the Series

Throughout this series, we’ve covered the complete journey from simple prompting to production multi-agent systems:

  1. Prompting Foundations: Role-based prompting, chain-of-thought, and ReAct patterns provide the building blocks
  2. Workflow Patterns: Chaining, routing, parallelization, and evaluator-optimizer loops structure agent behavior
  3. Agent Architecture: Tools, state management, and memory enable sophisticated capabilities
  4. External Integration: APIs, databases, and RAG connect agents to real-world data
  5. Multi-Agent Systems: Orchestration, coordination, and synthesis enable complex collaborative workflows

The key insight: start simple, add complexity only when needed. A single well-designed agent often outperforms a complex multi-agent system. But when you do need multiple agents, the patterns in this series provide a solid foundation.

Building AI agents is an iterative process - design, implement, test, refine. The tools and patterns will evolve, but the fundamental principles of clear roles, clean interfaces, robust error handling, and observable execution remain constant.


This concludes my series on building intelligent AI systems. From simple prompts to complex multi-agent architectures, the journey has covered the essential patterns for building capable, reliable AI agents.

Multi-Agent Routing, State, and Coordination

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×