File size: 9,326 Bytes
859af74
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
import pandas as pd
import numpy as np
import logging
from typing import Dict, Any, Optional
from .agent_base import Agent

class StrategyAgent(Agent):
    def __init__(self, config: Dict[str, Any]):
        super().__init__(config)
        self.symbol = config['trading']['symbol']
        self.capital = config['trading']['capital']
        self.max_position = config['risk']['max_position']
        self.max_drawdown = config['risk']['max_drawdown']
        self.logger.info(f"Strategy agent initialized for {self.symbol} with capital {self.capital}")
    
    def act(self, data: pd.DataFrame) -> Dict[str, Any]:
        """
        Analyze market data and generate trading signals.
        
        Args:
            data: DataFrame with OHLCV market data
            
        Returns:
            Dictionary containing trading signal
        """
        try:
            self.logger.info(f"Analyzing {len(data)} data points for {self.symbol}")
            
            # Validate data
            if data.empty:
                self.logger.warning("Empty data received")
                return self._generate_no_action_signal()
            
            # Calculate technical indicators
            indicators = self._calculate_indicators(data)
            
            # Generate trading signal
            signal = self._generate_signal(data, indicators)
            
            # Log the signal
            self.log_action(signal)
            
            return signal
            
        except Exception as e:
            self.log_error(e, "Error in strategy analysis")
            return self._generate_no_action_signal()
    
    def _calculate_indicators(self, data: pd.DataFrame) -> Dict[str, Any]:
        """Calculate technical indicators from market data"""
        try:
            close_prices = data['close'].values
            
            # Simple Moving Averages
            sma_20 = self._calculate_sma(close_prices, 20)
            sma_50 = self._calculate_sma(close_prices, 50)
            
            # RSI
            rsi = self._calculate_rsi(close_prices, 14)
            
            # Bollinger Bands
            bb_upper, bb_lower = self._calculate_bollinger_bands(close_prices, 20, 2)
            
            # MACD
            macd, signal_line = self._calculate_macd(close_prices)
            
            indicators = {
                'sma_20': sma_20,
                'sma_50': sma_50,
                'rsi': rsi,
                'bb_upper': bb_upper,
                'bb_lower': bb_lower,
                'macd': macd,
                'macd_signal': signal_line
            }
            
            self.logger.debug(f"Calculated indicators: {list(indicators.keys())}")
            return indicators
            
        except Exception as e:
            self.log_error(e, "Error calculating indicators")
            return {}
    
    def _generate_signal(self, data: pd.DataFrame, indicators: Dict[str, Any]) -> Dict[str, Any]:
        """Generate trading signal based on indicators"""
        try:
            if not indicators:
                return self._generate_no_action_signal()
            
            current_price = data['close'].iloc[-1]
            current_volume = data['volume'].iloc[-1]
            
            # Get latest indicator values
            sma_20 = indicators['sma_20'][-1] if len(indicators['sma_20']) > 0 else current_price
            sma_50 = indicators['sma_50'][-1] if len(indicators['sma_50']) > 0 else current_price
            rsi = indicators['rsi'][-1] if len(indicators['rsi']) > 0 else 50
            bb_upper = indicators['bb_upper'][-1] if len(indicators['bb_upper']) > 0 else current_price * 1.02
            bb_lower = indicators['bb_lower'][-1] if len(indicators['bb_lower']) > 0 else current_price * 0.98
            
            # Simple strategy: Buy when price is above SMA20 and RSI < 70
            # Sell when price is below SMA20 or RSI > 80
            action = 'hold'
            quantity = 0
            confidence = 0.5
            
            if current_price > sma_20 and rsi < 70:
                action = 'buy'
                quantity = self._calculate_position_size(current_price)
                confidence = 0.7
                self.logger.info(f"BUY signal: Price {current_price} > SMA20 {sma_20}, RSI {rsi}")
                
            elif current_price < sma_20 or rsi > 80:
                action = 'sell'
                quantity = self._calculate_position_size(current_price)
                confidence = 0.6
                self.logger.info(f"SELL signal: Price {current_price} < SMA20 {sma_20}, RSI {rsi}")
            
            return {
                'action': action,
                'symbol': self.symbol,
                'quantity': quantity,
                'price': current_price,
                'confidence': confidence,
                'timestamp': data.index[-1] if hasattr(data.index[-1], 'timestamp') else None,
                'indicators': {
                    'sma_20': sma_20,
                    'sma_50': sma_50,
                    'rsi': rsi,
                    'bb_upper': bb_upper,
                    'bb_lower': bb_lower
                }
            }
            
        except Exception as e:
            self.log_error(e, "Error generating signal")
            return self._generate_no_action_signal()
    
    def _calculate_position_size(self, price: float) -> int:
        """Calculate position size based on risk management rules"""
        try:
            # Simple position sizing: use 10% of capital per trade
            position_value = self.capital * 0.1
            quantity = int(position_value / price)
            
            # Apply max position limit
            quantity = min(quantity, self.max_position)
            
            # Ensure minimum quantity
            if quantity < 1:
                quantity = 1
                
            return quantity
            
        except Exception as e:
            self.log_error(e, "Error calculating position size")
            return 1
    
    def _generate_no_action_signal(self) -> Dict[str, Any]:
        """Generate a no-action signal"""
        return {
            'action': 'hold',
            'symbol': self.symbol,
            'quantity': 0,
            'price': 0,
            'confidence': 0.0,
            'timestamp': None,
            'indicators': {}
        }
    
    # Technical indicator calculations
    def _calculate_sma(self, prices: np.ndarray, window: int) -> np.ndarray:
        """Calculate Simple Moving Average"""
        if len(prices) < window:
            return np.array([])
        return np.convolve(prices, np.ones(window)/window, mode='valid')
    
    def _calculate_rsi(self, prices: np.ndarray, window: int = 14) -> np.ndarray:
        """Calculate Relative Strength Index"""
        if len(prices) < window + 1:
            return np.array([])
        
        deltas = np.diff(prices)
        gains = np.where(deltas > 0, deltas, 0)
        losses = np.where(deltas < 0, -deltas, 0)
        
        avg_gains = np.convolve(gains, np.ones(window)/window, mode='valid')
        avg_losses = np.convolve(losses, np.ones(window)/window, mode='valid')
        
        rs = avg_gains / (avg_losses + 1e-10)  # Avoid division by zero
        rsi = 100 - (100 / (1 + rs))
        
        return rsi
    
    def _calculate_bollinger_bands(self, prices: np.ndarray, window: int = 20, std_dev: float = 2) -> tuple:
        """Calculate Bollinger Bands"""
        if len(prices) < window:
            return np.array([]), np.array([])
        
        sma = self._calculate_sma(prices, window)
        if len(sma) == 0:
            return np.array([]), np.array([])
        
        # Calculate rolling standard deviation
        std = np.array([np.std(prices[i:i+window]) for i in range(len(prices) - window + 1)])
        
        upper_band = sma + (std_dev * std)
        lower_band = sma - (std_dev * std)
        
        return upper_band, lower_band
    
    def _calculate_macd(self, prices: np.ndarray, fast: int = 12, slow: int = 26, signal: int = 9) -> tuple:
        """Calculate MACD (Moving Average Convergence Divergence)"""
        if len(prices) < slow:
            return np.array([]), np.array([])
        
        ema_fast = self._calculate_ema(prices, fast)
        ema_slow = self._calculate_ema(prices, slow)
        
        if len(ema_fast) == 0 or len(ema_slow) == 0:
            return np.array([]), np.array([])
        
        # Align lengths
        min_len = min(len(ema_fast), len(ema_slow))
        ema_fast = ema_fast[-min_len:]
        ema_slow = ema_slow[-min_len:]
        
        macd_line = ema_fast - ema_slow
        signal_line = self._calculate_ema(macd_line, signal)
        
        return macd_line, signal_line
    
    def _calculate_ema(self, prices: np.ndarray, window: int) -> np.ndarray:
        """Calculate Exponential Moving Average"""
        if len(prices) < window:
            return np.array([])
        
        alpha = 2 / (window + 1)
        ema = np.zeros(len(prices))
        ema[0] = prices[0]
        
        for i in range(1, len(prices)):
            ema[i] = alpha * prices[i] + (1 - alpha) * ema[i-1]
        
        return ema[window-1:]  # Return only the valid EMA values