Edwin Salguero
Initial commit: Enhanced Algorithmic Trading System with Synthetic Data Generation, Comprehensive Logging, and Extensive Testing
859af74
import logging
import time
import pandas as pd
from typing import Dict, Any, Optional
from .data_ingestion import load_data, validate_data
from .strategy_agent import StrategyAgent
from .execution_agent import ExecutionAgent
logger = logging.getLogger(__name__)
def run(config: Dict[str, Any]) -> Dict[str, Any]:
"""
Main orchestration function that coordinates the trading workflow.
Args:
config: Configuration dictionary
Returns:
Dictionary containing execution results and statistics
"""
start_time = time.time()
logger.info("Starting trading system orchestration")
try:
# Initialize workflow results
workflow_result = {
'success': False,
'data_loaded': False,
'signal_generated': False,
'order_executed': False,
'execution_result': None,
'errors': [],
'execution_time': 0
}
# Step 1: Load market data
logger.info("Step 1: Loading market data")
data = load_data(config)
if data is not None and not data.empty:
workflow_result['data_loaded'] = True
logger.info(f"Successfully loaded {len(data)} data points")
# Validate data quality
if validate_data(data):
logger.info("Data validation passed")
else:
logger.warning("Data validation failed, but continuing with workflow")
else:
logger.error("Failed to load market data")
workflow_result['errors'].append("Failed to load market data")
return workflow_result
# Step 2: Generate trading signal
logger.info("Step 2: Generating trading signal")
strategy_agent = StrategyAgent(config)
signal = strategy_agent.act(data)
if signal and signal.get('action') != 'hold':
workflow_result['signal_generated'] = True
logger.info(f"Generated signal: {signal['action']} {signal['quantity']} {signal['symbol']}")
else:
logger.info("No actionable signal generated (hold)")
workflow_result['signal_generated'] = True # Hold is still a valid signal
# Step 3: Execute order
logger.info("Step 3: Executing order")
execution_agent = ExecutionAgent(config)
execution_result = execution_agent.act(signal)
if execution_result['success']:
workflow_result['order_executed'] = True
workflow_result['execution_result'] = execution_result
logger.info("Order executed successfully")
else:
logger.error(f"Order execution failed: {execution_result.get('error', 'Unknown error')}")
workflow_result['errors'].append(f"Order execution failed: {execution_result.get('error')}")
# Calculate execution time
workflow_result['execution_time'] = time.time() - start_time
workflow_result['success'] = workflow_result['data_loaded'] and workflow_result['signal_generated']
logger.info(f"Trading workflow completed in {workflow_result['execution_time']:.2f} seconds")
return workflow_result
except Exception as e:
logger.error(f"Error in trading workflow: {e}", exc_info=True)
workflow_result = {
'success': False,
'data_loaded': False,
'signal_generated': False,
'order_executed': False,
'execution_result': None,
'errors': [str(e)],
'execution_time': time.time() - start_time
}
return workflow_result
def run_backtest(config: Dict[str, Any], start_date: str = '2024-01-01', end_date: str = '2024-12-31') -> Dict[str, Any]:
"""
Run backtesting simulation over historical data.
Args:
config: Configuration dictionary
start_date: Start date for backtest
end_date: End date for backtest
Returns:
Dictionary containing backtest results
"""
logger.info(f"Starting backtest from {start_date} to {end_date}")
try:
# Load historical data
data = load_data(config)
if data is None or data.empty:
logger.error("No data available for backtest")
return {'success': False, 'error': 'No data available'}
# Filter data for backtest period
data['timestamp'] = pd.to_datetime(data['timestamp'])
mask = (data['timestamp'] >= start_date) & (data['timestamp'] <= end_date)
backtest_data = data.loc[mask]
if backtest_data.empty:
logger.error("No data available for specified backtest period")
return {'success': False, 'error': 'No data for backtest period'}
logger.info(f"Running backtest on {len(backtest_data)} data points")
# Initialize agents
strategy_agent = StrategyAgent(config)
execution_agent = ExecutionAgent(config)
# Track backtest results
trades = []
portfolio_value = config['trading']['capital']
positions = {}
# Run simulation
for i in range(len(backtest_data)):
current_data = backtest_data.iloc[:i+1]
if len(current_data) < 50: # Need minimum data for indicators
continue
# Generate signal
signal = strategy_agent.act(current_data)
# Execute if not hold
if signal['action'] != 'hold':
execution_result = execution_agent.act(signal)
trades.append({
'timestamp': current_data.index[-1],
'signal': signal,
'execution': execution_result
})
# Update portfolio (simplified)
if execution_result['success']:
symbol = signal['symbol']
if signal['action'] == 'buy':
positions[symbol] = positions.get(symbol, 0) + signal['quantity']
portfolio_value -= execution_result['total_value']
elif signal['action'] == 'sell':
positions[symbol] = positions.get(symbol, 0) - signal['quantity']
portfolio_value += execution_result['total_value']
# Calculate final portfolio value
final_value = portfolio_value
for symbol, quantity in positions.items():
if quantity > 0:
final_price = backtest_data['close'].iloc[-1]
final_value += quantity * final_price
# Calculate performance metrics
total_return = (final_value - config['trading']['capital']) / config['trading']['capital']
backtest_results = {
'success': True,
'start_date': start_date,
'end_date': end_date,
'initial_capital': config['trading']['capital'],
'final_value': final_value,
'total_return': total_return,
'total_trades': len(trades),
'trades': trades,
'positions': positions
}
logger.info(f"Backtest completed: {total_return:.2%} return over {len(trades)} trades")
return backtest_results
except Exception as e:
logger.error(f"Error in backtest: {e}", exc_info=True)
return {'success': False, 'error': str(e)}
def run_live_trading(config: Dict[str, Any], duration_minutes: int = 60) -> Dict[str, Any]:
"""
Run live trading simulation for a specified duration.
Args:
config: Configuration dictionary
duration_minutes: Duration to run live trading in minutes
Returns:
Dictionary containing live trading results
"""
logger.info(f"Starting live trading simulation for {duration_minutes} minutes")
try:
import time
from datetime import datetime, timedelta
end_time = datetime.now() + timedelta(minutes=duration_minutes)
trades = []
while datetime.now() < end_time:
# Run single trading cycle
result = run(config)
if result['order_executed'] and result['execution_result']['success']:
trades.append(result['execution_result'])
# Wait before next cycle
time.sleep(60) # Wait 1 minute between cycles
live_results = {
'success': True,
'duration_minutes': duration_minutes,
'total_trades': len(trades),
'trades': trades,
'start_time': datetime.now() - timedelta(minutes=duration_minutes),
'end_time': datetime.now()
}
logger.info(f"Live trading completed: {len(trades)} trades executed")
return live_results
except Exception as e:
logger.error(f"Error in live trading: {e}", exc_info=True)
return {'success': False, 'error': str(e)}