【Time Series】LSTM代码实战

一、简介

        还是那句话,"时间序列+金融"是一个很有"钱"景的话题,还是想尝试采用Stock+时间序列预测任务+DeepLearning。本文提供了LSTM预测股票的源代码。

二、代码

        运行代码时的注意事项:按照配置项创建好对应的文件夹,准备好数据,数据来源我的上一篇blog《【Time Series】获取股票数据代码实战》可以找到。

import os
import random
from tqdm import tqdm
import joblib
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error,mean_absolute_error#配置项
class configs():def __init__(self):# Dataself.data_input_path = r'../data/input'self.data_output_path = r'../data/output'self.save_model_dir = '../data/output'self.data_inputfile_name = r'五粮液.xlsx'self.data_BaseTrue_infer_output_name = r'基于真实数据推理结果.xlsx'self.data_BaseSelf_infer_output_name = r'基于自回归推理结果.xlsx'self.data_split_ratio = "0.8#0.1#0.1"self.model_name = 'LSTM'self.seed = 2024self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")self.epoch = 50self.train_batch_size = 16self.in_seq_embeddings = 1 #输入的特征维度self.out_seq_embeddings = 1 #输出的特征维度self.in_seq_length = 5 #输入的时间窗口self.out_seq_length = 1 #输出的时间窗口self.hidden_features = 16  # 隐层数量self.learning_rate = 0.001self.dropout = 0.5self.istrain = Trueself.istest = Trueself.BaseTrue_infer = Trueself.BaseSelf_infer = Trueself.num_predictions = 800cfg = configs()def seed_everything(seed=2024):random.seed(seed)os.environ['PYTHONHASHSEED']=str(seed)np.random.seed(seed)torch.manual_seed(seed)seed_everything(seed = cfg.seed)#数据
class Define_Data():def __init__(self,task_type='train'):self.scaler = MinMaxScaler()self.df = pd.DataFrame()self.task_type = task_type#用于更新输入数据,设定选用从m行到n行的数据进行训/测,use_lines = "[m,n]"/"-1"def refresh_df_data(self,tmp_df_path,tmp_df_sheet_name,use_lines):self.df = pd.read_excel(tmp_df_path, sheet_name=tmp_df_sheet_name)if use_lines != "-1":use_lines = eval(use_lines)assert use_lines[0] <= use_lines[1]self.df = self.df.iloc[use_lines[0]:use_lines[1],:]#创建时间窗口数据,in_seq_length 为输入时间窗口,out_seq_length 为输出时间窗口def create_inout_sequences(self,input_data, in_seq_length, out_seq_length):inout_seq = []L = len(input_data)for i in range(L - in_seq_length):# 这里确保每个序列将是 tw x cfg.out_seq_length 的大小,这对应于 (seq_len, input_size)train_seq = input_data[i:i + in_seq_length][..., np.newaxis]  # np.newaxis 增加一个维度train_label = input_data[i + in_seq_length:i + in_seq_length + out_seq_length, np.newaxis]inout_seq.append((train_seq, train_label))return inout_seq#将时序数据转换为模型的输入形式def _collate_fn(self,batch):# Each element in 'batch' is a tuple (sequence, label)# We stack the sequences and labels separately to produce two tensorsseqs, labels = zip(*batch)# Now we reshape these tensors to have size (seq_len, batch_size, input_size)seq_tensor = torch.stack(seqs).transpose(0, 1)# For labels, it might be just a single dimension outputs,# so we only need to stack and then add an extra dimension if necessarylabel_tensor = torch.stack(labels).transpose(0, 1)if len(label_tensor.shape) == 2:label_tensor = label_tensor.unsqueeze(-1)  # Add input_size dimensionreturn seq_tensor, label_tensor#将表格数据构建成tensor格式def get_tensor_data(self):#缩放self.df['new_close'] = self.scaler.fit_transform(self.df[['close']])inout_seq = self.create_inout_sequences(self.df['new_close'].values,in_seq_length=cfg.in_seq_length,out_seq_length=cfg.out_seq_length)if self.task_type == 'train':# 准备训练数据X = torch.FloatTensor(np.array([s[0] for s in inout_seq]))y = torch.FloatTensor(np.array([s[1] for s in inout_seq]))# 划分训练集和测试集data_split_ratio = cfg.data_split_ratiodata_split_ratio = [float(d) for d in data_split_ratio.split('#')]train_size = int(len(inout_seq) * data_split_ratio[0])val_size = int(len(inout_seq) * (data_split_ratio[0]+data_split_ratio[1])) - train_sizetest_size = int(len(inout_seq)) - train_size - val_sizetrain_X, train_y = X[:train_size], y[:train_size]val_X, val_y = X[train_size:val_size], y[train_size:val_size]test_X, test_y = X[val_size:], y[val_size:]# 注意下面的 batch_first=Falsebatch_size = cfg.train_batch_sizetrain_data = TensorDataset(train_X, train_y)train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size, drop_last=True,collate_fn=self._collate_fn)val_data = TensorDataset(val_X, val_y)val_loader = DataLoader(val_data, shuffle=False, batch_size=1, collate_fn=self._collate_fn)test_data = TensorDataset(test_X, test_y)test_loader = DataLoader(test_data, shuffle=False, batch_size=1, collate_fn=self._collate_fn)return train_loader,val_loader, test_loader, self.scalerelif self.task_type == 'test' or 'infer':# 准备测试数据X = torch.FloatTensor(np.array([s[0] for s in inout_seq]))y = torch.FloatTensor(np.array([s[1] for s in inout_seq]))test_data = TensorDataset(X, y)test_loader = DataLoader(test_data, shuffle=False, batch_size=1, collate_fn=self._collate_fn)return test_loader, self.scaler# 模型定义
#################网络结构#################
class LSTM(nn.Module):def __init__(self, input_size=10, hidden_layer_size=20, output_size=1):super(LSTM,self).__init__()self.hidden_layer_size = hidden_layer_sizeself.lstm = nn.LSTM(input_size, hidden_layer_size)self.linear = nn.Linear(hidden_layer_size, output_size)self.batch_size = cfg.train_batch_sizeself.hidden_cell = (torch.zeros(1, self.batch_size, self.hidden_layer_size),torch.zeros(1, self.batch_size, self.hidden_layer_size))def forward(self, input_seq):lstm_out, self.hidden_cell = self.lstm(input_seq, self.hidden_cell)predictions = self.linear(lstm_out.view(len(input_seq) * self.batch_size, -1))# Only return the predictions from the last timestepreturn predictions.view(len(input_seq), self.batch_size, -1)[-1]def reset_hidden_state(self,tmp_batch_size):###该函数self.batch_size = tmp_batch_sizeself.hidden_cell = (torch.zeros(1, tmp_batch_size, self.hidden_layer_size),torch.zeros(1, tmp_batch_size, self.hidden_layer_size))class my_run():def train(self):Dataset = Define_Data(task_type='train')Dataset.refresh_df_data(tmp_df_path=os.path.join(cfg.data_input_path,cfg.data_inputfile_name),tmp_df_sheet_name='数据处理',use_lines='[0,3000]')train_loader,val_loader,test_loader,scaler = Dataset.get_tensor_data()model = LSTM(cfg.in_seq_embeddings, cfg.hidden_features,cfg.out_seq_length).to(cfg.device)# 定义损失函数和优化器loss_function = nn.MSELoss()optimizer = torch.optim.Adam(model.parameters(), lr=cfg.learning_rate, weight_decay=5e-4)model.train()loss_train_all = []for epoch in tqdm(range(cfg.epoch)):#训练集predictions = []test_labels = []for seq, labels in train_loader:optimizer.zero_grad()model.reset_hidden_state(tmp_batch_size=cfg.train_batch_size)  # 重置LSTM隐藏状态y_pred = model(seq)loss_train = loss_function(torch.squeeze(y_pred), torch.squeeze(labels))loss_train_all.append(loss_train.item())loss_train.backward()optimizer.step()predictions.append(y_pred.squeeze().detach().numpy())  # Squeeze to remove extra dimensionstest_labels.append(labels.squeeze().detach().numpy())train_mse,train_mae = self.timeseries_metrics(predictions=predictions,test_labels=test_labels,scaler=Dataset.scaler)#测试val集predictions = []test_labels = []with torch.no_grad():for seq, labels in test_loader:model.reset_hidden_state(tmp_batch_size=1)y_test_pred = model(seq)# 保存预测和真实标签predictions.append(y_test_pred.squeeze().detach().numpy())  # Squeeze to remove extra dimensionstest_labels.append(labels.squeeze().detach().numpy())val_mse,val_mae = self.timeseries_metrics(predictions=predictions,test_labels=test_labels,scaler=Dataset.scaler)print('Epoch: {:04d}'.format(epoch + 1),'loss_train: {:.4f}'.format(np.mean(loss_train_all)),'mae_train: {:.8f}'.format(train_mae),'mae_val: {:.8f}'.format(val_mae))torch.save(model, os.path.join(cfg.save_model_dir, 'latest.pth'))  # 模型保存joblib.dump(Dataset.scaler,os.path.join(cfg.save_model_dir, 'latest_scaler.save')) # 数据缩放比例保存def test(self):#Create Test ProcessingDataset = Define_Data(task_type='test')Dataset.refresh_df_data(tmp_df_path=os.path.join(cfg.data_input_path,cfg.data_inputfile_name),tmp_df_sheet_name='数据处理',use_lines='[2995,4000]')Dataset.scaler = joblib.load(os.path.join(cfg.save_model_dir, 'latest_scaler.save'))test_loader,_ = Dataset.get_tensor_data()model_path = os.path.join(cfg.save_model_dir, 'latest.pth')model = torch.load(model_path, map_location=torch.device(cfg.device))model.eval()params = sum(p.numel() for p in model.parameters())predictions = []test_labels = []with torch.no_grad():for seq, labels in test_loader:model.reset_hidden_state(tmp_batch_size=1)y_test_pred = model(seq)# 保存预测和真实标签predictions.append(y_test_pred.squeeze().detach().numpy())  # Squeeze to remove extra dimensionstest_labels.append(labels.squeeze().detach().numpy())_, val_mae = self.timeseries_metrics(predictions=predictions,test_labels=test_labels,scaler=Dataset.scaler)print('Test set results:','mae_val: {:.8f}'.format(val_mae),'params={:.4f}k'.format(params / 1024))def BaseTrue_infer(self):# Create BaseTrue Infer ProcessingDataset = Define_Data(task_type='infer')Dataset.refresh_df_data(tmp_df_path=os.path.join(cfg.data_input_path, cfg.data_inputfile_name),tmp_df_sheet_name='数据处理',use_lines='[4000,4870]')Dataset.scaler = joblib.load(os.path.join(cfg.save_model_dir, 'latest_scaler.save'))test_loader, _ = Dataset.get_tensor_data()model_path = os.path.join(cfg.save_model_dir, 'latest.pth')model = torch.load(model_path, map_location=torch.device(cfg.device))model.eval()params = sum(p.numel() for p in model.parameters())predictions = [] #模型推理值test_labels = [] #标签值,可以没有with torch.no_grad():for seq, labels in test_loader:model.reset_hidden_state(tmp_batch_size=1)y_test_pred = model(seq)# 保存预测和真实标签predictions.append(y_test_pred.squeeze().detach().numpy())  # Squeeze to remove extra dimensionstest_labels.append(labels.squeeze().detach().numpy())predictions = np.array(predictions)test_labels = np.array(test_labels)predictions_rescaled = Dataset.scaler.inverse_transform(predictions.reshape(-1, 1)).flatten()test_labels_rescaled = Dataset.scaler.inverse_transform(test_labels.reshape(-1, 1)).flatten()pd.DataFrame({'test_labels':test_labels_rescaled,'模型推理值':predictions_rescaled}).to_excel(os.path.join(cfg.save_model_dir,cfg.data_BaseTrue_infer_output_name),index=False)print('Infer Ok')def BaseSelf_infer(self):# Create BaseSelf Infer ProcessingDataset = Define_Data(task_type='infer')Dataset.refresh_df_data(tmp_df_path=os.path.join(cfg.data_input_path, cfg.data_inputfile_name),tmp_df_sheet_name='数据处理',use_lines='[4000,4870]')Dataset.scaler = joblib.load(os.path.join(cfg.save_model_dir, 'latest_scaler.save'))test_loader, _ = Dataset.get_tensor_data()initial_input, labels = next(iter(test_loader))model_path = os.path.join(cfg.save_model_dir, 'latest.pth')model = torch.load(model_path, map_location=torch.device(cfg.device))model.eval()params = sum(p.numel() for p in model.parameters())predictions = [] #模型推理值with torch.no_grad():for _ in range(cfg.num_predictions):model.reset_hidden_state(tmp_batch_size=1)y_test_pred = model(initial_input)# 将预测结果转换为适合再次输入模型的形式next_input = torch.cat((initial_input[1:, ...], y_test_pred.unsqueeze(-1)), dim=0)initial_input = next_input# 保存预测和真实标签predictions.append(y_test_pred.squeeze().item())  # Squeeze to remove extra dimensionspredictions_rescaled = Dataset.scaler.inverse_transform(np.array(predictions).reshape(-1, 1)).flatten()pd.DataFrame({'模型推理值': predictions_rescaled}).to_excel(os.path.join(cfg.save_model_dir,cfg.data_BaseSelf_infer_output_name), index=False)print('Infer Ok')def timeseries_metrics(self,predictions,test_labels,scaler):# 反向缩放预测和标签值predictions = np.array(predictions)test_labels = np.array(test_labels)# 此处假设predictions和test_labels是一维数组,如果不是,你可能需要调整reshape的参数predictions_rescaled = scaler.inverse_transform(predictions.reshape(-1, 1)).flatten()test_labels_rescaled = scaler.inverse_transform(test_labels.reshape(-1, 1)).flatten()# 计算MSE和MAEmse = mean_squared_error(test_labels_rescaled, predictions_rescaled)mae = mean_absolute_error(test_labels_rescaled, predictions_rescaled)# print(f"Test MSE on original scale: {mse}")# print(f"Test MAE on original scale: {mae}")return mse,maeif __name__ == '__main__':myrun = my_run()if cfg.istrain == True:myrun.train()if cfg.istest == True:myrun.test()if cfg.BaseTrue_infer == True:myrun.BaseTrue_infer()if cfg.BaseSelf_infer == True:myrun.BaseSelf_infer()

