Machine Learning Examples

XGBoost ensemble strategy with volatility regime detection. Production-grade ML pipeline with proper train/test split and feature engineering.

6
XGBoost Models
4
Volatility Regimes
8
Features
F1
Optimized Threshold

Ensemble Regime ML Strategy

Advanced~400 linesXGBoostsklearn

This is a complete, production-ready ML strategy using ensemble learning with volatility regime detection. Multiple XGBoost models vote on predictions, with per-regime decision thresholds optimized via F1 score.

ensemble_regime_ml_strategy.pyFull production example
from typing import Any, Dict, Optional, Tuple
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.ensemble import VotingClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.metrics import f1_score
from rlxbt import Strategy, Backtester, rlx


class EnsembleRegimeMLStrategy(Strategy):
    def __init__(
        self, regime_threshold_method: str = "quartiles", horizon_bars: int = 60
    ):
        super().__init__()
        self.name = "Ensemble Regime ML Strategy"
        self.description = "Multi-model ensemble with volatility regime detection"
        self.regime_threshold_method = regime_threshold_method
        self.regime_models: Dict[str, Any] = {}
        self.horizon_bars = int(horizon_bars)
        self.feature_columns = [
            "ret_5m", "ret_15m", "price_volatility",
            "bar_range_pct", "atr_h_pct", "ema5_slope_5",
            "rsi_5", "vwap_dev_15",
        ]
        self.regime_decision_thresholds: Dict[str, float] = {}
        self.global_model = None
        self.global_threshold = 0.5
        self.is_trained = False
        self.vol_thresholds = None

    def create_small_ensemble_models(self):
        """Return a compact yet diverse set of base models."""
        models = []

        # Compact XGB variants with different hyperparams
        xgb_configs = [
            dict(n_estimators=600, learning_rate=0.05, max_depth=4,
                 subsample=0.9, colsample_bytree=0.9, random_state=1),
            dict(n_estimators=800, learning_rate=0.03, max_depth=5,
                 subsample=0.85, colsample_bytree=0.85, random_state=2),
            dict(n_estimators=400, learning_rate=0.08, max_depth=3,
                 subsample=1.0, colsample_bytree=0.8, random_state=3),
            dict(n_estimators=700, learning_rate=0.04, max_depth=6,
                 subsample=0.9, colsample_bytree=1.0, random_state=4),
            dict(n_estimators=500, learning_rate=0.06, max_depth=4,
                 subsample=0.95, colsample_bytree=0.9, random_state=5),
        ]

        for i, cfg in enumerate(xgb_configs):
            models.append((
                f"xgb_{i}",
                xgb.XGBClassifier(
                    **cfg, eval_metric="logloss", n_jobs=-1,
                    tree_method="hist", objective="binary:logistic",
                ),
            ))

        # Add one very simple model for diversity
        models.append(("gnb", GaussianNB()))
        return models

    def calculate_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """Calculate all features for ML model."""
        features = pd.DataFrame(index=df.index)
        close = df["close"]
        high = df["high"]
        low = df["low"]
        vol = df.get("volume", pd.Series(index=df.index, dtype=float))

        # Short-horizon returns
        features["ret_5m"] = (close.shift(1) - close.shift(5)) / close.shift(5) * 100
        features["ret_15m"] = (close.shift(1) - close.shift(15)) / close.shift(15) * 100

        # Micro volatility (15 bars)
        roll_std_15 = close.shift(1).rolling(window=15).std()
        roll_mean_15 = close.shift(1).rolling(window=15).mean()
        features["price_volatility"] = (roll_std_15 / roll_mean_15) * 100

        # One-bar range as % of price
        features["bar_range_pct"] = (high.shift(1) - low.shift(1)) / close.shift(1) * 100

        # ATR over horizon_bars as % of price
        tr1 = (high - low).abs()
        tr2 = (high - close.shift(1)).abs()
        tr3 = (low - close.shift(1)).abs()
        tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
        atr_h = tr.rolling(window=self.horizon_bars).mean()
        features["atr_h_pct"] = (atr_h / close) * 100

        # EMA(5) slope over last 5 bars
        ema5 = close.ewm(span=5, adjust=False).mean()
        features["ema5_slope_5"] = (ema5.shift(1) - ema5.shift(6)) / ema5.shift(6) * 100

        # RSI(5)
        delta = close.diff()
        gain = delta.clip(lower=0)
        loss = -delta.clip(upper=0)
        avg_gain = gain.rolling(window=5).mean()
        avg_loss = loss.rolling(window=5).mean()
        rs = avg_gain / avg_loss.replace(0, np.nan)
        features["rsi_5"] = (100 - (100 / (1 + rs))).shift(1)

        # VWAP deviation
        if not vol.isna().all():
            tp = (high + low + close) / 3.0
            vol_15 = vol.rolling(window=15).sum()
            tpv_15 = (tp * vol).rolling(window=15).sum()
            vwap_15 = tpv_15 / vol_15
            features["vwap_dev_15"] = (close.shift(1) - vwap_15.shift(1)) / vwap_15.shift(1) * 100
        else:
            features["vwap_dev_15"] = 0.0

        # Target
        features["close_next"] = close.shift(-self.horizon_bars)
        features["return_next"] = (features["close_next"] - close) / close * 100

        return features

