Coordinated Trading Systems

When multiple agents operate simultaneously on trading operations, they must share a consistent understanding of the world state. This post explores the coordination patterns that make multi-agent trading systems reliable: persistent state management, conflict resolution, and multi-agent RAG for comprehensive analysis.

State Management in Multi-Agent Systems

Imagine a customer service agent who has amnesia - every time you talk to them, you have to start the conversation from scratch. A single-turn agent without memory is like that. To handle complex, multi-step tasks, our agents need a memory - what we call state.

State management is the process of recording what has happened, what the current status is, and what information has been gathered, so that any agent on the team can get up to speed.

Ephemeral vs Persistent State

flowchart TB
    subgraph Ephemeral["Ephemeral State"]
        direction LR
        E1[Chat History] --> E2[Session Context]
        E2 --> E3[Lost on Restart]
    end

    subgraph Persistent["Persistent State"]
        direction LR
        P1[Database/Files] --> P2[Survives Restarts]
        P2 --> P3[Shared Access]
    end

    classDef pinkClass fill:#E74C3C,stroke:#333,stroke-width:2px,color:#fff
    classDef greenClass fill:#27AE60,stroke:#333,stroke-width:2px,color:#fff

    class Ephemeral pinkClass
    class Persistent greenClass
Type Duration Use Case
Ephemeral Single session Chat history, temporary calculations
Persistent Cross-session Account balances, portfolio holdings, transaction history

Persistent State with Interchangeable Managers

For trading systems, persistent state is essential. The pattern that makes this maintainable is an interchangeable state manager that abstracts storage details:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from abc import ABC, abstractmethod
from typing import Dict, Tuple
import csv
from datetime import datetime
from pathlib import Path

class BaseStateManager(ABC):
"""Abstract base for state management"""

@abstractmethod
def get_synchronized_state(self) -> Tuple[float, Dict[str, int]]:
"""Get current cash balance and portfolio"""
pass

@abstractmethod
def update_account(self, cash_balance: float) -> None:
"""Update cash balance"""
pass

@abstractmethod
def update_portfolio(self, portfolio: Dict[str, int]) -> None:
"""Update portfolio holdings"""
pass


class CSVStateManager(BaseStateManager):
"""CSV-based implementation - easily swappable with SQL, Parquet, etc."""

def __init__(self, data_dir: str):
self.data_dir = Path(data_dir)
self._initialize_csv_files()

def _initialize_csv_files(self):
"""Initialize CSV files with headers"""
if not (self.data_dir / "account.csv").exists():
with open(self.data_dir / "account.csv", 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['cash_balance', 'total_value', 'last_updated'])
writer.writerow(['100000.00', '100000.00', datetime.now().isoformat()])

if not (self.data_dir / "portfolio.csv").exists():
with open(self.data_dir / "portfolio.csv", 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['symbol', 'quantity', 'avg_cost'])

if not (self.data_dir / "transactions.csv").exists():
with open(self.data_dir / "transactions.csv", 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['transaction_id', 'timestamp', 'type', 'symbol',
'quantity', 'price', 'total', 'status'])

def get_synchronized_state(self) -> Tuple[float, Dict[str, int]]:
# This is where locking mechanisms would be implemented
return self._read_cash_balance(), self._read_portfolio()

The key insight: agents don’t care what type of state manager they use. You could swap CSVStateManager for SQLStateManager without changing agent code.

Agent Separation: Decision vs Execution

A robust trading system separates decision-making from execution. This separation provides:

  • Clear responsibilities: One decides, one executes
  • Different characteristics: Decision is non-deterministic (LLM reasoning), execution is deterministic (follows rules)
  • Testable components: Each can be tested independently
flowchart LR
    subgraph Decision["Trade Decision Agent"]
        D1[Read Portfolio] --> D2[Analyze State]
        D2 --> D3[Make Decision]
    end

    subgraph Execution["Execution Agent"]
        E1[Verify Funds] --> E2[Execute Trade]
        E2 --> E3[Update State]
        E3 --> E4[Record Transaction]
    end

    D3 --> |Trade Request| E1
    E4 --> |Audit Trail| DB[(Shared State)]
    D1 --> |Read| DB

    classDef blueClass fill:#4A90E2,stroke:#333,stroke-width:2px,color:#fff
    classDef orangeClass fill:#F39C12,stroke:#333,stroke-width:2px,color:#fff

    class Decision blueClass
    class Execution orangeClass

