推荐系统和广告CTR预估主流模型的演化有两条主要路线。
第一条是显式建模特征交互,提升模型对交叉特征的捕获能力。(如Wide&Deep,PNN,FNN,DCN,DeepFM,AutoInt等)
第二条是加入注意力机制,提升模型的自适应能力和解释性。(如DIN,DIEN,DSIN,FiBiNET,AutoInt等)
在所有这些模型中,DeepFM属于性价比非常高的模型(结构简洁,计算高效,指标有竞争力)。
张俊林大佬 在2019年的时候甚至建议 沿着 LR->FM->DeepFM->干点别的 这样的路线去迭代推荐系统。
参考文档:
- 《推荐系统CTR预估学习路线》:https://zhuanlan.zhihu.com/p/351078721
- criteo数据集榜单:https://paperswithcode.com/dataset/criteo
- DeepFM论文: https://arxiv.org/abs/1703.04247
- 《清晰易懂,基于pytorch的DeepFM的完整实验代码》: https://zhuanlan.zhihu.com/p/332786045
- torch实现参考:https://github.com/rixwew/pytorch-fm/blob/master/torchfm/model/dfm.py
import torch
import torchkeras
print("torch.__version__ = ", torch.__version__)
print("torchkeras.__version__ = ", torchkeras.__version__) """
torch.__version__ = 2.3.1+cu121
torchkeras.__version__ = 3.9.6
"""
1.DeepFM原理解析
DeepFM继承了Wide&Deep的主体结构,将高低特征进行融合。
其主要创新点有2个。
一是将Wide部分替换成了 FM结构,以更有效的捕获特征交互interaction.
二是FM中的隐向量 和 Deep部分的 embedding 向量共享权重,减少模型复杂性。
2.DeepFM的pytorch实现
下面是一个DeepFM的Pytorch实现。
除了添加了一个并行的MLP模块用于捕获隐式高阶交叉和组合特征外,其余结构基本和FM的实现完全一致。
import torch
from torch import nn, Tensor
import torch.nn.functional as Fclass NumEmbedding(nn.Module):"""连续特征用linear层编码输入shape: [batch_size, feature_number(n), d_in], # d_in通常是1输出shape: [batch_size, feature_number(n), d_out]"""def __init__(self, n: int, d_in: int, d_out: int, bias: bool = False) -> None:super().__init__()self.weight = nn.Parameter(Tensor(n, d_in, d_out))self.bias = nn.Parameter(Tensor(n, d_out)) if bias else Nonewith torch.no_grad():for i in range(n):layer = nn.Linear(d_in, d_out)self.weight[i] = layer.weight.Tif self.bias is not None:self.bias[i] = layer.biasdef forward(self, x_num):# x_num: batch_size, features_num, d_inassert x_num.ndim == 3# x = x_num[..., None] * self.weight[None]# x = x.sum(-2)x = torch.einsum("bfi,fij->bfj", x_num, self.weight)if self.bias is not None:x = x + self.bias[None]return xclass CatEmbedding(nn.Module):"""离散特征用Embedding层编码。输入shape: [batch_size, feature_num],输出shape: [batch_size, feature_num, d_embed]"""def __init__(self, categories, d_embed):super().__init__()self.embedding = torch.nn.Embedding(sum(categories), d_embed)# 这段代码的作用是创建一个不可训练的参数(nn.Parameter)self.offsets,并初始化为一个张量,这个张量用于存储类别的偏移量(offsets)。self.offsets = nn.Parameter(torch.tensor([0] + categories[:-1]).cumsum(0), requires_grad=False)torch.nn.init.xavier_uniform_(self.embedding.weight.data)def forward(self, x_cat):""":param x_cat: Long tensor of (batch_size, feature_num)"""# 通过将类别索引张量 x_cat 与 self.offsets 相加,调整类别索引,使其正确对应到 self.embedding 中的实际类别x = x_cat + self.offsets[None]return self.embedding(x)class CatLinear(nn.Module):"""离散特征用Embedding实现线性层(等价于先F.onehot再nn.Linear())输入shape: [batch_size, features_num]输出shape: [batch_size, features_num, d_out]"""def __init__(self, categories, d_out=1):super().__init__()self.fc = nn.Embedding(sum(categories), d_out)self.bias = nn.Parameter(torch.zeros((d_out, )))self.offsets = nn.Parameter(torch.tensor([0]+categories[:-1]).cumsum(0), requires_grad=False)def forward(self, x_cat):""":param x: Long tensor of size (batch_size, features_num)"""x = x_cat + self.offsets[None]return torch.sum(self.fc(x), dim=1) + self.biasclass FMLayer(nn.Module):"""FM交互项"""def __init__(self, reduce_sum=True):super().__init__()self.reduce_sum = reduce_sumdef forward(self, x): # 注意 这里的x是公式中的<v_i> * xi""":param x: Float tensor of size (batch_size, num_features, k)"""square_of_sum = torch.sum(x, dim=1) ** 2sum_of_square = torch.sum(x ** 2, dim=1)ix = square_of_sum - sum_of_squareif self.reduce_sum:ix = torch.sum(ix, dim=1, keepdim=True)return 0.5 * ix# deep部分
class MultiLayerPerceptron(nn.Module):def __init__(self, d_in, d_layers, dropout, d_out=1):super().__init__()layers = []for d in d_layers:layers.append(nn.Linear(d_in, d))layers.append(nn.BatchNorm1d(d))layers.append(nn.ReLU())layers.append(nn.Dropout(p=dropout))d_in = dlayers.append(nn.Linear(d_layers[-1], d_out))self.mlp = nn.Sequential(*layers)def forward(self, x):""":param x: Float tensor of size (batch_size, d_in)"""return self.mlp(x)class DeepFM(nn.Module):"""DeepFM模型"""def __init__(self, d_numerical, categories, d_embed, deep_layers, deep_dropout, n_classes=1):super().__init__()if d_numerical is None:d_numerical = 0if categories is None:categories = []self.categories = categoriesself.n_classes = n_classesself.num_linear = nn.Linear(d_numerical, n_classes) if d_numerical else Noneself.cat_linear = CatLinear(categories, n_classes) if categories else Noneself.num_embedding = NumEmbedding(d_numerical, 1, d_embed) if d_numerical else Noneself.cat_embedding = CatEmbedding(categories, d_embed) if categories else None"""FM 的主要作用是捕捉特征之间的交互效应。在二分类问题中,输出一个单一的值(例如,用于二元交叉熵损失)足以描述模型的预测。当 reduce_sum=True 时,FM 层会将所有特征的交互结果相加,从而得到一个标量输出。这适合二分类,因为我们只需要一个输出值来进行分类。输出的标量可以直接用于后续的激活函数(如 Sigmoid),从而将其转换为概率值。在二分类的场景下,引入一个额外的线性层(self.fm_linear)可能会增加模型的复杂性,但对最终输出没有实质性帮助。因此,省略这个层能够简化模型结构。单输出:对于二分类,通常最后只需要一个输出(例如,预测为正类的概率),因此 self.fm 的输出直接作为最终预测的输入。多输出:在多分类的情况下(n_classes >= 2),需要多个输出(每个类一个),因此需要一个线性层(self.fm_linear)来生成每个类的预测。"""if n_classes == 1:self.fm = FMLayer(reduce_sum=True)self.fm_linear = Noneelse:assert n_classes >= 2self.fm = FMLayer(reduce_sum=False)self.fm_linear = nn.Linear(d_embed, n_classes)# 包含数值特征的嵌入和分类特征的嵌入的总和。# (数值特征的数量×每个数值特征的嵌入维度)+(类别特征的数量×每个类别特征的嵌入维度)self.deep_in = d_numerical * d_embed + len(categories) * d_embedself.deep = MultiLayerPerceptron(d_in=self.deep_in,d_layers=deep_layers,dropout=deep_dropout,d_out=n_classes)def forward(self, x):""":param x_num: numerical features:param x_cat: categorical features"""x_num, x_cat = x# linear部分x = 0.0if self.num_linear:x = x + self.num_linear(x_num)if self.cat_linear:x = x + self.cat_linear(x_cat)# fm部分x_embedding = []if self.num_embedding:# [..., None]: 这个操作用于增加一个维度,使 x_num 的形状变为 (batch_size, d_numerical, 1)。# 在 PyTorch 中,这种方式通常用于将一维张量转换为二维或三维张量,以便与嵌入层的输入形状匹配。x_embedding.append(self.num_embedding(x_num[..., None]))if self.cat_embedding:x_embedding.append(self.cat_embedding(x_cat))x_embedding = torch.cat(x_embedding, dim=1)if self.n_classes == 1:x = x + self.fm(x_embedding)else:x = x+ self.fm_linear(self.fm(x_embedding))# deep 部分x = x + self.deep(x_embedding.view(-1, self.deep_in))if self.n_classes == 1:x = x.squeeze(-1)return x
# 测试DeepFM
model = DeepFM(d_numerical=3, categories=[4, 3, 2], d_embed=4, deep_layers=[20, 20], deep_dropout=0.1, n_classes=1)
x_num = torch.randn(2, 3)
x_cat = torch.randint(0, 2, (2, 3))
model((x_num, x_cat))"""
tensor([-0.0014, -0.1834], grad_fn=<SqueezeBackward1>)
"""
3.criteo数据集完整范例
import numpy as np
import pandas as pd
import datetime
from sklearn.model_selection import train_test_split
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import torchkeras# 准备数据
from sklearn.preprocessing import LabelEncoder, QuantileTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer# dfdata = pd.read_csv('./dataset/cretio/cretio_small_small.csv', sep='\t', header=None)
dfdata = pd.read_csv('./dataset/cretio/dac_sample.txt', sep='\t', header=None)
dfdata.columns = ["label"] + ["I"+str(x) for x in range(1, 14)] + ["C"+str(x) for x in range(14, 40)]cat_cols = [x for x in dfdata.columns if x.startswith('C')]
num_cols = [x for x in dfdata.columns if x.startswith('I')]
num_pipe = Pipeline(steps=[("impute", SimpleImputer()), ("quantile", QuantileTransformer())]) # 用于将数据的分布转化为均匀分布或正态分布for col in cat_cols:dfdata[col] = LabelEncoder().fit_transform(dfdata[col])
dfdata[num_cols] = num_pipe.fit_transform(dfdata[num_cols])
categories = [dfdata[col].max()+1 for col in cat_cols]import torch
from torch.utils.data import Dataset, DataLoader# 将DataFrame转换成数据集Dataset特征分割成X_num, X_cat方式
class DfDataset(Dataset):def __init__(self, df, label_col, num_features, cat_features, categories, is_training=True):self.X_num = torch.tensor(df[num_features].values).float() if num_features else Noneself.X_cat = torch.tensor(df[cat_features].values).long() if cat_features else Noneself.Y = torch.tensor(df[label_col].values).float()self.categories = categoriesself.is_training = is_trainingdef __len__(self):return len(self.Y)def __getitem__(self, index):if self.is_training:return ((self.X_num[index], self.X_cat[index]), self.Y[index])else:return (self.X_num[index], self.X_cat[index])def get_categories(self):return self.categoriesdftrain_val, dftest = train_test_split(dfdata, test_size=0.2)
dftrain, dfval = train_test_split(dftrain_val, test_size=0.2)
ds_train = DfDataset(dftrain,label_col = "label",num_features = num_cols,cat_features = cat_cols,categories = categories, is_training=True)ds_val = DfDataset(dfval,label_col = "label",num_features = num_cols,cat_features = cat_cols,categories = categories, is_training=True)ds_test = DfDataset(dftest,label_col = "label",num_features = num_cols,cat_features = cat_cols,categories = categories, is_training=True)dl_train = DataLoader(ds_train, batch_size=2048, shuffle=True)
dl_val = DataLoader(ds_val,batch_size = 2048,shuffle=False)
dl_test = DataLoader(ds_test,batch_size = 2048,shuffle=False)for features,labels in dl_train:break # 定义模型
def create_net():net = DeepFM(d_numerical=ds_train.X_num.shape[1], categories=ds_train.get_categories(),d_embed=8, deep_layers=[128, 64, 32], deep_dropout=0.25, n_classes=1)return netfrom torchkeras import summarynet = create_net()
summary(net, input_data=features);"""
--------------------------------------------------------------------------
Layer (type) Output Shape Param #
==========================================================================
Linear-1 [-1, 1] 14
Embedding-2 [-1, 26, 1] 241,338
NumEmbedding-3 [-1, 13, 8] 104
Embedding-4 [-1, 26, 8] 1,930,704
FMLayer-5 [-1, 1] 0
Linear-6 [-1, 128] 40,064
BatchNorm1d-7 [-1, 128] 256
ReLU-8 [-1, 128] 0
Dropout-9 [-1, 128] 0
Linear-10 [-1, 64] 8,256
BatchNorm1d-11 [-1, 64] 128
ReLU-12 [-1, 64] 0
Dropout-13 [-1, 64] 0
Linear-14 [-1, 32] 2,080
BatchNorm1d-15 [-1, 32] 64
ReLU-16 [-1, 32] 0
Dropout-17 [-1, 32] 0
Linear-18 [-1, 1] 33
==========================================================================
Total params: 2,223,041
Trainable params: 2,223,041
Non-trainable params: 0
--------------------------------------------------------------------------
Input size (MB): 0.000084
Forward/backward pass size (MB): 0.009438
Params size (MB): 8.480228
Estimated Total Size (MB): 8.489750
--------------------------------------------------------------------------
"""
# 训练模型
# 我们使用梦中情炉torchkeras来实现最优雅的训练循环。
from torchkeras.metrics import AUC
from torchkeras import KerasModelloss_fn = nn.BCEWithLogitsLoss() # BCEWithLogitsLoss 接收模型的输出(logits),不需要手动应用 sigmoid 激活函数。
metrics_dict = {"auc": AUC()}optimizer = torch.optim.Adam(net.parameters(), lr=0.002, weight_decay=0.001)model = KerasModel(net, loss_fn=loss_fn, metrics_dict=metrics_dict, optimizer=optimizer)dfhistory = model.fit(train_data=dl_train, val_data=dl_val, epochs=100, ckpt_path='checkpoint', patience=5, monitor='val_auc', mode='max', plot=True)
# 评估模型
model.evaluate(dl_test)"""
{'val_loss': 0.7424652814865113, 'val_auc': 0.7181903719902039}
"""# 使用模型
from sklearn.metrics import roc_auc_scoremodel.eval()
# 测试阶段:在评估模型之前,使用 accelerator.prepare() 可以确保测试数据以最佳方式加载,尤其是在使用多 GPU 时。
dl_test = model.accelerator.prepare(dl_test)with torch.no_grad():result = torch.cat([model.forward(t[0]) for t in dl_test])preds = F.sigmoid(result).cpu()
labels = torch.cat([x[-1] for x in dl_test]).cpu()val_auc = roc_auc_score(labels.numpy(), preds.numpy())print(val_auc)"""
0.7181925663222225"""# 保存模型
# 模型最佳权重已经保存在 model.fit(ckpt_path) 传入的参数中了。
net_clone = create_net()
net_clone.load_state_dict(torch.load(model.ckpt_path))"""
<All keys matched successfully>
"""