Edwin Salguero
Initial commit: Enhanced Algorithmic Trading System with Synthetic Data Generation, Comprehensive Logging, and Extensive Testing
859af74
import pytest | |
import pandas as pd | |
import numpy as np | |
import tempfile | |
import os | |
from unittest.mock import patch, MagicMock | |
from agentic_ai_system.orchestrator import run, run_backtest, run_live_trading | |
from agentic_ai_system.main import load_config | |
class TestIntegration: | |
"""Integration tests for the entire trading system""" | |
def config(self): | |
"""Sample configuration for integration testing""" | |
return { | |
'data_source': { | |
'type': 'synthetic', | |
'path': 'data/market_data.csv' | |
}, | |
'trading': { | |
'symbol': 'AAPL', | |
'timeframe': '1min', | |
'capital': 100000 | |
}, | |
'risk': { | |
'max_position': 100, | |
'max_drawdown': 0.05 | |
}, | |
'execution': { | |
'broker_api': 'paper', | |
'order_size': 10, | |
'delay_ms': 10, # Fast for testing | |
'success_rate': 1.0 # Always succeed for testing | |
}, | |
'synthetic_data': { | |
'base_price': 150.0, | |
'volatility': 0.02, | |
'trend': 0.001, | |
'noise_level': 0.005, | |
'data_path': 'data/synthetic_market_data.csv' | |
}, | |
'logging': { | |
'log_level': 'INFO', | |
'log_dir': 'logs', | |
'enable_console': True, | |
'enable_file': True | |
} | |
} | |
def test_full_workflow(self, config): | |
"""Test the complete trading workflow""" | |
result = run(config) | |
# Check result structure | |
assert isinstance(result, dict) | |
assert 'success' in result | |
assert 'data_loaded' in result | |
assert 'signal_generated' in result | |
assert 'order_executed' in result | |
assert 'execution_time' in result | |
assert 'errors' in result | |
# Check that data was loaded | |
assert result['data_loaded'] == True | |
# Check that signal was generated | |
assert result['signal_generated'] == True | |
# Check execution time is reasonable | |
assert result['execution_time'] > 0 | |
assert result['execution_time'] < 60 # Should complete within 60 seconds | |
def test_backtest_workflow(self, config): | |
"""Test the backtest workflow""" | |
result = run_backtest(config, '2024-01-01', '2024-01-02') | |
# Check result structure | |
assert isinstance(result, dict) | |
assert 'success' in result | |
if result['success']: | |
assert 'start_date' in result | |
assert 'end_date' in result | |
assert 'initial_capital' in result | |
assert 'final_value' in result | |
assert 'total_return' in result | |
assert 'total_trades' in result | |
assert 'trades' in result | |
assert 'positions' in result | |
# Check that backtest completed | |
assert result['initial_capital'] == config['trading']['capital'] | |
assert result['final_value'] >= 0 | |
assert isinstance(result['total_return'], float) | |
assert result['total_trades'] >= 0 | |
assert isinstance(result['trades'], list) | |
assert isinstance(result['positions'], dict) | |
def test_live_trading_workflow(self, config): | |
"""Test the live trading workflow (short duration)""" | |
# Test with very short duration to avoid long test times | |
result = run_live_trading(config, duration_minutes=1) | |
# Check result structure | |
assert isinstance(result, dict) | |
assert 'success' in result | |
if result['success']: | |
assert 'duration_minutes' in result | |
assert 'total_trades' in result | |
assert 'trades' in result | |
assert 'start_time' in result | |
assert 'end_time' in result | |
# Check that live trading completed | |
assert result['duration_minutes'] == 1 | |
assert result['total_trades'] >= 0 | |
assert isinstance(result['trades'], list) | |
def test_workflow_with_csv_data(self, config): | |
"""Test workflow with CSV data source""" | |
# Create temporary CSV file | |
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as tmp_file: | |
# Generate sample data | |
dates = pd.date_range(start='2024-01-01', periods=100, freq='1min') | |
data = [] | |
for i, date in enumerate(dates): | |
base_price = 150.0 + (i * 0.1) | |
data.append({ | |
'timestamp': date, | |
'open': base_price + np.random.normal(0, 1), | |
'high': base_price + abs(np.random.normal(0, 2)), | |
'low': base_price - abs(np.random.normal(0, 2)), | |
'close': base_price + np.random.normal(0, 1), | |
'volume': np.random.randint(1000, 100000) | |
}) | |
df = pd.DataFrame(data) | |
df.to_csv(tmp_file.name, index=False) | |
config['data_source']['type'] = 'csv' | |
config['data_source']['path'] = tmp_file.name | |
try: | |
result = run(config) | |
assert result['success'] == True | |
assert result['data_loaded'] == True | |
assert result['signal_generated'] == True | |
finally: | |
os.unlink(tmp_file.name) | |
def test_workflow_error_handling(self, config): | |
"""Test workflow error handling""" | |
# Test with invalid configuration | |
invalid_config = config.copy() | |
invalid_config['data_source']['type'] = 'invalid_type' | |
result = run(invalid_config) | |
assert result['success'] == False | |
assert len(result['errors']) > 0 | |
def test_backtest_with_different_periods(self, config): | |
"""Test backtest with different time periods""" | |
# Test short period | |
short_result = run_backtest(config, '2024-01-01', '2024-01-01') | |
assert isinstance(short_result, dict) | |
# Test longer period | |
long_result = run_backtest(config, '2024-01-01', '2024-01-07') | |
assert isinstance(long_result, dict) | |
# Both should be valid results (success or failure) | |
assert 'success' in short_result | |
assert 'success' in long_result | |
def test_system_with_different_symbols(self, config): | |
"""Test system with different trading symbols""" | |
symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA'] | |
for symbol in symbols: | |
test_config = config.copy() | |
test_config['trading']['symbol'] = symbol | |
result = run(test_config) | |
assert result['success'] == True | |
assert result['data_loaded'] == True | |
assert result['signal_generated'] == True | |
def test_system_with_different_capital_amounts(self, config): | |
"""Test system with different capital amounts""" | |
capital_amounts = [10000, 50000, 100000, 500000] | |
for capital in capital_amounts: | |
test_config = config.copy() | |
test_config['trading']['capital'] = capital | |
result = run(test_config) | |
assert result['success'] == True | |
assert result['data_loaded'] == True | |
assert result['signal_generated'] == True | |
def test_execution_failure_simulation(self, config): | |
"""Test system behavior with execution failures""" | |
# Set success rate to 0 to simulate all failures | |
test_config = config.copy() | |
test_config['execution']['success_rate'] = 0.0 | |
result = run(test_config) | |
# System should still complete workflow | |
assert result['success'] == True | |
assert result['data_loaded'] == True | |
assert result['signal_generated'] == True | |
# But order execution should fail | |
if result['order_executed']: | |
assert result['execution_result']['success'] == False | |
def test_data_validation_integration(self, config): | |
"""Test data validation integration""" | |
# Create invalid data | |
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as tmp_file: | |
invalid_data = pd.DataFrame({ | |
'timestamp': pd.date_range('2024-01-01', periods=10, freq='1min'), | |
'open': [150] * 10, | |
'high': [145] * 10, # Invalid: high < open | |
'low': [145] * 10, | |
'close': [152] * 10, | |
'volume': [1000] * 10 | |
}) | |
invalid_data.to_csv(tmp_file.name, index=False) | |
config['data_source']['type'] = 'csv' | |
config['data_source']['path'] = tmp_file.name | |
try: | |
result = run(config) | |
# System should still work (fallback to synthetic data) | |
assert result['success'] == True | |
finally: | |
os.unlink(tmp_file.name) | |
def test_performance_metrics(self, config): | |
"""Test that performance metrics are calculated correctly""" | |
result = run_backtest(config, '2024-01-01', '2024-01-03') | |
if result['success']: | |
# Check that return is calculated correctly | |
initial_capital = result['initial_capital'] | |
final_value = result['final_value'] | |
calculated_return = (final_value - initial_capital) / initial_capital | |
assert abs(result['total_return'] - calculated_return) < 0.001 | |
# Check that trade count is reasonable | |
assert result['total_trades'] >= 0 | |
def test_config_loading(self): | |
"""Test configuration loading functionality""" | |
# Test with valid config | |
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as tmp_file: | |
config_content = """ | |
data_source: | |
type: 'synthetic' | |
path: 'data/market_data.csv' | |
trading: | |
symbol: 'AAPL' | |
timeframe: '1min' | |
capital: 100000 | |
risk: | |
max_position: 100 | |
max_drawdown: 0.05 | |
execution: | |
broker_api: 'paper' | |
order_size: 10 | |
""" | |
tmp_file.write(config_content) | |
tmp_file.flush() | |
try: | |
config = load_config(tmp_file.name) | |
assert config['data_source']['type'] == 'synthetic' | |
assert config['trading']['symbol'] == 'AAPL' | |
assert config['trading']['capital'] == 100000 | |
finally: | |
os.unlink(tmp_file.name) | |
def test_system_scalability(self, config): | |
"""Test system scalability with larger datasets""" | |
# Test with larger synthetic dataset | |
test_config = config.copy() | |
test_config['synthetic_data']['base_price'] = 200.0 | |
test_config['synthetic_data']['volatility'] = 0.03 | |
result = run(test_config) | |
assert result['success'] == True | |
assert result['data_loaded'] == True | |
assert result['signal_generated'] == True | |
# Check execution time is reasonable | |
assert result['execution_time'] < 30 # Should complete within 30 seconds |