Connecting Agents to Financial Data Sources

Agents become truly useful when they stop just reasoning and start acting - interacting with real systems like market data feeds, financial news sources, and internal databases. This shift happens through three key integration patterns: external APIs for real-time data, web search for unstructured information, and database connections for structured internal data. Mastering these patterns transforms agents from conversational tools into operational financial systems.

Why External Data Matters

An LLM alone can’t answer “What’s Apple’s current stock price?” or “What are today’s best credit card offers?” - this information isn’t in its training data and changes constantly. Financial agents need access to:

  • Real-time market data: Stock prices, FX rates, bond yields
  • Current news and research: Earnings reports, regulatory filings, market analysis
  • Internal systems: Transaction records, customer data, compliance logs
flowchart LR
    USER[User Query] --> AGENT[Financial Agent]
    AGENT --> API[Market APIs]
    AGENT --> SEARCH[Web Search]
    AGENT --> DB[Databases]
    API --> AGENT
    SEARCH --> AGENT
    DB --> AGENT
    AGENT --> RESPONSE[Informed Response]

    classDef blueClass fill:#4A90E2,stroke:#333,stroke-width:2px,color:#fff

    class AGENT blueClass

External APIs: Real-Time Market Data

APIs (Application Programming Interfaces) are structured ways for software systems to communicate. For financial agents, APIs provide access to live market data, pricing information, and execution capabilities.

The API Pattern

flowchart LR
    A[Agent] --> |HTTP Request| B[API Endpoint]
    B --> |JSON Response| A
    A --> C[Process & Respond]

Think of it like placing an order at a restaurant: the agent formats a request, sends it to the API (the kitchen), and receives a response (the meal).

Building a Stock Price Agent

Let’s build an agent that fetches live stock prices and calculates investment costs:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
from dataclasses import dataclass
from typing import Optional
import requests
import json

@dataclass
class InvestmentRequest:
"""Data model for stock purchase request"""
ticker: Optional[str] = None
shares: Optional[int] = None
broker_fee_pct: Optional[float] = None
commission_per_trade: Optional[float] = None

def get_missing_fields(self) -> list:
missing = []
if not self.ticker:
missing.append("ticker")
if not self.shares:
missing.append("shares")
if self.broker_fee_pct is None:
missing.append("broker_fee_pct")
if self.commission_per_trade is None:
missing.append("commission_per_trade")
return missing

@dataclass
class InvestmentResult:
"""Data model for investment calculation results"""
ticker: str
shares: int
current_price: float
base_cost: float
broker_fee: float
commission: float
total_cost: float
timestamp: str


class StockPriceAgent:
"""Agent for fetching stock prices and calculating investment costs"""

def __init__(self, api_key: str = "demo"):
self.api_key = api_key
self.api_url = "https://www.alphavantage.co/query"

def get_stock_price(self, ticker: str) -> Optional[float]:
"""Fetch current stock price from Alpha Vantage API"""
try:
params = {
"function": "GLOBAL_QUOTE",
"symbol": ticker.upper(),
"apikey": self.api_key
}
response = requests.get(self.api_url, params=params, timeout=10)
response.raise_for_status()

data = response.json()
quote = data.get("Global Quote", {})
price_str = quote.get("05. price")

if price_str:
return float(price_str)
return None

except Exception as e:
print(f"API Error: {e}")
return None

def calculate_total_cost(
self,
ticker: str,
shares: int,
price: float,
broker_fee_pct: float,
commission: float
) -> InvestmentResult:
"""Calculate total investment cost including fees"""
base_cost = shares * price
broker_fee = base_cost * (broker_fee_pct / 100)
total_cost = base_cost + broker_fee + commission

return InvestmentResult(
ticker=ticker,
shares=shares,
current_price=price,
base_cost=base_cost,
broker_fee=broker_fee,
commission=commission,
total_cost=total_cost,
timestamp=datetime.now().isoformat()
)

def extract_investment_info(self, user_input: str) -> InvestmentRequest:
"""Extract investment details from natural language using LLM"""

prompt = f"""Extract investment information from this input:
"{user_input}"

Return JSON with these fields:
- ticker: stock symbol (string or null)
- shares: number of shares (integer or null)
- broker_fee_pct: broker fee percentage (number or null)
- commission_per_trade: flat commission (number or null)

Return only valid JSON."""

