Parallel Processing and Quality Control in Finance

Financial analysis often requires examining multiple dimensions simultaneously - market data, sentiment, risk factors, regulatory compliance. Rather than processing these sequentially, parallel workflows distribute work across multiple agents for speed and diverse perspectives. Combined with evaluator-optimizer patterns for quality assurance, these techniques form the backbone of production-grade financial AI systems.

The Power of Parallel Processing

Parallelization in agentic workflows means multiple agents work on different parts of a task - or even the same task - simultaneously. Think of it like a research team: instead of one analyst doing everything sequentially, multiple specialists tackle their domains in parallel, then synthesize findings.

flowchart TD
    I[Investment Research Request] --> D[Distributor]
    D --> F[Fundamental
Analysis Agent]
    D --> T[Technical
Analysis Agent]
    D --> S[Sentiment
Analysis Agent]
    D --> R[Risk
Analysis Agent]
    F --> A[Aggregator]
    T --> A
    S --> A
    R --> A
    A --> O[Comprehensive Report]

    classDef blueClass fill:#4A90E2,stroke:#333,stroke-width:2px,color:#fff
    classDef orangeClass fill:#F39C12,stroke:#333,stroke-width:2px,color:#fff

    class D blueClass
    class A orangeClass

This follows the scatter-gather pattern: a problem is scattered to multiple workers, and their individual findings are gathered into a final result.

The Independence Requirement

The golden rule for effective parallelization: subtasks must be largely independent. Agent A working on Subtask A shouldn’t need to wait for Agent B to finish Subtask B. If outputs depend on each other, sequential chaining is more appropriate.

flowchart TB
    subgraph Good["Good: Independent Tasks"]
        direction LR
        G1[Analyze Fundamentals] -.-> |parallel| G2[Analyze Technicals]
        G2 -.-> |parallel| G3[Analyze Sentiment]
    end

    subgraph Bad["Bad: Dependent Tasks"]
        direction LR
        B1[Get Price] --> B2[Calculate Returns]
        B2 --> B3[Assess Risk]
    end

    classDef greenClass fill:#27AE60,stroke:#333,stroke-width:2px,color:#fff
    classDef pinkClass fill:#E74C3C,stroke:#333,stroke-width:2px,color:#fff

    class Good greenClass
    class Bad pinkClass

Task Decomposition Strategies

Sectioning (Sharding)

For large, divisible inputs, split the data and process chunks in parallel:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
from typing import List

async def analyze_portfolio_sharded(positions: List[dict]) -> dict:
"""Analyze a large portfolio by sharding across agents"""

# Split positions into chunks
chunk_size = 10
chunks = [positions[i:i+chunk_size]
for i in range(0, len(positions), chunk_size)]

# Analyze chunks in parallel
tasks = [analyze_chunk(chunk) for chunk in chunks]
results = await asyncio.gather(*tasks)

# Aggregate results
return aggregate_position_analyses(results)

async def analyze_chunk(positions: List[dict]) -> dict:
prompt = f"""
Analyze these portfolio positions:
{positions}

For each position, assess:
- Current value and P&L
- Risk exposure
- Sector allocation
"""
return await llm.complete(prompt)

Financial use cases:

  • Analyzing large portfolios
  • Processing batch transactions
  • Reviewing multiple compliance reports

Aspect-Based Decomposition

When analyzing a single subject from multiple angles:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
async def comprehensive_stock_analysis(symbol: str) -> dict:
"""Analyze a stock from multiple independent aspects"""

# Define independent analysis aspects
analyses = {
"fundamental": fundamental_analysis(symbol),
"technical": technical_analysis(symbol),
"sentiment": sentiment_analysis(symbol),
"risk": risk_analysis(symbol),
"competitor": competitor_analysis(symbol)
}

# Run all analyses in parallel
results = {}
tasks = {name: asyncio.create_task(fn)
for name, fn in analyses.items()}

for name, task in tasks.items():
results[name] = await task

return synthesize_analyses(results)

