Multi-Agent Architecture for Trading

When building an AI hedge fund, the most critical element is understanding the flow of agent-to-agent communication. Does one agent need to work after another? Can agents work in parallel? What is the specific purpose of each agent? Multi-agent architecture answers these questions by defining specialized agents, shared state communication, orchestration patterns, and routing strategies that together create reliable, intelligent trading systems.

The Case for Agent Specialization

A fundamental principle in multi-agent workflows: agents should be specialized. We don’t want agents capable of doing everything - this aligns with the concept of context engineering.

flowchart TD
    subgraph Bad["Single Agent (Problematic)"]
        A1[Mixed Agent] --> C1[Confused about
primary objective]
        A1 --> C2[Context overload]
        A1 --> C3[Inconsistent behavior]
    end

    subgraph Good["Specialized Agents (Recommended)"]
        A2[Data Collection] --> D1[Focused: Get best data]
        A3[Risk Assessment] --> D2[Focused: Evaluate risk]
        A4[Decision Making] --> D3[Focused: Make recommendation]
    end

    classDef pinkClass fill:#E74C3C,stroke:#333,stroke-width:2px,color:#fff
    classDef greenClass fill:#27AE60,stroke:#333,stroke-width:2px,color:#fff

    class Bad pinkClass
    class Good greenClass

Why Separation Matters

Consider building a healthcare system with both medical advice and HIPAA compliance responsibilities. If you combine these in one agent:

Problem: User asks “I have a headache, what should I do?”
Mixed Agent Response: “I can’t give you that information” (regulatory confusion)

Solution: Separate agents for medical advice and compliance - the medical agent answers health questions while the compliance agent handles regulatory requirements independently.

Hedge Fund Agent Architecture

An AI hedge fund requires multiple specialized components working together:

flowchart LR
    I[Trade Request] --> DC[Data Collection]
    DC --> MA[Market Analyst]
    MA --> SA[Strategy Agent]
    SA --> RM[Risk Manager]
    RM --> RA[Regulatory Agent]
    RA --> SU[Supervisor]
    SU --> O[Final Decision]

    classDef blueClass fill:#4A90E2,stroke:#333,stroke-width:2px,color:#fff
    classDef orangeClass fill:#F39C12,stroke:#333,stroke-width:2px,color:#fff
    classDef greenClass fill:#27AE60,stroke:#333,stroke-width:2px,color:#fff
    classDef pinkClass fill:#E74C3C,stroke:#333,stroke-width:2px,color:#fff
    classDef purpleClass fill:#9B59B6,stroke:#333,stroke-width:2px,color:#fff
    classDef tealClass fill:#16A085,stroke:#333,stroke-width:2px,color:#fff

    class DC blueClass
    class MA orangeClass
    class SA greenClass
    class RM pinkClass
    class RA purpleClass
    class SU tealClass

Core Agent Definitions

Agent Core Function Input Output
Market Analyst Analyze market and alternative data MarketSnapshot Insights
Strategy Convert insights into trade intents Insights TradeIntents
Risk Manager Assess and adjust position sizing TradeIntents, Positions AdjustedTradeIntents
Regulatory Ensure compliance and auditability TradeIntents, Rulebooks RegulatoryVerdict
Supervisor Orchestrate and learn from all agents All upstream outputs Orders, Feedback

Market Analyst Agent

Transforms raw financial data into structured, actionable insights:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class MarketAnalystAgent:
"""Analyze market conditions and generate insights"""

def analyze(self, snapshot: MarketSnapshot) -> List[Insight]:
"""
Responsibilities:
- Ingest market, news, and macroeconomic data
- Compute features (volatility, correlation, momentum)
- Detect short and long-term signals
- Tag signals with confidence and rationale
"""
pass

# Tools
def calculate_volatility(self, symbol: str, period: int) -> float:
"""Calculate historical volatility"""
pass

def analyze_sentiment(self, symbol: str) -> Dict:
"""Analyze news and social sentiment"""
pass

def detect_patterns(self, price_data: List[float]) -> List[str]:
"""Identify technical patterns"""
pass

KPIs: Signal hit-rate, Information Coefficient (IC), data coverage

Strategy Agent

Converts insights into executable trade intents:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class StrategyAgent:
"""Convert insights into trade intents using trading playbooks"""

def generate_intents(self, insights: List[Insight]) -> List[TradeIntent]:
"""
Responsibilities:
- Match insights to strategies (momentum, pairs, mean reversion)
- Determine entry/exit points, stop-loss, time-in-force
- Estimate expected alpha and transaction cost impact
- Resolve conflicts across multiple strategies
"""
pass

