Edwin Salguero
Initial commit: Enhanced Algorithmic Trading System with Synthetic Data Generation, Comprehensive Logging, and Extensive Testing
859af74
import pandas as pd | |
import numpy as np | |
import logging | |
from typing import Dict, Any, Optional | |
from .agent_base import Agent | |
class StrategyAgent(Agent): | |
def __init__(self, config: Dict[str, Any]): | |
super().__init__(config) | |
self.symbol = config['trading']['symbol'] | |
self.capital = config['trading']['capital'] | |
self.max_position = config['risk']['max_position'] | |
self.max_drawdown = config['risk']['max_drawdown'] | |
self.logger.info(f"Strategy agent initialized for {self.symbol} with capital {self.capital}") | |
def act(self, data: pd.DataFrame) -> Dict[str, Any]: | |
""" | |
Analyze market data and generate trading signals. | |
Args: | |
data: DataFrame with OHLCV market data | |
Returns: | |
Dictionary containing trading signal | |
""" | |
try: | |
self.logger.info(f"Analyzing {len(data)} data points for {self.symbol}") | |
# Validate data | |
if data.empty: | |
self.logger.warning("Empty data received") | |
return self._generate_no_action_signal() | |
# Calculate technical indicators | |
indicators = self._calculate_indicators(data) | |
# Generate trading signal | |
signal = self._generate_signal(data, indicators) | |
# Log the signal | |
self.log_action(signal) | |
return signal | |
except Exception as e: | |
self.log_error(e, "Error in strategy analysis") | |
return self._generate_no_action_signal() | |
def _calculate_indicators(self, data: pd.DataFrame) -> Dict[str, Any]: | |
"""Calculate technical indicators from market data""" | |
try: | |
close_prices = data['close'].values | |
# Simple Moving Averages | |
sma_20 = self._calculate_sma(close_prices, 20) | |
sma_50 = self._calculate_sma(close_prices, 50) | |
# RSI | |
rsi = self._calculate_rsi(close_prices, 14) | |
# Bollinger Bands | |
bb_upper, bb_lower = self._calculate_bollinger_bands(close_prices, 20, 2) | |
# MACD | |
macd, signal_line = self._calculate_macd(close_prices) | |
indicators = { | |
'sma_20': sma_20, | |
'sma_50': sma_50, | |
'rsi': rsi, | |
'bb_upper': bb_upper, | |
'bb_lower': bb_lower, | |
'macd': macd, | |
'macd_signal': signal_line | |
} | |
self.logger.debug(f"Calculated indicators: {list(indicators.keys())}") | |
return indicators | |
except Exception as e: | |
self.log_error(e, "Error calculating indicators") | |
return {} | |
def _generate_signal(self, data: pd.DataFrame, indicators: Dict[str, Any]) -> Dict[str, Any]: | |
"""Generate trading signal based on indicators""" | |
try: | |
if not indicators: | |
return self._generate_no_action_signal() | |
current_price = data['close'].iloc[-1] | |
current_volume = data['volume'].iloc[-1] | |
# Get latest indicator values | |
sma_20 = indicators['sma_20'][-1] if len(indicators['sma_20']) > 0 else current_price | |
sma_50 = indicators['sma_50'][-1] if len(indicators['sma_50']) > 0 else current_price | |
rsi = indicators['rsi'][-1] if len(indicators['rsi']) > 0 else 50 | |
bb_upper = indicators['bb_upper'][-1] if len(indicators['bb_upper']) > 0 else current_price * 1.02 | |
bb_lower = indicators['bb_lower'][-1] if len(indicators['bb_lower']) > 0 else current_price * 0.98 | |
# Simple strategy: Buy when price is above SMA20 and RSI < 70 | |
# Sell when price is below SMA20 or RSI > 80 | |
action = 'hold' | |
quantity = 0 | |
confidence = 0.5 | |
if current_price > sma_20 and rsi < 70: | |
action = 'buy' | |
quantity = self._calculate_position_size(current_price) | |
confidence = 0.7 | |
self.logger.info(f"BUY signal: Price {current_price} > SMA20 {sma_20}, RSI {rsi}") | |
elif current_price < sma_20 or rsi > 80: | |
action = 'sell' | |
quantity = self._calculate_position_size(current_price) | |
confidence = 0.6 | |
self.logger.info(f"SELL signal: Price {current_price} < SMA20 {sma_20}, RSI {rsi}") | |
return { | |
'action': action, | |
'symbol': self.symbol, | |
'quantity': quantity, | |
'price': current_price, | |
'confidence': confidence, | |
'timestamp': data.index[-1] if hasattr(data.index[-1], 'timestamp') else None, | |
'indicators': { | |
'sma_20': sma_20, | |
'sma_50': sma_50, | |
'rsi': rsi, | |
'bb_upper': bb_upper, | |
'bb_lower': bb_lower | |
} | |
} | |
except Exception as e: | |
self.log_error(e, "Error generating signal") | |
return self._generate_no_action_signal() | |
def _calculate_position_size(self, price: float) -> int: | |
"""Calculate position size based on risk management rules""" | |
try: | |
# Simple position sizing: use 10% of capital per trade | |
position_value = self.capital * 0.1 | |
quantity = int(position_value / price) | |
# Apply max position limit | |
quantity = min(quantity, self.max_position) | |
# Ensure minimum quantity | |
if quantity < 1: | |
quantity = 1 | |
return quantity | |
except Exception as e: | |
self.log_error(e, "Error calculating position size") | |
return 1 | |
def _generate_no_action_signal(self) -> Dict[str, Any]: | |
"""Generate a no-action signal""" | |
return { | |
'action': 'hold', | |
'symbol': self.symbol, | |
'quantity': 0, | |
'price': 0, | |
'confidence': 0.0, | |
'timestamp': None, | |
'indicators': {} | |
} | |
# Technical indicator calculations | |
def _calculate_sma(self, prices: np.ndarray, window: int) -> np.ndarray: | |
"""Calculate Simple Moving Average""" | |
if len(prices) < window: | |
return np.array([]) | |
return np.convolve(prices, np.ones(window)/window, mode='valid') | |
def _calculate_rsi(self, prices: np.ndarray, window: int = 14) -> np.ndarray: | |
"""Calculate Relative Strength Index""" | |
if len(prices) < window + 1: | |
return np.array([]) | |
deltas = np.diff(prices) | |
gains = np.where(deltas > 0, deltas, 0) | |
losses = np.where(deltas < 0, -deltas, 0) | |
avg_gains = np.convolve(gains, np.ones(window)/window, mode='valid') | |
avg_losses = np.convolve(losses, np.ones(window)/window, mode='valid') | |
rs = avg_gains / (avg_losses + 1e-10) # Avoid division by zero | |
rsi = 100 - (100 / (1 + rs)) | |
return rsi | |
def _calculate_bollinger_bands(self, prices: np.ndarray, window: int = 20, std_dev: float = 2) -> tuple: | |
"""Calculate Bollinger Bands""" | |
if len(prices) < window: | |
return np.array([]), np.array([]) | |
sma = self._calculate_sma(prices, window) | |
if len(sma) == 0: | |
return np.array([]), np.array([]) | |
# Calculate rolling standard deviation | |
std = np.array([np.std(prices[i:i+window]) for i in range(len(prices) - window + 1)]) | |
upper_band = sma + (std_dev * std) | |
lower_band = sma - (std_dev * std) | |
return upper_band, lower_band | |
def _calculate_macd(self, prices: np.ndarray, fast: int = 12, slow: int = 26, signal: int = 9) -> tuple: | |
"""Calculate MACD (Moving Average Convergence Divergence)""" | |
if len(prices) < slow: | |
return np.array([]), np.array([]) | |
ema_fast = self._calculate_ema(prices, fast) | |
ema_slow = self._calculate_ema(prices, slow) | |
if len(ema_fast) == 0 or len(ema_slow) == 0: | |
return np.array([]), np.array([]) | |
# Align lengths | |
min_len = min(len(ema_fast), len(ema_slow)) | |
ema_fast = ema_fast[-min_len:] | |
ema_slow = ema_slow[-min_len:] | |
macd_line = ema_fast - ema_slow | |
signal_line = self._calculate_ema(macd_line, signal) | |
return macd_line, signal_line | |
def _calculate_ema(self, prices: np.ndarray, window: int) -> np.ndarray: | |
"""Calculate Exponential Moving Average""" | |
if len(prices) < window: | |
return np.array([]) | |
alpha = 2 / (window + 1) | |
ema = np.zeros(len(prices)) | |
ema[0] = prices[0] | |
for i in range(1, len(prices)): | |
ema[i] = alpha * prices[i] + (1 - alpha) * ema[i-1] | |
return ema[window-1:] # Return only the valid EMA values | |