async def fundamental_analysis(symbol: str) -> dict:
prompt = f"""
Perform fundamental analysis of {symbol}:
- Revenue trends and growth
- Profit margins and cash flow
- Debt levels and balance sheet health
- Valuation metrics (P/E, P/B, EV/EBITDA)
"""
return await llm.complete(prompt)

async def technical_analysis(symbol: str) -> dict:
prompt = f"""
Perform technical analysis of {symbol}:
- Price trends and momentum
- Support and resistance levels
- Volume patterns
- Key technical indicators (RSI, MACD, Moving Averages)
"""
return await llm.complete(prompt)

Financial use cases:

  • Due diligence reports
  • Credit assessments
  • Investment recommendations

Identical Tasks for Diversity or Voting

Run the same task multiple times to generate diverse outputs or achieve consensus:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
async def robust_trade_decision(order: dict) -> dict:
"""Get multiple independent opinions on a trade"""

# Run same analysis with different prompts/temperatures
opinions = await asyncio.gather(
analyze_trade(order, perspective="conservative"),
analyze_trade(order, perspective="moderate"),
analyze_trade(order, perspective="aggressive")
)

# Voting mechanism
approvals = sum(1 for o in opinions if o["recommendation"] == "APPROVE")

return {
"decision": "APPROVE" if approvals >= 2 else "REJECT",
"confidence": approvals / 3,
"opinions": opinions
}

async def analyze_trade(order: dict, perspective: str) -> dict:
prompt = f"""
As a {perspective} risk analyst, evaluate this trade:
{order}

Provide recommendation: APPROVE or REJECT
Include reasoning.
"""
return await llm.complete(prompt)

Financial use cases:

  • High-stakes trading decisions
  • Fraud detection (multiple detectors)
  • Regulatory interpretations

Aggregation Strategies

Once parallel tasks complete, their outputs must be combined:

Concatenation

Simply join outputs together:

1
2
3
def concatenate_reports(reports: List[str]) -> str:
"""Combine section reports into full document"""
return "\n\n---\n\n".join(reports)

Best for: Document assembly, log compilation

Comparison and Selection

Choose the best output from multiple candidates:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
async def select_best_analysis(analyses: List[dict]) -> dict:
"""Evaluate and select the best analysis"""

evaluation_prompt = f"""
Compare these stock analyses and select the best one:

{json.dumps(analyses, indent=2)}

Criteria:
- Depth of analysis
- Data accuracy
- Actionable insights
- Risk consideration

Return the index (0-based) of the best analysis and explain why.
"""

result = await llm.complete(evaluation_prompt)
best_index = extract_index(result)
return analyses[best_index]

Best for: Creative generation, strategy selection

Voting / Majority Rule

Consensus from multiple independent assessments:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def majority_vote(decisions: List[str]) -> str:
"""Return most common decision"""
from collections import Counter
counts = Counter(decisions)
return counts.most_common(1)[0][0]

def weighted_vote(decisions: List[dict]) -> str:
"""Vote weighted by confidence scores"""
weighted_counts = {}
for d in decisions:
decision = d["decision"]
confidence = d["confidence"]
weighted_counts[decision] = weighted_counts.get(decision, 0) + confidence

return max(weighted_counts, key=weighted_counts.get)

Best for: Classification tasks, yes/no decisions, risk ratings

Synthesizer LLM

A dedicated agent combines diverse outputs into coherent synthesis:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
async def synthesize_research(analyses: dict) -> str:
"""Combine multiple analyses into unified report"""

synthesis_prompt = f"""
You are a senior investment strategist. Synthesize these analyses
into a coherent investment recommendation:

Fundamental Analysis:
{analyses['fundamental']}

Technical Analysis:
{analyses['technical']}

Sentiment Analysis:
{analyses['sentiment']}

Risk Analysis:
{analyses['risk']}

Create a unified recommendation that:
- Weighs evidence from all sources
- Addresses contradictions between analyses
- Provides clear actionable guidance
- Includes risk-adjusted position sizing
"""

return await llm.complete(synthesis_prompt)

Best for: Complex multi-source analysis, research reports

The Evaluator-Optimizer Pattern