# Tools
def apply_momentum_strategy(self, insight: Insight) -> TradeIntent:
pass

def apply_pairs_strategy(self, insight: Insight) -> TradeIntent:
pass

def estimate_alpha(self, intent: TradeIntent) -> float:
pass

KPIs: Realized vs expected alpha, turnover efficiency, strategy hit ratio

Risk Manager Agent

Controls exposure and enforces limits:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class RiskManagerAgent:
"""Assess and adjust position sizing for risk management"""

def assess(self, intents: List[TradeIntent],
positions: Dict) -> Tuple[List[TradeIntent], RiskAssessment]:
"""
Responsibilities:
- Evaluate gross, net, and sector-level exposures
- Apply liquidity and volatility adjustments
- Recalculate position sizing under VaR constraints
- Flag violations or excessive leverage
"""
pass

# Tools
def calculate_var(self, positions: Dict, confidence: float) -> float:
"""Calculate Value at Risk"""
pass

def stress_test(self, positions: Dict, scenario: str) -> Dict:
"""Run stress test scenarios"""
pass

def check_concentration(self, positions: Dict) -> List[str]:
"""Identify concentration risks"""
pass

KPIs: Limit breach frequency, VaR utilization ratio, tail loss containment

Regulatory Agent

Performs automated compliance validation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class RegulatoryAgent:
"""Ensure compliance with trading regulations"""

def validate(self, intents: List[TradeIntent],
rulebooks: Dict) -> RegulatoryVerdict:
"""
Responsibilities:
- Screen trades for rule violations
- Check for market abuse patterns (spoofing, layering)
- Produce auditable regulatory verdicts
- Maintain traceable records for post-trade auditing
"""
pass

# Tools
def check_restricted_list(self, symbol: str) -> bool:
"""Check if symbol is on restricted list"""
pass

def check_sanctioned_countries(self, counterparty: str) -> bool:
"""Verify counterparty isn't in sanctioned countries"""
pass

def validate_position_limits(self, intent: TradeIntent) -> bool:
"""Ensure position limits are respected"""
pass

KPIs: False positive/negative rate, audit completeness, decision latency

Supervisor Agent

Orchestrates the entire system:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class SupervisorAgent:
"""Orchestrate, adjudicate, and learn from all agents"""

def supervise(self, insights: List[Insight],
intents: List[TradeIntent],
risk_assessment: RiskAssessment,
regulatory_verdict: RegulatoryVerdict) -> List[Order]:
"""
Responsibilities:
- Coordinate message flow between agents
- Handle conflict resolution
- Approve, modify, or cancel trade intents
- Monitor fills, PnL, and agent performance
"""
pass

# Decision options
DECISIONS = ["BUY", "SELL", "HOLD", "REJECT"]

KPIs: End-to-end latency, PnL attribution accuracy, uptime

Shared Context Architecture

Agents communicate through a shared state rather than direct calls:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from pydantic import BaseModel
from typing import Optional

class SharedContext(BaseModel):
"""Shared context passed between agents"""

topic: str
collected_data: Optional[str] = None
data_confidence: Optional[float] = None
risk_level: Optional[str] = None
recommendation: Optional[str] = None

class Config:
arbitrary_types_allowed = True

Data Collection Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from dataclasses import dataclass
from pydantic_ai import Agent, RunContext

@dataclass
class DataCollection:
"""Response from Data Collector Agent"""
topic: str
findings: str
confidence_score: float
data_summary: str


class DataCollectorAgent:
def __init__(self, model: str):
self.agent = Agent(
model=model,
deps_type=SharedContext,
output_type=DataCollection
)

@self.agent.system_prompt
def prompt(ctx: RunContext[SharedContext]) -> str:
return f"""You are a Data Collector Agent for: {ctx.deps.topic}

Your role:
- Research and collect relevant information
- Provide factual data and insights
- Assign confidence scores to findings
- Summarize key points clearly

Always provide structured information with confidence ratings."""

@self.agent.tool
def gather_information(ctx: RunContext[SharedContext]) -> str:
"""Gather information about the topic"""
topic = ctx.deps.topic
# Implementation: API calls, database queries, etc.
return f"Information gathered about {topic}"

@self.agent.tool
def analyze_data_quality(ctx: RunContext[SharedContext]) -> str:
"""Analyze reliability of collected data"""
return "Data quality assessment: 75% confidence"

