Orchestrating Financial Operations

When financial workflows become too complex for simple parallel processing or chaining, the orchestrator-worker pattern provides dynamic coordination. Unlike fixed workflows, an orchestrator analyzes problems at runtime, breaks them into subtasks, and delegates work to specialized agents. This is the pattern that ties together everything we’ve learned - bringing intelligent coordination to financial operations.

The Orchestrator-Worker Pattern

Think of the orchestrator-worker pattern like a skilled project manager leading a team of expert contractors. The project manager understands the project, breaks it down dynamically, assigns tasks to the right specialists, and synthesizes their contributions into a final deliverable.

flowchart TD
    T[Complex Task] --> O[Orchestrator Agent]
    O --> |Analyze & Plan| P[Task Plan]
    P --> W1[Worker 1:
Data Extraction] P --> W2[Worker 2:
Country Analysis] P --> W3[Worker 3:
Currency Aggregation] P --> W4[Worker 4:
Report Generation] W1 --> S[Synthesis] W2 --> S W3 --> S W4 --> S S --> O O --> F[Final Output] classDef blueClass fill:#4A90E2,stroke:#333,stroke-width:2px,color:#fff classDef orangeClass fill:#F39C12,stroke:#333,stroke-width:2px,color:#fff class O blueClass class S orangeClass

Two Key Responsibilities

1. Orchestration (Dynamic Decomposition)

The orchestrator analyzes a complex goal and dynamically breaks it into logical subtasks at runtime. It then intelligently delegates these subtasks to the most suitable specialized worker agents.

2. Synthesis (Result Aggregation)

Once workers complete their tasks, the orchestrator combines their individual findings into a single, coherent, and useful final output. This is more than concatenation - it’s about connecting diverse inputs to create new understanding.

Orchestrator vs Simple Parallelization

At first glance, orchestrator-worker might seem similar to parallelization, but there’s a fundamental difference:

flowchart LR
    subgraph Para["Simple Parallelization"]
        P1[Pre-defined
Task 1] -.-> |Fixed| R1[Result] P2[Pre-defined
Task 2] -.-> |Fixed| R2[Result] P3[Pre-defined
Task 3] -.-> |Fixed| R3[Result] end subgraph Orch["Orchestrator-Workers"] O[Orchestrator] --> |Dynamic| D1[Decided at
Runtime] O --> |Dynamic| D2[Decided at
Runtime] O --> |Adapts| D3[May Add
More Tasks] end classDef pinkClass fill:#E74C3C,stroke:#333,stroke-width:2px,color:#fff classDef greenClass fill:#27AE60,stroke:#333,stroke-width:2px,color:#fff class Para pinkClass class Orch greenClass
Aspect Simple Parallelization Orchestrator-Workers
Task Definition Pre-defined, static Dynamic, runtime decisions
Flexibility Fixed workflow Adapts to problem requirements
Best For Predictable, divisible work Complex, unpredictable problems
Intelligence None - assembly line Active management and synthesis

The orchestrator pattern brings dynamic intelligence and adaptability while parallelization brings efficiency for predictable workloads.

Applied Example: SWIFT Audit Report Generation

SWIFT transactions require extensive regulatory reporting. Federal agencies, banks, and sending institutions all need different reports generated from the same transaction data. This is a perfect use case for orchestrator-worker.

The Reporting Challenge

After SWIFT transactions are processed and validated, numerous reports are required:

Report Type Purpose Audience
Country-Currency Matrix Track flows by geography and currency Risk analysts
Country Volume Reports Federal regulatory compliance Government agencies
Corridor Analysis Identify transaction pathways Compliance officers
Amount Distribution Statistical analysis for fraud detection Fraud teams
Daily/Hourly Volume Operational monitoring Operations teams
Top Institutions High-volume partner identification Relationship managers

Orchestrator Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from typing import List, Dict, Any
from models.swift_message import SWIFTMessage
from services.llm_service import LLMService

class OrchestratorWorker:
"""
Orchestrator-Worker pattern for SWIFT transaction processing
"""

def __init__(self):
self.llm_service = LLMService()
self.orchestrator = Orchestrator()

def process_transactions(self, messages: List[SWIFTMessage]):
"""
Main orchestrator method - coordinates workers to process transactions
"""
# Step 1: Orchestrator analyzes and creates task plan
prompt = self.orchestrator.create_prompt(messages)
task_plan = self.orchestrator.respond(prompt)

# Step 2: Execute each task with appropriate worker
results = []
for task in task_plan['tasks']:
print(f"Executing task: {task['type']}")
print(f"Description: {task['description']}")

worker_result = GenericWorker().execute(
task=task,
analysis=task_plan['analysis'],
messages=messages
)
results.append(worker_result)

# Step 3: Synthesize results
return self.synthesize(results, task_plan['analysis'])


class Orchestrator:
"""
Analyzes complex tasks and creates execution plans
"""

