Edwin Salguero
Initial commit: Enhanced Algorithmic Trading System with Synthetic Data Generation, Comprehensive Logging, and Extensive Testing
859af74
import pandas as pd
import numpy as np
import logging
from typing import Dict, Any, Optional
from .agent_base import Agent
class StrategyAgent(Agent):
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.symbol = config['trading']['symbol']
self.capital = config['trading']['capital']
self.max_position = config['risk']['max_position']
self.max_drawdown = config['risk']['max_drawdown']
self.logger.info(f"Strategy agent initialized for {self.symbol} with capital {self.capital}")
def act(self, data: pd.DataFrame) -> Dict[str, Any]:
"""
Analyze market data and generate trading signals.
Args:
data: DataFrame with OHLCV market data
Returns:
Dictionary containing trading signal
"""
try:
self.logger.info(f"Analyzing {len(data)} data points for {self.symbol}")
# Validate data
if data.empty:
self.logger.warning("Empty data received")
return self._generate_no_action_signal()
# Calculate technical indicators
indicators = self._calculate_indicators(data)
# Generate trading signal
signal = self._generate_signal(data, indicators)
# Log the signal
self.log_action(signal)
return signal
except Exception as e:
self.log_error(e, "Error in strategy analysis")
return self._generate_no_action_signal()
def _calculate_indicators(self, data: pd.DataFrame) -> Dict[str, Any]:
"""Calculate technical indicators from market data"""
try:
close_prices = data['close'].values
# Simple Moving Averages
sma_20 = self._calculate_sma(close_prices, 20)
sma_50 = self._calculate_sma(close_prices, 50)
# RSI
rsi = self._calculate_rsi(close_prices, 14)
# Bollinger Bands
bb_upper, bb_lower = self._calculate_bollinger_bands(close_prices, 20, 2)
# MACD
macd, signal_line = self._calculate_macd(close_prices)
indicators = {
'sma_20': sma_20,
'sma_50': sma_50,
'rsi': rsi,
'bb_upper': bb_upper,
'bb_lower': bb_lower,
'macd': macd,
'macd_signal': signal_line
}
self.logger.debug(f"Calculated indicators: {list(indicators.keys())}")
return indicators
except Exception as e:
self.log_error(e, "Error calculating indicators")
return {}
def _generate_signal(self, data: pd.DataFrame, indicators: Dict[str, Any]) -> Dict[str, Any]:
"""Generate trading signal based on indicators"""
try:
if not indicators:
return self._generate_no_action_signal()
current_price = data['close'].iloc[-1]
current_volume = data['volume'].iloc[-1]
# Get latest indicator values
sma_20 = indicators['sma_20'][-1] if len(indicators['sma_20']) > 0 else current_price
sma_50 = indicators['sma_50'][-1] if len(indicators['sma_50']) > 0 else current_price
rsi = indicators['rsi'][-1] if len(indicators['rsi']) > 0 else 50
bb_upper = indicators['bb_upper'][-1] if len(indicators['bb_upper']) > 0 else current_price * 1.02
bb_lower = indicators['bb_lower'][-1] if len(indicators['bb_lower']) > 0 else current_price * 0.98
# Simple strategy: Buy when price is above SMA20 and RSI < 70
# Sell when price is below SMA20 or RSI > 80
action = 'hold'
quantity = 0
confidence = 0.5
if current_price > sma_20 and rsi < 70:
action = 'buy'
quantity = self._calculate_position_size(current_price)
confidence = 0.7
self.logger.info(f"BUY signal: Price {current_price} > SMA20 {sma_20}, RSI {rsi}")
elif current_price < sma_20 or rsi > 80:
action = 'sell'
quantity = self._calculate_position_size(current_price)
confidence = 0.6
self.logger.info(f"SELL signal: Price {current_price} < SMA20 {sma_20}, RSI {rsi}")
return {
'action': action,
'symbol': self.symbol,
'quantity': quantity,
'price': current_price,
'confidence': confidence,
'timestamp': data.index[-1] if hasattr(data.index[-1], 'timestamp') else None,
'indicators': {
'sma_20': sma_20,
'sma_50': sma_50,
'rsi': rsi,
'bb_upper': bb_upper,
'bb_lower': bb_lower
}
}
except Exception as e:
self.log_error(e, "Error generating signal")
return self._generate_no_action_signal()
def _calculate_position_size(self, price: float) -> int:
"""Calculate position size based on risk management rules"""
try:
# Simple position sizing: use 10% of capital per trade
position_value = self.capital * 0.1
quantity = int(position_value / price)
# Apply max position limit
quantity = min(quantity, self.max_position)
# Ensure minimum quantity
if quantity < 1:
quantity = 1
return quantity
except Exception as e:
self.log_error(e, "Error calculating position size")
return 1
def _generate_no_action_signal(self) -> Dict[str, Any]:
"""Generate a no-action signal"""
return {
'action': 'hold',
'symbol': self.symbol,
'quantity': 0,
'price': 0,
'confidence': 0.0,
'timestamp': None,
'indicators': {}
}
# Technical indicator calculations
def _calculate_sma(self, prices: np.ndarray, window: int) -> np.ndarray:
"""Calculate Simple Moving Average"""
if len(prices) < window:
return np.array([])
return np.convolve(prices, np.ones(window)/window, mode='valid')
def _calculate_rsi(self, prices: np.ndarray, window: int = 14) -> np.ndarray:
"""Calculate Relative Strength Index"""
if len(prices) < window + 1:
return np.array([])
deltas = np.diff(prices)
gains = np.where(deltas > 0, deltas, 0)
losses = np.where(deltas < 0, -deltas, 0)
avg_gains = np.convolve(gains, np.ones(window)/window, mode='valid')
avg_losses = np.convolve(losses, np.ones(window)/window, mode='valid')
rs = avg_gains / (avg_losses + 1e-10) # Avoid division by zero
rsi = 100 - (100 / (1 + rs))
return rsi
def _calculate_bollinger_bands(self, prices: np.ndarray, window: int = 20, std_dev: float = 2) -> tuple:
"""Calculate Bollinger Bands"""
if len(prices) < window:
return np.array([]), np.array([])
sma = self._calculate_sma(prices, window)
if len(sma) == 0:
return np.array([]), np.array([])
# Calculate rolling standard deviation
std = np.array([np.std(prices[i:i+window]) for i in range(len(prices) - window + 1)])
upper_band = sma + (std_dev * std)
lower_band = sma - (std_dev * std)
return upper_band, lower_band
def _calculate_macd(self, prices: np.ndarray, fast: int = 12, slow: int = 26, signal: int = 9) -> tuple:
"""Calculate MACD (Moving Average Convergence Divergence)"""
if len(prices) < slow:
return np.array([]), np.array([])
ema_fast = self._calculate_ema(prices, fast)
ema_slow = self._calculate_ema(prices, slow)
if len(ema_fast) == 0 or len(ema_slow) == 0:
return np.array([]), np.array([])
# Align lengths
min_len = min(len(ema_fast), len(ema_slow))
ema_fast = ema_fast[-min_len:]
ema_slow = ema_slow[-min_len:]
macd_line = ema_fast - ema_slow
signal_line = self._calculate_ema(macd_line, signal)
return macd_line, signal_line
def _calculate_ema(self, prices: np.ndarray, window: int) -> np.ndarray:
"""Calculate Exponential Moving Average"""
if len(prices) < window:
return np.array([])
alpha = 2 / (window + 1)
ema = np.zeros(len(prices))
ema[0] = prices[0]
for i in range(1, len(prices)):
ema[i] = alpha * prices[i] + (1 - alpha) * ema[i-1]
return ema[window-1:] # Return only the valid EMA values