Model Training with Regime Detection

train_model() method
def train_model(self, X_train: pd.DataFrame, y_train: pd.Series) -> Dict[str, Any]:
    # Determine regimes by volatility quartiles
    q1 = X_train["price_volatility"].quantile(0.25)
    q2 = X_train["price_volatility"].quantile(0.50)
    q3 = X_train["price_volatility"].quantile(0.75)
    self.vol_thresholds = (q1, q2, q3)

    def assign_regime(vol):
        if vol <= q1: return "regime_1"  # Very low vol
        elif vol <= q2: return "regime_2"  # Low vol
        elif vol <= q3: return "regime_3"  # High vol
        else: return "regime_4"  # Very high vol

    train_regime = X_train["price_volatility"].apply(assign_regime)

    # Train separate model for each regime
    for regime in ["regime_1", "regime_2", "regime_3", "regime_4"]:
        mask = train_regime == regime
        Xr, yr = X_train[mask], y_train[mask]
        if len(Xr) < 200: continue

        # Create binary target
        thr = yr.quantile(0.714)  # Top ~29% returns
        yr_bin = (yr > thr).astype(int)

        # Train ensemble
        trained_models = []
        for name, model in self.create_small_ensemble_models():
            model.fit(Xr, yr_bin)
            trained_models.append((name, model))

        voting = VotingClassifier(trained_models, voting="soft")
        voting.fit(Xr, yr_bin)

        # Optimize decision threshold via F1
        val_probs = voting.predict_proba(Xr)[:, 1]
        best_thr, best_f1 = 0.5, 0.0
        for thr in np.linspace(0.4, 0.7, 16):
            preds = (val_probs >= thr).astype(int)
            f1 = f1_score(yr_bin, preds)
            if f1 > best_f1:
                best_f1, best_thr = f1, float(thr)

        self.regime_models[regime] = voting
        self.regime_decision_thresholds[regime] = best_thr

    self.is_trained = True

Complete Usage Example

Run ML strategy with dashboard
def main():
    data_path = "data/BTCUSDT_1m_2025-04-30_2025-07-29.csv"

    strategy = EnsembleRegimeMLStrategy(
        regime_threshold_method="median",
        horizon_bars=15,  # Predict 15 bars ahead
    )

    strategy.run_complete_analysis(
        data_path,
        test_start_date="2025-06-01",
        initial_capital=100_000,
    )

# Output:
# Training models...
# 🚀 Running Ensemble Strategy Backtest...
#
# 📊 BACKTEST REPORT
# ===========================
# Total Return:  23.45%
# Sharpe Ratio:  1.87
# Max Drawdown:  -8.23%
# Trades:        142
# ===========================
#
# Generating Dashboard...

Feature Engineering

Feature Description Window
ret_5m5-bar return percentage5
ret_15m15-bar return percentage15
price_volatility Rolling std / mean (coefficient of variation) 15
bar_range_pct High-Low range as % of close1
atr_h_pctATR over horizon as % of price60
ema5_slope_5EMA(5) slope over 5 bars5
rsi_5Fast RSI indicator5
vwap_dev_15Price deviation from 15-bar VWAP15

Walk-Forward ML (Recommended)

The modern way to implement ML in RLX. Uses the WalkForwardML trainer to automatically handle rolling training and testing windows.

walk_forward_ml_demo.py
from rlxbt.ml import BaseMLStrategy, WalkForwardML
from sklearn.ensemble import RandomForestRegressor

class MyStrategy(BaseMLStrategy):
    def train(self, data):
        X, y = self.prepare_features(data)
        self.model.fit(X, y)

    def generate_signals(self, data):
        X, _ = self.prepare_features(data)
        return self.model.predict(X)

# Run rolling retraining every 500 bars
trainer = WalkForwardML(backtester=bt)
results = trainer.run(
    strategy_class=MyStrategy,
    data=df,
    train_size=2000,
    test_size=500,
    step_size=500
)