一、简介
还是那句话,"时间序列+金融"是一个很有"钱"景的话题,还是想尝试采用Stock+时间序列预测任务+DeepLearning。本文提供了LSTM预测股票的源代码。
二、代码
运行代码时的注意事项:按照配置项创建好对应的文件夹,准备好数据,数据来源我的上一篇blog《【Time Series】获取股票数据代码实战》可以找到。
import os
import random
from tqdm import tqdm
import joblib
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error,mean_absolute_error#配置项
class configs():def __init__(self):# Dataself.data_input_path = r'../data/input'self.data_output_path = r'../data/output'self.save_model_dir = '../data/output'self.data_inputfile_name = r'五粮液.xlsx'self.data_BaseTrue_infer_output_name = r'基于真实数据推理结果.xlsx'self.data_BaseSelf_infer_output_name = r'基于自回归推理结果.xlsx'self.data_split_ratio = "0.8#0.1#0.1"self.model_name = 'LSTM'self.seed = 2024self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")self.epoch = 50self.train_batch_size = 16self.in_seq_embeddings = 1 #输入的特征维度self.out_seq_embeddings = 1 #输出的特征维度self.in_seq_length = 5 #输入的时间窗口self.out_seq_length = 1 #输出的时间窗口self.hidden_features = 16 # 隐层数量self.learning_rate = 0.001self.dropout = 0.5self.istrain = Trueself.istest = Trueself.BaseTrue_infer = Trueself.BaseSelf_infer = Trueself.num_predictions = 800cfg = configs()def seed_everything(seed=2024):random.seed(seed)os.environ['PYTHONHASHSEED']=str(seed)np.random.seed(seed)torch.manual_seed(seed)seed_everything(seed = cfg.seed)#数据
class Define_Data():def __init__(self,task_type='train'):self.scaler = MinMaxScaler()self.df = pd.DataFrame()self.task_type = task_type#用于更新输入数据,设定选用从m行到n行的数据进行训/测,use_lines = "[m,n]"/"-1"def refresh_df_data(self,tmp_df_path,tmp_df_sheet_name,use_lines):self.df = pd.read_excel(tmp_df_path, sheet_name=tmp_df_sheet_name)if use_lines != "-1":use_lines = eval(use_lines)assert use_lines[0] <= use_lines[1]self.df = self.df.iloc[use_lines[0]:use_lines[1],:]#创建时间窗口数据,in_seq_length 为输入时间窗口,out_seq_length 为输出时间窗口def create_inout_sequences(self,input_data, in_seq_length, out_seq_length):inout_seq = []L = len(input_data)for i in range(L - in_seq_length):# 这里确保每个序列将是 tw x cfg.out_seq_length 的大小,这对应于 (seq_len, input_size)train_seq = input_data[i:i + in_seq_length][..., np.newaxis] # np.newaxis 增加一个维度train_label = input_data[i + in_seq_length:i + in_seq_length + out_seq_length, np.newaxis]inout_seq.append((train_seq, train_label))return inout_seq#将时序数据转换为模型的输入形式def _collate_fn(self,batch):# Each element in 'batch' is a tuple (sequence, label)# We stack the sequences and labels separately to produce two tensorsseqs, labels = zip(*batch)# Now we reshape these tensors to have size (seq_len, batch_size, input_size)seq_tensor = torch.stack(seqs).transpose(0, 1)# For labels, it might be just a single dimension outputs,# so we only need to stack and then add an extra dimension if necessarylabel_tensor = torch.stack(labels).transpose(0, 1)if len(label_tensor.shape) == 2:label_tensor = label_tensor.unsqueeze(-1) # Add input_size dimensionreturn seq_tensor, label_tensor#将表格数据构建成tensor格式def get_tensor_data(self):#缩放self.df['new_close'] = self.scaler.fit_transform(self.df[['close']])inout_seq = self.create_inout_sequences(self.df['new_close'].values,in_seq_length=cfg.in_seq_length,out_seq_length=cfg.out_seq_length)if self.task_type == 'train':# 准备训练数据X = torch.FloatTensor(np.array([s[0] for s in inout_seq]))y = torch.FloatTensor(np.array([s[1] for s in inout_seq]))# 划分训练集和测试集data_split_ratio = cfg.data_split_ratiodata_split_ratio = [float(d) for d in data_split_ratio.split('#')]train_size = int(len(inout_seq) * data_split_ratio[0])val_size = int(len(inout_seq) * (data_split_ratio[0]+data_split_ratio[1])) - train_sizetest_size = int(len(inout_seq)) - train_size - val_sizetrain_X, train_y = X[:train_size], y[:train_size]val_X, val_y = X[train_size:val_size], y[train_size:val_size]test_X, test_y = X[val_size:], y[val_size:]# 注意下面的 batch_first=Falsebatch_size = cfg.train_batch_sizetrain_data = TensorDataset(train_X, train_y)train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size, drop_last=True,collate_fn=self._collate_fn)val_data = TensorDataset(val_X, val_y)val_loader = DataLoader(val_data, shuffle=False, batch_size=1, collate_fn=self._collate_fn)test_data = TensorDataset(test_X, test_y)test_loader = DataLoader(test_data, shuffle=False, batch_size=1, collate_fn=self._collate_fn)return train_loader,val_loader, test_loader, self.scalerelif self.task_type == 'test' or 'infer':# 准备测试数据X = torch.FloatTensor(np.array([s[0] for s in inout_seq]))y = torch.FloatTensor(np.array([s[1] for s in inout_seq]))test_data = TensorDataset(X, y)test_loader = DataLoader(test_data, shuffle=False, batch_size=1, collate_fn=self._collate_fn)return test_loader, self.scaler# 模型定义
#################网络结构#################
class LSTM(nn.Module):def __init__(self, input_size=10, hidden_layer_size=20, output_size=1):super(LSTM,self).__init__()self.hidden_layer_size = hidden_layer_sizeself.lstm = nn.LSTM(input_size, hidden_layer_size)self.linear = nn.Linear(hidden_layer_size, output_size)self.batch_size = cfg.train_batch_sizeself.hidden_cell = (torch.zeros(1, self.batch_size, self.hidden_layer_size),torch.zeros(1, self.batch_size, self.hidden_layer_size))def forward(self, input_seq):lstm_out, self.hidden_cell = self.lstm(input_seq, self.hidden_cell)predictions = self.linear(lstm_out.view(len(input_seq) * self.batch_size, -1))# Only return the predictions from the last timestepreturn predictions.view(len(input_seq), self.batch_size, -1)[-1]def reset_hidden_state(self,tmp_batch_size):###该函数self.batch_size = tmp_batch_sizeself.hidden_cell = (torch.zeros(1, tmp_batch_size, self.hidden_layer_size),torch.zeros(1, tmp_batch_size, self.hidden_layer_size))class my_run():def train(self):Dataset = Define_Data(task_type='train')Dataset.refresh_df_data(tmp_df_path=os.path.join(cfg.data_input_path,cfg.data_inputfile_name),tmp_df_sheet_name='数据处理',use_lines='[0,3000]')train_loader,val_loader,test_loader,scaler = Dataset.get_tensor_data()model = LSTM(cfg.in_seq_embeddings, cfg.hidden_features,cfg.out_seq_length).to(cfg.device)# 定义损失函数和优化器loss_function = nn.MSELoss()optimizer = torch.optim.Adam(model.parameters(), lr=cfg.learning_rate, weight_decay=5e-4)model.train()loss_train_all = []for epoch in tqdm(range(cfg.epoch)):#训练集predictions = []test_labels = []for seq, labels in train_loader:optimizer.zero_grad()model.reset_hidden_state(tmp_batch_size=cfg.train_batch_size) # 重置LSTM隐藏状态y_pred = model(seq)loss_train = loss_function(torch.squeeze(y_pred), torch.squeeze(labels))loss_train_all.append(loss_train.item())loss_train.backward()optimizer.step()predictions.append(y_pred.squeeze().detach().numpy()) # Squeeze to remove extra dimensionstest_labels.append(labels.squeeze().detach().numpy())train_mse,train_mae = self.timeseries_metrics(predictions=predictions,test_labels=test_labels,scaler=Dataset.scaler)#测试val集predictions = []test_labels = []with torch.no_grad():for seq, labels in test_loader:model.reset_hidden_state(tmp_batch_size=1)y_test_pred = model(seq)# 保存预测和真实标签predictions.append(y_test_pred.squeeze().detach().numpy()) # Squeeze to remove extra dimensionstest_labels.append(labels.squeeze().detach().numpy())val_mse,val_mae = self.timeseries_metrics(predictions=predictions,test_labels=test_labels,scaler=Dataset.scaler)print('Epoch: {:04d}'.format(epoch + 1),'loss_train: {:.4f}'.format(np.mean(loss_train_all)),'mae_train: {:.8f}'.format(train_mae),'mae_val: {:.8f}'.format(val_mae))torch.save(model, os.path.join(cfg.save_model_dir, 'latest.pth')) # 模型保存joblib.dump(Dataset.scaler,os.path.join(cfg.save_model_dir, 'latest_scaler.save')) # 数据缩放比例保存def test(self):#Create Test ProcessingDataset = Define_Data(task_type='test')Dataset.refresh_df_data(tmp_df_path=os.path.join(cfg.data_input_path,cfg.data_inputfile_name),tmp_df_sheet_name='数据处理',use_lines='[2995,4000]')Dataset.scaler = joblib.load(os.path.join(cfg.save_model_dir, 'latest_scaler.save'))test_loader,_ = Dataset.get_tensor_data()model_path = os.path.join(cfg.save_model_dir, 'latest.pth')model = torch.load(model_path, map_location=torch.device(cfg.device))model.eval()params = sum(p.numel() for p in model.parameters())predictions = []test_labels = []with torch.no_grad():for seq, labels in test_loader:model.reset_hidden_state(tmp_batch_size=1)y_test_pred = model(seq)# 保存预测和真实标签predictions.append(y_test_pred.squeeze().detach().numpy()) # Squeeze to remove extra dimensionstest_labels.append(labels.squeeze().detach().numpy())_, val_mae = self.timeseries_metrics(predictions=predictions,test_labels=test_labels,scaler=Dataset.scaler)print('Test set results:','mae_val: {:.8f}'.format(val_mae),'params={:.4f}k'.format(params / 1024))def BaseTrue_infer(self):# Create BaseTrue Infer ProcessingDataset = Define_Data(task_type='infer')Dataset.refresh_df_data(tmp_df_path=os.path.join(cfg.data_input_path, cfg.data_inputfile_name),tmp_df_sheet_name='数据处理',use_lines='[4000,4870]')Dataset.scaler = joblib.load(os.path.join(cfg.save_model_dir, 'latest_scaler.save'))test_loader, _ = Dataset.get_tensor_data()model_path = os.path.join(cfg.save_model_dir, 'latest.pth')model = torch.load(model_path, map_location=torch.device(cfg.device))model.eval()params = sum(p.numel() for p in model.parameters())predictions = [] #模型推理值test_labels = [] #标签值,可以没有with torch.no_grad():for seq, labels in test_loader:model.reset_hidden_state(tmp_batch_size=1)y_test_pred = model(seq)# 保存预测和真实标签predictions.append(y_test_pred.squeeze().detach().numpy()) # Squeeze to remove extra dimensionstest_labels.append(labels.squeeze().detach().numpy())predictions = np.array(predictions)test_labels = np.array(test_labels)predictions_rescaled = Dataset.scaler.inverse_transform(predictions.reshape(-1, 1)).flatten()test_labels_rescaled = Dataset.scaler.inverse_transform(test_labels.reshape(-1, 1)).flatten()pd.DataFrame({'test_labels':test_labels_rescaled,'模型推理值':predictions_rescaled}).to_excel(os.path.join(cfg.save_model_dir,cfg.data_BaseTrue_infer_output_name),index=False)print('Infer Ok')def BaseSelf_infer(self):# Create BaseSelf Infer ProcessingDataset = Define_Data(task_type='infer')Dataset.refresh_df_data(tmp_df_path=os.path.join(cfg.data_input_path, cfg.data_inputfile_name),tmp_df_sheet_name='数据处理',use_lines='[4000,4870]')Dataset.scaler = joblib.load(os.path.join(cfg.save_model_dir, 'latest_scaler.save'))test_loader, _ = Dataset.get_tensor_data()initial_input, labels = next(iter(test_loader))model_path = os.path.join(cfg.save_model_dir, 'latest.pth')model = torch.load(model_path, map_location=torch.device(cfg.device))model.eval()params = sum(p.numel() for p in model.parameters())predictions = [] #模型推理值with torch.no_grad():for _ in range(cfg.num_predictions):model.reset_hidden_state(tmp_batch_size=1)y_test_pred = model(initial_input)# 将预测结果转换为适合再次输入模型的形式next_input = torch.cat((initial_input[1:, ...], y_test_pred.unsqueeze(-1)), dim=0)initial_input = next_input# 保存预测和真实标签predictions.append(y_test_pred.squeeze().item()) # Squeeze to remove extra dimensionspredictions_rescaled = Dataset.scaler.inverse_transform(np.array(predictions).reshape(-1, 1)).flatten()pd.DataFrame({'模型推理值': predictions_rescaled}).to_excel(os.path.join(cfg.save_model_dir,cfg.data_BaseSelf_infer_output_name), index=False)print('Infer Ok')def timeseries_metrics(self,predictions,test_labels,scaler):# 反向缩放预测和标签值predictions = np.array(predictions)test_labels = np.array(test_labels)# 此处假设predictions和test_labels是一维数组,如果不是,你可能需要调整reshape的参数predictions_rescaled = scaler.inverse_transform(predictions.reshape(-1, 1)).flatten()test_labels_rescaled = scaler.inverse_transform(test_labels.reshape(-1, 1)).flatten()# 计算MSE和MAEmse = mean_squared_error(test_labels_rescaled, predictions_rescaled)mae = mean_absolute_error(test_labels_rescaled, predictions_rescaled)# print(f"Test MSE on original scale: {mse}")# print(f"Test MAE on original scale: {mae}")return mse,maeif __name__ == '__main__':myrun = my_run()if cfg.istrain == True:myrun.train()if cfg.istest == True:myrun.test()if cfg.BaseTrue_infer == True:myrun.BaseTrue_infer()if cfg.BaseSelf_infer == True:myrun.BaseSelf_infer()
三、结果与分析
本文代码,配置了两种预测模式,第一种,BaseTrue_infer:根据真实数据预测下一个点,然后循环用的真实数据;第二种,BaseSelf_infer:根据预测数据自回归预测下一个点,然后循环用的预测数据。实际用的一般都是第二种才有实用价值,当然本文时序预测的训练模式没有采用长距离自动纠偏的trick,所以第二种预测就直接坍塌了。后续可以研究探讨长时间预测如何进行。下面贴上在"五粮液"股价收盘价上的实验结果。