While parallelization handles distribution, the evaluator-optimizer pattern ensures quality through iterative refinement. This is critical for high-stakes financial outputs where accuracy and compliance are non-negotiable.

flowchart TD
    T[Task] --> O[Optimizer/Generator]
    O --> OUT[Output v1]
    OUT --> E[Evaluator]
    E --> D{Meets Criteria?}
    D -->|Yes| F[Final Output]
    D -->|No| FB[Feedback]
    FB --> O

    classDef orangeClass fill:#F39C12,stroke:#333,stroke-width:2px,color:#fff
    classDef greenClass fill:#27AE60,stroke:#333,stroke-width:2px,color:#fff

    class E orangeClass
    class D greenClass

Two Key Roles

Optimizer (Generator) Agent:

  • Takes initial task and generates output
  • Refines output based on evaluator feedback
  • Focuses on improving specific issues identified

Evaluator (Critique) Agent:

  • Acts as expert reviewer
  • Assesses output against predefined criteria
  • Provides specific, actionable feedback

Essential Elements for Effective Loops

1. Clear Evaluation Criteria

Criteria must be specific, measurable, and unambiguous:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
TRADE_ANALYSIS_CRITERIA = {
"completeness": {
"description": "All required sections present",
"required_sections": ["market_context", "risk_assessment",
"position_sizing", "entry_exit_points"]
},
"accuracy": {
"description": "Numerical calculations are correct",
"checks": ["position_size_within_limits", "risk_reward_calculated"]
},
"compliance": {
"description": "Meets regulatory requirements",
"checks": ["disclosure_present", "risk_warnings_included"]
},
"actionability": {
"description": "Clear, executable recommendations",
"checks": ["specific_entry_price", "stop_loss_defined", "target_defined"]
}
}

2. Actionable Feedback

Feedback must be specific enough to guide improvement:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
evaluator_prompt = """
Review this investment recommendation against our criteria:

Recommendation:
{recommendation}

Criteria:
{criteria}

For each criterion that is NOT met, provide:
1. What is missing or incorrect
2. Specific suggestion for how to fix it
3. Example of what a correct version would look like

Rate overall quality 1-10.
Respond APPROVED if score >= 8, otherwise provide detailed feedback.
"""

3. Stopping Conditions

Define when the loop should terminate:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class EvaluatorOptimizer:
def __init__(self, max_iterations: int = 5, min_score: float = 8.0):
self.max_iterations = max_iterations
self.min_score = min_score

async def run(self, task: str) -> dict:
output = None
feedback = None

for iteration in range(self.max_iterations):
# Generate or refine
output = await self.optimizer.generate(
task=task,
previous_output=output,
feedback=feedback
)

# Evaluate
evaluation = await self.evaluator.evaluate(output)

if evaluation["score"] >= self.min_score:
return {
"output": output,
"iterations": iteration + 1,
"final_score": evaluation["score"]
}

# Check for diminishing returns
if iteration > 0 and evaluation["score"] - last_score < 0.5:
return {
"output": output,
"status": "DIMINISHING_RETURNS",
"iterations": iteration + 1
}

feedback = evaluation["feedback"]
last_score = evaluation["score"]

return {
"output": output,
"status": "MAX_ITERATIONS_REACHED",
"iterations": self.max_iterations
}

Applied Example: Investment Research Pipeline

Combining parallelization and evaluation-optimization:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class InvestmentResearchPipeline:
def __init__(self):
self.analysts = {
"fundamental": FundamentalAnalyst(),
"technical": TechnicalAnalyst(),
"sentiment": SentimentAnalyst(),
"risk": RiskAnalyst()
}
self.synthesizer = ResearchSynthesizer()
self.evaluator = ResearchEvaluator()

async def research(self, symbol: str) -> dict:
# Phase 1: Parallel Analysis
analyses = await self.parallel_analysis(symbol)

# Phase 2: Synthesis with Evaluation Loop
report = await self.evaluated_synthesis(analyses)

return report

async def parallel_analysis(self, symbol: str) -> dict:
"""Run all analyses in parallel"""
tasks = {
name: asyncio.create_task(analyst.analyze(symbol))
for name, analyst in self.analysts.items()
}