Decision Agent Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from pydantic import BaseModel
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass

@dataclass
class AgentContext:
data_dir: str
state_manager: BaseStateManager

class TradeDecision(BaseModel):
should_trade: bool
transaction_type: str # BUY or SELL
symbol: str
quantity: int
estimated_price: float
reasoning: str
confidence: float

# Create decision agent
decision_agent = Agent(
model="openai:gpt-4",
deps_type=AgentContext,
output_type=TradeDecision
)

@decision_agent.system_prompt
def decision_prompt(ctx: RunContext[AgentContext]) -> str:
return """You are a Trade Decision Agent. Your role:
1. Analyze portfolio state from storage
2. Read account balance
3. Analyze current holdings
4. Decide if trade should be made
5. Provide clear reasoning

You have READ-ONLY access to state."""

@decision_agent.tool
def read_account_balance(ctx: RunContext[AgentContext]) -> str:
"""Read current cash balance"""
cash, _ = ctx.deps.state_manager.get_synchronized_state()
return f"Current cash balance: ${cash:,.2f}"

@decision_agent.tool
def read_portfolio(ctx: RunContext[AgentContext]) -> str:
"""Read current portfolio holdings"""
_, portfolio = ctx.deps.state_manager.get_synchronized_state()
if not portfolio:
return "Portfolio: Empty (no holdings)"
holdings = [f"{sym}: {qty} shares" for sym, qty in portfolio.items()]
return "Portfolio holdings:\n" + "\n".join(holdings)

@decision_agent.tool
def calculate_buying_power(ctx: RunContext[AgentContext]) -> str:
"""Calculate available buying power (80% of cash for safety)"""
cash, _ = ctx.deps.state_manager.get_synchronized_state()
buying_power = cash * 0.8
return f"Available buying power: ${buying_power:,.2f}"

Execution Agent Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class ExecutionResult(BaseModel):
success: bool
transaction_id: str
quantity: int
price: float
new_balance: float
message: str

execution_agent = Agent(
model="openai:gpt-4",
deps_type=AgentContext,
output_type=ExecutionResult
)

@execution_agent.system_prompt
def execution_prompt(ctx: RunContext[AgentContext]) -> str:
return """You are an Execution Agent. Your role:
1. Verify funds/shares availability
2. Execute trades
3. Update state atomically
4. Handle errors gracefully

You have WRITE access to state."""

@execution_agent.tool
def verify_cash_for_buy(ctx: RunContext[AgentContext],
quantity: int, price: float) -> str:
"""Verify sufficient cash for purchase"""
cash, _ = ctx.deps.state_manager.get_synchronized_state()
required = quantity * price

if cash >= required:
return f"Sufficient cash: ${cash:,.2f} >= ${required:,.2f}"
return f"Insufficient cash: ${cash:,.2f} < ${required:,.2f}"

@execution_agent.tool
def execute_buy(ctx: RunContext[AgentContext],
symbol: str, quantity: int, price: float) -> str:
"""Execute BUY order and update state"""
total_cost = quantity * price
cash, portfolio = ctx.deps.state_manager.get_synchronized_state()

# Update cash
new_balance = cash - total_cost
ctx.deps.state_manager.update_account(new_balance)

# Update portfolio
portfolio[symbol] = portfolio.get(symbol, 0) + quantity
ctx.deps.state_manager.update_portfolio(portfolio)

return f"BUY executed: {quantity} {symbol} @ ${price:.2f}"

State Coordination Strategies

When multiple agents operate simultaneously, they must share a consistent view of state. Several synchronization strategies apply:

1. Database as Source of Truth

For state stored in a database, leverage built-in features:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from contextlib import contextmanager
import threading

class LockingStateManager(CSVStateManager):
"""State manager with locking for concurrency"""

def __init__(self, data_dir: str):
super().__init__(data_dir)
self._lock = threading.Lock()

@contextmanager
def synchronized(self):
"""Context manager for synchronized access"""
self._lock.acquire()
try:
yield
finally:
self._lock.release()

def get_synchronized_state(self) -> Tuple[float, Dict[str, int]]:
with self.synchronized():
return self._read_cash_balance(), self._read_portfolio()