三、结果与分析

        本文代码,配置了两种预测模式,第一种,BaseTrue_infer:根据真实数据预测下一个点,然后循环用的真实数据;第二种,BaseSelf_infer:根据预测数据自回归预测下一个点,然后循环用的预测数据。实际用的一般都是第二种才有实用价值,当然本文时序预测的训练模式没有采用长距离自动纠偏的trick,所以第二种预测就直接坍塌了。后续可以研究探讨长时间预测如何进行。下面贴上在"五粮液"股价收盘价上的实验结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/439103.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android SystemUI 介绍

目录 一、什么是SystemUI 二、SystemUI应用源码 三、学习 SystemUI 的核心组件 四、修改状态与导航栏测试 本篇文章&#xff0c;主要科普的是Android SystemUI &#xff0c; 下一篇文章我们将介绍如何把Android SystemUI 应用转成Android Studio 工程项目。 一、什么是Syst…

Hadoop3.x基础(1)

来源&#xff1a;B站尚硅谷 这里写目录标题 大数据概论大数据概念大数据特点(4V)大数据应用场景 Hadoop概述Hadoop是什么Hadoop发展历史&#xff08;了解&#xff09;Hadoop三大发行版本&#xff08;了解&#xff09;Hadoop优势&#xff08;4高&#xff09;Hadoop组成&#xf…

