ML for TradingLesson 5
End-to-End ML Trading Pipeline
Build a complete machine learning trading pipeline from data to live signals.
15 minute read
4 key takeaways
The Full ML Trading Pipeline
Here's how to build an end-to-end system that takes price data and produces live trading signals.
Architecture Overview
- 1. Data Ingestion: Fetch OHLCV for universe
- 2. Feature Engineering: Create 30-50 features
- 3. Label Creation: Define what we're predicting
- 4. Model Training: Fit on historical data with proper CV
- 5. Model Evaluation: Test on holdout period
- 6. Live Signal Generation: Score new data daily
- 7. Position Management: Convert signals to orders
- 8. Monitoring: Track performance vs. backtest
python
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
class MLTradingPipeline:
def __init__(self):
self.model = None
self.scaler = StandardScaler()
self.last_train_date = None
def fetch_data(self, symbols, start_date, end_date):
"""Fetch OHLCV data"""
data = {}
for sym in symbols:
df = fetch_ohlcv(sym, start_date, end_date)
data[sym] = df
return data
def create_features(self, df):
"""Engineer features"""
df['Return'] = df['Close'].pct_change()
df['SMA_20'] = df['Close'].rolling(20).mean()
df['RSI'] = calculate_rsi(df['Close'], 14)
df['Volatility'] = df['Return'].rolling(20).std()
# Add 30+ more features
return df
def create_labels(self, df):
"""Define target"""
df['Future_Return'] = df['Close'].shift(-5) / df['Close'] - 1
df['Label'] = (df['Future_Return'] > 0).astype(int)
return df
def train_model(self, train_data):
"""Train on historical data"""
X = train_data[feature_cols].dropna()
y = train_data.loc[X.index, 'Label']
X_scaled = self.scaler.fit_transform(X)
self.model = RandomForestClassifier(n_estimators=100)
self.model.fit(X_scaled, y)
self.last_train_date = train_data.index[-1]
def generate_signals(self, recent_data):
"""Generate trading signals for today"""
features = self.create_features(recent_data)
X = features[feature_cols].loc[[features.index[-1]]]
X_scaled = self.scaler.transform(X)
prediction = self.model.predict(X_scaled)[0]
probability = self.model.predict_proba(X_scaled)[0, 1]
return {
'signal': 'BUY' if prediction == 1 else 'SELL',
'confidence': probability
}
# Usage
pipeline = MLTradingPipeline()
# Train (weekly)
historical_data = pipeline.fetch_data(['AAPL'], '2020-01-01', '2024-01-01')
for sym in historical_data:
df = historical_data[sym]
df = pipeline.create_features(df)
df = pipeline.create_labels(df)
pipeline.train_model(df)
# Deploy (daily)
today_data = pipeline.fetch_data(['AAPL'], today, today)
signal = pipeline.generate_signals(today_data['AAPL'])
print(f"Signal: {signal['signal']} (confidence: {signal['confidence']:.2%})")
Retraining Strategy
- Daily retraining: Train on latest 2 years of data every night
- Weekly retraining: Computationally cheaper, less responsive
- Monthly retraining: For high-latency strategies only
- Out-of-sample test period: Always hold out the most recent 3-6 months as a true test
Monitoring: Backtest vs. Live
python
# Track if live performance matches backtest
daily_metrics = {
'date': today,
'backtest_sharpe': 1.8, # From historical backtest
'live_sharpe': track_sharpe_ytd(), # From live trading
'win_rate_backtest': 0.55,
'win_rate_live': calculate_win_rate_ytd(),
'avg_loss_backtest': -0.50,
'avg_loss_live': calculate_avg_loss_ytd()
}
# If live Sharpe < backtest/2, something broke - retrain or stop trading
if daily_metrics['live_sharpe'] < daily_metrics['backtest_sharpe'] / 2:
alert("Model degradation detected! Investigate or pause trading.")
The Full Loop
Backtest → Train on History → Paper Trade → Monitor → Compare to Backtest → Adjust → Repeat. This iteration is how you improve over time.
Key Takeaways
- Pipeline: Data → Features → Labels → Train → Evaluate → Signals → Orders
- Automation: Retrain regularly (daily/weekly) on new data
- Monitor: Track live performance vs. backtest performance
- Integrate with Lab: Deploy signals to strategy builder