import os import asyncio import aiohttp import time import re import random from datetime import datetime, timedelta import google.generativeai as genai from dotenv import load_dotenv from concurrent.futures import ThreadPoolExecutor from groq import Groq, AsyncGroq from threading import Lock # Tải biến môi trường load_dotenv() # Cấu hình Gemini AI GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") genai.configure(api_key=GEMINI_API_KEY) MODEL_NAME = os.getenv("GEMINI_MODEL", "gemini-2.5-flash") # Cấu hình Groq AI GROQ_API_KEYS_STR = os.getenv("GROQ_API_KEY", "") GROQ_MODEL = "meta-llama/llama-4-scout-17b-16e-instruct" # Model Llama4 qua Groq # Quản lý nhiều API keys cho Groq class GroqClientManager: """Quản lý pool các Groq clients với logic round-robin""" def __init__(self, api_keys_str): # Tách chuỗi API keys (key1,key2,key3) thành list self.api_keys = [key.strip() for key in api_keys_str.split(",") if key.strip()] if not self.api_keys: raise ValueError("Không tìm thấy API key hợp lệ cho Groq") print(f"Khởi tạo {len(self.api_keys)} Groq API clients") # Tạo pool của các client self.clients = [AsyncGroq(api_key=key) for key in self.api_keys] self.current_index = 0 self.lock = Lock() # Thống kê sử dụng self.usage_stats = {i: 0 for i in range(len(self.api_keys))} def get_next_client(self): """Lấy client tiếp theo theo cơ chế round-robin""" with self.lock: client = self.clients[self.current_index] # Cập nhật thống kê self.usage_stats[self.current_index] += 1 # Di chuyển đến key tiếp theo self.current_index = (self.current_index + 1) % len(self.clients) return client def print_usage_stats(self): """In thống kê sử dụng của từng API key""" with self.lock: print("\n--- Thống kê sử dụng Groq API keys ---") total_calls = sum(self.usage_stats.values()) for idx, count in self.usage_stats.items(): key_preview = f"{self.api_keys[idx][:8]}..." if len(self.api_keys[idx]) > 10 else self.api_keys[idx] percentage = (count / total_calls * 100) if total_calls > 0 else 0 print(f"Key {idx+1} ({key_preview}): {count} lần gọi ({percentage:.1f}%)") print(f"Tổng số lần gọi API: {total_calls}") print("---------------------------------------\n") # Khởi tạo singleton manager groq_client_manager = GroqClientManager(GROQ_API_KEYS_STR) # Giữ lại một client đơn cho compatibility groq_client = Groq(api_key=groq_client_manager.api_keys[0] if groq_client_manager.api_keys else "") MAX_ARTICLES = 30 # 5 for testing, 30 for production # Các API keys NEWS_API_KEY = os.getenv("NEWS_API_KEY") MARKETAUX_API_KEY = os.getenv("MARKETAUX_API_KEY") ALPHA_VANTAGE_API_KEY = os.getenv("ALPHA_VANTAGE_API_KEY") # Cấu hình kiểm soát tốc độ gọi API MAX_REQUESTS_PER_MINUTE = 10 # Để an toàn, giữ dưới 10 BATCH_SIZE = 10 # Số lượng bài báo được xử lý cùng lúc DELAY_BETWEEN_BATCHES = 10 # Thời gian chờ giữa các batch (giây) RETRY_DELAY_BASE = 5 # Thời gian cơ sở cho retry (giây) MAX_RETRIES = 3 # Số lần thử lại tối đa # Cấu hình riêng cho Groq API GROQ_BATCH_SIZE = 10 # Số lượng bài báo được xử lý cùng lúc khi dùng Groq (cao hơn Gemini) GROQ_DELAY_BETWEEN_REQUESTS = 2 # Thời gian chờ giữa các request riêng lẻ với Groq (giây) GROQ_DELAY_BETWEEN_BATCHES = 5 # Thời gian chờ giữa các batch với Groq (giây) class NewsArticle: """Lớp tiêu chuẩn hóa cho các bài báo từ các nguồn khác nhau""" def __init__(self, title, description, content, source_name, url, published_at): self.title = title self.description = description self.content = content self.source_name = source_name self.url = url self.published_at = published_at def __str__(self): return f"{self.title} ({self.source_name})" def __eq__(self, other): if not isinstance(other, NewsArticle): return False # So sánh bằng URL hoặc tiêu đề return self.url == other.url or self.title == other.title def __hash__(self): # Sử dụng URL làm hash return hash(self.url) async def fetch_from_newsapi(): """Lấy tin tức từ NewsAPI""" url = "https://newsapi.org/v2/everything" yesterday = datetime.now() - timedelta(days=1) yesterday_str = yesterday.strftime('%Y-%m-%d') params = { 'q': 'finance OR economy OR stock market OR investing', 'from': yesterday_str, 'language': 'en', 'sortBy': 'publishedAt', 'pageSize': 50, 'apiKey': NEWS_API_KEY } async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: if response.status == 200: data = await response.json() articles = [] if data.get('status') == 'ok' and 'articles' in data: for article in data['articles']: articles.append( NewsArticle( title=article.get('title', ''), description=article.get('description', ''), content=article.get('content', ''), source_name=article.get('source', {}).get('name', 'Unknown'), url=article.get('url', ''), published_at=article.get('publishedAt', '') ) ) return articles return [] async def fetch_from_marketaux(): """Lấy tin tức từ Marketaux API""" url = "https://api.marketaux.com/v1/news/all" params = { 'symbols': 'AAPL,MSFT,TSLA,AMZN,NVDA,META', # Một số mã chứng khoán phổ biến 'filter_entities': 'true', 'language': 'en', 'published_after': (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%dT%H:%M'), 'limit': 50, 'api_token': MARKETAUX_API_KEY } async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: if response.status == 200: data = await response.json() articles = [] if 'data' in data: for article in data['data']: articles.append( NewsArticle( title=article.get('title', ''), description=article.get('description', ''), content=article.get('snippet', ''), source_name=article.get('source', ''), url=article.get('url', ''), published_at=article.get('published_at', '') ) ) return articles return [] def _normalize_and_deduplicate(articles_list): """Chuẩn hóa và loại bỏ các bài báo trùng lặp""" # Loại bỏ trùng lặp bằng cách chuyển thành set unique_articles = set() for article in articles_list: if article.title and article.content: # Bỏ qua bài viết không có tiêu đề hoặc nội dung # Loại bỏ các bài quảng cáo và không liên quan skip_keywords = ["advertisement", "sponsored", "promotion"] if not any(keyword in article.title.lower() for keyword in skip_keywords): unique_articles.add(article) # Sắp xếp theo thời gian xuất bản (mới nhất trước) sorted_articles = sorted( unique_articles, key=lambda x: datetime.fromisoformat(x.published_at.replace('Z', '+00:00').replace('T', ' ')), reverse=True ) # Giới hạn số lượng bài viết max_articles = min(len(sorted_articles), MAX_ARTICLES) # Giảm số lượng bài từ 30 xuống 20 return sorted_articles[:max_articles] async def _call_ai_with_retry(prompt, model, retries=MAX_RETRIES): """Gọi Gemini API với cơ chế retry và exponential backoff""" attempt = 0 last_exception = None while attempt <= retries: try: # Thêm jitter vào delay để tránh đồng bộ hóa các yêu cầu jitter = random.uniform(0.5, 1.5) if attempt > 0: # Exponential backoff với jitter delay = (RETRY_DELAY_BASE * (2 ** (attempt - 1))) * jitter print(f"Retry lần {attempt}, đợi {delay:.2f} giây...") await asyncio.sleep(delay) response = await model.generate_content_async(prompt) return response.text except Exception as e: last_exception = e print(f"Lỗi khi gọi AI (lần {attempt+1}/{retries+1}): {str(e)}") # Nếu đây là lỗi quota, thêm thời gian chờ dài hơn if "429" in str(e) or "quota" in str(e).lower(): # Thêm thời gian chờ dài hơn cho lỗi quota (60-90 giây) quota_delay = random.uniform(60, 90) print(f"Phát hiện lỗi quota, đợi {quota_delay:.2f} giây...") await asyncio.sleep(quota_delay) attempt += 1 # Nếu đã hết số lần thử lại, ném ngoại lệ if last_exception: raise last_exception else: raise Exception("Không thể gọi API Gemini sau nhiều lần thử lại") async def _call_groq_with_retry(prompt, retries=MAX_RETRIES): """Gọi Groq API với cơ chế retry và round-robin API keys""" attempt = 0 last_exception = None while attempt <= retries: try: # Thêm jitter vào delay để tránh đồng bộ hóa các yêu cầu jitter = random.uniform(0.5, 1.5) if attempt > 0: # Exponential backoff với jitter delay = (RETRY_DELAY_BASE * (2 ** (attempt - 1))) * jitter print(f"Retry Groq lần {attempt}, đợi {delay:.2f} giây...") await asyncio.sleep(delay) # Lấy client tiếp theo từ round-robin pool client = groq_client_manager.get_next_client() response = await client.chat.completions.create( model=GROQ_MODEL, messages=[ {"role": "user", "content": prompt} ], temperature=0.3, # Thấp hơn để tóm tắt chính xác max_tokens=500, # Giới hạn độ dài tóm tắt top_p=0.95, stream=False ) return response.choices[0].message.content except Exception as e: last_exception = e print(f"Lỗi khi gọi Groq API (lần {attempt+1}/{retries+1}): {str(e)}") # Nếu đây là lỗi rate limit hoặc quota if "429" in str(e) or "rate" in str(e).lower() or "limit" in str(e).lower(): # Thêm thời gian chờ dài hơn rate_limit_delay = random.uniform(30, 45) print(f"Phát hiện lỗi rate limit, đợi {rate_limit_delay:.2f} giây...") await asyncio.sleep(rate_limit_delay) attempt += 1 # Nếu đã hết số lần thử lại, ném ngoại lệ if last_exception: raise last_exception else: raise Exception("Không thể gọi API Groq sau nhiều lần thử lại") async def _summarize_article_with_groq(article): """Tạo tóm tắt cho một bài báo sử dụng Groq API""" prompt = f""" Summarize the most important information from this financial article in a concise paragraph (no more than 2-3 sentences). Focus on key events, figures, trends, or information valuable to investors. Provide only the summary without any introduction or conclusion. TITLE: {article.title} DESCRIPTION: {article.description} CONTENT: {article.content} SOURCE: {article.source_name} """ try: # Sử dụng hàm gọi Groq API có retry summary = await _call_groq_with_retry(prompt) # Loại bỏ các ký tự đặc biệt và chuẩn hóa summary = re.sub(r'[\n\r]+', ' ', summary) summary = re.sub(r'\s{2,}', ' ', summary).strip() return { 'title': article.title, 'source': article.source_name, 'summary': summary, 'url': article.url } except Exception as e: print(f"Lỗi khi tóm tắt bài báo với Groq: {str(e)}") return { 'title': article.title, 'source': article.source_name, 'summary': "Unable to summarize this article due to API limitations.", 'url': article.url } async def _summarize_article(article, model): """Tạo tóm tắt cho một bài báo với Gemini (giữ lại để dự phòng)""" prompt = f""" Summarize the most important information from this financial article in a concise paragraph (no more than 2-3 sentences). Focus on key events, figures, trends, or information valuable to investors. Provide only the summary without any introduction or conclusion. TITLE: {article.title} DESCRIPTION: {article.description} CONTENT: {article.content} SOURCE: {article.source_name} URL: {article.url} """ try: # Sử dụng hàm gọi API có retry summary = await _call_ai_with_retry(prompt, model) # Loại bỏ các ký tự đặc biệt và chuẩn hóa summary = re.sub(r'[\n\r]+', ' ', summary) summary = re.sub(r'\s{2,}', ' ', summary).strip() return { 'title': article.title, 'source': article.source_name, 'summary': summary, 'url': article.url } except Exception as e: print(f"Lỗi khi tóm tắt bài báo với Gemini: {str(e)}") return { 'title': article.title, 'source': article.source_name, 'summary': "Unable to summarize this article due to API limitations.", 'url': article.url } async def _summarize_articles_with_groq(articles): """Tóm tắt các bài báo bằng Groq API""" all_summaries = [] # Chia bài viết thành các batch nhỏ for i in range(0, len(articles), GROQ_BATCH_SIZE): batch = articles[i:i+GROQ_BATCH_SIZE] print(f"Đang xử lý batch {i//GROQ_BATCH_SIZE + 1}/{(len(articles) + GROQ_BATCH_SIZE - 1)//GROQ_BATCH_SIZE} ({len(batch)} bài) với Groq") # Tạo danh sách các coroutines để xử lý song song tasks = [_summarize_article_with_groq(article) for article in batch] # Chờ tất cả các task hoàn thành batch_summaries = await asyncio.gather(*tasks) all_summaries.extend(batch_summaries) # Nếu còn batch tiếp theo, đợi để tránh vượt quá quota if i + GROQ_BATCH_SIZE < len(articles): wait_time = random.uniform(GROQ_DELAY_BETWEEN_BATCHES * 0.9, GROQ_DELAY_BETWEEN_BATCHES * 1.1) print(f"Hoàn thành batch. Đang đợi {wait_time:.2f}s trước batch tiếp theo để tránh quá tải quota...") await asyncio.sleep(wait_time) # In thống kê sử dụng API keys groq_client_manager.print_usage_stats() return all_summaries async def _summarize_articles_in_batches(articles, model): """Tóm tắt các bài báo theo batch với Gemini API (giữ lại để dự phòng)""" all_summaries = [] # Chia bài viết thành các batch nhỏ for i in range(0, len(articles), BATCH_SIZE): batch = articles[i:i+BATCH_SIZE] print(f"Đang xử lý batch {i//BATCH_SIZE + 1}/{(len(articles) + BATCH_SIZE - 1)//BATCH_SIZE} ({len(batch)} bài)") # Xử lý các bài trong batch một cách tuần tự để tránh quá tải API batch_summaries = [] for article in batch: # Thêm jitter vào delay để tránh đồng bộ hóa các yêu cầu delay = random.uniform(1.0, 3.0) await asyncio.sleep(delay) summary = await _summarize_article(article, model) batch_summaries.append(summary) all_summaries.extend(batch_summaries) # Nếu còn batch tiếp theo, đợi để tránh vượt quá quota if i + BATCH_SIZE < len(articles): wait_time = random.uniform(DELAY_BETWEEN_BATCHES * 0.9, DELAY_BETWEEN_BATCHES * 1.1) print(f"Hoàn thành batch. Đang đợi {wait_time:.2f}s trước batch tiếp theo để tránh quá tải quota...") await asyncio.sleep(wait_time) return all_summaries async def _synthesize_newsletter(summaries, model): """Reduce phase: Tổng hợp các tóm tắt thành một bản tin hoàn chỉnh sử dụng Gemini API""" # Giới hạn số lượng tóm tắt để đảm bảo không vượt quá token limit # if len(summaries) > 15: # print(f"Giới hạn số lượng tóm tắt từ {len(summaries)} xuống 15 để phù hợp với giới hạn token") # summaries = summaries[:15] # Chuẩn bị dữ liệu đầu vào cho prompt summaries_text = "" for i, s in enumerate(summaries, 1): summaries_text += f"{i}. **{s['title']}** ({s['source']}): {s['summary']} [Link]({s['url']})\n\n" # Prompt để tạo bản tin prompt = f""" Below are summaries from {len(summaries)} financial articles published in the last 24 hours: {summaries_text} Write a comprehensive market report based on these summaries. The report should: 1. Have an overall headline for the entire report 2. Be organized into clear sections by topic, such as: - Macroeconomic Developments - Corporate News - Stock Market Performance - Cryptocurrency and Fintech - Expert Opinions and Forecasts 3. Each section should have at least 3-5 concise news points, with sources cited 4. End with a brief conclusion about the overall market trends Format the report in Markdown. Ensure clear numbering and proper Markdown syntax. """ try: # Thêm jitter vào delay trước khi gọi API tổng hợp await asyncio.sleep(random.uniform(3.0, 5.0)) # Sử dụng hàm gọi API có retry newsletter = await _call_ai_with_retry(prompt, model) # Thêm phần thông tin về ngày tạo today = datetime.now().strftime("%d/%m/%Y") newsletter = f"# DAILY MARKET REPORT - {today}\n\n" + newsletter # Thêm phần footer footer = """ --- *This report was automatically generated by AI Financial Dashboard based on data from multiple reliable financial news sources.* *Note: This is not investment advice.* """ newsletter += footer return newsletter except Exception as e: print(f"Lỗi khi tạo bản tin: {str(e)}") return """# DAILY MARKET REPORT We're sorry, but the market report could not be automatically generated due to API limitations. Please try again later. --- *Note: The system has collected data but could not generate the report due to API quota limitations.* """ async def run_news_summary_pipeline(): """Hàm chính để chạy toàn bộ pipeline tạo bản tin""" start_time = time.time() # 1. Khởi tạo model model = genai.GenerativeModel(MODEL_NAME) # 2. Thu thập tin tức từ nhiều nguồn song song print("Đang thu thập tin tức từ các nguồn...") tasks = [ fetch_from_newsapi(), fetch_from_marketaux() ] # Chờ tất cả các task hoàn thành results = await asyncio.gather(*tasks) # Gộp kết quả từ các nguồn all_articles = [] for articles in results: if articles: all_articles.extend(articles) print(f"Đã thu thập {len(all_articles)} bài báo từ các nguồn.") # 3. Chuẩn hóa và loại bỏ trùng lặp articles = _normalize_and_deduplicate(all_articles) print(f"Sau khi lọc: {len(articles)} bài báo duy nhất.") # 4. Tóm tắt từng bài báo sử dụng Groq API (nhanh hơn, quota cao hơn) print("Bắt đầu tóm tắt các bài báo với Groq API...") try: summaries = await _summarize_articles_with_groq(articles) print(f"Đã tóm tắt {len(summaries)} bài báo với Groq API.") except Exception as e: print(f"Lỗi khi sử dụng Groq API: {str(e)}. Chuyển sang sử dụng Gemini API...") # Fallback sang Gemini nếu có lỗi với Groq summaries = await _summarize_articles_in_batches(articles, model) print(f"Đã tóm tắt {len(summaries)} bài báo với Gemini API.") # 5. Tổng hợp bản tin sử dụng Gemini API (tốt hơn với việc viết nội dung dài) print("Đang tạo bản tin tổng hợp với Gemini API...") newsletter = await _synthesize_newsletter(summaries, model) # Thống kê thời gian thực hiện end_time = time.time() duration = end_time - start_time print(f"Pipeline hoàn tất trong {duration:.2f} giây.") return newsletter