上一篇熟悉了LSTM之后,就可以用这个工具做nlp相关的实验了。
下面两组代码是使用LSTM做文本分类的实验:
一、更多使用自定义方法做文本分类的代码,有几个特点:
1. 数据集是根据csv文件格式解析的,用的自定义数据类。
2. 使用jieba分词。
3. 数据对齐使用了collate_fn和pad_sequence函数确保维度统一。
4. 使用之前word2vec训练好的result.model.bin模型将文本嵌入。
5. 由于单独使用的word2vec,因此模型中不再有Embedding模块。
import torch import jieba import numpy as np import pandas as pd import torch.nn as nn from torch.nn.utils.rnn import pad_sequence from torch.utils.data import Dataset, DataLoader from gensim.models.keyedvectors import KeyedVectorsdevice = torch.device("cuda")word_vectors = KeyedVectors.load_word2vec_format('result.model.bin', binary = True) word2idx = {word: idx + 1 for idx, word in enumerate(word_vectors.index_to_key)}batch_size = 8 embed_size = word_vectors.vector_size hidden_size = 64 vocab_size = len(word2idx) num_classes = 2 num_layers = 4print(vocab_size,embed_size)# 自定义数据集类 class TxTDataset(Dataset):def __init__(self, txt_file):self.df = pd.read_csv(txt_file)self.rows = self.df.shape[0]def __len__(self):return self.rowsdef __getitem__(self, index):txt = self.df.iloc[index, 0]scentence = []pos = jieba.cut(txt, cut_all = False)for term in pos:if term in word_vectors.index_to_key:scentence.append(word_vectors[term])if(self.df.iloc[index,1]=='正面'):label = np.array([1,0])else:label = np.array([0,1]) scentence = torch.from_numpy(np.array(scentence))label = torch.from_numpy(label).float()return scentence, labeldef collate_fn(x):data = []for i in range(len(x)):if len(x[i][0])==0:continuedata.append(x[i][0])data = pad_sequence(data, batch_first=True,padding_value=0).float() scents = []labels = []for i in range(data.size(0)):scents.append(data[i,:,:])labels.append(x[i][1])return [scents,labels]class RnnClassifer(nn.Module):def __init__(self, embed_size, hidden_size, num_classes,num_layers):super(RnnClassifer, self).__init__()self.hidden_size = hidden_sizeself.num_layers = num_layersself.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)self.norm = nn.LayerNorm(hidden_size)self.fc = nn.Linear(hidden_size, num_classes)self.sigmod = nn.Sigmoid()def forward(self, x):h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)x, _ = self.lstm(x, (h0, c0)) x = self.norm(x) x = self.fc(x[:, -1, :])return self.sigmod(x)model = RnnClassifer(embed_size,hidden_size,num_classes,num_layers) model.to(device) model.train() train_dataset = TxTDataset('data_single.csv') train_loader = DataLoader(train_dataset, batch_size, shuffle=True, collate_fn=collate_fn)cross = nn.CrossEntropyLoss() optimizer=torch.optim.Adam(model.parameters(),lr=1e-3)for epoch in range(10):correctSum = 0.0lossSum = 0.0dataLen = 0for inputs, labels in train_loader:inputs = torch.stack(inputs,dim=0).to(device)labels = torch.stack(labels,dim=0).to(device)outputs = model(inputs)loss = cross(outputs, labels)_, preds = torch.max(outputs,dim=1) _, lab = torch.max(labels,dim=1)optimizer.zero_grad()loss.backward() optimizer.step() correct = (preds == lab).sum() correctSum +=correctlossSum += loss.item()dataLen +=inputs.size(0)print('epoch loss prec:',epoch, lossSum/dataLen,(correctSum/dataLen).item())print(lossSum/dataLen, correctSum/dataLen)
二、更多使用现成函数做文本分类代码,更精简,和上组有几点不同:
1. 不使用自定义数据集,使用TabularDataset作为模版构造数据集。
2. 使用spacy分词。
3. 使用torchtext中Filed构造vocal词袋。
4. 使用BucketIterator做数据对齐。
5. 使用nn.Embedding做文本嵌入,比自己训练的word2vec更高效,所以模型中多了一个Embedding模块。
import spacy import torch import torch.utils import torch.nn as nn import torch.utils.data from torchtext.data import Field, BucketIterator,TabularDatasetdevice = torch.device("cuda") spacy_zh = spacy.load("zh_core_web_sm")class RnnClassifer(nn.Module):def __init__(self,vocab_size, embed_size, hidden_size, num_classes,num_layers):super(RnnClassifer, self).__init__() self.embedding = nn.Embedding(vocab_size, embed_size) self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)self.norm = nn.LayerNorm(hidden_size)self.fc = nn.Linear(hidden_size, num_classes)self.sigmod = nn.Sigmoid()def forward(self, x, h):x = self.embedding(x)x, h = self.lstm(x, h)x = self.norm(x) x = self.fc(x[:, -1, :])return self.sigmod(x),hdef tokenizer(text): return [tok.text for tok in spacy_zh.tokenizer(text)]def detach(states):return [state.detach() for state in states] TEXT = Field(sequential=True, tokenize=tokenizer) LABEL = Field(sequential=True, tokenize=tokenizer)train_data = TabularDataset( path='data_single.csv',format='csv', fields={'evaluation':('evaluation', TEXT),'label':('label', LABEL) })TEXT.build_vocab(train_data) LABEL.build_vocab(train_data)batch_size = 64 vocab_size = len(TEXT.vocab) embed_size = 128 hidden_size = 64 num_classes = 2 num_layers = 3print(vocab_size)train_iterator = BucketIterator(train_data, batch_size=batch_size, device='cuda', sort_key=lambda x: len(x.text),repeat=False,train=True, shuffle=True)model = RnnClassifer(vocab_size,embed_size,hidden_size,num_classes,num_layers) model.to(device) model.train() cross = nn.CrossEntropyLoss() optimizer=torch.optim.Adam(model.parameters(),lr=1e-3)for epoch in range(50):correctSum = 0.0lossSum = 0.0dataLen = 0states = (torch.zeros(num_layers, batch_size, hidden_size).to(device),torch.zeros(num_layers, batch_size, hidden_size).to(device))for i, batch in enumerate(train_iterator): inputs = batch.evaluation.transpose(0,1).to(device)labels = (batch.label.transpose(0,1) - 2).to(device)labels = torch.cat((labels,1-labels),dim=1).float()if inputs.size(0)!=batch_size:continuestates = detach(states)outputs,states = model(inputs,states)loss = cross(outputs, labels)_, preds = torch.max(outputs,dim=1) _, lab = torch.max(labels,dim=1)optimizer.zero_grad()loss.backward() optimizer.step() correct = (preds == lab).sum() correctSum +=correctlossSum += loss.item()dataLen +=inputs.size(0)print('epoch loss prec:',epoch, lossSum/dataLen,(correctSum/dataLen).item())print(lossSum/dataLen, correctSum/dataLen)
训练数据下载地址:https://files.cnblogs.com/files/tiandsp/data_single.rar