349. 两个数组的交集(力扣LeetCode)

文章目录 349. 两个数组的交集题目描述数组解题set容器解题该思路数组版解题 349. 两个数组的交集 题目描述 给定两个数组 nums1 和 nums2 &#xff0c;返回 它们的交集 。输出结果中的每个元素一定是 唯一 的。我们可以 不考虑输出结果的顺序 。 示例 1&#xff1a; 输入&a…

C#,贝尔数(Bell Number)的计算方法与源程序

1 埃里克坦普尔贝尔 贝尔数是组合数学中的一组整数数列&#xff0c;以埃里克坦普尔贝尔&#xff08;Eric Temple Bell&#xff09;命名&#xff0c; 埃里克坦普尔贝尔&#xff08;生于1883年2月7日&#xff0c;苏格兰阿伯丁郡阿伯丁&#xff0c;于1960年12月21日在美国加利福尼…

RK3568平台开发系列讲解(Linux系统篇)device 资源的获取

🚀返回专栏总目录 文章目录 一、platform_device 结构体二、platform_get_resource() 获取沉淀、分享、成长,让自己和他人都能有所收获!😄 一、platform_device 结构体 struct platform_driver 结构体继承了 struct device_driver 结构体, 因此可以直接访问 struct devi…

SeaTunnel集群安装