response = llm.complete(
prompt,
response_format={"type": "json_object"}
)

data = json.loads(response)
return InvestmentRequest(**data)

def process_request(self, user_input: str) -> str:
"""Main processing function"""

# 1. Extract investment information
request = self.extract_investment_info(user_input)

# 2. Check for missing information
missing = request.get_missing_fields()
if missing:
return f"Please provide: {', '.join(missing)}"

# 3. Fetch current stock price
price = self.get_stock_price(request.ticker)
if price is None:
return f"Unable to fetch price for {request.ticker}"

# 4. Calculate total cost
result = self.calculate_total_cost(
ticker=request.ticker,
shares=request.shares,
price=price,
broker_fee_pct=request.broker_fee_pct,
commission=request.commission_per_trade
)

# 5. Format response
return f"""
Investment Summary for {result.ticker}:
- Shares: {result.shares}
- Current Price: ${result.current_price:,.2f}
- Base Cost: ${result.base_cost:,.2f}
- Broker Fee ({request.broker_fee_pct}%): ${result.broker_fee:,.2f}
- Commission: ${result.commission:,.2f}
- Total Cost: ${result.total_cost:,.2f}
"""

Using the Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
agent = StockPriceAgent()

# Complete request
response = agent.process_request(
"Buy 50 shares of MSFT with 0.5% broker fee and $10 commission"
)
print(response)

# Output:
# Investment Summary for MSFT:
# - Shares: 50
# - Current Price: $378.45
# - Base Cost: $18,922.50
# - Broker Fee (0.5%): $94.61
# - Commission: $10.00
# - Total Cost: $19,027.11

Handling API Challenges

Authentication: Most APIs require credentials:

1
2
3
4
5
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
response = requests.get(url, headers=headers)

Rate Limiting: Implement retries with backoff:

1
2
3
4
5
6
7
from tenacity import retry, wait_exponential, stop_after_attempt

@retry(wait=wait_exponential(min=1, max=10), stop=stop_after_attempt(3))
def fetch_with_retry(url: str) -> dict:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()

Error Handling: Always handle failures gracefully:

1
2
3
4
5
6
7
def get_price_with_fallback(self, ticker: str) -> Optional[float]:
try:
return self.get_stock_price(ticker)
except requests.Timeout:
return self.get_cached_price(ticker)
except Exception:
return None

Web Search: Unstructured Financial Research

Not all information lives in structured APIs. Credit card offers, analyst opinions, regulatory news - these exist in unstructured web content. Web search agents retrieve and synthesize this information.

When asked “What are the best no-fee credit cards right now?”, the answer exists across news sites, comparison sites, and financial blogs. Web search enables:

  • Grounding responses in evidence
  • Accessing real-time information
  • Citing verifiable sources

Building a Financial Research Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
from dataclasses import dataclass, field
from typing import List, Dict, Optional

@dataclass
class SearchResult:
"""Single search result"""
title: str
url: str
snippet: str
domain: str

@dataclass
class CreditCardRecord:
"""Structured credit card data"""
card_name: str
issuer: str
rewards_rate: str
annual_fee: str
bonus_offer: Optional[str] = None
best_for: Optional[str] = None
source_url: str = ""

@dataclass
class CardComparison:
"""Final comparison output"""
intro: str
top_cards: List[CreditCardRecord]
takeaway: str
sources: List[Dict[str, str]]
disclaimer: str


class CreditCardSearchAgent:
"""Agent for searching and analyzing credit card offers"""

def __init__(self):
self.reputable_domains = {
'nerdwallet.com',
'bankrate.com',
'creditkarma.com',
'thepointsguy.com',
'forbes.com'
}

def search_web(self, query: str, num_results: int = 5) -> List[SearchResult]:
"""Search web using Tavily API"""
response = tavily_client.search(
query=query,
search_depth="advanced",
max_results=num_results
)

results = []
for item in response.get("results", []):
results.append(SearchResult(
title=item["title"],
url=item["url"],
snippet=item["content"],
domain=self._extract_domain(item["url"])
))
return results

