Spaces:
Sleeping
Sleeping
import os | |
import asyncio | |
import aiohttp | |
import time | |
import re | |
import random | |
from datetime import datetime, timedelta | |
import google.generativeai as genai | |
from dotenv import load_dotenv | |
from concurrent.futures import ThreadPoolExecutor | |
from groq import Groq, AsyncGroq | |
from threading import Lock | |
# Tải biến môi trường | |
load_dotenv() | |
# Cấu hình Gemini AI | |
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
genai.configure(api_key=GEMINI_API_KEY) | |
MODEL_NAME = os.getenv("GEMINI_MODEL", "gemini-2.5-flash") | |
# Cấu hình Groq AI | |
GROQ_API_KEYS_STR = os.getenv("GROQ_API_KEY", "") | |
GROQ_MODEL = "meta-llama/llama-4-scout-17b-16e-instruct" # Model Llama4 qua Groq | |
# Quản lý nhiều API keys cho Groq | |
class GroqClientManager: | |
"""Quản lý pool các Groq clients với logic round-robin""" | |
def __init__(self, api_keys_str): | |
# Tách chuỗi API keys (key1,key2,key3) thành list | |
self.api_keys = [key.strip() for key in api_keys_str.split(",") if key.strip()] | |
if not self.api_keys: | |
raise ValueError("Không tìm thấy API key hợp lệ cho Groq") | |
print(f"Khởi tạo {len(self.api_keys)} Groq API clients") | |
# Tạo pool của các client | |
self.clients = [AsyncGroq(api_key=key) for key in self.api_keys] | |
self.current_index = 0 | |
self.lock = Lock() | |
# Thống kê sử dụng | |
self.usage_stats = {i: 0 for i in range(len(self.api_keys))} | |
def get_next_client(self): | |
"""Lấy client tiếp theo theo cơ chế round-robin""" | |
with self.lock: | |
client = self.clients[self.current_index] | |
# Cập nhật thống kê | |
self.usage_stats[self.current_index] += 1 | |
# Di chuyển đến key tiếp theo | |
self.current_index = (self.current_index + 1) % len(self.clients) | |
return client | |
def print_usage_stats(self): | |
"""In thống kê sử dụng của từng API key""" | |
with self.lock: | |
print("\n--- Thống kê sử dụng Groq API keys ---") | |
total_calls = sum(self.usage_stats.values()) | |
for idx, count in self.usage_stats.items(): | |
key_preview = f"{self.api_keys[idx][:8]}..." if len(self.api_keys[idx]) > 10 else self.api_keys[idx] | |
percentage = (count / total_calls * 100) if total_calls > 0 else 0 | |
print(f"Key {idx+1} ({key_preview}): {count} lần gọi ({percentage:.1f}%)") | |
print(f"Tổng số lần gọi API: {total_calls}") | |
print("---------------------------------------\n") | |
# Khởi tạo singleton manager | |
groq_client_manager = GroqClientManager(GROQ_API_KEYS_STR) | |
# Giữ lại một client đơn cho compatibility | |
groq_client = Groq(api_key=groq_client_manager.api_keys[0] if groq_client_manager.api_keys else "") | |
MAX_ARTICLES = 30 # 5 for testing, 30 for production | |
# Các API keys | |
NEWS_API_KEY = os.getenv("NEWS_API_KEY") | |
MARKETAUX_API_KEY = os.getenv("MARKETAUX_API_KEY") | |
ALPHA_VANTAGE_API_KEY = os.getenv("ALPHA_VANTAGE_API_KEY") | |
# Cấu hình kiểm soát tốc độ gọi API | |
MAX_REQUESTS_PER_MINUTE = 10 # Để an toàn, giữ dưới 10 | |
BATCH_SIZE = 10 # Số lượng bài báo được xử lý cùng lúc | |
DELAY_BETWEEN_BATCHES = 10 # Thời gian chờ giữa các batch (giây) | |
RETRY_DELAY_BASE = 5 # Thời gian cơ sở cho retry (giây) | |
MAX_RETRIES = 3 # Số lần thử lại tối đa | |
# Cấu hình riêng cho Groq API | |
GROQ_BATCH_SIZE = 10 # Số lượng bài báo được xử lý cùng lúc khi dùng Groq (cao hơn Gemini) | |
GROQ_DELAY_BETWEEN_REQUESTS = 2 # Thời gian chờ giữa các request riêng lẻ với Groq (giây) | |
GROQ_DELAY_BETWEEN_BATCHES = 5 # Thời gian chờ giữa các batch với Groq (giây) | |
class NewsArticle: | |
"""Lớp tiêu chuẩn hóa cho các bài báo từ các nguồn khác nhau""" | |
def __init__(self, title, description, content, source_name, url, published_at): | |
self.title = title | |
self.description = description | |
self.content = content | |
self.source_name = source_name | |
self.url = url | |
self.published_at = published_at | |
def __str__(self): | |
return f"{self.title} ({self.source_name})" | |
def __eq__(self, other): | |
if not isinstance(other, NewsArticle): | |
return False | |
# So sánh bằng URL hoặc tiêu đề | |
return self.url == other.url or self.title == other.title | |
def __hash__(self): | |
# Sử dụng URL làm hash | |
return hash(self.url) | |
async def fetch_from_newsapi(): | |
"""Lấy tin tức từ NewsAPI""" | |
url = "https://newsapi.org/v2/everything" | |
yesterday = datetime.now() - timedelta(days=1) | |
yesterday_str = yesterday.strftime('%Y-%m-%d') | |
params = { | |
'q': 'finance OR economy OR stock market OR investing', | |
'from': yesterday_str, | |
'language': 'en', | |
'sortBy': 'publishedAt', | |
'pageSize': 50, | |
'apiKey': NEWS_API_KEY | |
} | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url, params=params) as response: | |
if response.status == 200: | |
data = await response.json() | |
articles = [] | |
if data.get('status') == 'ok' and 'articles' in data: | |
for article in data['articles']: | |
articles.append( | |
NewsArticle( | |
title=article.get('title', ''), | |
description=article.get('description', ''), | |
content=article.get('content', ''), | |
source_name=article.get('source', {}).get('name', 'Unknown'), | |
url=article.get('url', ''), | |
published_at=article.get('publishedAt', '') | |
) | |
) | |
return articles | |
return [] | |
async def fetch_from_marketaux(): | |
"""Lấy tin tức từ Marketaux API""" | |
url = "https://api.marketaux.com/v1/news/all" | |
params = { | |
'symbols': 'AAPL,MSFT,TSLA,AMZN,NVDA,META', # Một số mã chứng khoán phổ biến | |
'filter_entities': 'true', | |
'language': 'en', | |
'published_after': (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%dT%H:%M'), | |
'limit': 50, | |
'api_token': MARKETAUX_API_KEY | |
} | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url, params=params) as response: | |
if response.status == 200: | |
data = await response.json() | |
articles = [] | |
if 'data' in data: | |
for article in data['data']: | |
articles.append( | |
NewsArticle( | |
title=article.get('title', ''), | |
description=article.get('description', ''), | |
content=article.get('snippet', ''), | |
source_name=article.get('source', ''), | |
url=article.get('url', ''), | |
published_at=article.get('published_at', '') | |
) | |
) | |
return articles | |
return [] | |
def _normalize_and_deduplicate(articles_list): | |
"""Chuẩn hóa và loại bỏ các bài báo trùng lặp""" | |
# Loại bỏ trùng lặp bằng cách chuyển thành set | |
unique_articles = set() | |
for article in articles_list: | |
if article.title and article.content: # Bỏ qua bài viết không có tiêu đề hoặc nội dung | |
# Loại bỏ các bài quảng cáo và không liên quan | |
skip_keywords = ["advertisement", "sponsored", "promotion"] | |
if not any(keyword in article.title.lower() for keyword in skip_keywords): | |
unique_articles.add(article) | |
# Sắp xếp theo thời gian xuất bản (mới nhất trước) | |
sorted_articles = sorted( | |
unique_articles, | |
key=lambda x: datetime.fromisoformat(x.published_at.replace('Z', '+00:00').replace('T', ' ')), | |
reverse=True | |
) | |
# Giới hạn số lượng bài viết | |
max_articles = min(len(sorted_articles), MAX_ARTICLES) # Giảm số lượng bài từ 30 xuống 20 | |
return sorted_articles[:max_articles] | |
async def _call_ai_with_retry(prompt, model, retries=MAX_RETRIES): | |
"""Gọi Gemini API với cơ chế retry và exponential backoff""" | |
attempt = 0 | |
last_exception = None | |
while attempt <= retries: | |
try: | |
# Thêm jitter vào delay để tránh đồng bộ hóa các yêu cầu | |
jitter = random.uniform(0.5, 1.5) | |
if attempt > 0: | |
# Exponential backoff với jitter | |
delay = (RETRY_DELAY_BASE * (2 ** (attempt - 1))) * jitter | |
print(f"Retry lần {attempt}, đợi {delay:.2f} giây...") | |
await asyncio.sleep(delay) | |
response = await model.generate_content_async(prompt) | |
return response.text | |
except Exception as e: | |
last_exception = e | |
print(f"Lỗi khi gọi AI (lần {attempt+1}/{retries+1}): {str(e)}") | |
# Nếu đây là lỗi quota, thêm thời gian chờ dài hơn | |
if "429" in str(e) or "quota" in str(e).lower(): | |
# Thêm thời gian chờ dài hơn cho lỗi quota (60-90 giây) | |
quota_delay = random.uniform(60, 90) | |
print(f"Phát hiện lỗi quota, đợi {quota_delay:.2f} giây...") | |
await asyncio.sleep(quota_delay) | |
attempt += 1 | |
# Nếu đã hết số lần thử lại, ném ngoại lệ | |
if last_exception: | |
raise last_exception | |
else: | |
raise Exception("Không thể gọi API Gemini sau nhiều lần thử lại") | |
async def _call_groq_with_retry(prompt, retries=MAX_RETRIES): | |
"""Gọi Groq API với cơ chế retry và round-robin API keys""" | |
attempt = 0 | |
last_exception = None | |
while attempt <= retries: | |
try: | |
# Thêm jitter vào delay để tránh đồng bộ hóa các yêu cầu | |
jitter = random.uniform(0.5, 1.5) | |
if attempt > 0: | |
# Exponential backoff với jitter | |
delay = (RETRY_DELAY_BASE * (2 ** (attempt - 1))) * jitter | |
print(f"Retry Groq lần {attempt}, đợi {delay:.2f} giây...") | |
await asyncio.sleep(delay) | |
# Lấy client tiếp theo từ round-robin pool | |
client = groq_client_manager.get_next_client() | |
response = await client.chat.completions.create( | |
model=GROQ_MODEL, | |
messages=[ | |
{"role": "user", "content": prompt} | |
], | |
temperature=0.3, # Thấp hơn để tóm tắt chính xác | |
max_tokens=500, # Giới hạn độ dài tóm tắt | |
top_p=0.95, | |
stream=False | |
) | |
return response.choices[0].message.content | |
except Exception as e: | |
last_exception = e | |
print(f"Lỗi khi gọi Groq API (lần {attempt+1}/{retries+1}): {str(e)}") | |
# Nếu đây là lỗi rate limit hoặc quota | |
if "429" in str(e) or "rate" in str(e).lower() or "limit" in str(e).lower(): | |
# Thêm thời gian chờ dài hơn | |
rate_limit_delay = random.uniform(30, 45) | |
print(f"Phát hiện lỗi rate limit, đợi {rate_limit_delay:.2f} giây...") | |
await asyncio.sleep(rate_limit_delay) | |
attempt += 1 | |
# Nếu đã hết số lần thử lại, ném ngoại lệ | |
if last_exception: | |
raise last_exception | |
else: | |
raise Exception("Không thể gọi API Groq sau nhiều lần thử lại") | |
async def _summarize_article_with_groq(article): | |
"""Tạo tóm tắt cho một bài báo sử dụng Groq API""" | |
prompt = f""" | |
Summarize the most important information from this financial article in a concise paragraph (no more than 2-3 sentences). | |
Focus on key events, figures, trends, or information valuable to investors. | |
Provide only the summary without any introduction or conclusion. | |
TITLE: {article.title} | |
DESCRIPTION: {article.description} | |
CONTENT: {article.content} | |
SOURCE: {article.source_name} | |
""" | |
try: | |
# Sử dụng hàm gọi Groq API có retry | |
summary = await _call_groq_with_retry(prompt) | |
# Loại bỏ các ký tự đặc biệt và chuẩn hóa | |
summary = re.sub(r'[\n\r]+', ' ', summary) | |
summary = re.sub(r'\s{2,}', ' ', summary).strip() | |
return { | |
'title': article.title, | |
'source': article.source_name, | |
'summary': summary, | |
'url': article.url | |
} | |
except Exception as e: | |
print(f"Lỗi khi tóm tắt bài báo với Groq: {str(e)}") | |
return { | |
'title': article.title, | |
'source': article.source_name, | |
'summary': "Unable to summarize this article due to API limitations.", | |
'url': article.url | |
} | |
async def _summarize_article(article, model): | |
"""Tạo tóm tắt cho một bài báo với Gemini (giữ lại để dự phòng)""" | |
prompt = f""" | |
Summarize the most important information from this financial article in a concise paragraph (no more than 2-3 sentences). | |
Focus on key events, figures, trends, or information valuable to investors. | |
Provide only the summary without any introduction or conclusion. | |
TITLE: {article.title} | |
DESCRIPTION: {article.description} | |
CONTENT: {article.content} | |
SOURCE: {article.source_name} | |
URL: {article.url} | |
""" | |
try: | |
# Sử dụng hàm gọi API có retry | |
summary = await _call_ai_with_retry(prompt, model) | |
# Loại bỏ các ký tự đặc biệt và chuẩn hóa | |
summary = re.sub(r'[\n\r]+', ' ', summary) | |
summary = re.sub(r'\s{2,}', ' ', summary).strip() | |
return { | |
'title': article.title, | |
'source': article.source_name, | |
'summary': summary, | |
'url': article.url | |
} | |
except Exception as e: | |
print(f"Lỗi khi tóm tắt bài báo với Gemini: {str(e)}") | |
return { | |
'title': article.title, | |
'source': article.source_name, | |
'summary': "Unable to summarize this article due to API limitations.", | |
'url': article.url | |
} | |
async def _summarize_articles_with_groq(articles): | |
"""Tóm tắt các bài báo bằng Groq API""" | |
all_summaries = [] | |
# Chia bài viết thành các batch nhỏ | |
for i in range(0, len(articles), GROQ_BATCH_SIZE): | |
batch = articles[i:i+GROQ_BATCH_SIZE] | |
print(f"Đang xử lý batch {i//GROQ_BATCH_SIZE + 1}/{(len(articles) + GROQ_BATCH_SIZE - 1)//GROQ_BATCH_SIZE} ({len(batch)} bài) với Groq") | |
# Tạo danh sách các coroutines để xử lý song song | |
tasks = [_summarize_article_with_groq(article) for article in batch] | |
# Chờ tất cả các task hoàn thành | |
batch_summaries = await asyncio.gather(*tasks) | |
all_summaries.extend(batch_summaries) | |
# Nếu còn batch tiếp theo, đợi để tránh vượt quá quota | |
if i + GROQ_BATCH_SIZE < len(articles): | |
wait_time = random.uniform(GROQ_DELAY_BETWEEN_BATCHES * 0.9, GROQ_DELAY_BETWEEN_BATCHES * 1.1) | |
print(f"Hoàn thành batch. Đang đợi {wait_time:.2f}s trước batch tiếp theo để tránh quá tải quota...") | |
await asyncio.sleep(wait_time) | |
# In thống kê sử dụng API keys | |
groq_client_manager.print_usage_stats() | |
return all_summaries | |
async def _summarize_articles_in_batches(articles, model): | |
"""Tóm tắt các bài báo theo batch với Gemini API (giữ lại để dự phòng)""" | |
all_summaries = [] | |
# Chia bài viết thành các batch nhỏ | |
for i in range(0, len(articles), BATCH_SIZE): | |
batch = articles[i:i+BATCH_SIZE] | |
print(f"Đang xử lý batch {i//BATCH_SIZE + 1}/{(len(articles) + BATCH_SIZE - 1)//BATCH_SIZE} ({len(batch)} bài)") | |
# Xử lý các bài trong batch một cách tuần tự để tránh quá tải API | |
batch_summaries = [] | |
for article in batch: | |
# Thêm jitter vào delay để tránh đồng bộ hóa các yêu cầu | |
delay = random.uniform(1.0, 3.0) | |
await asyncio.sleep(delay) | |
summary = await _summarize_article(article, model) | |
batch_summaries.append(summary) | |
all_summaries.extend(batch_summaries) | |
# Nếu còn batch tiếp theo, đợi để tránh vượt quá quota | |
if i + BATCH_SIZE < len(articles): | |
wait_time = random.uniform(DELAY_BETWEEN_BATCHES * 0.9, DELAY_BETWEEN_BATCHES * 1.1) | |
print(f"Hoàn thành batch. Đang đợi {wait_time:.2f}s trước batch tiếp theo để tránh quá tải quota...") | |
await asyncio.sleep(wait_time) | |
return all_summaries | |
async def _synthesize_newsletter(summaries, model): | |
"""Reduce phase: Tổng hợp các tóm tắt thành một bản tin hoàn chỉnh sử dụng Gemini API""" | |
# Giới hạn số lượng tóm tắt để đảm bảo không vượt quá token limit | |
# if len(summaries) > 15: | |
# print(f"Giới hạn số lượng tóm tắt từ {len(summaries)} xuống 15 để phù hợp với giới hạn token") | |
# summaries = summaries[:15] | |
# Chuẩn bị dữ liệu đầu vào cho prompt | |
summaries_text = "" | |
for i, s in enumerate(summaries, 1): | |
summaries_text += f"{i}. **{s['title']}** ({s['source']}): {s['summary']} [Link]({s['url']})\n\n" | |
# Prompt để tạo bản tin | |
prompt = f""" | |
Below are summaries from {len(summaries)} financial articles published in the last 24 hours: | |
{summaries_text} | |
Write a comprehensive market report based on these summaries. The report should: | |
1. Have an overall headline for the entire report | |
2. Be organized into clear sections by topic, such as: | |
- Macroeconomic Developments | |
- Corporate News | |
- Stock Market Performance | |
- Cryptocurrency and Fintech | |
- Expert Opinions and Forecasts | |
3. Each section should have at least 3-5 concise news points, with sources cited | |
4. End with a brief conclusion about the overall market trends | |
Format the report in Markdown. Ensure clear numbering and proper Markdown syntax. | |
""" | |
try: | |
# Thêm jitter vào delay trước khi gọi API tổng hợp | |
await asyncio.sleep(random.uniform(3.0, 5.0)) | |
# Sử dụng hàm gọi API có retry | |
newsletter = await _call_ai_with_retry(prompt, model) | |
# Thêm phần thông tin về ngày tạo | |
today = datetime.now().strftime("%d/%m/%Y") | |
newsletter = f"# DAILY MARKET REPORT - {today}\n\n" + newsletter | |
# Thêm phần footer | |
footer = """ | |
--- | |
*This report was automatically generated by AI Financial Dashboard based on data from multiple reliable financial news sources.* | |
*Note: This is not investment advice.* | |
""" | |
newsletter += footer | |
return newsletter | |
except Exception as e: | |
print(f"Lỗi khi tạo bản tin: {str(e)}") | |
return """# DAILY MARKET REPORT | |
We're sorry, but the market report could not be automatically generated due to API limitations. Please try again later. | |
--- | |
*Note: The system has collected data but could not generate the report due to API quota limitations.* | |
""" | |
async def run_news_summary_pipeline(): | |
"""Hàm chính để chạy toàn bộ pipeline tạo bản tin""" | |
start_time = time.time() | |
# 1. Khởi tạo model | |
model = genai.GenerativeModel(MODEL_NAME) | |
# 2. Thu thập tin tức từ nhiều nguồn song song | |
print("Đang thu thập tin tức từ các nguồn...") | |
tasks = [ | |
fetch_from_newsapi(), | |
fetch_from_marketaux() | |
] | |
# Chờ tất cả các task hoàn thành | |
results = await asyncio.gather(*tasks) | |
# Gộp kết quả từ các nguồn | |
all_articles = [] | |
for articles in results: | |
if articles: | |
all_articles.extend(articles) | |
print(f"Đã thu thập {len(all_articles)} bài báo từ các nguồn.") | |
# 3. Chuẩn hóa và loại bỏ trùng lặp | |
articles = _normalize_and_deduplicate(all_articles) | |
print(f"Sau khi lọc: {len(articles)} bài báo duy nhất.") | |
# 4. Tóm tắt từng bài báo sử dụng Groq API (nhanh hơn, quota cao hơn) | |
print("Bắt đầu tóm tắt các bài báo với Groq API...") | |
try: | |
summaries = await _summarize_articles_with_groq(articles) | |
print(f"Đã tóm tắt {len(summaries)} bài báo với Groq API.") | |
except Exception as e: | |
print(f"Lỗi khi sử dụng Groq API: {str(e)}. Chuyển sang sử dụng Gemini API...") | |
# Fallback sang Gemini nếu có lỗi với Groq | |
summaries = await _summarize_articles_in_batches(articles, model) | |
print(f"Đã tóm tắt {len(summaries)} bài báo với Gemini API.") | |
# 5. Tổng hợp bản tin sử dụng Gemini API (tốt hơn với việc viết nội dung dài) | |
print("Đang tạo bản tin tổng hợp với Gemini API...") | |
newsletter = await _synthesize_newsletter(summaries, model) | |
# Thống kê thời gian thực hiện | |
end_time = time.time() | |
duration = end_time - start_time | |
print(f"Pipeline hoàn tất trong {duration:.2f} giây.") | |
return newsletter |