def execute_atomic_trade(self, trade: TradeDecision) -> bool:
"""Execute trade atomically with locking"""
with self.synchronized():
cash, portfolio = self._read_cash_balance(), self._read_portfolio()

if trade.transaction_type == "BUY":
required = trade.quantity * trade.estimated_price
if cash < required:
return False
cash -= required
portfolio[trade.symbol] = portfolio.get(trade.symbol, 0) + trade.quantity
else: # SELL
if portfolio.get(trade.symbol, 0) < trade.quantity:
return False
cash += trade.quantity * trade.estimated_price
portfolio[trade.symbol] -= trade.quantity

self._write_cash_balance(cash)
self._write_portfolio(portfolio)
return True

2. State Snapshots and Version Hashing

Track state changes with version hashes to detect concurrent modifications:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import hashlib
import time
from pydantic import BaseModel
from typing import Optional, List
from enum import Enum

class StateSnapshot(BaseModel):
snapshot_id: str
timestamp: str
cash_balance: float
portfolio: Dict[str, int]
version_hash: str

class ConflictType(Enum):
CONCURRENT_MODIFICATION = "concurrent_modification"
INSUFFICIENT_FUNDS = "insufficient_funds"
INSUFFICIENT_SHARES = "insufficient_shares"

class StateCoordinator:
"""Coordinates state across multiple agents"""

def __init__(self, state_manager: BaseStateManager):
self.state_manager = state_manager
self.current_version = 0

def create_state_snapshot(self) -> StateSnapshot:
"""Create snapshot with version hash for conflict detection"""
cash, portfolio = self.state_manager.get_synchronized_state()

state_str = f"{cash}:{sorted(portfolio.items())}"
version_hash = hashlib.md5(state_str.encode()).hexdigest()[:8]

return StateSnapshot(
snapshot_id=f"SNAP_{int(time.time())}",
timestamp=datetime.now().isoformat(),
cash_balance=cash,
portfolio=portfolio.copy(),
version_hash=version_hash
)

def detect_conflicts(self, initial_snapshot: StateSnapshot) -> List[ConflictType]:
"""Detect if state changed since snapshot"""
current_snapshot = self.create_state_snapshot()
conflicts = []

if current_snapshot.version_hash != initial_snapshot.version_hash:
conflicts.append(ConflictType.CONCURRENT_MODIFICATION)

return conflicts

Conflict Resolution Strategies

When agents disagree or state conflicts occur:

Strategy Description Use Case
Predefined Rules Embedded conflict resolution logic Simple conflicts with clear precedence
Rollback & Retry Undo operation and retry with current state Concurrent modification
Human Escalation Escalate to human for decision Critical or ambiguous conflicts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class ConflictResolver:
"""Resolves conflicts between agent decisions"""

def resolve(self, conflicts: List[ConflictType],
trade: TradeDecision,
state_manager: BaseStateManager) -> Tuple[bool, str]:
"""Attempt to resolve conflicts"""

if ConflictType.CONCURRENT_MODIFICATION in conflicts:
# Re-validate with current state
cash, portfolio = state_manager.get_synchronized_state()

if trade.transaction_type == "BUY":
required = trade.quantity * trade.estimated_price
if cash >= required:
return True, "Re-validated: Sufficient funds"
return False, f"Insufficient funds: ${cash:,.2f} < ${required:,.2f}"

else: # SELL
available = portfolio.get(trade.symbol, 0)
if available >= trade.quantity:
return True, "Re-validated: Sufficient shares"
return False, f"Insufficient shares: {available} < {trade.quantity}"

return False, "Unable to resolve conflicts"

Multi-Agent Trading System with Supervisor

The complete system uses a supervisor agent to coordinate all checks:

flowchart TD
    T[Trade Request] --> S[Supervisor]
    S --> |Create Snapshot| SNAP[State Snapshot]
    S --> |Check| C[Compliance Agent]
    S --> |Check| R[Risk Agent]
    S --> |Check| A[Account Agent]
    C --> |Decision| AGG[Aggregate Decisions]
    R --> |Decision| AGG
    A --> |Decision| AGG
    AGG --> |All Approved?| D{Decision}
    D -->|Yes| CONF[Check Conflicts]
    D -->|No| REJ[Reject Trade]
    CONF --> |No Conflicts| EXEC[Execute Trade]
    CONF --> |Conflicts| RES[Resolve/Retry]

    classDef blueClass fill:#4A90E2,stroke:#333,stroke-width:2px,color:#fff
    classDef orangeClass fill:#F39C12,stroke:#333,stroke-width:2px,color:#fff

    class S blueClass
    class D orangeClass

