图片来源:Unsplash 上的 Agence Olloweb
引言
机器学习模型的选择一直是一个挑战。无论是预测股票价格、诊断疾病,还是优化营销活动,问题始终是:哪个模型最适合我的数据? 传统上,我们依赖交叉验证来测试多个模型——XGBoost、LGBM、随机森林等——然后根据验证性能选择最佳模型。但如果数据集的不同部分需要不同的模型呢?或者,如果动态融合多个模型可以提高准确率呢?
这个想法是在我阅读Deepseeker R1(一种先进的大型语言模型,能够动态调整以提高性能)时产生的。受其基于强化学习(RL)的优化方法启发,我开始思考:我们是否可以在监督学习中应用类似的 RL 策略?与其手动选择模型,为什么不让强化学习自动学习出最佳策略?
想象一个强化学习代理就像一个数据科学家——分析数据集的特征,测试不同的模型,并学习哪些模型表现最佳。更进一步,它不仅仅选择一个模型,还可以根据数据模式动态融合多个模型。例如,在金融数据集中,XGBoost 可能擅长处理结构化趋势,而 LGBM 可能更适合捕捉变量间的交互关系。我们的 RL 系统可以智能地在它们之间切换,甚至自适应地组合它们。
本论文提出了一种全新的强化学习驱动的模型选择与融合框架。我们将这个问题建模为一个马尔可夫决策过程(MDP),其中:
• 状态(state) 表示数据集的特征统计信息。
• 动作(action) 代表选择或融合不同的机器学习模型。
• 奖励(reward) 取决于模型的表现。
• 策略(policy) 通过强化学习训练,以找到最佳的模型选择策略。
与传统方法不同,本方法不会对整个数据集应用单一最优模型,而是学习针对不同数据片段选择最佳模型,甚至动态融合模型。这一方法可以自动化、优化并个性化机器学习流程——减少人工干预,同时提升预测性能。
在本文的最后,我们将看到,强化学习如何彻底改变模型选择,让其更自适应、更智能、更高效——就像一个不断学习和优化决策的专业数据科学家。
方法论:用于监督学习的自适应强化学习模型选择
我们将机器学习模型的自适应选择与融合定义为马尔可夫决策过程(MDP),其由五元组 (S, A, P, R, γ) 组成:
• S(状态):表示当前数据集的统计摘要(例如特征的均值和方差)。
• A(动作):对应于选择单个模型 {XGB, LGBM, RF, DNN, Blend}。
• P(s′∣s, a)(状态转移概率):从当前状态 s 转移到下一状态 s′ 的概率。
• R(s, a)(奖励):在状态 s 下执行动作 a 后获得的即时奖励。
• γ ∈ [0,1](折扣因子):权衡即时奖励与未来奖励的影响。
强化学习代理的目标是学习一个最优策略 π*(s),使得累积奖励最大化:
其中,奖励 R(s, a) 计算如下:
AUC 和 KS 评估模型的预测性能,而复杂度惩罚用于控制模型复杂度(例如,对 DNN 或模型融合策略施加更高的惩罚)。
状态表示与动作空间
在许多情况下,状态 s 由数据集的特征级摘要定义,例如:
其中,μ(X) 是特征均值,σ(X) 是特征方差。
动作 a_t 可以是选择单个模型(如 XGB),也可以是多个模型的加权融合:
其中,w_i 是融合权重,y^i 是模型 i 的预测概率。
Q 学习与模型评估
强化学习的核心是估计状态-动作值函数 Q(s, a),它表示从状态 s 采取动作 a 并遵循策略 π 后的期望累计奖励。该函数可以表示为:
其中,R(s, a) 是模型的即时奖励(基于 AUC + KS - 复杂度惩罚),γ 是折扣因子,决定了未来奖励的权重。强化学习的目标是通过智能选择模型来最大化这个累积奖励,并随着数据的变化不断调整。
多选项探索方法:无需复杂计算的快速模型选择
多选项探索(Multi-Armed Bandit,MAB)方法将问题视为无状态、无记忆的情况,其中每次行动的奖励是相互独立的。在本文的背景下,每个动作 a 都是一个候选模型(如 XGBoost 或 LightGBM),或者是多个模型的融合。当采取某个动作 a 时,我们可以立即观察到奖励 R(a),该奖励基于所选模型在 AUC 和 KS 等指标上的表现。
Q 值的更新规则如下:
其中:
是学习率,N(a) 代表模型 a 被选择的次数的倒数。
ε-贪心(epsilon-greedy)策略 确保了探索较少使用的模型,同时利用当前表现最好的模型:
例如,在训练的早期,MAB 可能会探索 DNN 或融合模型。但如果 XGBoost 一直能获得较高的 AUC 且惩罚最小,则 MAB 代理会逐渐倾向于更多地选择 XGBoost。这种方法在模型表现不依赖于数据状态的情况下效果较好。
深度 Q 网络(DQN)方法:当状态转换重要时
不同于 MAB 方法,DQN 方法可以考虑状态的变化和长期的奖励。在本文中,状态 s 由数据集的统计特征(均值、方差等)定义,并且会随时间变化,例如数据分布的漂移。
强化学习代理的任务是在当前状态 s 下选择最优模型,同时预测未来状态 s′ 及其可能的奖励变化。Q 值的估计由深度神经网络 Q(s, a; θ) 近似计算,其中 θ 是网络参数。
训练 DQN 需要最小化时间差分(TD)误差:
训练 DQN 时采用经验回放(experience replay):
• 存储 过去的状态-动作-奖励-新状态 (s, a, r, s') 经验到缓冲区中。
• 随机抽取 过去的经验进行训练,防止神经网络过拟合到最近的样本。
这种方法的一个典型案例是:
• 假设当前状态 s 代表一个包含大量噪声的金融数据集,并且特征之间有强相关性(多重共线性)。 在这种情况下,简单模型(如随机森林)可能无法有效处理数据,因此 DQN 代理可能学会避免选择随机森林,而更倾向于选择XGBoost + DNN 融合模型,因为它能更好地处理复杂模式。
• 随着数据状态的变化,例如噪声减少,DQN 可以自动调整策略,回归到更简单的模型,如 XGBoost 或 LGBM。
探索与利用:模型选择中的权衡
MAB 和 DQN 都依赖于ε-贪心策略来在探索(exploration)和利用(exploitation)之间取得平衡:
• MAB 方法 直接选择最优模型,适用于静态环境,但可能会忽略数据状态的变化。
• DQN 方法 通过神经网络识别数据的动态变化,适用于时序数据或流数据,但训练计算量更大。
示例说明:在 XGBoost 和融合模型之间进行选择
考虑一个不断变化的金融数据集,任务是预测贷款违约。
• 在数据的初始状态下,特征具有较强的线性可分性,因此 XGBoost 可能是最佳选择。
• 当数据集变得更加嘈杂、特征变得重叠时,融合 XGBoost 和 DNN 可能能获得更高的奖励,因为它可以更好地泛化。
• 强化学习代理不断监控过去的 AUC 和 KS 评估指标,并更新 Q 值,以适应不断变化的环境。
对比:
• 在 MAB 框架下:代理可能由于 XGBoost 在早期表现良好,而倾向于一直选择它,即便在数据变得嘈杂时也不会改变策略。
• 在 DQN 框架下:代理能够随着数据状态的变化调整策略,在数据嘈杂时选择融合模型,在数据变干净时回归到 XGBoost。
结合 MAB 和 DQN 方法
MAB 方法适用于计算效率高、奖励独立且即时的任务,如在线模型选择。
DQN 方法适用于奖励依赖于数据状态、需要长期优化的任务,如时间序列预测或金融建模。
结合两者的方法可以在各种监督学习任务中提供更强的适应能力。
数据与代码实验
我们首先使用提供的数据和代码测试多选项探索方法,通过动态探索和基于奖励的更新来评估模型选择的性能。
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
import torch
import torch.nn as nn
import torch.nn.functional as F
Read the dataset from CSV
data = pd.read_csv('rein_data_binary.csv')
X = data.drop('label', axis=1)
y = data['label']
---------------------------------------------
2) Metrics (AUC, KS)
---------------------------------------------
def calc_ks_score(y_true, y_prob):
data = pd.DataFrame({'y_true': y_true, 'y_prob': y_prob}).sort_values('y_prob', ascending=False)
data['cum_pos'] = (data['y_true'] == 1).cumsum()
data['cum_neg'] = (data['y_true'] == 0).cumsum()
total_pos = data['y_true'].sum()
total_neg = (data['y_true'] == 0).sum()
data['cum_pos_rate'] = data['cum_pos'] / total_pos
data['cum_neg_rate'] = data['cum_neg'] / total_neg
data['ks'] = data['cum_pos_rate'] - data['cum_neg_rate']
return data['ks'].max()
---------------------------------------------
3) PyTorch DNN Model
---------------------------------------------
class DNNModel(nn.Module):
def init(self, input_dim=5):
super(DNNModel, self).init()
self.fc1 = nn.Linear(input_dim, 16)
self.fc2 = nn.Linear(16, 8)
self.out = nn.Linear(8, 1)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = torch.sigmoid(self.out(x))
return x
def train_eval_pytorch_dnn(X_train, y_train, X_val, y_val,
epochs=5, batch_size=64, lr=1e-3, device='cpu'):
model = DNNModel(input_dim=X_train.shape[1]).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = nn.BCELoss()
X_train_t = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train_t = torch.tensor(y_train, dtype=torch.float32).view(-1, 1).to(device)
X_val_t = torch.tensor(X_val, dtype=torch.float32).to(device)
dataset_size = len(X_train_t)
n_batches = (dataset_size // batch_size) + 1
for epoch in range(epochs):
indices = torch.randperm(dataset_size)
X_train_t = X_train_t[indices]
y_train_t = y_train_t[indices]
for i in range(n_batches):
start_idx = i * batch_size
end_idx = start_idx + batch_size
if start_idx >= dataset_size:
break
x_batch = X_train_t[start_idx:end_idx]
y_batch = y_train_t[start_idx:end_idx]
preds = model(x_batch)
loss = criterion(preds, y_batch)
optimizer.zero_grad()
loss.backward()
optimizer.step()
with torch.no_grad():
val_preds = model(X_val_t).cpu().numpy().ravel()
auc = roc_auc_score(y_val, val_preds)
ks = calc_ks_score(y_val, val_preds)
return model, auc, ks, val_preds
---------------------------------------------
4) Helper: Train & Evaluate Various Models
---------------------------------------------
def train_eval_model(model_name, X_train, y_train, X_val, y_val, device='cpu'):
if model_name == 'xgb':
model = XGBClassifier(use_label_encoder=False, eval_metric='logloss')
model.fit(X_train, y_train)
y_prob = model.predict_proba(X_val)[:, 1]
auc = roc_auc_score(y_val, y_prob)
ks = calc_ks_score(y_val, y_prob)
return model, auc, ks, y_prob
elif model_name == 'lgbm':
model = LGBMClassifier()
model.fit(X_train, y_train)
y_prob = model.predict_proba(X_val)[:, 1]
auc = roc_auc_score(y_val, y_prob)
ks = calc_ks_score(y_val, y_prob)
return model, auc, ks, y_prob
elif model_name == 'rf':
model = RandomForestClassifier()
model.fit(X_train, y_train)
y_prob = model.predict_proba(X_val)[:, 1]
auc = roc_auc_score(y_val, y_prob)
ks = calc_ks_score(y_val, y_prob)
return model, auc, ks, y_prob
elif model_name == 'dnn':
model, auc, ks, y_prob = train_eval_pytorch_dnn(
X_train.values, y_train.values, X_val.values, y_val.values, device=device
)
return model, auc, ks, y_prob
else:
raise ValueError(f"Unknown model name: {model_name}")
Continue with the rest of the code unchanged...
---------------------------------------------
5) Weighted Blending
---------------------------------------------
def blend_predictions(probs_list, weights=None):
if weights is None:
weights = [1.0 / len(probs_list)] * len(probs_list)
final_prob = np.zeros_like(probs_list[0])
for w, p in zip(weights, probs_list):
final_prob += w * p
return final_prob
def evaluate_action(action, X_train, X_val, y_train, y_val, device='cpu'):
"""
action: int from 0..4 => (xgb=0, lgbm=1, rf=2, dnn=3, blend=4)
Returns:
reward = (auc + ks) - penalty
auc, ks
"""
model_names = ['xgb', 'lgbm', 'rf', 'dnn']
if action < 4:
chosen_model = model_names[action]
_, auc_val, ks_val, _ = train_eval_model(chosen_model, X_train, y_train, X_val, y_val, device=device)
penalty = 0.05 if chosen_model == 'dnn' else 0.0
reward = (auc_val + ks_val) - penalty
return reward, auc_val, ks_val
else:
Blend
probs_list = []
for m in model_names:
_, auc_m, ks_m, p = train_eval_model(m, X_train, y_train, X_val, y_val, device=device)
probs_list.append(p)
final_prob = blend_predictions(probs_list)
auc_blend = roc_auc_score(y_val, final_prob)
ks_blend = calc_ks_score(y_val, final_prob)
reward = (auc_blend + ks_blend) - 0.1
return reward, auc_blend, ks_blend
---------------------------------------------
6) A Simple Multi-Armed Bandit Approach
---------------------------------------------
def multi_armed_bandit_model_selection(
n_episodes=50,
n_actions=5,
epsilon=0.06,
device='cpu'
):
"""
We have 5 actions (xgb=0, lgbm=1, rf=2, dnn=3, blend=4).
For each 'episode':
-
Generate a dataset (X,y) with the chosen seed
-
Split into train/val
-
Epsilon-greedy select an action
-
Evaluate the chosen action => get reward
-
Update average reward (Q) for that action
"""
Q = np.zeros(n_actions, dtype=np.float32)
counts = np.zeros(n_actions, dtype=int)
For storing raw AUC, KS, Reward each time an action is chosen
action_auc_records = [[] for _ in range(n_actions)]
action_ks_records = [[] for _ in range(n_actions)]
action_reward_records = [[] for _ in range(n_actions)]
action_history = []
reward_history = []
for episode in range(n_episodes):
Generate the data here
seed = 1000 + episode
X = data.drop('label', axis=1) # Features
y = data['label'] # Labels
Split the data
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.3, random_state=123)
Epsilon-greedy action selection
if np.random.rand() < epsilon:
action = np.random.randint(n_actions)
else:
action = np.argmax(Q)
Evaluate chosen action => get (reward, auc, ks)
reward, auc_val, ks_val = evaluate_action(
action, X_train, X_val, y_train, y_val, device=device
)
Update Q (incremental mean)
counts[action] += 1
Q[action] += (reward - Q[action]) / counts[action]
Store details
action_history.append(action)
reward_history.append(reward)
action_auc_records[action].append(auc_val)
action_ks_records[action].append(ks_val)
action_reward_records[action].append(reward)
print(f"Episode {episode+1}/{n_episodes}, "
f"Action={action}, Reward={reward:.4f}, Updated Q={Q}")
return Q, action_history, reward_history, action_auc_records, action_ks_records, action_reward_records
---------------------------------------------
7) Run the Bandit, then Interpret Results
---------------------------------------------
def run_bandit():
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device={device}")
n_episodes = 50
n_actions = 5
epsilon = 0.05
(
Q,
actions,
rewards,
auc_records,
ks_records,
reward_records
) = multi_armed_bandit_model_selection(
n_episodes=n_episodes,
n_actions=n_actions,
epsilon=epsilon,
device=device
)
best_action = np.argmax(Q)
model_names = ["XGB", "LightGBM", "RandomForest", "DNN", "Blend"]
print("\n========================================")
print("Interpreting Your Current Results")
print("========================================\n")
print("Final Q-values:", Q)
print(f"Best action index: {best_action}")
print(f"Best action is: {model_names[best_action]} with estimated Q = {Q[best_action]:.4f}\n")
print("Detailed AUC/KS/Reward by action:")
print("--------------------------------------------------")
for a in range(n_actions):
if len(auc_records[a]) > 0:
avg_auc = np.mean(auc_records[a])
avg_ks = np.mean(ks_records[a])
avg_reward = np.mean(reward_records[a])
print(f"Action {a} ({model_names[a]}): chosen {len(auc_records[a])} times")
print(f" Mean AUC = {avg_auc:.4f}, Mean KS = {avg_ks:.4f}, Mean Reward = {avg_reward:.4f}\n")
else:
print(f"Action {a} ({model_names[a]}): chosen 0 times\n")
if name == "main":
run_bandit()
结果解析
最终的 Q 值表示对各模型在累积奖励方面的估计性能。实验结果揭示了以下关键点:
• 最佳动作:XGBoost,平均奖励为 1.267。
• 对比分析:XGBoost 显著优于其他模型,主要因为它在预测准确性和稳健性之间达到了良好的平衡。
详细结果提供了不同模型之间的权衡信息:
实验结果表明:
• XGBoost 主导了模型选择,因为它在所有测试中都保持了较高的 AUC 和 KS 分数。
• DNN 展现出潜力,尽管它只在一次实验中被选中,但其 AUC 和 KS 评分较高。
• 融合模型(Blend)及其他模型被选择的频率较低,这表明可能需要进一步调整权重或特征表示。
XGBoost 的主导地位反映了它在该特定数据结构和奖励机制下的适用性。未来的研究方向可能包括:
1、 探索复杂模型的动态惩罚调整机制。
2、 扩展多选项探索框架,以适应时间变化的奖励。
3、 使用强化学习优化模型融合策略。
实验结果表明,多选项探索方法能够高效地找到基于指定评估标准的最佳模型。
接下来,我们将深入测试深度 Q 网络(DQN)方法,以观察它在应对数据状态变化和长期奖励优化方面的表现,并与之前测试的无状态多选项探索方法进行对比。
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
import torch
import torch.nn as nn
import torch.nn.functional as F
Gymnasium
import gymnasium as gym
from gymnasium import spaces
Stable Baselines3
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv
For callback
from stable_baselines3.common.callbacks import BaseCallback
---------------------------------------------
1) Read data from CSV file
---------------------------------------------
data = pd.read_csv('rein_data_binary.csv')
X = data.drop('label', axis=1) # Features
y = data['label'] # Labels
---------------------------------------------
2) Metrics (AUC, KS)
---------------------------------------------
def calc_ks_score(y_true, y_prob):
data = pd.DataFrame({'y_true': y_true, 'y_prob': y_prob}).sort_values('y_prob', ascending=False)
data['cum_pos'] = (data['y_true'] == 1).cumsum()
data['cum_neg'] = (data['y_true'] == 0).cumsum()
total_pos = data['y_true'].sum()
total_neg = (data['y_true'] == 0).sum()
data['cum_pos_rate'] = data['cum_pos'] / total_pos
data['cum_neg_rate'] = data['cum_neg'] / total_neg
data['ks'] = data['cum_pos_rate'] - data['cum_neg_rate']
return data['ks'].max()
---------------------------------------------
3) PyTorch DNN
---------------------------------------------
class DNNModel(nn.Module):
def init(self, input_dim=5):
super(DNNModel, self).init()
self.fc1 = nn.Linear(input_dim, 16)
self.fc2 = nn.Linear(16, 8)
self.out = nn.Linear(8, 1)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = torch.sigmoid(self.out(x))
return x
def train_eval_pytorch_dnn(X_train, y_train, X_val, y_val,
epochs=5, batch_size=64, lr=1e-3, device='cpu'):
model = DNNModel(input_dim=X_train.shape[1]).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = nn.BCELoss()
X_train_t = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train_t = torch.tensor(y_train, dtype=torch.float32).view(-1, 1).to(device)
X_val_t = torch.tensor(X_val, dtype=torch.float32).to(device)
dataset_size = len(X_train_t)
n_batches = (dataset_size // batch_size) + 1
for epoch in range(epochs):
indices = torch.randperm(dataset_size)
X_train_t = X_train_t[indices]
y_train_t = y_train_t[indices]
for i in range(n_batches):
start_idx = i * batch_size
end_idx = start_idx + batch_size
if start_idx >= dataset_size:
break
x_batch = X_train_t[start_idx:end_idx]
y_batch = y_train_t[start_idx:end_idx]
preds = model(x_batch)
loss = criterion(preds, y_batch)
optimizer.zero_grad()
loss.backward()
optimizer.step()
with torch.no_grad():
val_preds = model(X_val_t).cpu().numpy().ravel()
auc = roc_auc_score(y_val, val_preds)
ks = calc_ks_score(y_val, val_preds)
return model, auc, ks, val_preds
---------------------------------------------
4) Train & Evaluate Helper
---------------------------------------------
def train_eval_model(model_name, X_train, y_train, X_val, y_val, device='cpu'):
if model_name == 'xgb':
model = XGBClassifier(use_label_encoder=False, eval_metric='logloss')
model.fit(X_train, y_train)
y_prob = model.predict_proba(X_val)[:, 1]
auc = roc_auc_score(y_val, y_prob)
ks = calc_ks_score(y_val, y_prob)
return model, auc, ks, y_prob
elif model_name == 'lgbm':
model = LGBMClassifier()
model.fit(X_train, y_train)
y_prob = model.predict_proba(X_val)[:, 1]
auc = roc_auc_score(y_val, y_prob)
ks = calc_ks_score(y_val, y_prob)
return model, auc, ks, y_prob
elif model_name == 'rf':
model = RandomForestClassifier()
model.fit(X_train, y_train)
y_prob = model.predict_proba(X_val)[:, 1]
auc = roc_auc_score(y_val, y_prob)
ks = calc_ks_score(y_val, y_prob)
return model, auc, ks, y_prob
elif model_name == 'dnn':
model, auc, ks, y_prob = train_eval_pytorch_dnn(
X_train.values, y_train.values, X_val.values, y_val.values, device=device
)
return model, auc, ks, y_prob
else:
raise ValueError(f"Unknown model name: {model_name}")
def blend_predictions(probs_list, weights=None):
if weights is None:
weights = [1.0 / len(probs_list)] * len(probs_list)
final_prob = np.zeros_like(probs_list[0])
for w, p in zip(weights, probs_list):
final_prob += w * p
return final_prob
---------------------------------------------
5) Single-step Environment
---------------------------------------------
class ModelSelectionEnv(gym.Env):
metadata = {"render_modes": ["human"]}
def init(self, X, y, device='cpu'):
super().init()
self.device = device
Train/val split
self.X_train, self.X_val, self.y_train, self.y_val = train_test_split(
X, y, test_size=0.3, random_state=123
)
means = X.mean().values
vars_ = X.var().values
self.state = np.concatenate([means, vars_]) # observation
5 discrete actions
self.action_space = spaces.Discrete(5)
self.observation_space = spaces.Box(
low=-np.inf,
high=np.inf,
shape=(len(self.state),),
dtype=np.float32
)
self.terminated = False
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.terminated = False
return self.state.astype(np.float32), {}
def step(self, action):
if self.terminated:
return self.state.astype(np.float32), 0.0, True, False, {}
model_names = ['xgb', 'lgbm', 'rf', 'dnn']
if action < 4:
chosen_model = model_names[action]
_, auc_v, ks_v, _ = train_eval_model(
chosen_model,
self.X_train, self.y_train,
self.X_val, self.y_val,
device=self.device
)
penalty = 0.05 if chosen_model == 'dnn' else 0.0
reward = (auc_v + ks_v) - penalty
info = {
"action_name": chosen_model,
"AUC": auc_v,
"KS": ks_v,
"Penalty": penalty
}
else:
Blend
probs_list = []
for m in model_names:
_, auc_m, ks_m, prob_m = train_eval_model(
m,
self.X_train, self.y_train,
self.X_val, self.y_val,
device=self.device
)
probs_list.append(prob_m)
final_prob = blend_predictions(probs_list)
auc_v = roc_auc_score(self.y_val, final_prob)
ks_v = calc_ks_score(self.y_val, final_prob)
penalty = 0.1
reward = (auc_v + ks_v) - penalty
info = {
"action_name": "blend",
"AUC": auc_v,
"KS": ks_v,
"Penalty": penalty
}
self.terminated = True
return self.state.astype(np.float32), reward, True, False, info
---------------------------------------------
7) RL Training & Execution
---------------------------------------------
def run_rl_model_selection_pytorch():
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using {device} device")
Create single-step Gymnasium environment
env = ModelSelectionEnv(X, y, device=device)
Wrap with DummyVecEnv
def make_env():
return env
vec_env = DummyVecEnv([make_env])
Create callback
callback = BanditSummaryCallback()
Create DQN
model = DQN(
"MlpPolicy",
vec_env,
verbose=1,
learning_rate=1e-3,
buffer_size=10000,
exploration_fraction=0.3,
exploration_final_eps=0.02,
tensorboard_log="./rl_tensorboard/"
)
Train with callback
model.learn(total_timesteps=2000, callback=callback)
Evaluate final policy (one step)
obs = vec_env.reset()
action, _ = model.predict(obs, deterministic=True)
obs, rewards, dones, infos = vec_env.step(action)
final_reward = rewards[0]
action_map = ["XGB", "LightGBM", "RandomForest", "DNN", "Blend"]
print("\n======================================")
print(f"Final chosen action => {action[0]} ({action_map[action[0]]})")
print(f"Final step reward => (AUC + KS - penalty) = {final_reward:.4f}")
print("======================================\n")
if name == "main":
run_rl_model_selection_pytorch()
结果与解析
训练摘要:
• 运行了 2000 轮训练,使用 ε-贪心策略 在探索(exploration)与利用(exploitation)之间进行权衡。
• Q 值 反映了每个模型选择的估计累积奖励。
最终 Q 值:
[1.1442,1.1800,1.0847,1.0510,1.0684][1.1442, 1.1800, 1.0847, 1.0510, 1.0684][1.1442,1.1800,1.0847,1.0510,1.0684]
最佳动作:LightGBM,Q 值最高,为 1.1800。
详细表现:
最终选定的模型:
LightGBM(动作索引 1)在 AUC、KS 和奖励 方面始终优于其他模型。
鉴于数据结构和评估指标,LightGBM 的选择表明其具有稳健性和可靠性。
Deepseeker 技术在强化学习自适应模型选择中的应用
本论文借鉴了 Deepseeker 技术,以提升自适应模型选择能力,主要涵盖以下方面:
- 无需监督微调的强化学习
Deepseeker 通过强化学习(RL)跳过了对大规模标注数据集的需求,而是通过试错(trial-and-error)进行推理。
在本研究中,我的 RL 代理能够动态学习最佳模型选择策略,通过探索和反馈机制进行优化,而不依赖传统的微调(fine-tuning)。
• 长期来看,它可以泛化到新数据集或领域,无需重新训练模型,类似于 Deepseeker 的元学习(meta-learning)。
• 示例:RL 代理可能学习到 在结构化数据上使用 XGBoost,而在非线性数据上使用 DNN,从而根据数据变化自动调整策略。
- 数据驱动的模型选择与分类
Deepseeker 仅激活与当前数据上下文相关的模型组件,从而提高效率并增强针对性。
同样地,RL 代理利用数据集的统计信息(均值、方差等) 来分类数据区域,并将其匹配到最合适的模型。
• 示例:
o 对于结构化数据区域,代理可能会选择 XGBoost。
o 对于嘈杂的、非线性数据区域,可能会触发 DNN 与 LGBM 的融合模型。
这一动态映射状态到动作的方法 避免了全局应用单一模型的弊端,并提升了预测精度。
- 部分参数激活(稀疏模型执行)
Deepseeker 不会激活所有模型参数,而是仅激活当前任务所需的部分。
同样地,本研究中的动态模型融合策略仅激活最有效的模型或配置,以适应当前状态。
• 示例:
o 在医疗数据集中,XGBoost 可用于结构化的患者病史,而 DNN 适用于复杂的基因数据。
o 这样可以减少不必要的计算,提高效率。
- 思维链(Chain-of-Thought, CoT)推理
DeepSeek-R1-Zero 采用 思维链(Chain-of-Thought, CoT) 来解决复杂任务,通过生成自验证的推理步骤进行优化。
在本研究中,RL 代理有潜力在自适应模型选择中应用 CoT 方法(目前尚未应用,但值得进一步研究):
• 代理可以从基本模型(XGBoost 或 LightGBM)开始,然后在需要时切换或融合更复杂的模型(如 DNN)。
• 每次动作后,代理会验证模型表现(AUC、KS),如果奖励较低,它会反思、调整融合比例,或选择新的模型。
• 通过滚动反馈(rolling feedback),代理可以记住过去的决策,避免重复错误,并动态探索融合比率以持续改进。
最终思考
本研究探讨了如何通过强化学习(RL)实现自适应模型选择与融合,并将其建模为马尔可夫决策过程(MDP)。
• RL 代理能够智能地选择 XGBoost、LightGBM、DNN 或模型融合策略,并能动态适应不断变化的数据分布。
• 利用累积奖励(基于 AUC 和 KS)并惩罚低效选择,我们可以优化建模过程,而无需依赖固定假设。
我从 Deepseeker NN(LLM) 的强化学习机制中获得了灵感,这一机制结合了领域特定知识,有助于该方法泛化到更广泛的任务,如:
• 自动化交易(Automated Trading)
• 医疗诊断(Healthcare Diagnostics)
• 预测性维护(Predictive Maintenance)
尽管仍然存在计算开销和探索-利用权衡等挑战,但未来可以通过元学习(Meta-Learning)和半监督标注(Semi-Supervised Labeling)提高系统适应性,同时降低成本。
此外,将 RL 扩展到超参数调优(Hyperparameter Tuning),可以使系统完全自动化,从而选择最优模型及其参数配置。
本研究为下一代 AutoML 系统奠定了基础,在这些系统中,强化学习与大型语言模型(LLMs)协同工作,以提供更智能、更可扩展的解决方案,并应用于真实世界场景。
数据和代码可在以下地址访问:
https://github.com/datalev001/RL_supLR/