finance-bot / modules /analysis_pipeline.py
tosanoob's picture
feat: update report format
dec9e8e
# modules/analysis_pipeline.py
import os
import asyncio
import pandas as pd
from datetime import datetime
import google.generativeai as genai
from dotenv import load_dotenv
from .api_clients import AlphaVantageClient, NewsAPIClient, MarketauxClient, get_price_history
import time
# Load environment variables and configure AI
load_dotenv()
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
MODEL_NAME = os.getenv("GEMINI_MODEL", "gemini-2.5-flash")
# Define the analysis pipeline class
class StockAnalysisPipeline:
"""Pipeline for generating comprehensive stock analysis reports"""
def __init__(self, symbol):
"""Initialize the pipeline with a stock symbol"""
self.symbol = symbol.upper() # Convert to uppercase
self.company_data = {}
self.analysis_results = {}
self.ai_model = genai.GenerativeModel(model_name=MODEL_NAME)
async def run_analysis(self):
"""Run the full analysis pipeline in an interleaved pattern"""
print(f"Starting analysis pipeline for {self.symbol}...")
# 1. Get company overview and financial statements first
await self._get_company_overview()
if hasattr(self, 'company_name'):
print(f"Analyzing {self.symbol} ({self.company_name})")
else:
self.company_name = self.symbol
print(f"Analyzing {self.symbol}")
# 2. Get and analyze financial statements
print("Getting financial data...")
await self._get_financial_statements()
# 3. Run financial health analysis with Gemini
print("Analyzing financial health...")
self.analysis_results['financial_health'] = await self._analyze_financial_health()
# 4. Get and analyze market news and sentiment
print("Getting news and sentiment data...")
await self._get_market_sentiment_and_news()
# 5. Run news sentiment analysis with Gemini
print("Analyzing news and sentiment...")
self.analysis_results['news_sentiment'] = await self._analyze_news_sentiment()
# 6. Get quote data and price history
print("Getting quote and price data...")
await self._get_analyst_ratings()
await self._get_price_data()
# 7. Run expert opinion analysis with Gemini
print("Analyzing market data...")
self.analysis_results['expert_opinion'] = await self._analyze_expert_opinion()
# 8. Create final summary and recommendation
print("Creating final summary and recommendation...")
self.analysis_results['summary'] = await self._create_summary()
# 9. Return the complete analysis
return {
'symbol': self.symbol,
'company_name': self.company_name,
'analysis': self.analysis_results,
'price_data': self.company_data.get('price_data', {}),
'overview': self.company_data.get('overview', {})
}
async def _get_company_overview(self):
"""Get company overview information"""
self.company_data['overview'] = await AlphaVantageClient.get_company_overview(self.symbol)
if self.company_data['overview'] and 'Name' in self.company_data['overview']:
self.company_name = self.company_data['overview']['Name']
else:
self.company_name = self.symbol
print(f"Retrieved company overview for {self.symbol}")
async def _get_financial_statements(self):
"""Get company financial statements"""
# Run these in parallel
income_stmt_task = AlphaVantageClient.get_income_statement(self.symbol)
balance_sheet_task = AlphaVantageClient.get_balance_sheet(self.symbol)
cash_flow_task = AlphaVantageClient.get_cash_flow(self.symbol)
# Wait for all tasks to complete
results = await asyncio.gather(
income_stmt_task,
balance_sheet_task,
cash_flow_task
)
# Store results
self.company_data['income_statement'] = results[0]
self.company_data['balance_sheet'] = results[1]
self.company_data['cash_flow'] = results[2]
print(f"Retrieved financial statements for {self.symbol}")
async def _get_market_sentiment_and_news(self):
"""Get market sentiment and news about the company"""
# Get news from multiple sources in parallel
alpha_news_task = AlphaVantageClient.get_news_sentiment(self.symbol)
news_api_task = NewsAPIClient.get_company_news(self.company_name if hasattr(self, 'company_name') else self.symbol)
marketaux_task = MarketauxClient.get_company_news(self.symbol)
# Wait for all tasks to complete
results = await asyncio.gather(
alpha_news_task,
news_api_task,
marketaux_task
)
# Store results
self.company_data['alpha_news'] = results[0]
self.company_data['news_api'] = results[1]
self.company_data['marketaux'] = results[2]
print(f"Retrieved news and sentiment for {self.symbol}")
async def _get_analyst_ratings(self):
"""Get current stock quotes instead of analyst ratings"""
self.company_data['quote_data'] = await AlphaVantageClient.get_global_quote(self.symbol)
print(f"Retrieved quote data for {self.symbol}")
async def _get_price_data(self):
"""Get historical price data"""
# Get price data for different time periods
periods = ['1_month', '3_months', '1_year']
price_data = {}
# Sử dụng phương thức đồng bộ thông thường vì get_price_history không còn async
for period in periods:
price_data[period] = get_price_history(self.symbol, period)
self.company_data['price_data'] = price_data
print(f"Retrieved price history for {self.symbol}")
async def _analyze_financial_health(self):
"""Analyze company's financial health using AI"""
# Add a small delay before API call to Gemini to avoid rate limiting
await asyncio.sleep(1)
# Prepare financial data for the AI
financial_data = {
'overview': self.company_data.get('overview', {}),
'income_statement': self.company_data.get('income_statement', {}),
'balance_sheet': self.company_data.get('balance_sheet', {}),
'cash_flow': self.company_data.get('cash_flow', {})
}
# Create prompt for financial analysis
prompt = f"""
You are a senior financial analyst. Analyze the financial health of {self.symbol} based on the following data:
{financial_data}
Provide a detailed analysis covering:
1. Overall financial condition overview
2. Key financial ratios analysis (P/E, ROE, Debt/Equity, etc.)
3. Revenue and profit growth assessment
4. Cash flow and liquidity assessment
5. Key financial strengths and weaknesses
Format requirements:
- Write in professional, concise financial reporting style
- Use Markdown formatting with appropriate headers and bullet points
- DO NOT include any introductory phrases like "Hello," "I'm happy to provide," etc.
- DO NOT include any concluding phrases
- Present only factual analysis based on the data
- Present the information directly and objectively
- Prefer using the correct currency text instead of the symbol. For example, use USD instead of $
"""
# Get AI response
response = self.ai_model.generate_content(prompt)
return response.text
async def _analyze_news_sentiment(self):
"""Analyze news and market sentiment using AI"""
# Add a small delay before API call to Gemini to avoid rate limiting
await asyncio.sleep(1)
# Prepare news data for the AI
news_data = {
'alpha_news': self.company_data.get('alpha_news', {}),
'news_api': self.company_data.get('news_api', {}),
'marketaux': self.company_data.get('marketaux', {})
}
# Create prompt for news analysis
prompt = f"""
You are a market analyst. Analyze news and market sentiment about {self.symbol} based on the following data:
{news_data}
Provide a detailed analysis covering:
1. Summary of key recent news about the company
2. Important events that could impact stock price
3. Overall market sentiment analysis (positive/negative/neutral)
4. Risk factors identified in news
Format requirements:
- Write in professional, concise financial reporting style
- Use Markdown formatting with appropriate headers and bullet points
- DO NOT include any introductory phrases like "Hello," "I'm happy to provide," etc.
- DO NOT include any concluding phrases
- Present only factual analysis based on the data
- Present the information directly and objectively
- Prefer using the correct currency text instead of the symbol. For example, use USD instead of $
"""
# Get AI response
response = self.ai_model.generate_content(prompt)
return response.text
async def _analyze_expert_opinion(self):
"""Analyze current stock quote and price data"""
# Add a small delay before API call to Gemini to avoid rate limiting
await asyncio.sleep(1)
# Prepare data for the AI
quote_data = self.company_data.get('quote_data', {})
price_data = self.company_data.get('price_data', {})
overview = self.company_data.get('overview', {})
# Create prompt for market analysis with chart descriptions
chart_descriptions = []
# Add descriptions for each timeframe chart
for period, period_name in [('1_month', 'last month'), ('3_months', 'last 3 months'), ('1_year', 'last year')]:
if period in price_data and 'values' in price_data[period] and price_data[period]['values']:
values = price_data[period]['values']
# Get first and last price for the period
first_price = float(values[-1]['close']) # Reversed order in the API
last_price = float(values[0]['close'])
price_change = ((last_price - first_price) / first_price) * 100
# Calculate volatility (standard deviation)
if len(values) > 1:
closes = [float(day['close']) for day in values]
volatility = pd.Series(closes).pct_change().std() * 100 # Convert to percentage
else:
volatility = 0.0
# Detect trend (simple linear regression slope)
if len(values) > 2:
closes = [float(day['close']) for day in values]
dates = list(range(len(closes)))
slope = pd.Series(closes).corr(pd.Series(dates))
trend = "strong upward" if slope > 0.7 else \
"upward" if slope > 0.3 else \
"relatively flat" if slope > -0.3 else \
"downward" if slope > -0.7 else \
"strong downward"
else:
trend = "insufficient data to determine"
# Get price range
prices = [float(day['close']) for day in values]
min_price = min(prices) if prices else 0
max_price = max(prices) if prices else 0
price_range = max_price - min_price
# Find significant price movements
significant_changes = []
if len(values) > 5:
for i in range(1, len(values)):
prev_close = float(values[i]['close'])
curr_close = float(values[i-1]['close'])
daily_change = ((curr_close - prev_close) / prev_close) * 100
if abs(daily_change) > 2.0: # More than 2% daily change
date = values[i-1]['datetime']
significant_changes.append(f"On {date}, there was a {daily_change:.2f}% {'increase' if daily_change > 0 else 'decrease'}")
# Limit to 3 most significant changes
significant_changes = significant_changes[:3]
# Create chart description
description = f"""
Chart for {period_name}:
- Overall trend: {trend}
- Price change: {price_change:.2f}% ({first_price:.2f} to {last_price:.2f})
- Volatility: {volatility:.2f}%
- Price range: {min_price:.2f} to {max_price:.2f} (range: {price_range:.2f})
"""
# Add significant changes if any
if significant_changes:
description += "- Significant price movements:\n * " + "\n * ".join(significant_changes)
chart_descriptions.append(description)
# Create prompt for market analysis
prompt = f"""
You are a stock market analyst. Analyze the current stock data for {self.symbol} based on the following information:
Current Quote Data: {quote_data}
Company Overview: {overview}
Chart Analysis:
{chr(10).join(chart_descriptions)}
Provide a detailed analysis covering:
1. Current stock performance overview
2. Price trends and technical indicators based on the charts
3. Price comparison with sector averages and benchmarks
4. Potential price movement factors
5. Technical analysis of support and resistance levels
6. Trading volume patterns and their significance
Format requirements:
- Write in professional, concise financial reporting style
- Use Markdown formatting with appropriate headers and bullet points
- DO NOT include any introductory phrases like "Hello," "I'm happy to provide," etc.
- DO NOT include any concluding phrases
- Present only factual analysis based on the data
- Present the information directly and objectively
- Prefer using the correct currency text instead of the symbol. For example, use USD instead of $
"""
# Get AI response
response = self.ai_model.generate_content(prompt)
return response.text
async def _create_summary(self):
"""Create a comprehensive summary and investment recommendation"""
# Add a small delay before API call to Gemini to avoid rate limiting
await asyncio.sleep(1)
# Combine all analyses
combined_analysis = {
'financial_health': self.analysis_results.get('financial_health', ''),
'news_sentiment': self.analysis_results.get('news_sentiment', ''),
'expert_opinion': self.analysis_results.get('expert_opinion', '')
}
# Add overview data
overview = self.company_data.get('overview', {})
# Create prompt for final summary
prompt = f"""
You are an investment advisor. Based on the detailed analyses below for {self.symbol} ({overview.get('Name', '')}),
synthesize a final report and investment recommendation:
=== Company Basic Information ===
{overview}
=== Financial Health Analysis ===
{combined_analysis['financial_health']}
=== News and Market Sentiment Analysis ===
{combined_analysis['news_sentiment']}
=== Market Analysis ===
{combined_analysis['expert_opinion']}
Provide:
1. Brief company and industry overview
2. Summary of key strengths and weaknesses from the analyses above
3. Risk and opportunity assessment
4. Investment recommendation (BULLISH/BEARISH/NEUTRAL) with rationale
5. Key factors to monitor going forward
Format requirements:
- Write in professional, concise financial reporting style
- Use Markdown formatting with appropriate headers and bullet points
- DO NOT include any introductory phrases like "Hello," "I'm happy to provide," etc.
- DO NOT include any concluding phrases or sign-offs
- Present the report directly and objectively
- The report should be comprehensive but concise
- Prefer using the correct currency text instead of the symbol. For example, use USD instead of $
"""
# Get AI response
response = self.ai_model.generate_content(prompt)
return response.text
# Main function to run the pipeline
async def run_analysis_pipeline(symbol):
"""Run the complete stock analysis pipeline for a given symbol"""
pipeline = StockAnalysisPipeline(symbol)
return await pipeline.run_analysis()
# Function to generate HTML report from analysis results
import altair as alt
import base64
import io
from PIL import Image
# Function to convert Altair chart to base64 image
def chart_to_base64(chart):
"""Convert Altair chart to base64-encoded PNG image"""
# Save chart as PNG
import io
import base64
from PIL import Image
try:
# Sử dụng Altair's save method
import tempfile
# Tạo file tạm thời để lưu chart
with tempfile.NamedTemporaryFile(suffix='.png') as tmpfile:
# Lưu biểu đồ dưới dạng PNG
chart.save(tmpfile.name)
# Đọc file PNG và mã hóa base64
with open(tmpfile.name, 'rb') as f:
image_bytes = f.read()
base64_image = base64.b64encode(image_bytes).decode('utf-8')
return base64_image
except Exception as e:
# Backup method - tạo hình ảnh đơn giản với thông tin chart
try:
print(f"Chart rendering failed: {str(e)}")
# Tạo một hình ảnh thay thế đơn giản
width, height = 800, 400
# Tạo hình ảnh trắng
image = Image.new("RGB", (width, height), (255, 255, 255))
# Lưu hình ảnh vào buffer
buffer = io.BytesIO()
image.save(buffer, format="PNG")
image_bytes = buffer.getvalue()
# Mã hóa base64
base64_image = base64.b64encode(image_bytes).decode('utf-8')
return base64_image
except:
return None
# Function to create price chart from price data
def create_price_chart(price_data, period, symbol):
"""Create a price chart from the price data"""
if 'values' not in price_data:
return None
df = pd.DataFrame(price_data['values'])
if df.empty:
return None
df['datetime'] = pd.to_datetime(df['datetime'])
df['close'] = pd.to_numeric(df['close'])
# Map period to title
title_map = {
'1_month': f'{symbol} - Price over the last month',
'3_months': f'{symbol} - Price over the last 3 months',
'1_year': f'{symbol} - Price over the last year'
}
# Create the Altair chart
chart = alt.Chart(df).mark_line(color='#3498db').encode(
x=alt.X('datetime:T', title='Time'),
y=alt.Y('close:Q', title='Closing Price', scale=alt.Scale(zero=False)),
).properties(
title=title_map.get(period, f'Stock price ({period})'),
width=800,
height=400
)
# Add a point for the last day
last_point = alt.Chart(df.iloc[[-1]]).mark_circle(size=100, color='red').encode(
x='datetime:T',
y='close:Q',
tooltip=[
alt.Tooltip('datetime:T', title='Date', format='%d/%m/%Y'),
alt.Tooltip('close:Q', title='Closing Price', format=',.2f'),
alt.Tooltip('volume:Q', title='Volume', format=',.0f')
]
)
# Combine the line and point charts
final_chart = chart + last_point
return final_chart
# Sửa function generate_html_report để thêm biểu đồ
def generate_html_report(analysis_results):
"""Generate HTML report from analysis results"""
# Import markdown module
import markdown
import re
from markdown.extensions.tables import TableExtension
from markdown.extensions.fenced_code import FencedCodeExtension
# Get current date for the report
current_date = datetime.now().strftime("%d/%m/%Y")
symbol = analysis_results['symbol']
company_name = analysis_results['company_name']
import json
json.dump(analysis_results['analysis'], open('analysis_results_before.json', 'w'), ensure_ascii=False, indent=4)
# Pre-process markdown text to fix bullet point styling
def process_markdown_text(text):
# First, properly format bullet points with '*'
# Pattern: "\n* Item" -> "\n\n- Item"
text = re.sub(r'\n\*\s+(.*?)$', r'\n\n- \1', text, flags=re.MULTILINE)
# Pattern: Replace $ with USD
text = text.replace('$', 'USD ')
return text
# Process and convert markdown to HTML
summary_text = process_markdown_text(analysis_results['analysis']['summary'])
financial_text = process_markdown_text(analysis_results['analysis']['financial_health'])
news_text = process_markdown_text(analysis_results['analysis']['news_sentiment'])
expert_text = process_markdown_text(analysis_results['analysis']['expert_opinion'])
import json
json.dump({'summary': summary_text, 'financial': financial_text, 'news': news_text, 'expert': expert_text}, open('analysis_results.json', 'w'), ensure_ascii=False, indent=4)
# Convert to HTML
summary_html = markdown.markdown(
summary_text,
extensions=['tables', 'fenced_code']
)
financial_html = markdown.markdown(
financial_text,
extensions=['tables', 'fenced_code']
)
news_html = markdown.markdown(
news_text,
extensions=['tables', 'fenced_code']
)
expert_html = markdown.markdown(
expert_text,
extensions=['tables', 'fenced_code']
)
# Generate chart images
price_charts_html = ""
if 'price_data' in analysis_results:
price_data = analysis_results['price_data']
periods = ['1_month', '3_months', '1_year']
for period in periods:
if period in price_data:
chart = create_price_chart(price_data[period], period, symbol)
if chart:
try:
base64_image = chart_to_base64(chart)
if base64_image:
price_charts_html += f"""
<div class="chart-container">
<h3>Price Chart - {period.replace('_', ' ').title()}</h3>
<img src="data:image/png;base64,{base64_image}" alt="{symbol} {period} chart"
style="width: 100%; max-width: 800px; margin: 0 auto; display: block;">
</div>
"""
except Exception as e:
print(f"Error generating chart image: {e}")
# Create HTML content
html_content = f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Stock Analysis Report {symbol}</title>
<style>
body {{
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
line-height: 1.6;
color: #333;
max-width: 1200px;
margin: 0 auto;
padding: 20px;
background-color: #f9f9f9;
}}
.report-header {{
background-color: #2c3e50;
color: white;
padding: 20px;
border-radius: 5px 5px 0 0;
position: relative;
}}
.report-date {{
position: absolute;
top: 20px;
right: 20px;
font-size: 14px;
}}
.report-title {{
margin: 0;
padding: 0;
font-size: 24px;
color: white;
}}
.report-subtitle {{
margin: 5px 0 0;
padding: 0;
font-size: 16px;
font-weight: normal;
color: white;
}}
.report-body {{
background-color: white;
padding: 20px;
border-radius: 0 0 5px 5px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}}
.section {{
margin-bottom: 20px;
border-bottom: 1px solid #eee;
padding-bottom: 20px;
}}
h1, h2, h3, h4, h5, h6 {{
color: #2c3e50;
margin-top: 1.5em;
margin-bottom: 0.5em;
}}
h1 {{ font-size: 24px; }}
h2 {{
font-size: 20px;
border-bottom: 2px solid #3498db;
padding-bottom: 5px;
color: #2c3e50 !important;
}}
h3 {{ font-size: 18px; color: #3498db; }}
h4 {{ font-size: 16px; }}
p {{ margin: 0.8em 0; }}
ul, ol {{
margin: 1em 0 1em 2em;
padding-left: 0;
}}
li {{
margin-bottom: 0.8em;
line-height: 1.5;
}}
li strong {{
color: #2c3e50;
}}
table {{
width: 100%;
border-collapse: collapse;
margin: 15px 0;
}}
th, td {{
padding: 12px;
border: 1px solid #ddd;
text-align: left;
}}
th {{
background-color: #f2f2f2;
font-weight: bold;
}}
tr:nth-child(even) {{
background-color: #f9f9f9;
}}
.bullish {{
color: #27ae60;
font-weight: bold;
}}
.bearish {{
color: #e74c3c;
font-weight: bold;
}}
.neutral {{
color: #f39c12;
font-weight: bold;
}}
code {{
background: #f8f8f8;
border: 1px solid #ddd;
border-radius: 3px;
padding: 0 3px;
font-family: Consolas, monospace;
}}
pre {{
background: #f8f8f8;
border: 1px solid #ddd;
border-radius: 3px;
padding: 10px;
overflow-x: auto;
}}
blockquote {{
margin: 1em 0;
padding: 0 1em;
color: #666;
border-left: 4px solid #ddd;
}}
hr {{
border: 0;
border-top: 1px solid #eee;
margin: 20px 0;
}}
.footer {{
text-align: center;
margin-top: 40px;
padding-top: 20px;
font-size: 12px;
color: #777;
border-top: 1px solid #eee;
}}
/* Custom styling for bullet points */
ul {{
list-style-type: disc;
}}
ul ul {{
list-style-type: circle;
}}
ul ul ul {{
list-style-type: square;
}}
/* Fix for section headers to ensure they're black */
.section h2 {{
color: #2c3e50 !important;
}}
/* Fix for investment report headers */
strong {{
color: inherit;
}}
/* Chart container styling */
.chart-container {{
margin: 30px 0;
text-align: center;
}}
.chart-container h3 {{
text-align: center;
}}
</style>
</head>
<body>
<div class="report-header">
<div class="report-date">Date: {current_date}</div>
<h1 class="report-title">Stock Analysis Report: {symbol}</h1>
<h2 class="report-subtitle">{company_name}</h2>
</div>
<div class="report-body">
<div class="section">
<h2>Summary & Recommendation</h2>
{summary_html}
</div>
<div class="section">
<h2>Financial Health Analysis</h2>
{financial_html}
</div>
<div class="section">
<h2>News & Market Sentiment Analysis</h2>
{news_html}
</div>
<div class="section">
<h2>Market Analysis</h2>
{expert_html}
</div>
<div class="section">
<h2>Price Charts</h2>
{price_charts_html}
</div>
<div class="footer">
This report was automatically generated by AI Financial Dashboard. Information is for reference only.
</div>
</div>
</body>
</html>
"""
return html_content
# Function to generate and save PDF report
def generate_pdf_report(analysis_results, output_path):
"""Generate and save PDF report directly"""
from weasyprint import HTML
# Generate HTML content
html_content = generate_html_report(analysis_results)
# Save HTML preview for debugging
with open("report_preview.html", "w", encoding="utf-8") as f:
f.write(html_content)
# Generate PDF
try:
HTML(string=html_content).write_pdf(output_path)
print(f"PDF report saved successfully at: {output_path}")
return True
except Exception as e:
print(f"Error generating PDF report: {e}")
return False