Supervisor Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from typing import Dict

class AgentDecision(BaseModel):
approved: bool
reasoning: str
confidence: float

class SupervisorDecision(BaseModel):
trade_id: str
final_decision: str # APPROVED or REJECTED
reasoning: str
agent_approvals: Dict[str, bool]
conflicts_detected: List[str]
conflicts_resolved: bool
execution_allowed: bool

class MultiAgentTradingSystem:
"""Coordinates multiple agents for trade processing"""

def __init__(self, state_coordinator: StateCoordinator):
self.state_coordinator = state_coordinator
self.compliance_agent = self._create_compliance_agent()
self.risk_agent = self._create_risk_agent()
self.account_agent = self._create_account_agent()
self.supervisor_agent = self._create_supervisor_agent()

def supervise_trade(self, trade: TradeDecision) -> SupervisorDecision:
"""Supervise multi-agent trade processing"""

# Step 1: Create state snapshot for conflict detection
initial_snapshot = self.state_coordinator.create_state_snapshot()

# Step 2: Run parallel agent checks
compliance = self.run_compliance_check(trade)
risk = self.run_risk_check(trade)
account = self.run_account_check(trade)

agent_approvals = {
"compliance": compliance.approved,
"risk": risk.approved,
"account": account.approved
}

all_approved = all(agent_approvals.values())

# Step 3: Detect conflicts
conflicts = self.state_coordinator.detect_conflicts(initial_snapshot)

# Step 4: Resolve conflicts if needed
conflicts_resolved = False
if conflicts and all_approved:
resolver = ConflictResolver()
conflicts_resolved, _ = resolver.resolve(
conflicts, trade, self.state_coordinator.state_manager
)
if not conflicts_resolved:
all_approved = False

# Step 5: Execute if approved
execution_allowed = all_approved and (not conflicts or conflicts_resolved)

if execution_allowed:
self.state_coordinator.state_manager.execute_atomic_trade(trade)

return SupervisorDecision(
trade_id=f"TRD_{int(time.time())}",
final_decision="APPROVED" if execution_allowed else "REJECTED",
reasoning=self._aggregate_reasoning(compliance, risk, account),
agent_approvals=agent_approvals,
conflicts_detected=[c.value for c in conflicts],
conflicts_resolved=conflicts_resolved,
execution_allowed=execution_allowed
)

Fallback Mechanisms

Critical for production systems - always handle agent failures:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def run_compliance_check(self, trade: TradeDecision) -> AgentDecision:
"""Run compliance check with fallback"""
try:
result = self.compliance_agent.run_sync(
f"Check compliance for {trade.transaction_type} {trade.quantity} {trade.symbol}",
deps=self.context
)
return result.output
except Exception as e:
# Fallback: Reject on compliance failure
return AgentDecision(
approved=False,
reasoning=f"Compliance check failed: {e}",
confidence=0.0
)

Multi-Agent RAG for Trading Intelligence

For complex queries like “Should I buy AAPL?”, a single agent isn’t enough. Multi-Agent RAG coordinates specialized retrieval agents to gather comprehensive information:

flowchart TD
    Q[User Query] --> C[Retrieval Coordinator]
    C --> |Analyze| D[Determine Required Agents]
    D --> M[Market Data Agent]
    D --> F[Fundamental Agent]
    D --> N[News Sentiment Agent]
    D --> R[Risk Agent]
    M --> S[Synthesis Agent]
    F --> S
    N --> S
    R --> S
    S --> G[Gap Analysis]
    G --> |Complete?| OUT{Output}
    OUT -->|Yes| ANS[Final Answer]
    OUT -->|No| C

    classDef blueClass fill:#4A90E2,stroke:#333,stroke-width:2px,color:#fff
    classDef orangeClass fill:#F39C12,stroke:#333,stroke-width:2px,color:#fff
    classDef greenClass fill:#27AE60,stroke:#333,stroke-width:2px,color:#fff

    class C blueClass
    class S orangeClass
    class G greenClass

