Edwin Salguero
chore: enterprise-grade project structure, robust .gitignore, and directory cleanup
9289e29
""" | |
FinRL Agent for Algorithmic Trading | |
This module provides a FinRL-based reinforcement learning agent that can be integrated | |
with the existing algorithmic trading system. It supports various RL algorithms | |
including PPO, A2C, DDPG, and TD3, and can work with Alpaca broker for real trading. | |
""" | |
import numpy as np | |
import pandas as pd | |
import gymnasium as gym | |
from gymnasium import spaces | |
from stable_baselines3 import PPO, A2C, DDPG, TD3 | |
from stable_baselines3.common.vec_env import DummyVecEnv | |
from stable_baselines3.common.callbacks import EvalCallback | |
import torch | |
import logging | |
from typing import Dict, List, Tuple, Optional, Any | |
from dataclasses import dataclass | |
import yaml | |
import inspect | |
logger = logging.getLogger(__name__) | |
class FinRLConfig: | |
"""Configuration for FinRL agent""" | |
algorithm: str = "PPO" # PPO, A2C, DDPG, TD3 | |
learning_rate: float = 0.0003 | |
batch_size: int = 64 | |
buffer_size: int = 1000000 | |
learning_starts: int = 100 | |
gamma: float = 0.99 | |
tau: float = 0.005 | |
train_freq: int = 1 | |
gradient_steps: int = 1 | |
target_update_interval: int = 1 | |
exploration_fraction: float = 0.1 | |
exploration_initial_eps: float = 1.0 | |
exploration_final_eps: float = 0.05 | |
max_grad_norm: float = 10.0 | |
verbose: int = 1 | |
tensorboard_log: str = "logs/finrl_tensorboard" | |
class TradingEnvironment(gym.Env): | |
""" | |
Custom trading environment for FinRL | |
This environment simulates a trading scenario where the agent can: | |
- Buy, sell, or hold positions | |
- Use technical indicators for decision making | |
- Manage portfolio value and risk | |
- Integrate with Alpaca broker for real trading | |
""" | |
def __init__(self, data: pd.DataFrame, config: Dict[str, Any], | |
initial_balance: float = 100000, transaction_fee: float = 0.001, | |
max_position: int = 100, use_real_broker: bool = False): | |
super().__init__() | |
self.data = data | |
self.config = config | |
self.initial_balance = initial_balance | |
self.transaction_fee = transaction_fee | |
self.max_position = max_position | |
self.use_real_broker = use_real_broker | |
# Initialize Alpaca broker if using real trading | |
self.alpaca_broker = None | |
if use_real_broker: | |
try: | |
from .alpaca_broker import AlpacaBroker | |
self.alpaca_broker = AlpacaBroker(config) | |
logger.info("Alpaca broker initialized for FinRL environment") | |
except Exception as e: | |
logger.error(f"Failed to initialize Alpaca broker: {e}") | |
self.use_real_broker = False | |
# Reset state | |
self.reset() | |
# Define action space: [-1, 0, 1] for sell, hold, buy | |
self.action_space = spaces.Discrete(3) | |
# Define observation space | |
# Features: OHLCV + technical indicators + portfolio state | |
n_features = len(self._get_features(self.data.iloc[0])) | |
self.observation_space = spaces.Box( | |
low=-np.inf, high=np.inf, shape=(n_features,), dtype=np.float32 | |
) | |
def _get_features(self, row: pd.Series) -> np.ndarray: | |
"""Extract features from market data row""" | |
features = [] | |
# Price features | |
features.extend([ | |
row['open'], row['high'], row['low'], row['close'], row['volume'] | |
]) | |
# Technical indicators (if available) | |
for indicator in ['sma_20', 'sma_50', 'rsi', 'bb_upper', 'bb_lower', 'macd']: | |
if indicator in row.index: | |
features.append(row[indicator]) | |
else: | |
features.append(0.0) | |
# Portfolio state | |
features.extend([ | |
self.balance, | |
self.position, | |
self.portfolio_value, | |
self.total_return | |
]) | |
return np.array(features, dtype=np.float32) | |
def _calculate_portfolio_value(self) -> float: | |
"""Calculate current portfolio value""" | |
current_price = self.data.iloc[self.current_step]['close'] | |
return self.balance + (self.position * current_price) | |
def _calculate_reward(self) -> float: | |
"""Calculate reward based on portfolio performance""" | |
current_value = self._calculate_portfolio_value() | |
previous_value = self.previous_portfolio_value | |
# Calculate return | |
if previous_value > 0: | |
return (current_value - previous_value) / previous_value | |
else: | |
return 0.0 | |
def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, Dict]: | |
"""Execute one step in the environment""" | |
# Get current market data | |
current_data = self.data.iloc[self.current_step] | |
current_price = current_data['close'] | |
# Execute action | |
if action == 0: # Sell | |
if self.position > 0: | |
shares_to_sell = min(self.position, self.max_position) | |
if self.use_real_broker and self.alpaca_broker: | |
# Execute real order with Alpaca | |
result = self.alpaca_broker.place_market_order( | |
symbol=self.config['trading']['symbol'], | |
quantity=shares_to_sell, | |
side='sell' | |
) | |
if result['success']: | |
sell_value = result['filled_avg_price'] * shares_to_sell * (1 - self.transaction_fee) | |
self.balance += sell_value | |
self.position -= shares_to_sell | |
logger.info(f"Real sell order executed: {result['order_id']}") | |
else: | |
logger.warning(f"Real sell order failed: {result['error']}") | |
else: | |
# Simulate order execution | |
sell_value = shares_to_sell * current_price * (1 - self.transaction_fee) | |
self.balance += sell_value | |
self.position -= shares_to_sell | |
elif action == 2: # Buy | |
if self.balance > 0: | |
max_shares = min( | |
int(self.balance / current_price), | |
self.max_position - self.position | |
) | |
if max_shares > 0: | |
if self.use_real_broker and self.alpaca_broker: | |
# Execute real order with Alpaca | |
result = self.alpaca_broker.place_market_order( | |
symbol=self.config['trading']['symbol'], | |
quantity=max_shares, | |
side='buy' | |
) | |
if result['success']: | |
buy_value = result['filled_avg_price'] * max_shares * (1 + self.transaction_fee) | |
self.balance -= buy_value | |
self.position += max_shares | |
logger.info(f"Real buy order executed: {result['order_id']}") | |
else: | |
logger.warning(f"Real buy order failed: {result['error']}") | |
else: | |
# Simulate order execution | |
buy_value = max_shares * current_price * (1 + self.transaction_fee) | |
self.balance -= buy_value | |
self.position += max_shares | |
# Update portfolio value | |
self.previous_portfolio_value = self.portfolio_value | |
self.portfolio_value = self._calculate_portfolio_value() | |
self.total_return = (self.portfolio_value - self.initial_balance) / self.initial_balance | |
# Move to next step | |
self.current_step += 1 | |
# Check if episode is done | |
done = self.current_step >= len(self.data) - 1 | |
# Get observation for next step | |
if not done: | |
observation = self._get_features(self.data.iloc[self.current_step]) | |
else: | |
observation = self._get_features(self.data.iloc[-1]) | |
# Calculate reward | |
reward = self._calculate_reward() | |
# Additional info | |
info = { | |
'portfolio_value': self.portfolio_value, | |
'total_return': self.total_return, | |
'position': self.position, | |
'balance': self.balance, | |
'step': self.current_step | |
} | |
return observation, reward, done, False, info | |
def reset(self, seed: Optional[int] = None) -> Tuple[np.ndarray, Dict]: | |
"""Reset the environment""" | |
super().reset(seed=seed) | |
self.current_step = 0 | |
self.balance = self.initial_balance | |
self.position = 0 | |
self.portfolio_value = self.initial_balance | |
self.previous_portfolio_value = self.initial_balance | |
self.total_return = 0.0 | |
# Get initial observation | |
observation = self._get_features(self.data.iloc[0]) | |
return observation, {} | |
class FinRLAgent: | |
""" | |
FinRL-based reinforcement learning agent for algorithmic trading | |
""" | |
def __init__(self, config: FinRLConfig): | |
self.config = config | |
self.model = None | |
self.env = None | |
self.eval_env = None | |
self.callback = None | |
logger.info(f"Initializing FinRL agent with algorithm: {self.config.algorithm}") | |
def _get_valid_kwargs(self, algo_class): | |
"""Return a dict of config fields valid for the given algorithm class, excluding tensorboard_log.""" | |
sig = inspect.signature(algo_class.__init__) | |
valid_keys = set(sig.parameters.keys()) | |
# Exclude 'self', 'policy', and 'tensorboard_log' which are always passed explicitly | |
valid_keys.discard('self') | |
valid_keys.discard('policy') | |
valid_keys.discard('tensorboard_log') | |
# Build kwargs from config dataclass | |
return {k: getattr(self.config, k) for k in self.config.__dataclass_fields__ if k in valid_keys} | |
def create_environment(self, data: pd.DataFrame, config: Dict[str, Any], | |
initial_balance: float = 100000, use_real_broker: bool = False) -> TradingEnvironment: | |
"""Create trading environment from market data""" | |
return TradingEnvironment( | |
data=data, | |
config=config, | |
initial_balance=initial_balance, | |
transaction_fee=0.001, | |
max_position=100, | |
use_real_broker=use_real_broker | |
) | |
def prepare_data(self, data: pd.DataFrame) -> pd.DataFrame: | |
"""Prepare data with technical indicators for FinRL""" | |
df = data.copy() | |
# Add technical indicators if not present | |
if 'sma_20' not in df.columns: | |
df['sma_20'] = df['close'].rolling(window=20).mean() | |
if 'sma_50' not in df.columns: | |
df['sma_50'] = df['close'].rolling(window=50).mean() | |
if 'rsi' not in df.columns: | |
df['rsi'] = self._calculate_rsi(df['close']) | |
if 'bb_upper' not in df.columns or 'bb_lower' not in df.columns: | |
bb_upper, bb_lower = self._calculate_bollinger_bands(df['close']) | |
df['bb_upper'] = bb_upper | |
df['bb_lower'] = bb_lower | |
if 'macd' not in df.columns: | |
df['macd'] = self._calculate_macd(df['close']) | |
# Fill NaN values | |
df = df.bfill().fillna(0) | |
return df | |
def train(self, data: pd.DataFrame, config: Dict[str, Any], | |
total_timesteps: int = 100000, use_real_broker: bool = False) -> Dict[str, Any]: | |
""" | |
Train the FinRL agent | |
Args: | |
data: Market data for training | |
config: Configuration dictionary | |
total_timesteps: Number of timesteps for training | |
use_real_broker: Whether to use real Alpaca broker during training | |
Returns: | |
Training results dictionary | |
""" | |
try: | |
# Prepare data | |
prepared_data = self.prepare_data(data) | |
# Create environment | |
self.env = self.create_environment(prepared_data, config, use_real_broker=use_real_broker) | |
# Create evaluation environment (without real broker) | |
eval_data = prepared_data.copy() | |
self.eval_env = self.create_environment(eval_data, config, use_real_broker=False) | |
# Create callback for evaluation | |
finrl_config = config.get('finrl', {}) | |
training_config = finrl_config.get('training', {}) | |
model_save_path = training_config.get('model_save_path', 'models/finrl') | |
tensorboard_log = finrl_config.get('tensorboard_log', self.config.tensorboard_log) | |
eval_freq = training_config.get('eval_freq', 1000) | |
self.callback = EvalCallback( | |
self.eval_env, | |
best_model_save_path=model_save_path, | |
log_path=tensorboard_log, | |
eval_freq=eval_freq, | |
deterministic=True, | |
render=False | |
) | |
# Initialize model based on algorithm | |
if self.config.algorithm == "PPO": | |
algo_kwargs = self._get_valid_kwargs(PPO) | |
self.model = PPO( | |
"MlpPolicy", | |
self.env, | |
**algo_kwargs, | |
tensorboard_log=self.config.tensorboard_log | |
) | |
elif self.config.algorithm == "A2C": | |
algo_kwargs = self._get_valid_kwargs(A2C) | |
self.model = A2C( | |
"MlpPolicy", | |
self.env, | |
**algo_kwargs, | |
tensorboard_log=self.config.tensorboard_log | |
) | |
elif self.config.algorithm == "DDPG": | |
algo_kwargs = self._get_valid_kwargs(DDPG) | |
self.model = DDPG( | |
"MlpPolicy", | |
self.env, | |
**algo_kwargs, | |
tensorboard_log=self.config.tensorboard_log | |
) | |
elif self.config.algorithm == "TD3": | |
algo_kwargs = self._get_valid_kwargs(TD3) | |
self.model = TD3( | |
"MlpPolicy", | |
self.env, | |
**algo_kwargs, | |
tensorboard_log=self.config.tensorboard_log | |
) | |
else: | |
raise ValueError(f"Unsupported algorithm: {self.config.algorithm}") | |
# Train the model | |
logger.info(f"Starting training with {total_timesteps} timesteps") | |
self.model.learn( | |
total_timesteps=total_timesteps, | |
callback=self.callback, | |
progress_bar=True | |
) | |
# Save the final model | |
model_path = f"{model_save_path}/final_model" | |
self.model.save(model_path) | |
logger.info(f"Training completed. Model saved to {model_path}") | |
return { | |
'success': True, | |
'algorithm': self.config.algorithm, | |
'total_timesteps': total_timesteps, | |
'model_path': model_path | |
} | |
except Exception as e: | |
logger.error(f"Error during training: {e}") | |
return { | |
'success': False, | |
'error': str(e) | |
} | |
def predict(self, data: pd.DataFrame, config: Dict[str, Any], | |
use_real_broker: bool = False) -> Dict[str, Any]: | |
""" | |
Make predictions using the trained model | |
Args: | |
data: Market data for prediction | |
config: Configuration dictionary | |
use_real_broker: Whether to use real Alpaca broker for execution | |
Returns: | |
Prediction results dictionary | |
""" | |
try: | |
if self.model is None: | |
# Try to load model | |
finrl_config = config.get('finrl', {}) | |
inference_config = finrl_config.get('inference', {}) | |
model_path = inference_config.get('model_path', 'models/finrl/final_model') | |
use_trained_model = inference_config.get('use_trained_model', True) | |
if use_trained_model: | |
self.model = self._load_model(model_path, config) | |
if self.model is None: | |
return {'success': False, 'error': 'No trained model available'} | |
else: | |
return {'success': False, 'error': 'No model available for prediction'} | |
# Prepare data | |
prepared_data = self.prepare_data(data) | |
# Create environment | |
env = self.create_environment(prepared_data, config, use_real_broker=use_real_broker) | |
# Run prediction | |
obs, _ = env.reset() | |
done = False | |
actions = [] | |
rewards = [] | |
portfolio_values = [] | |
while not done: | |
action, _ = self.model.predict(obs, deterministic=True) | |
obs, reward, done, _, info = env.step(action) | |
actions.append(action) | |
rewards.append(reward) | |
portfolio_values.append(info['portfolio_value']) | |
# Calculate final metrics | |
initial_value = config.get('trading', {}).get('capital', 100000) | |
final_value = portfolio_values[-1] if portfolio_values else initial_value | |
total_return = (final_value - initial_value) / initial_value | |
return { | |
'success': True, | |
'actions': actions, | |
'rewards': rewards, | |
'portfolio_values': portfolio_values, | |
'initial_value': initial_value, | |
'final_value': final_value, | |
'total_return': total_return, | |
'total_trades': len([a for a in actions if a != 1]) # Count non-hold actions | |
} | |
except Exception as e: | |
logger.error(f"Error during prediction: {e}") | |
return { | |
'success': False, | |
'error': str(e) | |
} | |
def evaluate(self, data: pd.DataFrame, config: Dict[str, Any], | |
use_real_broker: bool = False) -> Dict[str, Any]: | |
""" | |
Evaluate the trained model on test data | |
Args: | |
data: Market data for evaluation | |
config: Configuration dictionary | |
use_real_broker: Whether to use real Alpaca broker for execution | |
Returns: | |
Evaluation results dictionary | |
""" | |
try: | |
if self.model is None: | |
raise ValueError("Model not trained") | |
# Prepare data | |
prepared_data = self.prepare_data(data) | |
# Create environment | |
env = self.create_environment(prepared_data, config, use_real_broker=use_real_broker) | |
# Run evaluation | |
obs, _ = env.reset() | |
done = False | |
actions = [] | |
rewards = [] | |
portfolio_values = [] | |
while not done: | |
action, _ = self.model.predict(obs, deterministic=True) | |
obs, reward, done, _, info = env.step(action) | |
actions.append(action) | |
rewards.append(reward) | |
portfolio_values.append(info['portfolio_value']) | |
# Calculate evaluation metrics | |
initial_value = config.get('trading', {}).get('capital', 100000) | |
final_value = portfolio_values[-1] if portfolio_values else initial_value | |
total_return = (final_value - initial_value) / initial_value | |
# Calculate additional metrics | |
total_trades = len([a for a in actions if a != 1]) # Count non-hold actions | |
avg_reward = np.mean(rewards) if rewards else 0 | |
max_drawdown = self._calculate_max_drawdown(portfolio_values) | |
return { | |
'success': True, | |
'total_return': total_return, | |
'total_trades': total_trades, | |
'avg_reward': avg_reward, | |
'max_drawdown': max_drawdown, | |
'final_portfolio_value': final_value, | |
'initial_portfolio_value': initial_value, | |
'actions': actions, | |
'rewards': rewards, | |
'portfolio_values': portfolio_values | |
} | |
except Exception as e: | |
logger.error(f"Error during evaluation: {e}") | |
return { | |
'success': False, | |
'error': str(e) | |
} | |
def save_model(self, model_path: str) -> bool: | |
""" | |
Save the trained model | |
Args: | |
model_path: Path to save the model | |
Returns: | |
True if successful, False otherwise | |
""" | |
try: | |
if self.model is None: | |
raise ValueError("Model not trained") | |
self.model.save(model_path) | |
logger.info(f"Model saved to {model_path}") | |
return True | |
except Exception as e: | |
logger.error(f"Error saving model: {e}") | |
return False | |
def load_model(self, model_path: str, config: Dict[str, Any]) -> bool: | |
""" | |
Load a trained model | |
Args: | |
model_path: Path to the model | |
config: Configuration dictionary | |
Returns: | |
True if successful, False otherwise | |
""" | |
try: | |
self.model = self._load_model(model_path, config) | |
if self.model is None: | |
return False | |
logger.info(f"Model loaded from {model_path}") | |
return True | |
except Exception as e: | |
logger.error(f"Error loading model: {e}") | |
return False | |
def _calculate_max_drawdown(self, portfolio_values: List[float]) -> float: | |
"""Calculate maximum drawdown from portfolio values""" | |
if not portfolio_values: | |
return 0.0 | |
peak = portfolio_values[0] | |
max_drawdown = 0.0 | |
for value in portfolio_values: | |
if value > peak: | |
peak = value | |
drawdown = (peak - value) / peak | |
max_drawdown = max(max_drawdown, drawdown) | |
return max_drawdown | |
def _load_model(self, model_path: str, config: Dict[str, Any]): | |
"""Load a trained model""" | |
try: | |
# Get algorithm from config or use default | |
finrl_config = config.get('finrl', {}) | |
algorithm = finrl_config.get('algorithm', self.config.algorithm) | |
if algorithm == "PPO": | |
return PPO.load(model_path) | |
elif algorithm == "A2C": | |
return A2C.load(model_path) | |
elif algorithm == "DDPG": | |
return DDPG.load(model_path) | |
elif algorithm == "TD3": | |
return TD3.load(model_path) | |
else: | |
logger.error(f"Unsupported algorithm for model loading: {algorithm}") | |
return None | |
except Exception as e: | |
logger.error(f"Error loading model: {e}") | |
return None | |
def _calculate_rsi(self, prices: pd.Series, period: int = 14) -> pd.Series: | |
"""Calculate RSI indicator""" | |
delta = prices.diff() | |
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() | |
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() | |
rs = gain / loss | |
return 100 - (100 / (1 + rs)) | |
def _calculate_bollinger_bands(self, prices: pd.Series, period: int = 20, std_dev: int = 2) -> Tuple[pd.Series, pd.Series]: | |
"""Calculate Bollinger Bands""" | |
sma = prices.rolling(window=period).mean() | |
std = prices.rolling(window=period).std() | |
upper_band = sma + (std * std_dev) | |
lower_band = sma - (std * std_dev) | |
return upper_band, lower_band | |
def _calculate_macd(self, prices: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> pd.Series: | |
"""Calculate MACD indicator""" | |
ema_fast = prices.ewm(span=fast).mean() | |
ema_slow = prices.ewm(span=slow).mean() | |
macd = ema_fast - ema_slow | |
return macd | |
def create_finrl_agent_from_config(config: FinRLConfig) -> FinRLAgent: | |
"""Create a FinRL agent from configuration""" | |
return FinRLAgent(config) |