results = {}
for name, task in tasks.items():
results[name] = await task

return results

async def evaluated_synthesis(self, analyses: dict) -> dict:
"""Synthesize with quality control loop"""
synthesis = None
feedback = None

for iteration in range(3): # Max 3 iterations
# Generate synthesis
synthesis = await self.synthesizer.synthesize(
analyses=analyses,
previous=synthesis,
feedback=feedback
)

# Evaluate
evaluation = await self.evaluator.evaluate(synthesis)

if evaluation["approved"]:
return {
"report": synthesis,
"quality_score": evaluation["score"],
"iterations": iteration + 1
}

feedback = evaluation["feedback"]

return {
"report": synthesis,
"status": "NEEDS_REVIEW",
"iterations": 3
}

Financial-Specific Considerations

Handling Conflicting Analyses

When parallel agents disagree:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
async def resolve_conflicts(analyses: dict) -> dict:
"""Handle contradictions between analyses"""

conflicts = identify_conflicts(analyses)

if not conflicts:
return {"status": "consistent", "analyses": analyses}

resolution_prompt = f"""
These analyses have conflicting conclusions:

{json.dumps(conflicts, indent=2)}

For each conflict:
1. Explain why the disagreement exists
2. Assess which perspective has stronger evidence
3. Provide a balanced view that acknowledges uncertainty

A conflict is NOT a reason to reject - it's information about uncertainty.
"""

resolution = await llm.complete(resolution_prompt)

return {
"status": "resolved",
"original_analyses": analyses,
"conflicts": conflicts,
"resolution": resolution
}

Compliance Checkpoints

Add regulatory validation to evaluation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
COMPLIANCE_CRITERIA = {
"risk_disclosure": "Must include clear risk warnings",
"suitability": "Must consider investor profile",
"conflicts_of_interest": "Must disclose any conflicts",
"past_performance": "Must include past performance disclaimer",
"regulatory_status": "Must include regulatory information"
}

async def compliance_evaluation(recommendation: str) -> dict:
"""Check regulatory compliance"""
prompt = f"""
Review this investment recommendation for regulatory compliance:

{recommendation}

Check each requirement:
{json.dumps(COMPLIANCE_CRITERIA, indent=2)}

For any failed requirement, specify exactly what needs to be added.
"""

return await llm.complete(prompt)

Audit Trail

Track all iterations for regulatory purposes:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@dataclass
class AuditEntry:
timestamp: datetime
iteration: int
output_hash: str
evaluation_score: float
feedback: str
changes_made: List[str]

class AuditedEvaluatorOptimizer:
def __init__(self):
self.audit_trail: List[AuditEntry] = []

async def run(self, task: str) -> dict:
# ... optimization loop ...

for iteration in range(self.max_iterations):
# Record each iteration
self.audit_trail.append(AuditEntry(
timestamp=datetime.now(),
iteration=iteration,
output_hash=hash_output(output),
evaluation_score=evaluation["score"],
feedback=evaluation.get("feedback", ""),
changes_made=extract_changes(previous_output, output)
))

return {
"output": output,
"audit_trail": self.audit_trail
}

Takeaways

  1. Parallelization requires independence - only parallelize tasks that don’t depend on each other’s outputs

  2. Three decomposition strategies serve different needs: sectioning for large data, aspect-based for multi-angle analysis, and identical tasks for diversity/voting

  3. Aggregation strategy matters - choose between concatenation, selection, voting, or synthesis based on the nature of outputs

  4. Evaluator-optimizer loops ensure quality through iterative refinement with clear criteria, actionable feedback, and defined stopping conditions

  5. Financial applications need special handling for conflicting analyses, compliance requirements, and audit trails

  6. Combine patterns - parallelization for speed, evaluation-optimization for quality, creating robust pipelines for production use


This is the sixth post in my Applied Agentic AI for Finance series. Next: Orchestrating Financial Operations where we’ll explore the orchestrator-worker pattern for coordinating complex financial workflows.

Prompt Chaining and Routing in Trading Systems Orchestrating Financial Operations

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×