Key Roles in Multi-Agent RAG

Role Responsibility
Retrieval Coordinator Analyzes query, delegates to appropriate agents
Specialized Agents Each focused on a specific data source (market, fundamentals, news, risk)
Synthesis Agent Combines results into coherent answer
Gap Analyzer Identifies missing information, triggers follow-up queries

Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from enum import Enum
from typing import List, Any

class DataSource(Enum):
MARKET_DATA = "MARKET_DATA"
FUNDAMENTAL_DATA = "FUNDAMENTAL_DATA"
NEWS_SENTIMENT = "NEWS_SENTIMENT"
RISK_METRICS = "RISK_METRICS"

class RetrievalResult(BaseModel):
agent_name: str
data_source: DataSource
retrieved_data: Dict[str, Any]
confidence: float
retrieval_time: float

class CoordinatorDecision(BaseModel):
query: str
required_agents: List[DataSource]
reasoning: str
priority_order: List[str]

class MultiAgentRAGSystem:
"""Complete Multi-Agent RAG system for trading intelligence"""

def __init__(self):
self.market_agent = MarketDataAgent()
self.fundamental_agent = FundamentalDataAgent()
self.news_agent = NewsSentimentAgent()
self.risk_agent = RiskDataAgent()
self.coordinator = RetrievalCoordinator()
self.synthesizer = SynthesisAgent()
self.gap_analyzer = GapAnalysisAgent()

def query(self, user_query: str, symbol: str,
max_iterations: int = 2) -> SynthesisResult:
"""Process query through multi-agent RAG pipeline"""

all_results = []
iteration = 0

while iteration < max_iterations:
iteration += 1

# Step 1: Coordinate - determine which agents to use
decision = self.coordinator.coordinate(user_query, symbol)

# Step 2: Execute retrievals
results = []
for agent_type in decision.required_agents:
if agent_type == DataSource.MARKET_DATA:
results.append(self.market_agent.retrieve(user_query, symbol))
elif agent_type == DataSource.FUNDAMENTAL_DATA:
results.append(self.fundamental_agent.retrieve(user_query, symbol))
elif agent_type == DataSource.NEWS_SENTIMENT:
results.append(self.news_agent.retrieve(user_query, symbol))
elif agent_type == DataSource.RISK_METRICS:
results.append(self.risk_agent.retrieve(user_query, symbol))

all_results.extend(results)

# Step 3: Synthesize
synthesis = self.synthesizer.synthesize(user_query, symbol, all_results)

# Step 4: Gap analysis
gaps = self.gap_analyzer.analyze(user_query, synthesis)

if not gaps.needs_re_query:
return synthesis

# Expand query for next iteration
user_query = gaps.follow_up_queries[0] if gaps.follow_up_queries else user_query

return synthesis

Retrieval Coordinator

The coordinator analyzes queries to determine which agents are needed:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class RetrievalCoordinator:
"""Orchestrates retrieval agents based on query analysis"""

def __init__(self):
self.agent = Agent(
model="openai:gpt-3.5-turbo",
deps_type=RAGContext,
output_type=CoordinatorDecision
)

@self.agent.tool
def analyze_query_intent(ctx: RunContext, query: str) -> str:
"""Analyze what the query is asking for"""
keywords = query.lower()
intents = []

if any(word in keywords for word in ['price', 'volume', 'technical']):
intents.append("MARKET_DATA")
if any(word in keywords for word in ['earnings', 'revenue', 'financial']):
intents.append("FUNDAMENTAL_DATA")
if any(word in keywords for word in ['news', 'sentiment', 'analyst']):
intents.append("NEWS_SENTIMENT")
if any(word in keywords for word in ['risk', 'volatility', 'beta']):
intents.append("RISK_METRICS")

return f"Query intents: {', '.join(intents) if intents else 'GENERAL'}"

def coordinate(self, query: str, symbol: str) -> CoordinatorDecision:
"""Coordinate retrieval based on query"""
context = RAGContext(query=query, symbol=symbol)
result = self.agent.run_sync(
f"Analyze this query and determine which retrieval agents are needed: {query}",
deps=context
)
return result.output

Gap Analysis and Re-Querying

