Welcome to Taipy › Forums › Other › LSTM › Reply To: LSTM
December 20, 2022 at 8:43 am
#237420
Keymaster
Hi,
Thank you for your code. I will format your answer to have a clearer code.
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np
data = pd.read_csv('CBS.csv', parse_dates=[0])
data = da.dropna(axis = 0)
data.head()
from matplotlib import pyplot as plt
data.loc[0:100]["CLOSE"].plot(kind="line", figsize=(10,5), title = "Indices - Close")
plt.legend()
plt.show()
data.loc[0:100]["GMS"].plot(kind="line", figsize=(10,5), title = "GMS Change")
plt.legend()
plt.show()
def transform_data(data, feature_cols, target_col, cut_off_index, lookback=3):
X, Y, x_scaler, y_scaler = None, None, None, None
x = np.array(data[feature_cols])
y = np.array(data[target_col])
x_scaler = StandardScaler()
y_scaler = StandardScaler()
x_scaler.fit(x[:cut_off_index])
y_scaler.fit(np.array((y[:cut_off_index],)).T)
x = x_scaler.transform(x)
y = y_scaler.transform(np.array((y,)).T)
X = np.array([x[i:(i+lookback)] for i in range(len(x)-lookback)])
Y = np.array([y[i+lookback] for i in range(len(y)-lookback)])
return X, Y, x_scaler, y_scaler
np.set_printoptions(precision = 3)
feature_cols = ['CLOSE', 'GMS']
target_col = "CLOSE"
lookback = 3
train_test_cut_off = int(len(data)*0.8)
X, Y, x_scaler, y_scaler = transform_data(data, feature_cols, target_col, I am running a few minutes late; my previous meeting is running over.
train_test_cut_off, lookback=lookback)
print("Total samples: {0}, train: {1}, test: {2}\n".\
format(len(X), train_test_cut_off, len(X)- train_test_cut_off))
print("Show a few observations:")
print("Before transformation:")
print(data.iloc[0:7,0:3])
print("\nAfter transformation:")
print("X:")
print(X[0:2,:,0:2])
print("\nY - Regression:")
print(Y[0:2])
class SP_Model(nn.Module):
def __init__(self, in_dim, h_dim, n_layers):
super(SP_Model, self).__init__()
self.prep = nn.Sequential(
nn.LSTM(input_size = in_dim, hidden_size = h_dim, num_layers = n_layers, batch_first=True, bidirectional = False))
self.net = nn.Sequential(
nn.Linear(h_dim, 64),
nn.ReLU(),
nn.Linear(64, 1))
def forward(self, x):
out, h = self.prep(x)
return self.net(out)
class SP_dataset(Dataset):
def __init__(self, x, y_reg, y_clf):
self.x = torch.Tensor(x)
self.y_reg = torch.Tensor(y_reg)
self.y_clf = torch.Tensor(y_clf)
def __getitem__(self, index):
return self.x[index], self.y_reg[index], self.y_clf[index]
def __len__(self):
return len(self.x)
def train_model(model, train_dataset, test_dataset, device, binary_pred = False,\
lr=0.0005, epochs=20, batch_size=32):
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)
model = model.to(device)
opt = torch.optim.Adam(model.parameters(), lr=lr)
history = {'train_loss': [], 'train_acc': [], 'test_loss': [], 'test_acc': []}
print('Start!')
if binary_pred:
cri = nn.BCEWithLogitsLoss().to(device)
for epoch in range(epochs):
model.train()
train_loss = 0
train_acc = 0
test_loss = 0
test_acc = 0
for X, _, y in train_loader:
X = X.to(device)
y = y.to(device)
out = model(X)[:, -1, :].squeeze(-1)
acc = (torch.round(torch.sigmoid(out))==y).float().mean().item()
loss = cri(out, y)
opt.zero_grad()
loss.backward()
opt.step()
train_loss += loss
train_acc += acc
model.eval()
with torch.no_grad():
for X, _, y in test_loader:
X = X.to(device)
y = y.to(device)
out = model(X)[:, -1, :].squeeze(-1)
acc = (torch.round(torch.sigmoid(out))==y).float().mean().item()
loss = cri(out, y)
test_loss += loss
test_acc += acc
train_loss = (train_loss/len(train_loader)).item()
train_acc = train_acc/len(train_loader)
test_loss = (test_loss/len(test_loader)).item()
test_acc = test_acc/len(test_loader)
history['train_loss'].append(train_loss)
history['train_acc'].append(train_acc)
history['test_loss'].append(test_loss)
history['test_acc'].append(test_acc)
print(f'Epoch: {epoch+1}/{epochs}')
print(f'Train Loss: {format(train_loss, ".8f")} \t Train Accuracy: {format(train_acc*100, ".3f")} %')
print(f'Test Loss: {format(test_loss, ".8f")} \t Test Accuracy: {format(test_acc*100, ".3f")} %')
else:
cri = nn.L1Loss().to(device)
for epoch in range(epochs):
model.train()
train_loss = 0
test_loss = 0
for X, y, _ in train_loader:
X = X.to(device)
y = y.to(device)
out = model(X)[:, -1, :]
loss = cri(out, y)
opt.zero_grad()
loss.backward()
opt.step()
train_loss += loss
model.eval()
with torch.no_grad():
for X, y, _ in test_loader:
X = X.to(device)
y = y.to(device)
out = model(X)[:, -1, :]
loss = cri(out, y)
test_loss += loss
train_loss = (train_loss/len(train_loader)).item()
test_loss = (test_loss/len(test_loader)).item()
history['train_loss'].append(train_loss)
history['test_loss'].append(test_loss)
print(f'Epoch: {epoch+1}/{epochs}')
print(f'Train Loss: {format(train_loss, ".8f")}')
print(f'Test Loss: {format(test_loss, ".8f")}')
print(f'\nComplete!')
return history
training = SP_dataset(X[:train_test_cut_off], Y[:train_test_cut_off], Y_binary[:train_test_cut_off])
testing = SP_dataset(X[train_test_cut_off:], Y[train_test_cut_off:], Y_binary[train_test_cut_off:])
model_A = SP_Model(2, 2, 3)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
lr = 0.01
epochs = 100
batch_size = 200
hist = train_model(model_A, training, testing, device, True, lr, epochs, batch_size)
def plot_history(hist):
f, ax = plt.subplots(2,1, figsize=(6,9))
ax[0].set_title('Loss')
ax[0].plot(hist['train_loss'], label = 'Train', color='b')
ax[0].plot(hist['test_loss'], label = 'Test', color='r')
ax[0].legend()
ax[1].set_title('Accuracy')
ax[1].plot(hist['train_acc'], label = 'Train', color='b')
ax[1].plot(hist['test_acc'], label = 'Test', color='r')
ax[1].legend();
plot_history(hist)
def transform_data(data, feature_cols, target_col, cut_off_index, lookback=3):
X, Y, x_scaler, y_scaler = None, None, None, None
x = np.array(data[feature_cols])
y = np.array(data[target_col])
x_scaler = StandardScaler()
y_scaler = StandardScaler()
x_scaler.fit(x[:cut_off_index])
y_scaler.fit(np.array((y[:cut_off_index],)).T)
x = x_scaler.transform(x)
y = y_scaler.transform(np.array((y,)).T)
X = np.array([x[i:(i+lookback)] for i in range(len(x)-lookback)])
Y = np.array([y[i+lookback] for i in range(len(y)-lookback)])
return X, Y, x_scaler, y_scaler
np.set_printoptions(precision = 3)
feature_cols = ['CLOSE']
target_col = "CLOSE"
lookback = 3
train_test_cut_off = int(len(data)*0.8)
X, Y, x_scaler, y_scaler = transform_data(data, feature_cols, target_col, \
train_test_cut_off, lookback=lookback)
print("Total samples: {0}, train: {1}, test: {2}\n".\
format(len(X), train_test_cut_off, len(X)- train_test_cut_off))
print("Show a few observations:")
print("Before transformation:")
print(data.iloc[0:7,0:3])
print("\nAfter transformation:")
print("X:")
print(X[0:2,:,0:2])
print("\nY - Regression:")
print(Y[0:2])
training = SP_dataset(X[:train_test_cut_off], Y[:train_test_cut_off], Y_binary[:train_test_cut_off])
testing = SP_dataset(X[train_test_cut_off:], Y[train_test_cut_off:], Y_binary[train_test_cut_off:])
model_A = SP_Model(1, 2, 3)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
lr = 0.01
epochs = 100
batch_size = 250
hist = train_model(model_A, training, testing, device, True, lr, epochs, batch_size)
plot_history(hist)