def __init__(self):
self.llm_service = LLMService()

def create_prompt(self, messages: List[SWIFTMessage]) -> str:
"""Create prompt for task decomposition"""
return f"""
You are a SWIFT Transaction processor. You have this list of messages:

{messages}

Your job is to analyze these messages and process them. Only concern
yourself with amounts, countries, debits and credits.

Do not concern yourself with fraud or compliance (already handled).

One of the tasks should be a report for amounts by country and currency.

Please break down the plan into sub-plans and tasks.

Return your response with:
<analysis>
Provide a high-level summary.
</analysis>

<tasks>
Provide four tasks to process these transactions.
Each task must have a <type> and a <description>.
</tasks>

Respond in JSON Format.
"""

def respond(self, prompt: str) -> Dict[str, Any]:
"""Get task plan from LLM"""
response = self.llm_service.complete(
prompt=prompt,
response_format={"type": "json_object"},
temperature=0
)
return response

Worker Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class GenericWorker:
"""
Executes subtasks assigned by the orchestrator
"""

def __init__(self):
self.llm_service = LLMService()

def execute(self, task: Dict, analysis: str,
messages: List[SWIFTMessage]) -> Dict[str, Any]:
"""Execute a specific subtask"""

prompt = f"""
You are a SWIFT payment processor. Please process this subtask:

Main Task: {analysis}
Subtask Type: {task['type']}
Subtask Description: {task['description']}

The SWIFT Messages are:
{messages}

Return:
1. How the task was processed
2. Summary of findings for review
"""

return self.llm_service.complete(prompt, temperature=0)

Dynamic Task Creation

The orchestrator doesn’t follow a fixed checklist. Given a set of SWIFT messages, it might dynamically determine:

1
2
3
4
5
6
7
8
9
10
11
Task 1: Data Extraction
- Extract transaction amounts, currency, and sender BIC from each message

Task 2: Country Determination
- Determine country of origin from BIC codes (5th and 6th characters)

Task 3: Currency Aggregation
- Aggregate transaction amounts by currency

Task 4: Geographic Summary
- Create country-by-currency volume matrix

The LLM brings domain knowledge to task creation - it knows that BIC codes encode country information, that regulatory reports need geographic breakdowns, and what standard SWIFT processing workflows look like.

Task Coordination Patterns

The orchestrator can manage different execution patterns based on task dependencies:

Parallel Execution

When tasks are independent:

1
2
3
4
Orchestrator
├─→ [Country Volume Report] ─┐
├─→ [Daily Volume Analysis] ─┤→ Complete simultaneously
└─→ [Currency Distribution] ─┘

Sequential with Dependencies

When one task needs another’s output:

1
2
3
Orchestrator
├─→ [Country Volume Report] → Complete
└─→ [Corridor Analysis] → Uses country data

Mixed Execution

Complex report suites with multiple dependency levels:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
async def execute_with_dependencies(self, task_plan: Dict) -> Dict:
"""Execute tasks respecting dependencies"""

# Phase 1: Base data collection (must complete first)
base_data = await self.execute_task(task_plan['base_collection'])

# Phase 2: Independent analyses (parallel)
analyses = await asyncio.gather(
self.execute_task(task_plan['country_analysis'], base_data),
self.execute_task(task_plan['currency_analysis'], base_data),
self.execute_task(task_plan['volume_analysis'], base_data)
)

# Phase 3: Dependent analysis (needs all previous)
corridor = await self.execute_task(
task_plan['corridor_analysis'],
dependencies={
'country': analyses[0],
'currency': analyses[1],
'volume': analyses[2]
}
)

return self.synthesize([base_data, *analyses, corridor])

LLM-Powered Advantages

The orchestrator pattern gains significant power from LLM integration:

Natural Language Interface

1
2
3
4
5
6
7
# Traditional approach - fixed queries
def get_country_report():
return run_sql("SELECT country, SUM(amount)...")

# LLM-powered approach - natural language
request = "Show me top 10 banks by volume this week"
# LLM interprets → Creates task plan → Generates report

Adaptive Formatting

The LLM adjusts output format based on data characteristics and context:

1
2
3
4
5
6
7
8
9
10
11
synthesis_prompt = """
You have these analysis results from multiple workers:

{worker_results}

Synthesize into a comprehensive executive report that:
1. Highlights key patterns and anomalies
2. Connects insights across different analyses
3. Provides actionable recommendations
4. Uses appropriate formatting for the audience
"""

Intelligent Aggregation

Beyond simple concatenation, the orchestrator can:

  • Resolve inconsistencies between worker outputs
  • Identify patterns across multiple analyses
  • Provide narrative summaries explaining the data
  • Generate insights beyond what was explicitly requested

Complete SWIFT Processing Pipeline

The orchestrator-worker pattern typically serves as the final stage in a complete processing pipeline:

