import pandas as pd
import numpy as np
# === ATR ===
def compute_atr(df, window=14):
high = df['high']
low = df['low']
close = df['close']
tr1 = high - low
tr2 = (high - close.shift(1)).abs()
tr3 = (low - close.shift(1)).abs()
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = tr.rolling(window).mean()
return atr
# === Rolling Volatility ===
def compute_rolling_vol(df, window=21):
returns = df['close'].pct_change()
vol = returns.rolling(window).std()
return vol
# === VWAP ===
def compute_vwap(df):
cum_vol_price = (df['close'] * df['volume']).cumsum()
cum_volume = df['volume'].cumsum()
vwap = cum_vol_price / cum_volume
return vwap
# Demo 计算
df['ATR14'] = compute_atr(df, 14)
df['VOL21'] = compute_rolling_vol(df, 21)
df['VWAP'] = compute_vwap(df)
df.tail()
import matplotlib.pyplot as plt
windows = [14, 21, 50]
for w in windows:
df[f'ATR{w}'] = compute_atr(df, w)
df[f'VOL{w}'] = compute_rolling_vol(df, w)
# 画图示例
plt.figure(figsize=(14,5))
for w in windows:
plt.plot(df.index, df[f'ATR{w}'], label=f'ATR{w}')
plt.legend()
plt.title("ATR comparison")
plt.show()
plt.figure(figsize=(14,5))
for w in windows:
plt.plot(df.index, df[f'VOL{w}'], label=f'Vol{w}')
plt.legend()
plt.title("Volatility comparison")
plt.show()
# 特征矩阵
features = pd.DataFrame({
"ATR14": df["ATR14"],
"VOL21": df["VOL21"],
"VWAP": df["VWAP"]
}).dropna()
# === 将特征转为信号(伪代码) ===
"""
signal = 1 if:
close > vwap
and vol < rolling_vol_threshold
and atr in high_volatility_regime
signal = -1 if:
close < vwap
and vol > rolling_vol_threshold
"""
# Python signal demo
df['signal'] = 0
df.loc[df['close'] > df['VWAP'], 'signal'] = 1
df.loc[df['close'] < df['VWAP'], 'signal'] = -1
df[['close', 'VWAP', 'signal']].tail()
from sklearn.ensemble import RandomForestClassifier
from sklearn.inspection import permutation_importance
# 创建目标:未来1根K线是否上涨
df['future_return'] = df['close'].shift(-1).pct_change()
df['label'] = (df['future_return'] > 0).astype(int)
# 准备训练数据
feature_cols = ["ATR14", "VOL21", "VWAP"]
X = df[feature_cols].dropna()
y = df['label'].loc[X.index]
model = RandomForestClassifier(n_estimators=200)
model.fit(X, y)
# Permutation importance
result = permutation_importance(model, X, y, n_repeats=10)
importance = pd.DataFrame({
"feature": feature_cols,
"importance": result.importances_mean
}).sort_values("importance", ascending=False)
print(importance)
# 波动率 regime
vol_threshold = df['VOL21'].median()
df['regime'] = np.where(df['VOL21'] > vol_threshold, 'high_vol', 'low_vol')
# 策略示例:仅在低波动 regime 做多
df['signal'] = 0
df.loc[(df['close'] > df['VWAP']) & (df['regime'] == 'low_vol'), 'signal'] = 1
# 输出对比
df[['close', 'VOL21', 'regime', 'signal']].tail()
你可将 signal 扔回 Backtrader 做回测。
# feature_pipeline.py
import pandas as pd
import numpy as np
def compute_atr(df, window):
high = df['high']
low = df['low']
close = df['close']
tr = pd.concat([
high - low,
(high - close.shift(1)).abs(),
(low - close.shift(1)).abs()
], axis=1).max(axis=1)
return tr.rolling(window).mean()
def compute_rolling_vol(df, window):
return df['close'].pct_change().rolling(window).std()
def compute_vwap(df):
return (df['close'] * df['volume']).cumsum() / df['volume'].cumsum()
def feature_pipeline(df):
df = df.copy()
df['ATR14'] = compute_atr(df, 14)
df['VOL21'] = compute_rolling_vol(df, 21)
df['VWAP'] = compute_vwap(df)
return df.dropna()