The gap analyzer ensures completeness before returning results:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class GapAnalysis(BaseModel):
missing_data: List[str]
follow_up_queries: List[str]
needs_re_query: bool
reasoning: str

class GapAnalysisAgent:
"""Identifies missing information and triggers re-querying"""

@self.agent.tool
def identify_data_gaps(ctx: RunContext) -> str:
"""Identify what data is missing"""
data = ctx.deps.retrieved_data
gaps = []

if "MARKET_DATA" not in data or not data["MARKET_DATA"]:
gaps.append("Market data")
if "FUNDAMENTAL_DATA" not in data or not data["FUNDAMENTAL_DATA"]:
gaps.append("Fundamental data")
if "RISK_METRICS" not in data or not data["RISK_METRICS"]:
gaps.append("Risk metrics")

return f"Data gaps: {', '.join(gaps) if gaps else 'None - complete'}"

def analyze(self, query: str, synthesis: SynthesisResult) -> GapAnalysis:
"""Analyze information gaps"""
context = RAGContext(query=query, retrieved_data=synthesis.sources_used)

result = self.agent.run_sync(
f"Analyze if we have complete information. Confidence: {synthesis.confidence:.0%}",
deps=context
)
return result.output

Production Considerations

Audit Trails

Every multi-agent operation should be logged:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@dataclass
class CoordinationAudit:
timestamp: datetime
trade_id: str
initial_snapshot: StateSnapshot
agent_decisions: Dict[str, AgentDecision]
conflicts: List[ConflictType]
resolution: Optional[str]
final_decision: str
execution_time_ms: int

class AuditedTradingSystem(MultiAgentTradingSystem):
"""Trading system with comprehensive audit logging"""

def __init__(self, state_coordinator: StateCoordinator):
super().__init__(state_coordinator)
self.audit_log: List[CoordinationAudit] = []

def supervise_trade(self, trade: TradeDecision) -> SupervisorDecision:
start_time = time.time()
initial_snapshot = self.state_coordinator.create_state_snapshot()

result = super().supervise_trade(trade)

self.audit_log.append(CoordinationAudit(
timestamp=datetime.now(),
trade_id=result.trade_id,
initial_snapshot=initial_snapshot,
agent_decisions=result.agent_approvals,
conflicts=[ConflictType(c) for c in result.conflicts_detected],
resolution="Resolved" if result.conflicts_resolved else None,
final_decision=result.final_decision,
execution_time_ms=int((time.time() - start_time) * 1000)
))

return result

Handling Agent Failures

Graceful degradation when agents fail:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class ResilientTradingSystem(MultiAgentTradingSystem):
"""Trading system with failure resilience"""

def run_agent_check(self, agent, trade: TradeDecision,
agent_name: str) -> AgentDecision:
"""Run agent check with retry and fallback"""

for attempt in range(3):
try:
return agent.run_sync(
f"Check {trade.symbol} trade",
deps=self.context
).output
except Exception as e:
if attempt == 2:
# Final fallback: conservative rejection
return AgentDecision(
approved=False,
reasoning=f"{agent_name} failed after 3 attempts: {e}",
confidence=0.0
)
time.sleep(2 ** attempt) # Exponential backoff

Takeaways

  1. State management requires distinguishing ephemeral (session) from persistent (database) state - trading systems need persistence

  2. Interchangeable state managers abstract storage details, allowing CSV, SQL, or other backends without changing agent code

  3. Agent separation between decision and execution provides clear responsibilities and different operational characteristics

  4. State coordination through snapshots and version hashing detects concurrent modifications before they cause problems

  5. Conflict resolution strategies include predefined rules, rollback/retry, and human escalation for complex situations

  6. Supervisor agents coordinate multiple specialized agents, aggregate decisions, and handle conflicts

  7. Fallback mechanisms are critical - always handle agent failures to prevent system deadlock

  8. Multi-Agent RAG coordinates specialized retrieval agents (market, fundamental, news, risk) with synthesis and gap analysis for comprehensive trading intelligence


This is the thirteenth post in my Applied Agentic AI for Finance series. Next: Applied Agentic AI in Finance: A Complete Guide where we’ll bring together everything we’ve learned into a comprehensive overview of production patterns.

Multi-Agent Architecture for Trading

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×