def filter_reputable_sources(
self,
results: List[SearchResult]
) -> List[SearchResult]:
"""Keep only results from trusted financial sources"""
return [
r for r in results
if r.domain in self.reputable_domains
]

def extract_card_data(
self,
results: List[SearchResult]
) -> List[CreditCardRecord]:
"""Use LLM to extract structured card data from snippets"""

combined_text = "\n\n".join([
f"Source: {r.title}\nContent: {r.snippet}"
for r in results
])

prompt = f"""Extract credit card information from these sources:

{combined_text}

For each card mentioned, extract:
- card_name: Full name of the card
- issuer: Bank or company
- rewards_rate: Main rewards rate
- annual_fee: Annual fee amount
- bonus_offer: Sign-up bonus if mentioned
- best_for: What spending category it's best for

Return as JSON array."""

response = llm.complete(
prompt,
response_format={"type": "json_object"}
)

cards = json.loads(response)
return [CreditCardRecord(**card) for card in cards.get("cards", [])]

def deduplicate_cards(
self,
cards: List[CreditCardRecord]
) -> List[CreditCardRecord]:
"""Remove duplicate card entries"""
seen = set()
unique = []
for card in cards:
key = card.card_name.lower()
if key not in seen:
seen.add(key)
unique.append(card)
return unique

def synthesize_comparison(
self,
cards: List[CreditCardRecord],
user_query: str
) -> CardComparison:
"""Generate user-friendly comparison summary"""

cards_json = json.dumps([c.__dict__ for c in cards[:5]], indent=2)

prompt = f"""Create a credit card comparison based on:

User query: {user_query}

Cards found:
{cards_json}

Provide:
1. Brief intro (2-3 sentences)
2. Key takeaway about best choice
3. Standard disclaimer about rates changing

Return as JSON with fields: intro, takeaway, disclaimer"""

response = llm.complete(
prompt,
response_format={"type": "json_object"}
)

summary = json.loads(response)

return CardComparison(
intro=summary["intro"],
top_cards=cards[:5],
takeaway=summary["takeaway"],
sources=[{"title": c.source_url} for c in cards[:5]],
disclaimer=summary["disclaimer"]
)

def find_best_cards(self, user_query: str) -> CardComparison:
"""Main method: search, filter, extract, synthesize"""

print("Searching for current credit card offers...")

# Multi-query search for broader coverage
queries = [
f"{user_query} 2025",
"best rewards credit cards today",
"top cash back cards site:nerdwallet.com"
]

all_results = []
for query in queries:
results = self.search_web(query)
all_results.extend(results)

print(f"Found {len(all_results)} results")

# Filter to trusted sources
filtered = self.filter_reputable_sources(all_results)
print(f"Filtered to {len(filtered)} reputable sources")

# Extract structured data
cards = self.extract_card_data(filtered)
print(f"Extracted {len(cards)} card records")

# Deduplicate
unique = self.deduplicate_cards(cards)
print(f"Deduplicated to {len(unique)} unique cards")

# Synthesize comparison
return self.synthesize_comparison(unique, user_query)

Web Search Best Practices

Source Validation: Only trust reputable domains

1
2
3
4
5
6
7
TRUSTED_FINANCE_DOMAINS = {
'nerdwallet.com',
'bankrate.com',
'wsj.com',
'bloomberg.com',
'reuters.com'
}

Citation Requirement: Always cite sources

1
2
3
4
response = f"""Based on {source.title}:
{summary}

Source: {source.url}"""

Recency Filtering: Prefer recent content

1
2
3
def filter_recent(results: List[SearchResult], days: int = 30):
cutoff = datetime.now() - timedelta(days=days)
return [r for r in results if r.published_date > cutoff]

Database Agents: Structured Internal Data

