Edwin Salguero
Initial commit: Enhanced Algorithmic Trading System with Synthetic Data Generation, Comprehensive Logging, and Extensive Testing
859af74
| import pandas as pd | |
| import numpy as np | |
| import logging | |
| from typing import Dict, Any, Optional | |
| from .agent_base import Agent | |
| class StrategyAgent(Agent): | |
| def __init__(self, config: Dict[str, Any]): | |
| super().__init__(config) | |
| self.symbol = config['trading']['symbol'] | |
| self.capital = config['trading']['capital'] | |
| self.max_position = config['risk']['max_position'] | |
| self.max_drawdown = config['risk']['max_drawdown'] | |
| self.logger.info(f"Strategy agent initialized for {self.symbol} with capital {self.capital}") | |
| def act(self, data: pd.DataFrame) -> Dict[str, Any]: | |
| """ | |
| Analyze market data and generate trading signals. | |
| Args: | |
| data: DataFrame with OHLCV market data | |
| Returns: | |
| Dictionary containing trading signal | |
| """ | |
| try: | |
| self.logger.info(f"Analyzing {len(data)} data points for {self.symbol}") | |
| # Validate data | |
| if data.empty: | |
| self.logger.warning("Empty data received") | |
| return self._generate_no_action_signal() | |
| # Calculate technical indicators | |
| indicators = self._calculate_indicators(data) | |
| # Generate trading signal | |
| signal = self._generate_signal(data, indicators) | |
| # Log the signal | |
| self.log_action(signal) | |
| return signal | |
| except Exception as e: | |
| self.log_error(e, "Error in strategy analysis") | |
| return self._generate_no_action_signal() | |
| def _calculate_indicators(self, data: pd.DataFrame) -> Dict[str, Any]: | |
| """Calculate technical indicators from market data""" | |
| try: | |
| close_prices = data['close'].values | |
| # Simple Moving Averages | |
| sma_20 = self._calculate_sma(close_prices, 20) | |
| sma_50 = self._calculate_sma(close_prices, 50) | |
| # RSI | |
| rsi = self._calculate_rsi(close_prices, 14) | |
| # Bollinger Bands | |
| bb_upper, bb_lower = self._calculate_bollinger_bands(close_prices, 20, 2) | |
| # MACD | |
| macd, signal_line = self._calculate_macd(close_prices) | |
| indicators = { | |
| 'sma_20': sma_20, | |
| 'sma_50': sma_50, | |
| 'rsi': rsi, | |
| 'bb_upper': bb_upper, | |
| 'bb_lower': bb_lower, | |
| 'macd': macd, | |
| 'macd_signal': signal_line | |
| } | |
| self.logger.debug(f"Calculated indicators: {list(indicators.keys())}") | |
| return indicators | |
| except Exception as e: | |
| self.log_error(e, "Error calculating indicators") | |
| return {} | |
| def _generate_signal(self, data: pd.DataFrame, indicators: Dict[str, Any]) -> Dict[str, Any]: | |
| """Generate trading signal based on indicators""" | |
| try: | |
| if not indicators: | |
| return self._generate_no_action_signal() | |
| current_price = data['close'].iloc[-1] | |
| current_volume = data['volume'].iloc[-1] | |
| # Get latest indicator values | |
| sma_20 = indicators['sma_20'][-1] if len(indicators['sma_20']) > 0 else current_price | |
| sma_50 = indicators['sma_50'][-1] if len(indicators['sma_50']) > 0 else current_price | |
| rsi = indicators['rsi'][-1] if len(indicators['rsi']) > 0 else 50 | |
| bb_upper = indicators['bb_upper'][-1] if len(indicators['bb_upper']) > 0 else current_price * 1.02 | |
| bb_lower = indicators['bb_lower'][-1] if len(indicators['bb_lower']) > 0 else current_price * 0.98 | |
| # Simple strategy: Buy when price is above SMA20 and RSI < 70 | |
| # Sell when price is below SMA20 or RSI > 80 | |
| action = 'hold' | |
| quantity = 0 | |
| confidence = 0.5 | |
| if current_price > sma_20 and rsi < 70: | |
| action = 'buy' | |
| quantity = self._calculate_position_size(current_price) | |
| confidence = 0.7 | |
| self.logger.info(f"BUY signal: Price {current_price} > SMA20 {sma_20}, RSI {rsi}") | |
| elif current_price < sma_20 or rsi > 80: | |
| action = 'sell' | |
| quantity = self._calculate_position_size(current_price) | |
| confidence = 0.6 | |
| self.logger.info(f"SELL signal: Price {current_price} < SMA20 {sma_20}, RSI {rsi}") | |
| return { | |
| 'action': action, | |
| 'symbol': self.symbol, | |
| 'quantity': quantity, | |
| 'price': current_price, | |
| 'confidence': confidence, | |
| 'timestamp': data.index[-1] if hasattr(data.index[-1], 'timestamp') else None, | |
| 'indicators': { | |
| 'sma_20': sma_20, | |
| 'sma_50': sma_50, | |
| 'rsi': rsi, | |
| 'bb_upper': bb_upper, | |
| 'bb_lower': bb_lower | |
| } | |
| } | |
| except Exception as e: | |
| self.log_error(e, "Error generating signal") | |
| return self._generate_no_action_signal() | |
| def _calculate_position_size(self, price: float) -> int: | |
| """Calculate position size based on risk management rules""" | |
| try: | |
| # Simple position sizing: use 10% of capital per trade | |
| position_value = self.capital * 0.1 | |
| quantity = int(position_value / price) | |
| # Apply max position limit | |
| quantity = min(quantity, self.max_position) | |
| # Ensure minimum quantity | |
| if quantity < 1: | |
| quantity = 1 | |
| return quantity | |
| except Exception as e: | |
| self.log_error(e, "Error calculating position size") | |
| return 1 | |
| def _generate_no_action_signal(self) -> Dict[str, Any]: | |
| """Generate a no-action signal""" | |
| return { | |
| 'action': 'hold', | |
| 'symbol': self.symbol, | |
| 'quantity': 0, | |
| 'price': 0, | |
| 'confidence': 0.0, | |
| 'timestamp': None, | |
| 'indicators': {} | |
| } | |
| # Technical indicator calculations | |
| def _calculate_sma(self, prices: np.ndarray, window: int) -> np.ndarray: | |
| """Calculate Simple Moving Average""" | |
| if len(prices) < window: | |
| return np.array([]) | |
| return np.convolve(prices, np.ones(window)/window, mode='valid') | |
| def _calculate_rsi(self, prices: np.ndarray, window: int = 14) -> np.ndarray: | |
| """Calculate Relative Strength Index""" | |
| if len(prices) < window + 1: | |
| return np.array([]) | |
| deltas = np.diff(prices) | |
| gains = np.where(deltas > 0, deltas, 0) | |
| losses = np.where(deltas < 0, -deltas, 0) | |
| avg_gains = np.convolve(gains, np.ones(window)/window, mode='valid') | |
| avg_losses = np.convolve(losses, np.ones(window)/window, mode='valid') | |
| rs = avg_gains / (avg_losses + 1e-10) # Avoid division by zero | |
| rsi = 100 - (100 / (1 + rs)) | |
| return rsi | |
| def _calculate_bollinger_bands(self, prices: np.ndarray, window: int = 20, std_dev: float = 2) -> tuple: | |
| """Calculate Bollinger Bands""" | |
| if len(prices) < window: | |
| return np.array([]), np.array([]) | |
| sma = self._calculate_sma(prices, window) | |
| if len(sma) == 0: | |
| return np.array([]), np.array([]) | |
| # Calculate rolling standard deviation | |
| std = np.array([np.std(prices[i:i+window]) for i in range(len(prices) - window + 1)]) | |
| upper_band = sma + (std_dev * std) | |
| lower_band = sma - (std_dev * std) | |
| return upper_band, lower_band | |
| def _calculate_macd(self, prices: np.ndarray, fast: int = 12, slow: int = 26, signal: int = 9) -> tuple: | |
| """Calculate MACD (Moving Average Convergence Divergence)""" | |
| if len(prices) < slow: | |
| return np.array([]), np.array([]) | |
| ema_fast = self._calculate_ema(prices, fast) | |
| ema_slow = self._calculate_ema(prices, slow) | |
| if len(ema_fast) == 0 or len(ema_slow) == 0: | |
| return np.array([]), np.array([]) | |
| # Align lengths | |
| min_len = min(len(ema_fast), len(ema_slow)) | |
| ema_fast = ema_fast[-min_len:] | |
| ema_slow = ema_slow[-min_len:] | |
| macd_line = ema_fast - ema_slow | |
| signal_line = self._calculate_ema(macd_line, signal) | |
| return macd_line, signal_line | |
| def _calculate_ema(self, prices: np.ndarray, window: int) -> np.ndarray: | |
| """Calculate Exponential Moving Average""" | |
| if len(prices) < window: | |
| return np.array([]) | |
| alpha = 2 / (window + 1) | |
| ema = np.zeros(len(prices)) | |
| ema[0] = prices[0] | |
| for i in range(1, len(prices)): | |
| ema[i] = alpha * prices[i] + (1 - alpha) * ema[i-1] | |
| return ema[window-1:] # Return only the valid EMA values | |