环境准备 服务器节点 节点名称 IP bigdata1 192.168.1.250 bigdata4 192.168.1.251 bigdata5 192.168.1.252 Java环境&#xff08;三个节点都需要&#xff09; java1.8 注意&#xff1a;在安装SeaTunnel集群时&#xff0c;最好是现在一个节点上将所有配置都修改完&a…

jenkins pipeline配置maven可选参数

1、在Manage Jenkins下的Global Tool Configuration下对应的maven项添加我们要用得到的不同版本的maven安装项 2、pipeline文件内容具体如下 我们maven是单一的&#xff0c;所以我们都是配置单选参数 pipeline {agent anyparameters {gitParameter(name: BRANCH_TAG, type: …

(五)MySQL的备份及恢复

1、MySQL日志管理 在数据库保存数据时&#xff0c;有时候不可避免会出现数据丢失或者被破坏&#xff0c;这样情况下&#xff0c;我们必须保证数据的安全性和完整性&#xff0c;就需要使用日志来查看或者恢复数据了 数据库中数据丢失或被破坏可能原因&#xff1a; 误删除数据…

jQuery 下载 使用

1. jQuery 网址&#xff1a;jQuery CDN 2. 下载, 右击在新的tab打开 3.ctrls另存为jquery-1.12.4.min.js&#xff0c;并放到自己项目的js目录下 4.请参考目录 5.编写第一个使用jQuery的代码 <script src"js/jquery-1.12.4.min.js"></script> <!DOCT…

数学知识第五期 扩展欧几里得算法

前言 扩展欧几里得算法也是重要的数论&#xff0c;数论固然就是数学知识的理论&#xff0c;希望大家能够熟练掌握&#xff01;&#xff01;&#xff01; 一、扩展欧几里得算法的基本内容 扩展欧几里得算法&#xff08;英语&#xff1a;Extended Euclidean algorithm&#xf…

非阿里云注册域名如何在云解析DNS设置解析?

概述 非阿里云注册域名使用云解析DNS&#xff0c;按照如下步骤&#xff1a; 添加域名。 添加解析记录。 修改DNS服务器。 DNS服务器变更全球同步&#xff0c;等待48小时。 添加解析记录 登录云解析DNS产品控制台。 在 域名解析 页面中&#xff0c;单击 添加域名 。 在 …

为什么要用云手机养tiktok账号

在拓展海外电商市场的过程中&#xff0c;许多用户选择采用tiktok短视频平台引流的策略&#xff0c;以提升在电商平台上的流量&#xff0c;吸引更多消费者。而要进行tiktok引流&#xff0c;养号是必不可少的一个环节。tiktok云手机成为实现国内跨境养号的一种有效方式&#xff0…