For questions about internal systems (“What’s the status of ticket #3042?” or “How many users signed up this week?”), agents need database access. Text-to-SQL enables natural language queries against structured data.

The Text-to-SQL Pattern

flowchart LR
    Q[Natural Language
Question] --> GEN[Generate SQL]
    GEN --> VAL[Validate &
Sanitize]
    VAL --> EXEC[Execute Query]
    EXEC --> SUM[Summarize
Results]
    SUM --> R[User Response]

    classDef orangeClass fill:#F39C12,stroke:#333,stroke-width:2px,color:#fff

    class VAL orangeClass

Building a Text-to-SQL Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
from dataclasses import dataclass, field
from typing import Dict, List, Tuple, Optional
import pandas as pd
from sqlalchemy import create_engine, text

@dataclass
class DatabaseSchema:
"""Database schema for validation"""
tables: Dict[str, List[str]] # table -> columns
relationships: Dict[str, str] # FK relationships

@dataclass
class QueryResult:
"""Result of a text-to-SQL operation"""
original_question: str
generated_sql: str
executed_sql: str
data: pd.DataFrame
row_count: int
summary: str
assumptions_made: List[str] = field(default_factory=list)


# Define schema for validation
FINANCE_SCHEMA = DatabaseSchema(
tables={
'transactions': [
'transaction_id', 'account_id', 'amount',
'transaction_type', 'created_at', 'status'
],
'accounts': [
'account_id', 'customer_id', 'account_type',
'balance', 'created_at'
],
'customers': [
'customer_id', 'name', 'email',
'created_at', 'status'
]
},
relationships={
'transactions.account_id': 'accounts.account_id',
'accounts.customer_id': 'customers.customer_id'
}
)


class TextToSQLAgent:
"""Agent for converting natural language to safe SQL queries"""

def __init__(self, engine, schema: DatabaseSchema):
self.engine = engine
self.schema = schema
self.query_history = []

def get_schema_description(self) -> str:
"""Format schema for LLM context"""
desc = "Available Tables:\n\n"
for table, columns in self.schema.tables.items():
desc += f"Table: {table}\n"
desc += f"Columns: {', '.join(columns)}\n\n"

desc += "Relationships:\n"
for fk, pk in self.schema.relationships.items():
desc += f"- {fk} -> {pk}\n"

return desc

def generate_sql(
self,
question: str,
previous_error: Optional[str] = None
) -> str:
"""Generate SQL from natural language"""

error_context = ""
if previous_error:
error_context = f"\nPrevious attempt failed: {previous_error}\nPlease correct the error."

prompt = f"""Convert this question to SQL:

Question: {question}

Database Schema:
{self.get_schema_description()}

Rules:
1. Use only SELECT statements
2. Always include LIMIT unless counting
3. Use proper JOINs based on relationships
4. Handle date/time with appropriate functions
{error_context}

Return ONLY the SQL query, no explanation."""

return llm.complete(prompt, temperature=0)

def apply_safety_checks(
self,
sql: str,
question: str
) -> Tuple[str, List[str]]:
"""Apply safety checks and modifications"""

assumptions = []
sql_upper = sql.upper().strip()

# 1. Only allow SELECT
if not sql_upper.startswith('SELECT'):
return (
"SELECT 'Error: Only SELECT queries allowed' as error;",
["Query rejected - only SELECT allowed"]
)

# 2. Block dangerous keywords
forbidden = ['INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE', 'ALTER']
for keyword in forbidden:
if keyword in sql_upper:
return (
f"SELECT 'Error: {keyword} not allowed' as error;",
[f"Query rejected - {keyword} not allowed"]
)

# 3. Ensure LIMIT is present
if 'LIMIT' not in sql_upper:
sql = sql.rstrip(';') + ' LIMIT 100;'
assumptions.append("Added LIMIT 100 for performance")

return sql, assumptions

def execute_query(self, sql: str) -> Tuple[pd.DataFrame, int]:
"""Execute query and return results"""
with self.engine.connect() as conn:
result = conn.execute(text(sql))
df = pd.DataFrame(result.fetchall(), columns=result.keys())
return df, len(df)

def generate_summary(
self,
question: str,
sql: str,
data: pd.DataFrame,
assumptions: List[str]
) -> str:
"""Generate natural language summary of results"""

data_preview = data.head(10).to_string()

prompt = f"""Summarize these database query results:

Original Question: {question}
SQL Executed: {sql}
Assumptions Made: {assumptions}

Results ({len(data)} rows):
{data_preview}

Provide a 2-3 sentence summary answering the original question."""

return llm.complete(prompt)

def process_question(self, question: str) -> QueryResult:
"""Main processing pipeline"""

# 1. Generate SQL with retry
sql = None
last_error = None

for attempt in range(3):
try:
sql = self.generate_sql(question, last_error)

# Basic syntax validation
if not sql.strip().upper().startswith('SELECT'):
last_error = "Query must start with SELECT"
continue

break
except Exception as e:
last_error = str(e)

if sql is None:
raise ValueError(f"Failed to generate SQL after 3 attempts")

# 2. Apply safety checks
safe_sql, assumptions = self.apply_safety_checks(sql, question)

# 3. Execute query
data, row_count = self.execute_query(safe_sql)

# 4. Generate summary
summary = self.generate_summary(question, safe_sql, data, assumptions)

# 5. Create result
result = QueryResult(
original_question=question,
generated_sql=sql,
executed_sql=safe_sql,
data=data,
row_count=row_count,
summary=summary,
assumptions_made=assumptions
)

self.query_history.append(result)
return result

Using the Database Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Initialize
engine = create_engine("sqlite:///finance.db")
agent = TextToSQLAgent(engine, FINANCE_SCHEMA)

# Query 1: Aggregation
result = agent.process_question(
"How many transactions were processed this week?"
)
print(result.summary)
# "This week saw 1,247 transactions processed, with a total
# value of $2.3M. Most were standard debits and credits."

# Query 2: Joining tables
result = agent.process_question(
"Show me the top 5 customers by transaction volume"
)
print(result.data)

# Safety test: Blocked operation
result = agent.process_question(
"Delete all failed transactions"
)
print(result.summary)
# "Error: DELETE operations not allowed"

Database Security Essentials

Read-Only Access: Use restricted database credentials

1
2
3
4
engine = create_engine(
"postgresql://readonly_user:password@host/db",
connect_args={"options": "-c statement_timeout=30000"}
)

Query Limits: Always enforce row limits

1
2
3
4
def enforce_limit(sql: str, max_rows: int = 1000) -> str:
if 'LIMIT' not in sql.upper():
return sql.rstrip(';') + f' LIMIT {max_rows};'
return sql

Audit Logging: Track all queries

1
2
3
4
5
6
7
def log_query(question: str, sql: str, user_id: str):
audit_log.insert({
"timestamp": datetime.now(),
"user_id": user_id,
"question": question,
"sql": sql
})

Combining Data Sources

The most powerful agents combine multiple data sources:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class FinancialResearchAgent:
"""Agent combining APIs, web search, and databases"""

def __init__(self):
self.stock_api = StockPriceAgent()
self.search_agent = CreditCardSearchAgent()
self.db_agent = TextToSQLAgent(engine, schema)

def research(self, query: str) -> str:
"""Route query to appropriate data source"""

# Classify query type
if self._is_market_data_query(query):
return self.stock_api.process_request(query)

elif self._is_web_research_query(query):
comparison = self.search_agent.find_best_cards(query)
return self._format_comparison(comparison)

elif self._is_internal_data_query(query):
result = self.db_agent.process_question(query)
return result.summary

else:
return "I can help with market data, financial research, or internal reports."

def _is_market_data_query(self, query: str) -> bool:
keywords = ['price', 'stock', 'shares', 'buy', 'sell']
return any(kw in query.lower() for kw in keywords)

def _is_web_research_query(self, query: str) -> bool:
keywords = ['best', 'compare', 'recommend', 'credit card', 'rates']
return any(kw in query.lower() for kw in keywords)

def _is_internal_data_query(self, query: str) -> bool:
keywords = ['how many', 'show me', 'list', 'report', 'this week']
return any(kw in query.lower() for kw in keywords)

Takeaways

  1. External APIs provide real-time market data - handle authentication, rate limits, and failures gracefully

  2. Web search enables grounded responses from unstructured content - always validate sources and cite evidence

  3. Text-to-SQL unlocks internal databases - implement strict safety checks to prevent dangerous operations

  4. Combine data sources for comprehensive financial intelligence - route queries to appropriate sources

  5. Security is paramount - use read-only credentials, enforce limits, and log all queries

  6. Handle failures gracefully - implement retries, fallbacks, and clear error messages


This is the tenth post in my Applied Agentic AI for Finance series. Next: RAG and Evaluation for Financial Agents where we’ll explore retrieval-augmented generation and performance metrics.

State and Memory for Trading Agents RAG and Evaluation for Financial Agents

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×