# modules/analysis_pipeline.py import os import asyncio import pandas as pd from datetime import datetime import google.generativeai as genai from dotenv import load_dotenv from .api_clients import AlphaVantageClient, NewsAPIClient, MarketauxClient, get_price_history import time # Load environment variables and configure AI load_dotenv() genai.configure(api_key=os.getenv("GEMINI_API_KEY")) MODEL_NAME = os.getenv("GEMINI_MODEL", "gemini-2.5-flash") # Define the analysis pipeline class class StockAnalysisPipeline: """Pipeline for generating comprehensive stock analysis reports""" def __init__(self, symbol): """Initialize the pipeline with a stock symbol""" self.symbol = symbol.upper() # Convert to uppercase self.company_data = {} self.analysis_results = {} self.ai_model = genai.GenerativeModel(model_name=MODEL_NAME) async def run_analysis(self): """Run the full analysis pipeline in an interleaved pattern""" print(f"Starting analysis pipeline for {self.symbol}...") # 1. Get company overview and financial statements first await self._get_company_overview() if hasattr(self, 'company_name'): print(f"Analyzing {self.symbol} ({self.company_name})") else: self.company_name = self.symbol print(f"Analyzing {self.symbol}") # 2. Get and analyze financial statements print("Getting financial data...") await self._get_financial_statements() # 3. Run financial health analysis with Gemini print("Analyzing financial health...") self.analysis_results['financial_health'] = await self._analyze_financial_health() # 4. Get and analyze market news and sentiment print("Getting news and sentiment data...") await self._get_market_sentiment_and_news() # 5. Run news sentiment analysis with Gemini print("Analyzing news and sentiment...") self.analysis_results['news_sentiment'] = await self._analyze_news_sentiment() # 6. Get quote data and price history print("Getting quote and price data...") await self._get_analyst_ratings() await self._get_price_data() # 7. Run expert opinion analysis with Gemini print("Analyzing market data...") self.analysis_results['expert_opinion'] = await self._analyze_expert_opinion() # 8. Create final summary and recommendation print("Creating final summary and recommendation...") self.analysis_results['summary'] = await self._create_summary() # 9. Return the complete analysis return { 'symbol': self.symbol, 'company_name': self.company_name, 'analysis': self.analysis_results, 'price_data': self.company_data.get('price_data', {}), 'overview': self.company_data.get('overview', {}) } async def _get_company_overview(self): """Get company overview information""" self.company_data['overview'] = await AlphaVantageClient.get_company_overview(self.symbol) if self.company_data['overview'] and 'Name' in self.company_data['overview']: self.company_name = self.company_data['overview']['Name'] else: self.company_name = self.symbol print(f"Retrieved company overview for {self.symbol}") async def _get_financial_statements(self): """Get company financial statements""" # Run these in parallel income_stmt_task = AlphaVantageClient.get_income_statement(self.symbol) balance_sheet_task = AlphaVantageClient.get_balance_sheet(self.symbol) cash_flow_task = AlphaVantageClient.get_cash_flow(self.symbol) # Wait for all tasks to complete results = await asyncio.gather( income_stmt_task, balance_sheet_task, cash_flow_task ) # Store results self.company_data['income_statement'] = results[0] self.company_data['balance_sheet'] = results[1] self.company_data['cash_flow'] = results[2] print(f"Retrieved financial statements for {self.symbol}") async def _get_market_sentiment_and_news(self): """Get market sentiment and news about the company""" # Get news from multiple sources in parallel alpha_news_task = AlphaVantageClient.get_news_sentiment(self.symbol) news_api_task = NewsAPIClient.get_company_news(self.company_name if hasattr(self, 'company_name') else self.symbol) marketaux_task = MarketauxClient.get_company_news(self.symbol) # Wait for all tasks to complete results = await asyncio.gather( alpha_news_task, news_api_task, marketaux_task ) # Store results self.company_data['alpha_news'] = results[0] self.company_data['news_api'] = results[1] self.company_data['marketaux'] = results[2] print(f"Retrieved news and sentiment for {self.symbol}") async def _get_analyst_ratings(self): """Get current stock quotes instead of analyst ratings""" self.company_data['quote_data'] = await AlphaVantageClient.get_global_quote(self.symbol) print(f"Retrieved quote data for {self.symbol}") async def _get_price_data(self): """Get historical price data""" # Get price data for different time periods periods = ['1_month', '3_months', '1_year'] price_data = {} # Sử dụng phương thức đồng bộ thông thường vì get_price_history không còn async for period in periods: price_data[period] = get_price_history(self.symbol, period) self.company_data['price_data'] = price_data print(f"Retrieved price history for {self.symbol}") async def _analyze_financial_health(self): """Analyze company's financial health using AI""" # Add a small delay before API call to Gemini to avoid rate limiting await asyncio.sleep(1) # Prepare financial data for the AI financial_data = { 'overview': self.company_data.get('overview', {}), 'income_statement': self.company_data.get('income_statement', {}), 'balance_sheet': self.company_data.get('balance_sheet', {}), 'cash_flow': self.company_data.get('cash_flow', {}) } # Create prompt for financial analysis prompt = f""" You are a senior financial analyst. Analyze the financial health of {self.symbol} based on the following data: {financial_data} Provide a detailed analysis covering: 1. Overall financial condition overview 2. Key financial ratios analysis (P/E, ROE, Debt/Equity, etc.) 3. Revenue and profit growth assessment 4. Cash flow and liquidity assessment 5. Key financial strengths and weaknesses Format requirements: - Write in professional, concise financial reporting style - Use Markdown formatting with appropriate headers and bullet points - DO NOT include any introductory phrases like "Hello," "I'm happy to provide," etc. - DO NOT include any concluding phrases - Present only factual analysis based on the data - Present the information directly and objectively - Prefer using the correct currency text instead of the symbol. For example, use USD instead of $ """ # Get AI response response = self.ai_model.generate_content(prompt) return response.text async def _analyze_news_sentiment(self): """Analyze news and market sentiment using AI""" # Add a small delay before API call to Gemini to avoid rate limiting await asyncio.sleep(1) # Prepare news data for the AI news_data = { 'alpha_news': self.company_data.get('alpha_news', {}), 'news_api': self.company_data.get('news_api', {}), 'marketaux': self.company_data.get('marketaux', {}) } # Create prompt for news analysis prompt = f""" You are a market analyst. Analyze news and market sentiment about {self.symbol} based on the following data: {news_data} Provide a detailed analysis covering: 1. Summary of key recent news about the company 2. Important events that could impact stock price 3. Overall market sentiment analysis (positive/negative/neutral) 4. Risk factors identified in news Format requirements: - Write in professional, concise financial reporting style - Use Markdown formatting with appropriate headers and bullet points - DO NOT include any introductory phrases like "Hello," "I'm happy to provide," etc. - DO NOT include any concluding phrases - Present only factual analysis based on the data - Present the information directly and objectively - Prefer using the correct currency text instead of the symbol. For example, use USD instead of $ """ # Get AI response response = self.ai_model.generate_content(prompt) return response.text async def _analyze_expert_opinion(self): """Analyze current stock quote and price data""" # Add a small delay before API call to Gemini to avoid rate limiting await asyncio.sleep(1) # Prepare data for the AI quote_data = self.company_data.get('quote_data', {}) price_data = self.company_data.get('price_data', {}) overview = self.company_data.get('overview', {}) # Create prompt for market analysis with chart descriptions chart_descriptions = [] # Add descriptions for each timeframe chart for period, period_name in [('1_month', 'last month'), ('3_months', 'last 3 months'), ('1_year', 'last year')]: if period in price_data and 'values' in price_data[period] and price_data[period]['values']: values = price_data[period]['values'] # Get first and last price for the period first_price = float(values[-1]['close']) # Reversed order in the API last_price = float(values[0]['close']) price_change = ((last_price - first_price) / first_price) * 100 # Calculate volatility (standard deviation) if len(values) > 1: closes = [float(day['close']) for day in values] volatility = pd.Series(closes).pct_change().std() * 100 # Convert to percentage else: volatility = 0.0 # Detect trend (simple linear regression slope) if len(values) > 2: closes = [float(day['close']) for day in values] dates = list(range(len(closes))) slope = pd.Series(closes).corr(pd.Series(dates)) trend = "strong upward" if slope > 0.7 else \ "upward" if slope > 0.3 else \ "relatively flat" if slope > -0.3 else \ "downward" if slope > -0.7 else \ "strong downward" else: trend = "insufficient data to determine" # Get price range prices = [float(day['close']) for day in values] min_price = min(prices) if prices else 0 max_price = max(prices) if prices else 0 price_range = max_price - min_price # Find significant price movements significant_changes = [] if len(values) > 5: for i in range(1, len(values)): prev_close = float(values[i]['close']) curr_close = float(values[i-1]['close']) daily_change = ((curr_close - prev_close) / prev_close) * 100 if abs(daily_change) > 2.0: # More than 2% daily change date = values[i-1]['datetime'] significant_changes.append(f"On {date}, there was a {daily_change:.2f}% {'increase' if daily_change > 0 else 'decrease'}") # Limit to 3 most significant changes significant_changes = significant_changes[:3] # Create chart description description = f""" Chart for {period_name}: - Overall trend: {trend} - Price change: {price_change:.2f}% ({first_price:.2f} to {last_price:.2f}) - Volatility: {volatility:.2f}% - Price range: {min_price:.2f} to {max_price:.2f} (range: {price_range:.2f}) """ # Add significant changes if any if significant_changes: description += "- Significant price movements:\n * " + "\n * ".join(significant_changes) chart_descriptions.append(description) # Create prompt for market analysis prompt = f""" You are a stock market analyst. Analyze the current stock data for {self.symbol} based on the following information: Current Quote Data: {quote_data} Company Overview: {overview} Chart Analysis: {chr(10).join(chart_descriptions)} Provide a detailed analysis covering: 1. Current stock performance overview 2. Price trends and technical indicators based on the charts 3. Price comparison with sector averages and benchmarks 4. Potential price movement factors 5. Technical analysis of support and resistance levels 6. Trading volume patterns and their significance Format requirements: - Write in professional, concise financial reporting style - Use Markdown formatting with appropriate headers and bullet points - DO NOT include any introductory phrases like "Hello," "I'm happy to provide," etc. - DO NOT include any concluding phrases - Present only factual analysis based on the data - Present the information directly and objectively - Prefer using the correct currency text instead of the symbol. For example, use USD instead of $ """ # Get AI response response = self.ai_model.generate_content(prompt) return response.text async def _create_summary(self): """Create a comprehensive summary and investment recommendation""" # Add a small delay before API call to Gemini to avoid rate limiting await asyncio.sleep(1) # Combine all analyses combined_analysis = { 'financial_health': self.analysis_results.get('financial_health', ''), 'news_sentiment': self.analysis_results.get('news_sentiment', ''), 'expert_opinion': self.analysis_results.get('expert_opinion', '') } # Add overview data overview = self.company_data.get('overview', {}) # Create prompt for final summary prompt = f""" You are an investment advisor. Based on the detailed analyses below for {self.symbol} ({overview.get('Name', '')}), synthesize a final report and investment recommendation: === Company Basic Information === {overview} === Financial Health Analysis === {combined_analysis['financial_health']} === News and Market Sentiment Analysis === {combined_analysis['news_sentiment']} === Market Analysis === {combined_analysis['expert_opinion']} Provide: 1. Brief company and industry overview 2. Summary of key strengths and weaknesses from the analyses above 3. Risk and opportunity assessment 4. Investment recommendation (BULLISH/BEARISH/NEUTRAL) with rationale 5. Key factors to monitor going forward Format requirements: - Write in professional, concise financial reporting style - Use Markdown formatting with appropriate headers and bullet points - DO NOT include any introductory phrases like "Hello," "I'm happy to provide," etc. - DO NOT include any concluding phrases or sign-offs - Present the report directly and objectively - The report should be comprehensive but concise - Prefer using the correct currency text instead of the symbol. For example, use USD instead of $ """ # Get AI response response = self.ai_model.generate_content(prompt) return response.text # Main function to run the pipeline async def run_analysis_pipeline(symbol): """Run the complete stock analysis pipeline for a given symbol""" pipeline = StockAnalysisPipeline(symbol) return await pipeline.run_analysis() # Function to generate HTML report from analysis results import altair as alt import base64 import io from PIL import Image # Function to convert Altair chart to base64 image def chart_to_base64(chart): """Convert Altair chart to base64-encoded PNG image""" # Save chart as PNG import io import base64 from PIL import Image try: # Sử dụng Altair's save method import tempfile # Tạo file tạm thời để lưu chart with tempfile.NamedTemporaryFile(suffix='.png') as tmpfile: # Lưu biểu đồ dưới dạng PNG chart.save(tmpfile.name) # Đọc file PNG và mã hóa base64 with open(tmpfile.name, 'rb') as f: image_bytes = f.read() base64_image = base64.b64encode(image_bytes).decode('utf-8') return base64_image except Exception as e: # Backup method - tạo hình ảnh đơn giản với thông tin chart try: print(f"Chart rendering failed: {str(e)}") # Tạo một hình ảnh thay thế đơn giản width, height = 800, 400 # Tạo hình ảnh trắng image = Image.new("RGB", (width, height), (255, 255, 255)) # Lưu hình ảnh vào buffer buffer = io.BytesIO() image.save(buffer, format="PNG") image_bytes = buffer.getvalue() # Mã hóa base64 base64_image = base64.b64encode(image_bytes).decode('utf-8') return base64_image except: return None # Function to create price chart from price data def create_price_chart(price_data, period, symbol): """Create a price chart from the price data""" if 'values' not in price_data: return None df = pd.DataFrame(price_data['values']) if df.empty: return None df['datetime'] = pd.to_datetime(df['datetime']) df['close'] = pd.to_numeric(df['close']) # Map period to title title_map = { '1_month': f'{symbol} - Price over the last month', '3_months': f'{symbol} - Price over the last 3 months', '1_year': f'{symbol} - Price over the last year' } # Create the Altair chart chart = alt.Chart(df).mark_line(color='#3498db').encode( x=alt.X('datetime:T', title='Time'), y=alt.Y('close:Q', title='Closing Price', scale=alt.Scale(zero=False)), ).properties( title=title_map.get(period, f'Stock price ({period})'), width=800, height=400 ) # Add a point for the last day last_point = alt.Chart(df.iloc[[-1]]).mark_circle(size=100, color='red').encode( x='datetime:T', y='close:Q', tooltip=[ alt.Tooltip('datetime:T', title='Date', format='%d/%m/%Y'), alt.Tooltip('close:Q', title='Closing Price', format=',.2f'), alt.Tooltip('volume:Q', title='Volume', format=',.0f') ] ) # Combine the line and point charts final_chart = chart + last_point return final_chart # Sửa function generate_html_report để thêm biểu đồ def generate_html_report(analysis_results): """Generate HTML report from analysis results""" # Import markdown module import markdown import re from markdown.extensions.tables import TableExtension from markdown.extensions.fenced_code import FencedCodeExtension # Get current date for the report current_date = datetime.now().strftime("%d/%m/%Y") symbol = analysis_results['symbol'] company_name = analysis_results['company_name'] import json json.dump(analysis_results['analysis'], open('analysis_results_before.json', 'w'), ensure_ascii=False, indent=4) # Pre-process markdown text to fix bullet point styling def process_markdown_text(text): # First, properly format bullet points with '*' # Pattern: "\n* Item" -> "\n\n- Item" text = re.sub(r'\n\*\s+(.*?)$', r'\n\n- \1', text, flags=re.MULTILINE) # Pattern: Replace $ with USD text = text.replace('$', 'USD ') return text # Process and convert markdown to HTML summary_text = process_markdown_text(analysis_results['analysis']['summary']) financial_text = process_markdown_text(analysis_results['analysis']['financial_health']) news_text = process_markdown_text(analysis_results['analysis']['news_sentiment']) expert_text = process_markdown_text(analysis_results['analysis']['expert_opinion']) import json json.dump({'summary': summary_text, 'financial': financial_text, 'news': news_text, 'expert': expert_text}, open('analysis_results.json', 'w'), ensure_ascii=False, indent=4) # Convert to HTML summary_html = markdown.markdown( summary_text, extensions=['tables', 'fenced_code'] ) financial_html = markdown.markdown( financial_text, extensions=['tables', 'fenced_code'] ) news_html = markdown.markdown( news_text, extensions=['tables', 'fenced_code'] ) expert_html = markdown.markdown( expert_text, extensions=['tables', 'fenced_code'] ) # Generate chart images price_charts_html = "" if 'price_data' in analysis_results: price_data = analysis_results['price_data'] periods = ['1_month', '3_months', '1_year'] for period in periods: if period in price_data: chart = create_price_chart(price_data[period], period, symbol) if chart: try: base64_image = chart_to_base64(chart) if base64_image: price_charts_html += f"""

Price Chart - {period.replace('_', ' ').title()}

{symbol} {period} chart
""" except Exception as e: print(f"Error generating chart image: {e}") # Create HTML content html_content = f""" Stock Analysis Report {symbol}
Date: {current_date}

Stock Analysis Report: {symbol}

{company_name}

Summary & Recommendation

{summary_html}

Financial Health Analysis

{financial_html}

News & Market Sentiment Analysis

{news_html}

Market Analysis

{expert_html}

Price Charts

{price_charts_html}
""" return html_content # Function to generate and save PDF report def generate_pdf_report(analysis_results, output_path): """Generate and save PDF report directly""" from weasyprint import HTML # Generate HTML content html_content = generate_html_report(analysis_results) # Save HTML preview for debugging with open("report_preview.html", "w", encoding="utf-8") as f: f.write(html_content) # Generate PDF try: HTML(string=html_content).write_pdf(output_path) print(f"PDF report saved successfully at: {output_path}") return True except Exception as e: print(f"Error generating PDF report: {e}") return False