-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbasicLSTMs.py
More file actions
124 lines (107 loc) · 4.35 KB
/
basicLSTMs.py
File metadata and controls
124 lines (107 loc) · 4.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from numpy import array
from keras.models import Sequential
from keras.layers import LSTM, Dense, Bidirectional, Conv1D, TimeDistributed, MaxPooling1D, Flatten, ConvLSTM2D
from getSeqs import in_seq1 as raw_seq
TYPE = "stackedLSTM" # Type of univariate LSTM - Default = Vanilla LSTM
# split a univariate sequence into samples
def split_sequence(sequence, n_steps):
X, y = list(), list()
for i in range(len(sequence)):
# find the end of this pattern
end_ix = i + n_steps
# check if we are beyond the sequence
if end_ix > len(sequence) - 1:
break
# gather input and output parts of the pattern
seq_x, seq_y = sequence[i:end_ix], sequence[end_ix]
X.append(seq_x)
y.append(seq_y)
return array(X), array(y)
def get_vanilla_lstm(n_steps, n_features, n_outputs=1, n_units=50):
model = Sequential()
model.add(LSTM(n_units, activation='relu', input_shape=(n_steps, n_features)))
model.add(Dense(n_outputs))
model.compile(optimizer='adam', loss='mse')
return model
def get_stacked_lstm(n_steps, n_features, n_outputs=1, n_units=None):
if n_units is None:
n_units = [50, 50, 20]
model = Sequential()
model.add(LSTM(n_units[0], activation='relu', return_sequences=True, input_shape=(n_steps, n_features)))
model.add(LSTM(n_units[1], activation='relu', return_sequences=True))
model.add(LSTM(n_units[2], activation='relu'))
model.add(Dense(n_outputs))
model.compile(optimizer='adam', loss='mse')
return model
def get_bidirectional_lstm(n_steps, n_features, n_outputs=1, n_units=50):
model = Sequential()
model.add(Bidirectional(LSTM(n_units, activation='relu'), input_shape=(n_steps, n_features)))
model.add(Dense(n_outputs))
model.compile(optimizer='adam', loss='mse')
return model
def get_cnn_lstm(n_steps, n_features, n_units=50, filters=64):
model = Sequential()
model.add(
TimeDistributed(Conv1D(filters=filters, kernel_size=1, activation='relu'),
input_shape=(None, n_steps, n_features)))
model.add(TimeDistributed(MaxPooling1D(pool_size=2)))
model.add(TimeDistributed(Flatten()))
model.add(LSTM(n_units, activation='relu'))
model.add(Dense(1))
model.compile(optimizer='adam', loss='mse')
return model
def get_conv_lstm(n_steps, n_features, filters=64):
model = Sequential()
model.add(
ConvLSTM2D(filters=filters, kernel_size=(1, 2), activation='relu', input_shape=(n_seq, 1, n_steps, n_features)))
model.add(Flatten())
model.add(Dense(1))
model.compile(optimizer='adam', loss='mse')
return model
if __name__ == '__main__':
n_features = 1
if TYPE not in ["CNNLSTM", "ConvLSTM"]:
# choose a number of time steps
n_steps = 3
# split into samples
X, y = split_sequence(raw_seq, n_steps)
# reshape from [samples, timesteps] into [samples, timesteps, features]
X = X.reshape((X.shape[0], X.shape[1], n_features))
else:
# choose a number of time steps
n_steps = 4
# split into samples
X, y = split_sequence(raw_seq, n_steps)
n_steps = 2
n_seq = 2
if TYPE == "CNNLSTM":
X = X.reshape((X.shape[0], n_seq, n_steps, n_features))
else:
X = X.reshape((X.shape[0], n_seq, 1, n_steps, n_features))
# define model
print('model type is {0}'.format(TYPE))
if TYPE == "CNNLSTM":
model = get_cnn_lstm(n_steps, n_features)
elif TYPE == "ConvLSTM":
model = get_conv_lstm(n_steps, n_features)
elif TYPE == "BidirectionalLSTM":
model = get_bidirectional_lstm(n_steps, n_features)
elif TYPE == "StackedLSTM":
model = get_stacked_lstm(n_steps, n_features)
else:
model = get_vanilla_lstm(n_steps, n_features)
# fit model
model.fit(X, y, batch_size=2, epochs=200, verbose=1)
# demonstrate prediction
x_input = array([70, 80, 90])
if TYPE not in ["CNNLSTM", "ConvLSTM"]:
x_input = array([70, 80, 90])
x_input = x_input.reshape((1, n_steps, n_features))
else:
x_input = array([60, 70, 80, 90])
if TYPE == "CNNLSTM":
x_input = x_input.reshape((1, n_seq, n_steps, n_features))
else:
x_input = x_input.reshape((1, n_seq, 1, n_steps, n_features))
yhat = model.predict(x_input, verbose=1)
print(yhat)