# modules/api_clients.py import os import aiohttp import asyncio from dotenv import load_dotenv from twelvedata_api import TwelveDataAPI # Load environment variables load_dotenv() # API Keys ALPHA_VANTAGE_API_KEY = os.getenv("ALPHA_VANTAGE_API_KEY") NEWS_API_KEY = os.getenv("NEWS_API_KEY") MARKETAUX_API_KEY = os.getenv("MARKETAUX_API_KEY") TWELVEDATA_API_KEY = os.getenv("TWELVEDATA_API_KEY") # Initialize TwelveDataAPI for reuse td_api = TwelveDataAPI(TWELVEDATA_API_KEY) class AlphaVantageClient: """Client for interacting with the Alpha Vantage API""" BASE_URL = "https://www.alphavantage.co/query" @staticmethod async def get_company_overview(symbol): """Get company overview information""" params = { 'function': 'OVERVIEW', 'symbol': symbol, 'apikey': ALPHA_VANTAGE_API_KEY } async with aiohttp.ClientSession() as session: async with session.get(AlphaVantageClient.BASE_URL, params=params) as response: return await response.json() @staticmethod async def get_income_statement(symbol): """Get company income statement""" params = { 'function': 'INCOME_STATEMENT', 'symbol': symbol, 'apikey': ALPHA_VANTAGE_API_KEY } async with aiohttp.ClientSession() as session: async with session.get(AlphaVantageClient.BASE_URL, params=params) as response: return await response.json() @staticmethod async def get_balance_sheet(symbol): """Get company balance sheet""" params = { 'function': 'BALANCE_SHEET', 'symbol': symbol, 'apikey': ALPHA_VANTAGE_API_KEY } async with aiohttp.ClientSession() as session: async with session.get(AlphaVantageClient.BASE_URL, params=params) as response: return await response.json() @staticmethod async def get_cash_flow(symbol): """Get company cash flow statement""" params = { 'function': 'CASH_FLOW', 'symbol': symbol, 'apikey': ALPHA_VANTAGE_API_KEY } async with aiohttp.ClientSession() as session: async with session.get(AlphaVantageClient.BASE_URL, params=params) as response: return await response.json() @staticmethod async def get_news_sentiment(symbol): """Get news sentiment for a company""" params = { 'function': 'NEWS_SENTIMENT', 'tickers': symbol, 'apikey': ALPHA_VANTAGE_API_KEY } async with aiohttp.ClientSession() as session: async with session.get(AlphaVantageClient.BASE_URL, params=params) as response: return await response.json() @staticmethod async def get_global_quote(symbol): """Get real-time quote information for a company""" params = { 'function': 'GLOBAL_QUOTE', 'symbol': symbol, 'apikey': ALPHA_VANTAGE_API_KEY } async with aiohttp.ClientSession() as session: async with session.get(AlphaVantageClient.BASE_URL, params=params) as response: return await response.json() class NewsAPIClient: """Client for interacting with the NewsAPI""" BASE_URL = "https://newsapi.org/v2/everything" @staticmethod async def get_company_news(company_name, days=7): """Get news about a specific company from the last N days""" params = { 'q': company_name, 'sortBy': 'publishedAt', 'language': 'en', 'pageSize': 25, 'apiKey': NEWS_API_KEY } async with aiohttp.ClientSession() as session: async with session.get(NewsAPIClient.BASE_URL, params=params) as response: return await response.json() @staticmethod async def get_market_news(days=1): """Get general financial market news from the last N days""" params = { 'q': 'stock market OR finance OR investing OR economy', 'sortBy': 'publishedAt', 'language': 'en', 'pageSize': 30, 'apiKey': NEWS_API_KEY } async with aiohttp.ClientSession() as session: async with session.get(NewsAPIClient.BASE_URL, params=params) as response: return await response.json() class MarketauxClient: """Client for interacting with the Marketaux Financial News API""" BASE_URL = "https://api.marketaux.com/v1/news/all" @staticmethod async def get_company_news(symbol, days=7): """Get news about a specific company symbol from the last N days""" params = { 'symbols': symbol, 'filter_entities': 'true', 'language': 'en', 'api_token': MARKETAUX_API_KEY } async with aiohttp.ClientSession() as session: async with session.get(MarketauxClient.BASE_URL, params=params) as response: return await response.json() @staticmethod async def get_market_news(days=1): """Get general financial market news from the last N days""" params = { 'industries': 'Financial Services,Technology', 'language': 'en', 'limit': 30, 'api_token': MARKETAUX_API_KEY } async with aiohttp.ClientSession() as session: async with session.get(MarketauxClient.BASE_URL, params=params) as response: return await response.json() # Helper functions that utilize TwelveDataAPI for price data def get_price_history(symbol, time_period='1_year'): """ Get price history using TwelveDataAPI (non-async function) This reuses the existing TwelveDataAPI implementation """ # Map of time periods to appropriate parameters for TwelveDataAPI logic_map = { 'intraday': {'interval': '15min', 'outputsize': 120}, '1_week': {'interval': '1h', 'outputsize': 40}, '1_month': {'interval': '1day', 'outputsize': 22}, '3_months': {'interval': '1day', 'outputsize': 66}, '6_months': {'interval': '1day', 'outputsize': 120}, 'year_to_date': {'interval': '1day', 'outputsize': 120}, '1_year': {'interval': '1week', 'outputsize': 52}, '5_years': {'interval': '1month', 'outputsize': 60}, 'max': {'interval': '1month', 'outputsize': 120} } params = logic_map.get(time_period) if not params: return {"error": f"Khoảng thời gian '{time_period}' không hợp lệ."} # Call TwelveDataAPI synchronously (it's already optimized internally) return td_api.get_time_series(symbol=symbol, **params)