flowchart TD
    subgraph Stage1["Stage 1: Validation"]
        M[SWIFT Messages] --> EO[Evaluator-Optimizer]
        EO --> V[Validated Messages]
    end

    subgraph Stage2["Stage 2: Fraud Detection"]
        V --> PA[Parallelization Agent]
        PA --> F1[Fraud Detector 1]
        PA --> F2[Fraud Detector 2]
        PA --> F3[Fraud Detector 3]
        F1 --> AG[Aggregated Fraud Scores]
        F2 --> AG
        F3 --> AG
    end

    subgraph Stage3["Stage 3: Deep Analysis"]
        AG --> PC[Prompt Chaining]
        PC --> J[Junior Analyst]
        J --> T[Technical Analyst]
        T --> C[Compliance Officer]
        C --> R[Risk Rating]
    end

    subgraph Stage4["Stage 4: Reporting"]
        R --> OW[Orchestrator-Worker]
        OW --> W1[Data Worker]
        OW --> W2[Country Worker]
        OW --> W3[Report Worker]
        W1 --> SYN[Synthesis]
        W2 --> SYN
        W3 --> SYN
        SYN --> REP[Audit Reports]
    end

    classDef blueClass fill:#4A90E2,stroke:#333,stroke-width:2px,color:#fff
    classDef orangeClass fill:#F39C12,stroke:#333,stroke-width:2px,color:#fff
    classDef greenClass fill:#27AE60,stroke:#333,stroke-width:2px,color:#fff
    classDef pinkClass fill:#E74C3C,stroke:#333,stroke-width:2px,color:#fff

    class Stage1 blueClass
    class Stage2 orangeClass
    class Stage3 greenClass
    class Stage4 pinkClass

Each stage uses the appropriate pattern:

  • Evaluator-Optimizer: Quality control and message correction
  • Parallelization: Concurrent fraud detection
  • Prompt Chaining: Multi-step investigation
  • Orchestrator-Worker: Dynamic report generation

Production Considerations

Report Scheduling

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class ReportScheduler:
def __init__(self, orchestrator: OrchestratorWorker):
self.orchestrator = orchestrator

async def run_scheduled_reports(self):
"""Generate required reports on schedule"""

schedule = {
"daily": ["volume_summary", "fraud_incidents"],
"weekly": ["country_analysis", "corridor_trends"],
"monthly": ["regulatory_compliance", "top_institutions"]
}

current_period = self.get_current_period()
reports_needed = schedule.get(current_period, [])

for report_type in reports_needed:
await self.orchestrator.generate_report(report_type)

Audit Trail

Every orchestrator action should be logged for compliance:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@dataclass
class OrchestrationAudit:
timestamp: datetime
task_plan: Dict[str, Any]
worker_assignments: List[Dict]
worker_results: List[Dict]
synthesis_output: str
execution_time_ms: int

class AuditedOrchestrator(OrchestratorWorker):
def __init__(self):
super().__init__()
self.audit_trail: List[OrchestrationAudit] = []

def process_transactions(self, messages: List[SWIFTMessage]):
start_time = time.time()

# ... normal processing ...

self.audit_trail.append(OrchestrationAudit(
timestamp=datetime.now(),
task_plan=task_plan,
worker_assignments=assignments,
worker_results=results,
synthesis_output=final_output,
execution_time_ms=int((time.time() - start_time) * 1000)
))

return final_output

Error Handling

Graceful degradation when workers fail:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async def execute_with_fallback(self, task: Dict) -> Dict:
"""Execute task with retry and fallback logic"""

try:
return await self.primary_worker.execute(task)
except WorkerError as e:
logger.warning(f"Primary worker failed: {e}")

try:
return await self.backup_worker.execute(task)
except WorkerError:
logger.error("Both workers failed, escalating to manual review")
return {
"status": "MANUAL_REVIEW_REQUIRED",
"task": task,
"reason": "Automated processing failed"
}

Takeaways

  1. Orchestrator-worker provides dynamic coordination - unlike fixed workflows, the orchestrator analyzes problems at runtime and adapts its approach

  2. Two key responsibilities: decomposition (breaking complex tasks into subtasks) and synthesis (combining results into coherent output)

  3. Differs from parallelization in its dynamic intelligence - tasks are decided at runtime rather than pre-defined

  4. LLM integration enables natural language interfaces, adaptive formatting, and intelligent aggregation beyond simple concatenation

  5. Task coordination patterns include parallel execution, sequential dependencies, and mixed approaches

  6. Production systems need scheduled report generation, comprehensive audit trails, and graceful error handling

  7. Completes the workflow pattern toolkit - combined with chaining, routing, parallelization, and evaluator-optimizer, orchestrator-worker enables sophisticated financial processing pipelines


This is the seventh post in my Applied Agentic AI for Finance series. Next: Financial Tools and Structured Outputs where we’ll explore building tools for market data access and using Pydantic for financial data models.

Parallel Processing and Quality Control in Finance

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×