finance-bot / modules /news_pipeline.py
tosanoob's picture
feat: add news report
44f095f
import os
import asyncio
import aiohttp
import time
import re
import random
from datetime import datetime, timedelta
import google.generativeai as genai
from dotenv import load_dotenv
from concurrent.futures import ThreadPoolExecutor
from groq import Groq, AsyncGroq
from threading import Lock
# Tải biến môi trường
load_dotenv()
# Cấu hình Gemini AI
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
genai.configure(api_key=GEMINI_API_KEY)
MODEL_NAME = os.getenv("GEMINI_MODEL", "gemini-2.5-flash")
# Cấu hình Groq AI
GROQ_API_KEYS_STR = os.getenv("GROQ_API_KEY", "")
GROQ_MODEL = "meta-llama/llama-4-scout-17b-16e-instruct" # Model Llama4 qua Groq
# Quản lý nhiều API keys cho Groq
class GroqClientManager:
"""Quản lý pool các Groq clients với logic round-robin"""
def __init__(self, api_keys_str):
# Tách chuỗi API keys (key1,key2,key3) thành list
self.api_keys = [key.strip() for key in api_keys_str.split(",") if key.strip()]
if not self.api_keys:
raise ValueError("Không tìm thấy API key hợp lệ cho Groq")
print(f"Khởi tạo {len(self.api_keys)} Groq API clients")
# Tạo pool của các client
self.clients = [AsyncGroq(api_key=key) for key in self.api_keys]
self.current_index = 0
self.lock = Lock()
# Thống kê sử dụng
self.usage_stats = {i: 0 for i in range(len(self.api_keys))}
def get_next_client(self):
"""Lấy client tiếp theo theo cơ chế round-robin"""
with self.lock:
client = self.clients[self.current_index]
# Cập nhật thống kê
self.usage_stats[self.current_index] += 1
# Di chuyển đến key tiếp theo
self.current_index = (self.current_index + 1) % len(self.clients)
return client
def print_usage_stats(self):
"""In thống kê sử dụng của từng API key"""
with self.lock:
print("\n--- Thống kê sử dụng Groq API keys ---")
total_calls = sum(self.usage_stats.values())
for idx, count in self.usage_stats.items():
key_preview = f"{self.api_keys[idx][:8]}..." if len(self.api_keys[idx]) > 10 else self.api_keys[idx]
percentage = (count / total_calls * 100) if total_calls > 0 else 0
print(f"Key {idx+1} ({key_preview}): {count} lần gọi ({percentage:.1f}%)")
print(f"Tổng số lần gọi API: {total_calls}")
print("---------------------------------------\n")
# Khởi tạo singleton manager
groq_client_manager = GroqClientManager(GROQ_API_KEYS_STR)
# Giữ lại một client đơn cho compatibility
groq_client = Groq(api_key=groq_client_manager.api_keys[0] if groq_client_manager.api_keys else "")
MAX_ARTICLES = 30 # 5 for testing, 30 for production
# Các API keys
NEWS_API_KEY = os.getenv("NEWS_API_KEY")
MARKETAUX_API_KEY = os.getenv("MARKETAUX_API_KEY")
ALPHA_VANTAGE_API_KEY = os.getenv("ALPHA_VANTAGE_API_KEY")
# Cấu hình kiểm soát tốc độ gọi API
MAX_REQUESTS_PER_MINUTE = 10 # Để an toàn, giữ dưới 10
BATCH_SIZE = 10 # Số lượng bài báo được xử lý cùng lúc
DELAY_BETWEEN_BATCHES = 10 # Thời gian chờ giữa các batch (giây)
RETRY_DELAY_BASE = 5 # Thời gian cơ sở cho retry (giây)
MAX_RETRIES = 3 # Số lần thử lại tối đa
# Cấu hình riêng cho Groq API
GROQ_BATCH_SIZE = 10 # Số lượng bài báo được xử lý cùng lúc khi dùng Groq (cao hơn Gemini)
GROQ_DELAY_BETWEEN_REQUESTS = 2 # Thời gian chờ giữa các request riêng lẻ với Groq (giây)
GROQ_DELAY_BETWEEN_BATCHES = 5 # Thời gian chờ giữa các batch với Groq (giây)
class NewsArticle:
"""Lớp tiêu chuẩn hóa cho các bài báo từ các nguồn khác nhau"""
def __init__(self, title, description, content, source_name, url, published_at):
self.title = title
self.description = description
self.content = content
self.source_name = source_name
self.url = url
self.published_at = published_at
def __str__(self):
return f"{self.title} ({self.source_name})"
def __eq__(self, other):
if not isinstance(other, NewsArticle):
return False
# So sánh bằng URL hoặc tiêu đề
return self.url == other.url or self.title == other.title
def __hash__(self):
# Sử dụng URL làm hash
return hash(self.url)
async def fetch_from_newsapi():
"""Lấy tin tức từ NewsAPI"""
url = "https://newsapi.org/v2/everything"
yesterday = datetime.now() - timedelta(days=1)
yesterday_str = yesterday.strftime('%Y-%m-%d')
params = {
'q': 'finance OR economy OR stock market OR investing',
'from': yesterday_str,
'language': 'en',
'sortBy': 'publishedAt',
'pageSize': 50,
'apiKey': NEWS_API_KEY
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
articles = []
if data.get('status') == 'ok' and 'articles' in data:
for article in data['articles']:
articles.append(
NewsArticle(
title=article.get('title', ''),
description=article.get('description', ''),
content=article.get('content', ''),
source_name=article.get('source', {}).get('name', 'Unknown'),
url=article.get('url', ''),
published_at=article.get('publishedAt', '')
)
)
return articles
return []
async def fetch_from_marketaux():
"""Lấy tin tức từ Marketaux API"""
url = "https://api.marketaux.com/v1/news/all"
params = {
'symbols': 'AAPL,MSFT,TSLA,AMZN,NVDA,META', # Một số mã chứng khoán phổ biến
'filter_entities': 'true',
'language': 'en',
'published_after': (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%dT%H:%M'),
'limit': 50,
'api_token': MARKETAUX_API_KEY
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
articles = []
if 'data' in data:
for article in data['data']:
articles.append(
NewsArticle(
title=article.get('title', ''),
description=article.get('description', ''),
content=article.get('snippet', ''),
source_name=article.get('source', ''),
url=article.get('url', ''),
published_at=article.get('published_at', '')
)
)
return articles
return []
def _normalize_and_deduplicate(articles_list):
"""Chuẩn hóa và loại bỏ các bài báo trùng lặp"""
# Loại bỏ trùng lặp bằng cách chuyển thành set
unique_articles = set()
for article in articles_list:
if article.title and article.content: # Bỏ qua bài viết không có tiêu đề hoặc nội dung
# Loại bỏ các bài quảng cáo và không liên quan
skip_keywords = ["advertisement", "sponsored", "promotion"]
if not any(keyword in article.title.lower() for keyword in skip_keywords):
unique_articles.add(article)
# Sắp xếp theo thời gian xuất bản (mới nhất trước)
sorted_articles = sorted(
unique_articles,
key=lambda x: datetime.fromisoformat(x.published_at.replace('Z', '+00:00').replace('T', ' ')),
reverse=True
)
# Giới hạn số lượng bài viết
max_articles = min(len(sorted_articles), MAX_ARTICLES) # Giảm số lượng bài từ 30 xuống 20
return sorted_articles[:max_articles]
async def _call_ai_with_retry(prompt, model, retries=MAX_RETRIES):
"""Gọi Gemini API với cơ chế retry và exponential backoff"""
attempt = 0
last_exception = None
while attempt <= retries:
try:
# Thêm jitter vào delay để tránh đồng bộ hóa các yêu cầu
jitter = random.uniform(0.5, 1.5)
if attempt > 0:
# Exponential backoff với jitter
delay = (RETRY_DELAY_BASE * (2 ** (attempt - 1))) * jitter
print(f"Retry lần {attempt}, đợi {delay:.2f} giây...")
await asyncio.sleep(delay)
response = await model.generate_content_async(prompt)
return response.text
except Exception as e:
last_exception = e
print(f"Lỗi khi gọi AI (lần {attempt+1}/{retries+1}): {str(e)}")
# Nếu đây là lỗi quota, thêm thời gian chờ dài hơn
if "429" in str(e) or "quota" in str(e).lower():
# Thêm thời gian chờ dài hơn cho lỗi quota (60-90 giây)
quota_delay = random.uniform(60, 90)
print(f"Phát hiện lỗi quota, đợi {quota_delay:.2f} giây...")
await asyncio.sleep(quota_delay)
attempt += 1
# Nếu đã hết số lần thử lại, ném ngoại lệ
if last_exception:
raise last_exception
else:
raise Exception("Không thể gọi API Gemini sau nhiều lần thử lại")
async def _call_groq_with_retry(prompt, retries=MAX_RETRIES):
"""Gọi Groq API với cơ chế retry và round-robin API keys"""
attempt = 0
last_exception = None
while attempt <= retries:
try:
# Thêm jitter vào delay để tránh đồng bộ hóa các yêu cầu
jitter = random.uniform(0.5, 1.5)
if attempt > 0:
# Exponential backoff với jitter
delay = (RETRY_DELAY_BASE * (2 ** (attempt - 1))) * jitter
print(f"Retry Groq lần {attempt}, đợi {delay:.2f} giây...")
await asyncio.sleep(delay)
# Lấy client tiếp theo từ round-robin pool
client = groq_client_manager.get_next_client()
response = await client.chat.completions.create(
model=GROQ_MODEL,
messages=[
{"role": "user", "content": prompt}
],
temperature=0.3, # Thấp hơn để tóm tắt chính xác
max_tokens=500, # Giới hạn độ dài tóm tắt
top_p=0.95,
stream=False
)
return response.choices[0].message.content
except Exception as e:
last_exception = e
print(f"Lỗi khi gọi Groq API (lần {attempt+1}/{retries+1}): {str(e)}")
# Nếu đây là lỗi rate limit hoặc quota
if "429" in str(e) or "rate" in str(e).lower() or "limit" in str(e).lower():
# Thêm thời gian chờ dài hơn
rate_limit_delay = random.uniform(30, 45)
print(f"Phát hiện lỗi rate limit, đợi {rate_limit_delay:.2f} giây...")
await asyncio.sleep(rate_limit_delay)
attempt += 1
# Nếu đã hết số lần thử lại, ném ngoại lệ
if last_exception:
raise last_exception
else:
raise Exception("Không thể gọi API Groq sau nhiều lần thử lại")
async def _summarize_article_with_groq(article):
"""Tạo tóm tắt cho một bài báo sử dụng Groq API"""
prompt = f"""
Summarize the most important information from this financial article in a concise paragraph (no more than 2-3 sentences).
Focus on key events, figures, trends, or information valuable to investors.
Provide only the summary without any introduction or conclusion.
TITLE: {article.title}
DESCRIPTION: {article.description}
CONTENT: {article.content}
SOURCE: {article.source_name}
"""
try:
# Sử dụng hàm gọi Groq API có retry
summary = await _call_groq_with_retry(prompt)
# Loại bỏ các ký tự đặc biệt và chuẩn hóa
summary = re.sub(r'[\n\r]+', ' ', summary)
summary = re.sub(r'\s{2,}', ' ', summary).strip()
return {
'title': article.title,
'source': article.source_name,
'summary': summary,
'url': article.url
}
except Exception as e:
print(f"Lỗi khi tóm tắt bài báo với Groq: {str(e)}")
return {
'title': article.title,
'source': article.source_name,
'summary': "Unable to summarize this article due to API limitations.",
'url': article.url
}
async def _summarize_article(article, model):
"""Tạo tóm tắt cho một bài báo với Gemini (giữ lại để dự phòng)"""
prompt = f"""
Summarize the most important information from this financial article in a concise paragraph (no more than 2-3 sentences).
Focus on key events, figures, trends, or information valuable to investors.
Provide only the summary without any introduction or conclusion.
TITLE: {article.title}
DESCRIPTION: {article.description}
CONTENT: {article.content}
SOURCE: {article.source_name}
URL: {article.url}
"""
try:
# Sử dụng hàm gọi API có retry
summary = await _call_ai_with_retry(prompt, model)
# Loại bỏ các ký tự đặc biệt và chuẩn hóa
summary = re.sub(r'[\n\r]+', ' ', summary)
summary = re.sub(r'\s{2,}', ' ', summary).strip()
return {
'title': article.title,
'source': article.source_name,
'summary': summary,
'url': article.url
}
except Exception as e:
print(f"Lỗi khi tóm tắt bài báo với Gemini: {str(e)}")
return {
'title': article.title,
'source': article.source_name,
'summary': "Unable to summarize this article due to API limitations.",
'url': article.url
}
async def _summarize_articles_with_groq(articles):
"""Tóm tắt các bài báo bằng Groq API"""
all_summaries = []
# Chia bài viết thành các batch nhỏ
for i in range(0, len(articles), GROQ_BATCH_SIZE):
batch = articles[i:i+GROQ_BATCH_SIZE]
print(f"Đang xử lý batch {i//GROQ_BATCH_SIZE + 1}/{(len(articles) + GROQ_BATCH_SIZE - 1)//GROQ_BATCH_SIZE} ({len(batch)} bài) với Groq")
# Tạo danh sách các coroutines để xử lý song song
tasks = [_summarize_article_with_groq(article) for article in batch]
# Chờ tất cả các task hoàn thành
batch_summaries = await asyncio.gather(*tasks)
all_summaries.extend(batch_summaries)
# Nếu còn batch tiếp theo, đợi để tránh vượt quá quota
if i + GROQ_BATCH_SIZE < len(articles):
wait_time = random.uniform(GROQ_DELAY_BETWEEN_BATCHES * 0.9, GROQ_DELAY_BETWEEN_BATCHES * 1.1)
print(f"Hoàn thành batch. Đang đợi {wait_time:.2f}s trước batch tiếp theo để tránh quá tải quota...")
await asyncio.sleep(wait_time)
# In thống kê sử dụng API keys
groq_client_manager.print_usage_stats()
return all_summaries
async def _summarize_articles_in_batches(articles, model):
"""Tóm tắt các bài báo theo batch với Gemini API (giữ lại để dự phòng)"""
all_summaries = []
# Chia bài viết thành các batch nhỏ
for i in range(0, len(articles), BATCH_SIZE):
batch = articles[i:i+BATCH_SIZE]
print(f"Đang xử lý batch {i//BATCH_SIZE + 1}/{(len(articles) + BATCH_SIZE - 1)//BATCH_SIZE} ({len(batch)} bài)")
# Xử lý các bài trong batch một cách tuần tự để tránh quá tải API
batch_summaries = []
for article in batch:
# Thêm jitter vào delay để tránh đồng bộ hóa các yêu cầu
delay = random.uniform(1.0, 3.0)
await asyncio.sleep(delay)
summary = await _summarize_article(article, model)
batch_summaries.append(summary)
all_summaries.extend(batch_summaries)
# Nếu còn batch tiếp theo, đợi để tránh vượt quá quota
if i + BATCH_SIZE < len(articles):
wait_time = random.uniform(DELAY_BETWEEN_BATCHES * 0.9, DELAY_BETWEEN_BATCHES * 1.1)
print(f"Hoàn thành batch. Đang đợi {wait_time:.2f}s trước batch tiếp theo để tránh quá tải quota...")
await asyncio.sleep(wait_time)
return all_summaries
async def _synthesize_newsletter(summaries, model):
"""Reduce phase: Tổng hợp các tóm tắt thành một bản tin hoàn chỉnh sử dụng Gemini API"""
# Giới hạn số lượng tóm tắt để đảm bảo không vượt quá token limit
# if len(summaries) > 15:
# print(f"Giới hạn số lượng tóm tắt từ {len(summaries)} xuống 15 để phù hợp với giới hạn token")
# summaries = summaries[:15]
# Chuẩn bị dữ liệu đầu vào cho prompt
summaries_text = ""
for i, s in enumerate(summaries, 1):
summaries_text += f"{i}. **{s['title']}** ({s['source']}): {s['summary']} [Link]({s['url']})\n\n"
# Prompt để tạo bản tin
prompt = f"""
Below are summaries from {len(summaries)} financial articles published in the last 24 hours:
{summaries_text}
Write a comprehensive market report based on these summaries. The report should:
1. Have an overall headline for the entire report
2. Be organized into clear sections by topic, such as:
- Macroeconomic Developments
- Corporate News
- Stock Market Performance
- Cryptocurrency and Fintech
- Expert Opinions and Forecasts
3. Each section should have at least 3-5 concise news points, with sources cited
4. End with a brief conclusion about the overall market trends
Format the report in Markdown. Ensure clear numbering and proper Markdown syntax.
"""
try:
# Thêm jitter vào delay trước khi gọi API tổng hợp
await asyncio.sleep(random.uniform(3.0, 5.0))
# Sử dụng hàm gọi API có retry
newsletter = await _call_ai_with_retry(prompt, model)
# Thêm phần thông tin về ngày tạo
today = datetime.now().strftime("%d/%m/%Y")
newsletter = f"# DAILY MARKET REPORT - {today}\n\n" + newsletter
# Thêm phần footer
footer = """
---
*This report was automatically generated by AI Financial Dashboard based on data from multiple reliable financial news sources.*
*Note: This is not investment advice.*
"""
newsletter += footer
return newsletter
except Exception as e:
print(f"Lỗi khi tạo bản tin: {str(e)}")
return """# DAILY MARKET REPORT
We're sorry, but the market report could not be automatically generated due to API limitations. Please try again later.
---
*Note: The system has collected data but could not generate the report due to API quota limitations.*
"""
async def run_news_summary_pipeline():
"""Hàm chính để chạy toàn bộ pipeline tạo bản tin"""
start_time = time.time()
# 1. Khởi tạo model
model = genai.GenerativeModel(MODEL_NAME)
# 2. Thu thập tin tức từ nhiều nguồn song song
print("Đang thu thập tin tức từ các nguồn...")
tasks = [
fetch_from_newsapi(),
fetch_from_marketaux()
]
# Chờ tất cả các task hoàn thành
results = await asyncio.gather(*tasks)
# Gộp kết quả từ các nguồn
all_articles = []
for articles in results:
if articles:
all_articles.extend(articles)
print(f"Đã thu thập {len(all_articles)} bài báo từ các nguồn.")
# 3. Chuẩn hóa và loại bỏ trùng lặp
articles = _normalize_and_deduplicate(all_articles)
print(f"Sau khi lọc: {len(articles)} bài báo duy nhất.")
# 4. Tóm tắt từng bài báo sử dụng Groq API (nhanh hơn, quota cao hơn)
print("Bắt đầu tóm tắt các bài báo với Groq API...")
try:
summaries = await _summarize_articles_with_groq(articles)
print(f"Đã tóm tắt {len(summaries)} bài báo với Groq API.")
except Exception as e:
print(f"Lỗi khi sử dụng Groq API: {str(e)}. Chuyển sang sử dụng Gemini API...")
# Fallback sang Gemini nếu có lỗi với Groq
summaries = await _summarize_articles_in_batches(articles, model)
print(f"Đã tóm tắt {len(summaries)} bài báo với Gemini API.")
# 5. Tổng hợp bản tin sử dụng Gemini API (tốt hơn với việc viết nội dung dài)
print("Đang tạo bản tin tổng hợp với Gemini API...")
newsletter = await _synthesize_newsletter(summaries, model)
# Thống kê thời gian thực hiện
end_time = time.time()
duration = end_time - start_time
print(f"Pipeline hoàn tất trong {duration:.2f} giây.")
return newsletter