Decision Making Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@dataclass
class FinalDecision:
"""Response from Decision Maker Agent"""
topic: str
recommendation: str
reasoning: str
confidence_level: str
final_action: str


class DecisionMakerAgent:
def __init__(self, model: str):
self.agent = Agent(
model=model,
deps_type=SharedContext,
output_type=FinalDecision
)

@self.agent.system_prompt
def prompt(ctx: RunContext[SharedContext]) -> str:
return f"""You are a Decision Maker Agent for: {ctx.deps.topic}

Your role:
- Analyze data from the Data Collector
- Make informed decisions
- Provide clear reasoning
- Choose actions: PROCEED, CAUTION, or STOP

Available context:
- Collected Data: {ctx.deps.collected_data or 'Not yet available'}
- Data Confidence: {ctx.deps.data_confidence or 'Not assessed'}"""

@self.agent.tool
def evaluate_risk(ctx: RunContext[SharedContext]) -> str:
"""Evaluate risk based on collected data"""
confidence = ctx.deps.data_confidence or 0.5
if confidence > 0.8:
return "Risk: LOW - High confidence supports proceeding"
elif confidence > 0.6:
return "Risk: MEDIUM - Moderate confidence, be cautious"
else:
return "Risk: HIGH - Low confidence requires investigation"

@self.agent.tool
def make_recommendation(ctx: RunContext[SharedContext]) -> str:
"""Make final recommendation"""
confidence = ctx.deps.data_confidence or 0.0
if confidence > 0.7:
return "PROCEED"
elif confidence > 0.5:
return "CAUTION"
return "STOP"

Communication Flow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class TradingSystem:
"""Coordinates agent communication through shared context"""

def __init__(self):
self.data_collector = DataCollectorAgent("gpt-4")
self.decision_maker = DecisionMakerAgent("gpt-4")

def process(self, topic: str) -> Dict:
# Initialize shared context
context = SharedContext(topic=topic)

# Step 1: Data Collection
data_result = self.data_collector.agent.run_sync(
"Collect and analyze information",
deps=context
)

# Update context with findings
context.collected_data = data_result.output.findings
context.data_confidence = data_result.output.confidence_score

# Step 2: Decision Making
decision_result = self.decision_maker.agent.run_sync(
"Analyze collected data and make recommendation",
deps=context
)

return {
"data": data_result.output,
"decision": decision_result.output
}

Orchestration Patterns

Orchestration directs and manages agent interactions to achieve complex goals:

flowchart TD
    subgraph Sequential["Sequential Processing"]
        S1[Research] --> S2[Write]
        S2 --> S3[Edit]
        S3 --> S4[Publish]
    end

    subgraph Parallel["Parallel Execution"]
        P0[Request] --> P1[News Agent]
        P0 --> P2[Data Agent]
        P0 --> P3[Competitor Agent]
        P1 --> PA[Aggregate]
        P2 --> PA
        P3 --> PA
    end

    subgraph Conditional["Conditional Branching"]
        C0[Policy Check] --> C1{Eligible?}
        C1 -->|Yes| C2[Inventory + Refund]
        C1 -->|No| C3[Denial Notice]
    end

    classDef blueClass fill:#4A90E2,stroke:#333,stroke-width:2px,color:#fff
    classDef orangeClass fill:#F39C12,stroke:#333,stroke-width:2px,color:#fff
    classDef greenClass fill:#27AE60,stroke:#333,stroke-width:2px,color:#fff

    class Sequential blueClass
    class Parallel orangeClass
    class Conditional greenClass

Key Orchestration Patterns

Sequential Processing: Each step builds on the previous result.

1
2
3
4
5
6
7
8
9
10
11
12
class ContentOrchestrator:
def run(self, topic: str) -> Dict:
# Step 1: Research must happen first
research = self.research_agent.research(topic)

# Step 2: Writing happens after research
draft = self.writing_agent.write_draft(topic, research)

# Step 3: Editing after writing
final = self.editing_agent.edit(draft)

return {"research": research, "draft": draft, "final": final}

Parallel Execution: Independent tasks run simultaneously.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio

class MarketReportOrchestrator:
async def run(self, symbol: str) -> Dict:
# Run independent analyses in parallel
results = await asyncio.gather(
self.news_agent.analyze(symbol),
self.data_agent.analyze(symbol),
self.competitor_agent.analyze(symbol)
)

