-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathautoencoder.py
More file actions
337 lines (265 loc) · 12.1 KB
/
autoencoder.py
File metadata and controls
337 lines (265 loc) · 12.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# -*- coding: utf-8 -*-
import torch
import torch.nn as nn
from torch.optim import Adam
import torch.nn.functional as functional
from torch.utils.data import DataLoader
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
from config import Config
from dataset import Vocabulary, DualNovelDataSet
config = Config()
class SeqAutoEncoder(object):
"""
序列自编码器
用于计算语句的语义差别
"""
def __init__(self):
"""
初始化类
步骤包括设置数据集、初始化模型和优化器、设定参数
"""
# 设置数据集
self.vocabulary = Vocabulary(config.vocab_file)
self.pad = self.vocabulary.word2id['<pad>']
self.go = self.vocabulary.word2id['<go>']
self.eos = self.vocabulary.word2id['<eos>']
self.unk = self.vocabulary.word2id['<unk>']
self.train_set = DualNovelDataSet()
self.test_set = DualNovelDataSet()
# 初始化模型
self.encoder = RNNEncoder(config.encoder_num_layers, config.encoder_bidirectional)
self.decoder = RNNDecoder(config.decoder_num_layers, config.decoder_bidirectional)
self.trainable_variables = []
for k, v in self.encoder.state_dict(keep_vars=True).items():
if v.requires_grad:
self.trainable_variables.append(v)
for k, v in self.decoder.state_dict(keep_vars=True).items():
if v.requires_grad:
self.trainable_variables.append(v)
# 设定优化器和参数
self.learning_rate = config.ae_learning_rate
self.beta1 = config.ae_beta1
self.beta2 = config.ae_beta2
self.optimizer = Adam(self.trainable_variables, self.learning_rate, (self.beta1, self.beta2))
self.criterion = nn.CrossEntropyLoss()
self.mse_loss = nn.MSELoss(reduction='mean')
self.batch_size = config.ae_batch_size
self.epochs = config.ae_epochs
self.num_workers = config.ae_num_workers
def set_training(self, train_mode):
"""设定训练/测试模式
Args:
train_mode: 布尔型,是否是训练模式
"""
self.encoder.train(mode=train_mode)
self.decoder.train(mode=train_mode)
def train(self, verbose=False, graph=False):
"""训练自编码器
Args:
verbose: 是否输出提示信息,即每个Epoch结束后输出该代的损失
graph: 训练结束后是否显示损失变化曲线图
"""
loss_list = []
for epoch in range(self.epochs):
epoch_loss = self.run_epoch(test=False)
loss_list += epoch_loss
if verbose:
print('\n[TRAIN] Epoch {}, mean loss {}'.format(epoch, np.mean(epoch_loss)))
if graph:
plt.figure()
plt.plot([x for x in range(len(loss_list))], loss_list)
plt.xlabel('step')
plt.ylabel('loss')
plt.grid()
plt.title('Training loss')
plt.show()
def run_epoch(self, test=False):
"""运行一个epoch,可以指定训练/测试模式
Args:
test: 布尔型,是否是测试模式
Returns:
test = True:
mean_loss: 平均损失含函数值
test = False:
loss_list: 训练过程中的损失函数,是一个列表
"""
loss_list = []
if test:
loader = DataLoader(self.train_set, batch_size=self.batch_size, shuffle=True, num_workers=self.num_workers)
self.encoder.train(mode=True)
self.decoder.train(mode=True)
else:
loader = DataLoader(self.test_set, batch_size=self.batch_size, shuffle=True, num_workers=self.num_workers)
self.encoder.train(mode=False)
self.encoder.train(mode=False)
with tqdm(loader) as pbar:
for data in pbar:
sentences, labels = self.preprocess_data(data)
batch_size = sentences.shape[0]
encoder_state = self.encoder.init_hidden(batch_size)
encoder_output, encoder_hidden = self.encoder(sentences, encoder_state)
decoder_input = self.go * torch.ones((batch_size, config.max_sentence_length, config.embedding_dim))
decoder_output, decoder_hidden = self.decoder(decoder_input, encoder_hidden)
loss = self.criterion(decoder_output, sentences.reshape(-1))
if not test:
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
loss_list.append(loss.item())
return np.mean(loss_list) if test else loss_list
def mean_difference(self, sentences_0, sentences_1):
"""计算两组语句的平均语义差别
将语句进行编码,语义差别定义为其编码结果的均方差
Args:
sentences_0: 形状为[batch_size, max_seq_len]的LongTensor,表示语句
sentences_1: 同上
Returns:
mean_difference: 平均语义差别
"""
with torch.no_grad():
assert sentences_0.shape[0] == sentences_1.shape[0]
batch_size = sentences_0.shape[0]
encoder_state = self.encoder.init_hidden(batch_size)
_, hidden_0 = self.encoder(sentences_0, encoder_state)
_, hidden_1 = self.encoder(sentences_1, encoder_state)
return self.mse_loss(hidden_0, hidden_1)
def save_model(self):
"""
将模型保存到指定路径
"""
torch.save(self.encoder.state_dict(), config.encoder_model_path)
torch.save(self.decoder.state_dict(), config.decoder_model_path)
def load_model(self):
"""
从指定路径的文件读取模型参数
"""
self.encoder.load_state_dict(torch.load(config.encoder_model_path, map_location=lambda storage, loc: storage))
self.decoder.load_state_dict(torch.load(config.decoder_model_path, map_location=lambda storage, loc: storage))
def preprocess_data(self, data):
"""预处理数据
这里的数据是来自DualNovelDataSet的数据样本,同时包括风格为0和1的数据
Args:
data: DataLoader给出的每一个数据样本,格式见本函数第一行
Returns:
sentences: 形状为[batch_size * 2, max_len]的Tensor,代表语句数据
label: 形状为[batch_size * 2]的Tensor,代表标签
"""
(bare_0, go_0, eos_0, len_0), (bare_1, go_1, eos_1, len_1) = data
batch_size = bare_0.shape[0]
label_0 = torch.zeros(batch_size)
label_1 = torch.ones(batch_size)
sentences = torch.cat([bare_0, bare_1], dim=0)
label = torch.cat([label_0, label_1], dim=0)
if config.gpu:
sentences = sentences.cuda()
label = label.cuda()
return sentences, label
class RNNEncoder(nn.Module):
"""
基于RNN的序列编码器
结构包含一个Embedding层和一个多层GRU模块
"""
def __init__(self, num_layers, bidirectional):
"""初始化编码器
Args:
num_layers: GRU网络的层数
bidirectional: 布尔型,是否使用双向网络
"""
super(RNNEncoder, self).__init__()
self.vocabulary = Vocabulary(config.vocab_file)
self.embedding_dim = config.embedding_dim
self.num_layers = num_layers
self.bidirectional = bidirectional
self.directions = 2 if bidirectional else 1
self.embedding = nn.Embedding(self.vocabulary.vocab_size, self.embedding_dim)
if config.gpu:
self.embedding = self.embedding.cuda()
self.gru = nn.GRU(
self.embedding_dim, self.embedding_dim, num_layers, batch_first=True, bidirectional=bidirectional
)
if config.gpu:
self.gru = self.gru.cuda()
def forward(self, inputs, hidden):
"""前向传播步骤
Args:
inputs: 形状为[batch_size, max_seq_len]的LongTensor,表示网络输入
hidden: 形状为[num_layers * directions, batch_size, embedding_dim]的Tensor,表示隐藏状态
Returns:
output: 网络输出,形状为[batch_size, max_seq_len, embedding_dim]的Tensor
hidden: 网络输出的状态,形状同输入的hidden
"""
# embedded shape: [batch_size, max_seq_len, embedding_dim]
embedded = self.embedding(inputs)
# output shape: [batch_size, max_seq_len, embedding_dim]
output, hidden = self.gru(embedded, hidden)
return output, hidden
def init_hidden(self, batch_size):
"""取得用于初始化的全零状态
Args:
batch_size: 一批数据的大小,和初始状态的维度有关
Returns:
state: 可用于初始化的隐藏状态,是形状为[num_layers * directions, batch_size, embedding_dim]的Tensor
"""
state = torch.zeros((self.num_layers * self.directions, batch_size, self.embedding_dim))
if config.gpu:
state = state.cuda()
return state
class RNNDecoder(nn.Module):
"""
基于RNN的解码器模块
结构包含一个多层GRU模块和一个全连接层
解码器不需要初始化状态,因为初始状态来自编码器
"""
def __init__(self, num_layers, bidirectional):
"""初始化解码器
Args:
num_layers: GRU网络的层数
bidirectional: 布尔型,是否使用双向网络
"""
super(RNNDecoder, self).__init__()
self.vocabulary = Vocabulary(config.vocab_file)
self.embedding_dim = config.embedding_dim
self.num_layers = num_layers
self.bidirectional = bidirectional
self.directions = 2 if self.bidirectional else 1
self.embedding = nn.Embedding(self.embedding_dim, self.embedding_dim)
if config.gpu:
self.embedding = self.embedding.cuda()
self.gru = nn.GRU(
self.embedding_dim, self.embedding_dim, num_layers, batch_first=True, bidirectional=bidirectional
)
if config.gpu:
self.gru = self.gru.cuda()
self.dense = nn.Linear(self.embedding_dim * self.directions, self.vocabulary.vocab_size)
if config.gpu:
self.dense = self.dense.cuda()
def forward(self, inputs, hidden):
"""前向传播步骤
Args:
inputs: 形状为[batch_size, max_seq_len]的LongTensor,表示网络输入,一般用起始符号填充
hidden: 形状为[num_layers * directions, batch_size, embedding_dim]的Tensor,表示隐藏状态,来自编码器
Returns:
output_logits: 形状为[batch_size * max_seq_len, vocab_size]的Tensor,表示解码后得到的语句
中各个单词出现概率的评分(one-hot编码),前两个维度被flatten了
output_probs: output_logits经过softmax的结果,表示各单词出现的概率
"""
output = inputs # shape: [batch_size, max_seq_len, embedding_dim]
output, hidden = self.gru(output, hidden)
output = output.reshape(output.size(0) * output.size(1), output.size(2))
# output_logits shape: [batch_size * max_seq_len, vocab_size]
output_logits = self.dense(output)
output_probs = functional.softmax(output_logits, dim=1)
return output_logits, output_probs
def train_autoencoder():
model = SeqAutoEncoder()
model.train(verbose=True, graph=True)
model.save_model()
loader = DataLoader(model.train_set, batch_size=16, shuffle=True)
for data in loader:
(sen_0, _, _, _), (sen_1, _, _, _) = data
print(model.mean_difference(sen_0, sen_1))
break
if __name__ == '__main__':
train_autoencoder()