Edwin Salguero
Initial commit: Enhanced Algorithmic Trading System with Synthetic Data Generation, Comprehensive Logging, and Extensive Testing
859af74
import logging | |
import time | |
import pandas as pd | |
from typing import Dict, Any, Optional | |
from .data_ingestion import load_data, validate_data | |
from .strategy_agent import StrategyAgent | |
from .execution_agent import ExecutionAgent | |
logger = logging.getLogger(__name__) | |
def run(config: Dict[str, Any]) -> Dict[str, Any]: | |
""" | |
Main orchestration function that coordinates the trading workflow. | |
Args: | |
config: Configuration dictionary | |
Returns: | |
Dictionary containing execution results and statistics | |
""" | |
start_time = time.time() | |
logger.info("Starting trading system orchestration") | |
try: | |
# Initialize workflow results | |
workflow_result = { | |
'success': False, | |
'data_loaded': False, | |
'signal_generated': False, | |
'order_executed': False, | |
'execution_result': None, | |
'errors': [], | |
'execution_time': 0 | |
} | |
# Step 1: Load market data | |
logger.info("Step 1: Loading market data") | |
data = load_data(config) | |
if data is not None and not data.empty: | |
workflow_result['data_loaded'] = True | |
logger.info(f"Successfully loaded {len(data)} data points") | |
# Validate data quality | |
if validate_data(data): | |
logger.info("Data validation passed") | |
else: | |
logger.warning("Data validation failed, but continuing with workflow") | |
else: | |
logger.error("Failed to load market data") | |
workflow_result['errors'].append("Failed to load market data") | |
return workflow_result | |
# Step 2: Generate trading signal | |
logger.info("Step 2: Generating trading signal") | |
strategy_agent = StrategyAgent(config) | |
signal = strategy_agent.act(data) | |
if signal and signal.get('action') != 'hold': | |
workflow_result['signal_generated'] = True | |
logger.info(f"Generated signal: {signal['action']} {signal['quantity']} {signal['symbol']}") | |
else: | |
logger.info("No actionable signal generated (hold)") | |
workflow_result['signal_generated'] = True # Hold is still a valid signal | |
# Step 3: Execute order | |
logger.info("Step 3: Executing order") | |
execution_agent = ExecutionAgent(config) | |
execution_result = execution_agent.act(signal) | |
if execution_result['success']: | |
workflow_result['order_executed'] = True | |
workflow_result['execution_result'] = execution_result | |
logger.info("Order executed successfully") | |
else: | |
logger.error(f"Order execution failed: {execution_result.get('error', 'Unknown error')}") | |
workflow_result['errors'].append(f"Order execution failed: {execution_result.get('error')}") | |
# Calculate execution time | |
workflow_result['execution_time'] = time.time() - start_time | |
workflow_result['success'] = workflow_result['data_loaded'] and workflow_result['signal_generated'] | |
logger.info(f"Trading workflow completed in {workflow_result['execution_time']:.2f} seconds") | |
return workflow_result | |
except Exception as e: | |
logger.error(f"Error in trading workflow: {e}", exc_info=True) | |
workflow_result = { | |
'success': False, | |
'data_loaded': False, | |
'signal_generated': False, | |
'order_executed': False, | |
'execution_result': None, | |
'errors': [str(e)], | |
'execution_time': time.time() - start_time | |
} | |
return workflow_result | |
def run_backtest(config: Dict[str, Any], start_date: str = '2024-01-01', end_date: str = '2024-12-31') -> Dict[str, Any]: | |
""" | |
Run backtesting simulation over historical data. | |
Args: | |
config: Configuration dictionary | |
start_date: Start date for backtest | |
end_date: End date for backtest | |
Returns: | |
Dictionary containing backtest results | |
""" | |
logger.info(f"Starting backtest from {start_date} to {end_date}") | |
try: | |
# Load historical data | |
data = load_data(config) | |
if data is None or data.empty: | |
logger.error("No data available for backtest") | |
return {'success': False, 'error': 'No data available'} | |
# Filter data for backtest period | |
data['timestamp'] = pd.to_datetime(data['timestamp']) | |
mask = (data['timestamp'] >= start_date) & (data['timestamp'] <= end_date) | |
backtest_data = data.loc[mask] | |
if backtest_data.empty: | |
logger.error("No data available for specified backtest period") | |
return {'success': False, 'error': 'No data for backtest period'} | |
logger.info(f"Running backtest on {len(backtest_data)} data points") | |
# Initialize agents | |
strategy_agent = StrategyAgent(config) | |
execution_agent = ExecutionAgent(config) | |
# Track backtest results | |
trades = [] | |
portfolio_value = config['trading']['capital'] | |
positions = {} | |
# Run simulation | |
for i in range(len(backtest_data)): | |
current_data = backtest_data.iloc[:i+1] | |
if len(current_data) < 50: # Need minimum data for indicators | |
continue | |
# Generate signal | |
signal = strategy_agent.act(current_data) | |
# Execute if not hold | |
if signal['action'] != 'hold': | |
execution_result = execution_agent.act(signal) | |
trades.append({ | |
'timestamp': current_data.index[-1], | |
'signal': signal, | |
'execution': execution_result | |
}) | |
# Update portfolio (simplified) | |
if execution_result['success']: | |
symbol = signal['symbol'] | |
if signal['action'] == 'buy': | |
positions[symbol] = positions.get(symbol, 0) + signal['quantity'] | |
portfolio_value -= execution_result['total_value'] | |
elif signal['action'] == 'sell': | |
positions[symbol] = positions.get(symbol, 0) - signal['quantity'] | |
portfolio_value += execution_result['total_value'] | |
# Calculate final portfolio value | |
final_value = portfolio_value | |
for symbol, quantity in positions.items(): | |
if quantity > 0: | |
final_price = backtest_data['close'].iloc[-1] | |
final_value += quantity * final_price | |
# Calculate performance metrics | |
total_return = (final_value - config['trading']['capital']) / config['trading']['capital'] | |
backtest_results = { | |
'success': True, | |
'start_date': start_date, | |
'end_date': end_date, | |
'initial_capital': config['trading']['capital'], | |
'final_value': final_value, | |
'total_return': total_return, | |
'total_trades': len(trades), | |
'trades': trades, | |
'positions': positions | |
} | |
logger.info(f"Backtest completed: {total_return:.2%} return over {len(trades)} trades") | |
return backtest_results | |
except Exception as e: | |
logger.error(f"Error in backtest: {e}", exc_info=True) | |
return {'success': False, 'error': str(e)} | |
def run_live_trading(config: Dict[str, Any], duration_minutes: int = 60) -> Dict[str, Any]: | |
""" | |
Run live trading simulation for a specified duration. | |
Args: | |
config: Configuration dictionary | |
duration_minutes: Duration to run live trading in minutes | |
Returns: | |
Dictionary containing live trading results | |
""" | |
logger.info(f"Starting live trading simulation for {duration_minutes} minutes") | |
try: | |
import time | |
from datetime import datetime, timedelta | |
end_time = datetime.now() + timedelta(minutes=duration_minutes) | |
trades = [] | |
while datetime.now() < end_time: | |
# Run single trading cycle | |
result = run(config) | |
if result['order_executed'] and result['execution_result']['success']: | |
trades.append(result['execution_result']) | |
# Wait before next cycle | |
time.sleep(60) # Wait 1 minute between cycles | |
live_results = { | |
'success': True, | |
'duration_minutes': duration_minutes, | |
'total_trades': len(trades), | |
'trades': trades, | |
'start_time': datetime.now() - timedelta(minutes=duration_minutes), | |
'end_time': datetime.now() | |
} | |
logger.info(f"Live trading completed: {len(trades)} trades executed") | |
return live_results | |
except Exception as e: | |
logger.error(f"Error in live trading: {e}", exc_info=True) | |
return {'success': False, 'error': str(e)} | |