return {
"news": results[0],
"data": results[1],
"competitors": results[2]
}

Conditional Branching: Decisions based on agent output.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class ReturnOrchestrator:
def process(self, order: Dict) -> Dict:
# Check eligibility
eligibility = self.policy_agent.check(order)

if eligibility["status"] == "ELIGIBLE":
# Route to inventory and refund agents
inventory = self.inventory_agent.process(order)
refund = self.refund_agent.process(order)
return {"status": "PROCESSED", "inventory": inventory, "refund": refund}
else:
# Route to denial agent
denial = self.denial_agent.notify(order, eligibility["reason"])
return {"status": "DENIED", "notification": denial}

Determinism in AI Workflows

A critical concept: if you know the outcome of your flow, AI may not be the right choice.

Deterministic vs Non-Deterministic

Aspect Deterministic Non-Deterministic
Outcome Known and predictable Variable and adaptive
AI Value Minimal High
Example Execute trade for 100 shares Decide whether to buy TSLA
Approach Traditional code AI agents

The Hybrid Solution

Mix deterministic structure with non-deterministic decision-making:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class TradingOrchestrator:
"""Combines deterministic flow with AI decision-making"""

def process_trade(self, request: TradeRequest) -> TradeResult:
# Deterministic: Flow structure is defined
# Non-deterministic: AI makes routing decisions

# Step 1: AI determines market condition
condition = self.market_agent.assess(request.symbol)

# Step 2: Route based on condition (AI decision)
if condition.volatility == "HIGH":
# Risk-focused flow
flow = ["market_analysis", "risk_check", "strategy", "compliance"]
elif condition.type == "UNKNOWN":
# Comprehensive analysis
flow = ["comprehensive_analysis", "strategy", "risk", "compliance"]
else:
# Standard flow
flow = ["market_analysis", "strategy", "risk", "compliance"]

# Step 3: Execute flow (deterministic structure)
results = {}
for step in flow:
results[step] = self.execute_step(step, request, results)

# Step 4: Supervisor makes final decision (AI)
return self.supervisor.decide(results)

Execution Strategies by Stock Type

High Volatility Stocks (TSLA, NVDA):

1
Market Analysis → Risk Check (PRIORITY) → Strategy → Compliance → Supervisor

Stable Stocks (AAPL, MSFT):

1
Market Analysis → Strategy → Risk Check → Compliance → Supervisor

Unknown/IPO Stocks:

1
Comprehensive Analysis → Strategy → Risk Check → Compliance → Supervisor

Routing Strategies

Different scenarios require different routing approaches:

flowchart TD
    M[Message] --> RS{Routing Strategy}
    RS -->|Content-Based| CB[Analyze Symbol
Match Specialization]
    RS -->|Round-Robin| RR[Next Agent
in Sequence]
    RS -->|Priority-Based| PB[Least Loaded
Agent]

    CB --> AP[Agent Pool]
    RR --> AP
    PB --> AP

    classDef orangeClass fill:#F39C12,stroke:#333,stroke-width:2px,color:#fff

    class RS orangeClass

Message Structure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from enum import Enum

class MessageType(str, Enum):
MARKET_ANALYSIS_REQUEST = "MARKET_ANALYSIS_REQUEST"
STRATEGY_REQUEST = "STRATEGY_REQUEST"
RISK_ASSESSMENT_REQUEST = "RISK_ASSESSMENT_REQUEST"
COMPLIANCE_CHECK_REQUEST = "COMPLIANCE_CHECK_REQUEST"
SUPERVISOR_DECISION_REQUEST = "SUPERVISOR_DECISION_REQUEST"


class Priority(str, Enum):
CRITICAL = "CRITICAL"
HIGH = "HIGH"
MEDIUM = "MEDIUM"
LOW = "LOW"


class Message(BaseModel):
message_id: str
message_type: MessageType
priority: Priority
symbol: str
content: Dict[str, Any]
metadata: Dict[str, Any] = {}

Content-Based Routing

Routes based on message content and agent specialization:

1
2
3
4
5
6
7
8
9
10
11
class ContentBasedRouter:
"""Route messages based on content analysis"""

def route(self, message: Message, pool: AgentPool) -> str:
# Analyze message content
if message.symbol in HIGH_VOLATILITY_SYMBOLS:
return pool.get_specialist("high_volatility")
elif message.symbol in LARGE_CAP_SYMBOLS:
return pool.get_specialist("large_cap")
else:
return pool.get_specialist("general")

Round-Robin Routing

