Spaces:
Sleeping
Sleeping
# pages/stock_report.py | |
import os | |
import asyncio | |
import streamlit as st | |
import pandas as pd | |
import altair as alt | |
from io import BytesIO | |
import base64 | |
import tempfile | |
import weasyprint | |
import markdown | |
import json | |
from datetime import datetime | |
from modules.analysis_pipeline import run_analysis_pipeline, generate_html_report | |
from twelvedata_api import TwelveDataAPI | |
# Initialize session state for this page | |
if "stock_report_initialized" not in st.session_state: | |
st.session_state.stock_report_initialized = True | |
if "analysis_requested" not in st.session_state: | |
st.session_state.analysis_requested = False | |
if "analysis_complete" not in st.session_state: | |
st.session_state.analysis_complete = False | |
# Page setup - make sure this is consistent with Home.py | |
st.set_page_config( | |
page_title="Stock Analysis Report", | |
page_icon="📊", | |
layout="wide", | |
initial_sidebar_state="expanded" | |
) | |
# Clear the page for fresh rendering | |
main_container = st.container() | |
with main_container: | |
# Application title | |
st.title("📄 In-depth Stock Analysis Report") | |
st.markdown(""" | |
This application generates a comprehensive analysis report for a stock symbol, combining data from multiple sources | |
and using AI to synthesize information, helping you make better investment decisions. | |
""") | |
# Function to create price chart | |
def create_price_chart(price_data, period): | |
"""Create price chart from data""" | |
if 'values' not in price_data: | |
return None | |
df = pd.DataFrame(price_data['values']) | |
if df.empty: | |
return None | |
df['datetime'] = pd.to_datetime(df['datetime']) | |
df['close'] = pd.to_numeric(df['close']) | |
# Determine chart title based on time period | |
title_map = { | |
'1_month': 'Stock price over the last month', | |
'3_months': 'Stock price over the last 3 months', | |
'1_year': 'Stock price over the last year' | |
} | |
# Create chart with Altair | |
chart = alt.Chart(df).mark_line().encode( | |
x=alt.X('datetime:T', title='Time'), | |
y=alt.Y('close:Q', title='Closing Price', scale=alt.Scale(zero=False)), | |
tooltip=[ | |
alt.Tooltip('datetime:T', title='Date', format='%d/%m/%Y'), | |
alt.Tooltip('close:Q', title='Closing Price', format=',.2f'), | |
alt.Tooltip('volume:Q', title='Volume', format=',.0f') | |
] | |
).properties( | |
title=title_map.get(period, f'Stock price ({period})'), | |
height=350 | |
).interactive() | |
return chart | |
# Function to convert analysis results to PDF | |
def convert_html_to_pdf(html_content): | |
"""Convert HTML to PDF file""" | |
with tempfile.NamedTemporaryFile(suffix='.html', delete=False) as f: | |
f.write(html_content.encode()) | |
temp_html = f.name | |
pdf_bytes = weasyprint.HTML(filename=temp_html).write_pdf() | |
# Delete temporary file after use | |
os.unlink(temp_html) | |
return pdf_bytes | |
# Function to create PDF download link | |
def get_download_link(pdf_bytes, filename): | |
"""Create download link for PDF file""" | |
b64 = base64.b64encode(pdf_bytes).decode() | |
href = f'<a href="data:application/pdf;base64,{b64}" download="{filename}">Download Report (PDF)</a>' | |
return href | |
# List of popular stock symbols and information | |
def load_stock_symbols(): | |
"""Load stock symbols from cache or create new cache""" | |
cache_file = "static/stock_symbols_cache.json" | |
# Check if cache exists | |
if os.path.exists(cache_file): | |
try: | |
with open(cache_file, 'r') as f: | |
return json.load(f) | |
except Exception as e: | |
print(f"Error loading cache: {e}") | |
# Default list if cache doesn't exist or fails to load | |
default_symbols = [ | |
{"symbol": "AAPL", "name": "Apple Inc."}, | |
{"symbol": "MSFT", "name": "Microsoft Corporation"}, | |
{"symbol": "GOOGL", "name": "Alphabet Inc."}, | |
{"symbol": "AMZN", "name": "Amazon.com Inc."}, | |
{"symbol": "TSLA", "name": "Tesla, Inc."}, | |
{"symbol": "META", "name": "Meta Platforms, Inc."}, | |
{"symbol": "NVDA", "name": "NVIDIA Corporation"}, | |
{"symbol": "JPM", "name": "JPMorgan Chase & Co."}, | |
{"symbol": "V", "name": "Visa Inc."}, | |
{"symbol": "JNJ", "name": "Johnson & Johnson"}, | |
{"symbol": "WMT", "name": "Walmart Inc."}, | |
{"symbol": "MA", "name": "Mastercard Incorporated"}, | |
{"symbol": "PG", "name": "Procter & Gamble Co."}, | |
{"symbol": "UNH", "name": "UnitedHealth Group Inc."}, | |
{"symbol": "HD", "name": "Home Depot Inc."}, | |
{"symbol": "BAC", "name": "Bank of America Corp."}, | |
{"symbol": "XOM", "name": "Exxon Mobil Corporation"}, | |
{"symbol": "DIS", "name": "Walt Disney Co."}, | |
{"symbol": "CSCO", "name": "Cisco Systems, Inc."}, | |
{"symbol": "VZ", "name": "Verizon Communications Inc."}, | |
{"symbol": "ADBE", "name": "Adobe Inc."}, | |
{"symbol": "NFLX", "name": "Netflix, Inc."}, | |
{"symbol": "CMCSA", "name": "Comcast Corporation"}, | |
{"symbol": "PFE", "name": "Pfizer Inc."}, | |
{"symbol": "KO", "name": "Coca-Cola Company"}, | |
{"symbol": "INTC", "name": "Intel Corporation"}, | |
{"symbol": "PYPL", "name": "PayPal Holdings, Inc."}, | |
{"symbol": "T", "name": "AT&T Inc."}, | |
{"symbol": "PEP", "name": "PepsiCo, Inc."}, | |
{"symbol": "MRK", "name": "Merck & Co., Inc."} | |
] | |
# Try to fetch more comprehensive list if API key is available | |
try: | |
from dotenv import load_dotenv | |
load_dotenv() | |
api_key = os.getenv("TWELVEDATA_API_KEY") | |
if api_key: | |
td_api = TwelveDataAPI(api_key) | |
stocks_data = td_api.get_all_stocks(exchange="NASDAQ") | |
if stocks_data and 'data' in stocks_data: | |
# Convert to format we need and take first 1000 stocks | |
symbols = [{"symbol": stock["symbol"], "name": stock.get("name", "Unknown")} | |
for stock in stocks_data['data']] | |
# Save to cache | |
os.makedirs(os.path.dirname(cache_file), exist_ok=True) | |
with open(cache_file, 'w') as f: | |
json.dump(symbols, f) | |
return symbols | |
except Exception as e: | |
print(f"Error fetching stock symbols from API: {e}") | |
# If everything fails, return default list | |
return default_symbols | |
# Load stock symbols | |
STOCK_SYMBOLS = load_stock_symbols() | |
# Function to format stock options for display | |
def format_stock_option(stock): | |
return f"{stock['symbol']} - {stock['name']}" | |
# Create interface | |
col1, col2 = st.columns([3, 1]) | |
# Information input section | |
with col2: | |
st.subheader("Enter Information") | |
# Create a list of formatted options and a mapping back to symbols | |
stock_options = [format_stock_option(stock) for stock in STOCK_SYMBOLS] | |
# Use selectbox with search functionality | |
selected_stock = st.selectbox( | |
"Select a stock symbol", | |
options=stock_options, | |
index=0 if stock_options else None, | |
placeholder="Search for a stock symbol...", | |
) | |
# Extract symbol from selection | |
if selected_stock: | |
stock_symbol = selected_stock.split(" - ")[0] | |
else: | |
stock_symbol = "" | |
if st.button("Generate Report", use_container_width=True, type="primary"): | |
if not stock_symbol: | |
st.error("Please select a stock symbol to continue.") | |
else: | |
# Save stock symbol to session state to maintain between runs | |
st.session_state.stock_symbol = stock_symbol | |
st.session_state.analysis_requested = True | |
st.rerun() | |
# PDF report generation section | |
if "analysis_complete" in st.session_state and st.session_state.analysis_complete: | |
st.divider() | |
st.subheader("PDF Report") | |
# Get results from session state | |
analysis_results = st.session_state.analysis_results | |
# Create static directory if it doesn't exist | |
os.makedirs("static", exist_ok=True) | |
# Create PDF filename and path | |
filename = f"Report_{analysis_results['symbol']}_{datetime.now().strftime('%d%m%Y')}.pdf" | |
pdf_path = os.path.join("static", filename) | |
# Display information | |
st.markdown("Get a complete PDF report with price charts:") | |
# Import PDF generation function | |
from modules.analysis_pipeline import generate_pdf_report | |
# Generate and download PDF button (combined) | |
if st.button("📊 Generate & Download PDF Report", use_container_width=True, key="pdf_btn", type="primary"): | |
# Check if file doesn't exist or needs to be recreated | |
if not os.path.exists(pdf_path): | |
with st.spinner("Creating PDF report with charts..."): | |
generate_pdf_report(analysis_results, pdf_path) | |
if not os.path.exists(pdf_path): | |
st.error("Failed to create PDF report.") | |
st.stop() | |
# Read PDF file for download | |
with open(pdf_path, "rb") as pdf_file: | |
pdf_bytes = pdf_file.read() | |
# Display success message and download widget | |
st.success("PDF report generated successfully!") | |
st.download_button( | |
label="⬇️ Download Report", | |
data=pdf_bytes, | |
file_name=filename, | |
mime="application/pdf", | |
use_container_width=True, | |
key="download_pdf_btn" | |
) | |
# Report display section | |
with col1: | |
# Check if there's an analysis request | |
if "analysis_requested" in st.session_state and st.session_state.analysis_requested: | |
symbol = st.session_state.stock_symbol | |
with st.spinner(f"🔍 Collecting data and analyzing {symbol} stock... (this may take a few minutes)"): | |
try: | |
# Run analysis | |
analysis_results = asyncio.run(run_analysis_pipeline(symbol)) | |
# Save results to session state | |
st.session_state.analysis_results = analysis_results | |
st.session_state.analysis_complete = True | |
st.session_state.analysis_requested = False | |
# Automatically rerun to display results | |
st.rerun() | |
except Exception as e: | |
st.error(f"An error occurred during analysis: {str(e)}") | |
st.session_state.analysis_requested = False | |
# Check if analysis is complete | |
if "analysis_complete" in st.session_state and st.session_state.analysis_complete: | |
# Get results from session state | |
analysis_results = st.session_state.analysis_results | |
# Create tabs to display content | |
tab1, tab2, tab3, tab4, tab5 = st.tabs([ | |
"📋 Overview", | |
"💰 Financial Health", | |
"📰 News & Sentiment", | |
"👨💼 Market Analysis", | |
"📊 Price Charts" | |
]) | |
with tab1: | |
# Display basic company information | |
overview = analysis_results.get('overview', {}) | |
if overview: | |
col1, col2 = st.columns([1, 1]) | |
with col1: | |
st.subheader(f"{analysis_results['symbol']} - {overview.get('Name', 'N/A')}") | |
st.write(f"**Industry:** {overview.get('Industry', 'N/A')}") | |
st.write(f"**Sector:** {overview.get('Sector', 'N/A')}") | |
with col2: | |
st.write(f"**Market Cap:** {overview.get('MarketCapitalization', 'N/A')}") | |
st.write(f"**P/E Ratio:** {overview.get('PERatio', 'N/A')}") | |
st.write(f"**Dividend Yield:** {overview.get('DividendYield', 'N/A')}%") | |
# Display summary | |
st.markdown("### Summary & Recommendation") | |
st.markdown(analysis_results['analysis']['summary']) | |
with tab2: | |
st.markdown("### Financial Health Analysis") | |
st.markdown(analysis_results['analysis']['financial_health']) | |
with tab3: | |
st.markdown("### News & Market Sentiment Analysis") | |
st.markdown(analysis_results['analysis']['news_sentiment']) | |
with tab4: | |
st.markdown("### Market Analysis") | |
st.markdown(analysis_results['analysis']['expert_opinion']) | |
with tab5: | |
st.markdown("### Stock Price Charts") | |
# Display charts from price data | |
price_data = analysis_results.get('price_data', {}) | |
if price_data: | |
period_tabs = st.tabs(['1 Month', '3 Months', '1 Year']) | |
periods = ['1_month', '3_months', '1_year'] | |
for i, period in enumerate(periods): | |
with period_tabs[i]: | |
if period in price_data: | |
chart = create_price_chart(price_data[period], period) | |
if chart: | |
st.altair_chart(chart, use_container_width=True) | |
else: | |
st.info(f"Insufficient data to display chart for {period} timeframe.") | |
else: | |
st.info(f"No chart data available for {period} timeframe.") | |
else: | |
st.info("No price chart data available for this stock.") | |
else: | |
# Display instructions when no analysis is present | |
st.info("👈 Enter a stock symbol and click 'Generate Report' to begin.") | |
st.markdown(""" | |
### About Stock Analysis Reports | |
The stock analysis report includes the following information: | |
1. **Overview & Investment Recommendation**: Summary of the company and general investment potential assessment. | |
2. **Financial Health Analysis**: Evaluation of financial metrics, revenue growth, and profitability. | |
3. **News & Market Sentiment Analysis**: Summary of notable news related to the company. | |
4. **Market Analysis**: Analysis of current stock performance and market trends. | |
5. **Price Charts**: Stock price charts for various timeframes. | |
Reports are generated based on data from multiple sources and analyzed by AI. | |
""") | |
# Display popular stock symbols | |
st.markdown("### Popular Stock Symbols") | |
# Display list of popular stock symbols in grid | |
# Only take first 12 to avoid cluttering the interface | |
display_stocks = STOCK_SYMBOLS[:12] | |
# Create grid with 4 columns | |
cols = st.columns(4) | |
for i, stock in enumerate(display_stocks): | |
col = cols[i % 4] | |
if col.button(f"{stock['symbol']} - {stock['name']}", key=f"pop_stock_{i}", use_container_width=True): | |
st.session_state.stock_symbol = stock['symbol'] | |
st.session_state.analysis_requested = True | |
st.rerun() |