File size: 9,326 Bytes
859af74 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
import pandas as pd
import numpy as np
import logging
from typing import Dict, Any, Optional
from .agent_base import Agent
class StrategyAgent(Agent):
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.symbol = config['trading']['symbol']
self.capital = config['trading']['capital']
self.max_position = config['risk']['max_position']
self.max_drawdown = config['risk']['max_drawdown']
self.logger.info(f"Strategy agent initialized for {self.symbol} with capital {self.capital}")
def act(self, data: pd.DataFrame) -> Dict[str, Any]:
"""
Analyze market data and generate trading signals.
Args:
data: DataFrame with OHLCV market data
Returns:
Dictionary containing trading signal
"""
try:
self.logger.info(f"Analyzing {len(data)} data points for {self.symbol}")
# Validate data
if data.empty:
self.logger.warning("Empty data received")
return self._generate_no_action_signal()
# Calculate technical indicators
indicators = self._calculate_indicators(data)
# Generate trading signal
signal = self._generate_signal(data, indicators)
# Log the signal
self.log_action(signal)
return signal
except Exception as e:
self.log_error(e, "Error in strategy analysis")
return self._generate_no_action_signal()
def _calculate_indicators(self, data: pd.DataFrame) -> Dict[str, Any]:
"""Calculate technical indicators from market data"""
try:
close_prices = data['close'].values
# Simple Moving Averages
sma_20 = self._calculate_sma(close_prices, 20)
sma_50 = self._calculate_sma(close_prices, 50)
# RSI
rsi = self._calculate_rsi(close_prices, 14)
# Bollinger Bands
bb_upper, bb_lower = self._calculate_bollinger_bands(close_prices, 20, 2)
# MACD
macd, signal_line = self._calculate_macd(close_prices)
indicators = {
'sma_20': sma_20,
'sma_50': sma_50,
'rsi': rsi,
'bb_upper': bb_upper,
'bb_lower': bb_lower,
'macd': macd,
'macd_signal': signal_line
}
self.logger.debug(f"Calculated indicators: {list(indicators.keys())}")
return indicators
except Exception as e:
self.log_error(e, "Error calculating indicators")
return {}
def _generate_signal(self, data: pd.DataFrame, indicators: Dict[str, Any]) -> Dict[str, Any]:
"""Generate trading signal based on indicators"""
try:
if not indicators:
return self._generate_no_action_signal()
current_price = data['close'].iloc[-1]
current_volume = data['volume'].iloc[-1]
# Get latest indicator values
sma_20 = indicators['sma_20'][-1] if len(indicators['sma_20']) > 0 else current_price
sma_50 = indicators['sma_50'][-1] if len(indicators['sma_50']) > 0 else current_price
rsi = indicators['rsi'][-1] if len(indicators['rsi']) > 0 else 50
bb_upper = indicators['bb_upper'][-1] if len(indicators['bb_upper']) > 0 else current_price * 1.02
bb_lower = indicators['bb_lower'][-1] if len(indicators['bb_lower']) > 0 else current_price * 0.98
# Simple strategy: Buy when price is above SMA20 and RSI < 70
# Sell when price is below SMA20 or RSI > 80
action = 'hold'
quantity = 0
confidence = 0.5
if current_price > sma_20 and rsi < 70:
action = 'buy'
quantity = self._calculate_position_size(current_price)
confidence = 0.7
self.logger.info(f"BUY signal: Price {current_price} > SMA20 {sma_20}, RSI {rsi}")
elif current_price < sma_20 or rsi > 80:
action = 'sell'
quantity = self._calculate_position_size(current_price)
confidence = 0.6
self.logger.info(f"SELL signal: Price {current_price} < SMA20 {sma_20}, RSI {rsi}")
return {
'action': action,
'symbol': self.symbol,
'quantity': quantity,
'price': current_price,
'confidence': confidence,
'timestamp': data.index[-1] if hasattr(data.index[-1], 'timestamp') else None,
'indicators': {
'sma_20': sma_20,
'sma_50': sma_50,
'rsi': rsi,
'bb_upper': bb_upper,
'bb_lower': bb_lower
}
}
except Exception as e:
self.log_error(e, "Error generating signal")
return self._generate_no_action_signal()
def _calculate_position_size(self, price: float) -> int:
"""Calculate position size based on risk management rules"""
try:
# Simple position sizing: use 10% of capital per trade
position_value = self.capital * 0.1
quantity = int(position_value / price)
# Apply max position limit
quantity = min(quantity, self.max_position)
# Ensure minimum quantity
if quantity < 1:
quantity = 1
return quantity
except Exception as e:
self.log_error(e, "Error calculating position size")
return 1
def _generate_no_action_signal(self) -> Dict[str, Any]:
"""Generate a no-action signal"""
return {
'action': 'hold',
'symbol': self.symbol,
'quantity': 0,
'price': 0,
'confidence': 0.0,
'timestamp': None,
'indicators': {}
}
# Technical indicator calculations
def _calculate_sma(self, prices: np.ndarray, window: int) -> np.ndarray:
"""Calculate Simple Moving Average"""
if len(prices) < window:
return np.array([])
return np.convolve(prices, np.ones(window)/window, mode='valid')
def _calculate_rsi(self, prices: np.ndarray, window: int = 14) -> np.ndarray:
"""Calculate Relative Strength Index"""
if len(prices) < window + 1:
return np.array([])
deltas = np.diff(prices)
gains = np.where(deltas > 0, deltas, 0)
losses = np.where(deltas < 0, -deltas, 0)
avg_gains = np.convolve(gains, np.ones(window)/window, mode='valid')
avg_losses = np.convolve(losses, np.ones(window)/window, mode='valid')
rs = avg_gains / (avg_losses + 1e-10) # Avoid division by zero
rsi = 100 - (100 / (1 + rs))
return rsi
def _calculate_bollinger_bands(self, prices: np.ndarray, window: int = 20, std_dev: float = 2) -> tuple:
"""Calculate Bollinger Bands"""
if len(prices) < window:
return np.array([]), np.array([])
sma = self._calculate_sma(prices, window)
if len(sma) == 0:
return np.array([]), np.array([])
# Calculate rolling standard deviation
std = np.array([np.std(prices[i:i+window]) for i in range(len(prices) - window + 1)])
upper_band = sma + (std_dev * std)
lower_band = sma - (std_dev * std)
return upper_band, lower_band
def _calculate_macd(self, prices: np.ndarray, fast: int = 12, slow: int = 26, signal: int = 9) -> tuple:
"""Calculate MACD (Moving Average Convergence Divergence)"""
if len(prices) < slow:
return np.array([]), np.array([])
ema_fast = self._calculate_ema(prices, fast)
ema_slow = self._calculate_ema(prices, slow)
if len(ema_fast) == 0 or len(ema_slow) == 0:
return np.array([]), np.array([])
# Align lengths
min_len = min(len(ema_fast), len(ema_slow))
ema_fast = ema_fast[-min_len:]
ema_slow = ema_slow[-min_len:]
macd_line = ema_fast - ema_slow
signal_line = self._calculate_ema(macd_line, signal)
return macd_line, signal_line
def _calculate_ema(self, prices: np.ndarray, window: int) -> np.ndarray:
"""Calculate Exponential Moving Average"""
if len(prices) < window:
return np.array([])
alpha = 2 / (window + 1)
ema = np.zeros(len(prices))
ema[0] = prices[0]
for i in range(1, len(prices)):
ema[i] = alpha * prices[i] + (1 - alpha) * ema[i-1]
return ema[window-1:] # Return only the valid EMA values
|