Simple sequential distribution:

1
2
3
4
5
6
7
8
9
10
11
class RoundRobinRouter:
"""Distribute messages evenly across agents"""

def __init__(self):
self.current_index = 0

def route(self, message: Message, pool: AgentPool) -> str:
agents = pool.get_all_agents()
selected = agents[self.current_index]
self.current_index = (self.current_index + 1) % len(agents)
return selected

Priority-Based Routing

Routes to least loaded agents:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class PriorityBasedRouter:
"""Route to least loaded agent for load balancing"""

def route(self, message: Message, pool: AgentPool) -> str:
agents = pool.get_all_agents()

# Find agent with lowest load
min_load = float('inf')
selected = None

for agent in agents:
load = pool.get_agent_load(agent)
if load < min_load:
min_load = load
selected = agent

return selected

Agent Pool Architecture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class AgentPool:
"""Pool of specialized agents for a specific function"""

def __init__(self, pool_type: str):
self.pool_type = pool_type
self.agents = {}
self.loads = {}

def add_agent(self, agent_id: str, specialization: str):
self.agents[agent_id] = {
"specialization": specialization,
"status": "available"
}
self.loads[agent_id] = 0

def get_specialist(self, specialization: str) -> str:
for agent_id, info in self.agents.items():
if info["specialization"] == specialization:
return agent_id
return list(self.agents.keys())[0] # Default to first agent

def process_task(self, agent_id: str, message: Message) -> Dict:
self.loads[agent_id] += 1
# Process message with selected agent
result = self._execute(agent_id, message)
self.loads[agent_id] -= 1
return result


# Pool configuration
POOL_MAP = {
MessageType.MARKET_ANALYSIS_REQUEST: "market_analyst",
MessageType.STRATEGY_REQUEST: "strategy_agent",
MessageType.RISK_ASSESSMENT_REQUEST: "risk_manager",
MessageType.COMPLIANCE_CHECK_REQUEST: "regulatory_agent",
MessageType.SUPERVISOR_DECISION_REQUEST: "supervisor"
}

Putting It All Together

A complete trading system orchestrates all components:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class AIHedgeFund:
"""Complete AI hedge fund system"""

def __init__(self):
# Initialize agent pools
self.pools = {
"market_analyst": AgentPool("market_analyst"),
"strategy_agent": AgentPool("strategy_agent"),
"risk_manager": AgentPool("risk_manager"),
"regulatory_agent": AgentPool("regulatory_agent"),
"supervisor": AgentPool("supervisor")
}

# Initialize routers
self.content_router = ContentBasedRouter()
self.priority_router = PriorityBasedRouter()

# Shared context
self.context = SharedContext(topic="")

def process_trade_request(self, request: TradeRequest) -> TradeResult:
# Step 1: Create message
message = Message(
message_id=str(uuid.uuid4()),
message_type=MessageType.MARKET_ANALYSIS_REQUEST,
priority=Priority.HIGH,
symbol=request.symbol,
content=request.to_dict()
)

# Step 2: Route through agent pipeline
results = {}

for msg_type in MessageType:
pool = self.pools[POOL_MAP[msg_type]]

# Select routing strategy based on priority
if message.priority in [Priority.CRITICAL, Priority.HIGH]:
router = self.priority_router
else:
router = self.content_router

# Route and process
agent = router.route(message, pool)
results[msg_type] = pool.process_task(agent, message)

# Update message for next stage
message.content = results[msg_type]
message.message_type = self._get_next_type(msg_type)

# Step 3: Return final decision
return TradeResult(
decision=results[MessageType.SUPERVISOR_DECISION_REQUEST],
audit_trail=results
)

Takeaways

  1. Agent specialization prevents confusion - each agent should have a single, focused purpose

  2. Shared context enables communication - agents communicate through state rather than direct calls

  3. Five core agents for trading: Market Analyst, Strategy, Risk Manager, Regulatory, and Supervisor

  4. Three orchestration patterns serve different needs: sequential, parallel, and conditional

  5. Hybrid determinism combines structured flows with AI decision-making for reliability

  6. Routing strategies match messages to appropriate agents: content-based, round-robin, or priority-based

  7. Agent pools enable specialization and load balancing within each function


This is the twelfth post in my Applied Agentic AI for Finance series. Next: Coordinated Trading Systems where we’ll explore state coordination, multi-agent RAG, and the Agentic Alpha project.

RAG and Evaluation for Financial Agents Coordinated Trading Systems

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×