Toggle dark mode

Hi,

📢 Taipy v2.1 is about to be launched! 🚀 ETA:

Day(s)

:

Hour(s)

:

Minute(s)

:

Second(s)

M

Reply To: LSTM

Welcome to Taipy Forums Other LSTM Reply To: LSTM

#237420
Florian JactaFlorian Jacta
Keymaster

Hi,

Thank you for your code. I will format your answer to have a clearer code.


import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np

data = pd.read_csv('CBS.csv', parse_dates=[0])
data = da.dropna(axis = 0)
data.head()

from matplotlib import pyplot as plt
data.loc[0:100]["CLOSE"].plot(kind="line", figsize=(10,5), title = "Indices - Close")
plt.legend()
plt.show()

data.loc[0:100]["GMS"].plot(kind="line", figsize=(10,5), title = "GMS Change")
plt.legend()
plt.show()

def transform_data(data, feature_cols, target_col, cut_off_index, lookback=3):
    
    X, Y, x_scaler, y_scaler = None, None, None, None
    
    x = np.array(data[feature_cols])
    y = np.array(data[target_col])
    
    x_scaler = StandardScaler()
    y_scaler = StandardScaler()
    
    x_scaler.fit(x[:cut_off_index])
    y_scaler.fit(np.array((y[:cut_off_index],)).T)
    x = x_scaler.transform(x)
    y = y_scaler.transform(np.array((y,)).T)
    
    X = np.array([x[i:(i+lookback)] for i in range(len(x)-lookback)])
    Y = np.array([y[i+lookback] for i in range(len(y)-lookback)])
    
    return X, Y, x_scaler, y_scaler

np.set_printoptions(precision = 3)

feature_cols = ['CLOSE', 'GMS']
target_col = "CLOSE"
lookback = 3
train_test_cut_off = int(len(data)*0.8)

X, Y, x_scaler, y_scaler = transform_data(data, feature_cols, target_col, I am running a few minutes late; my previous meeting is running over.
                                                   train_test_cut_off, lookback=lookback)
print("Total samples: {0}, train: {1}, test: {2}\n".\
      format(len(X), train_test_cut_off, len(X)- train_test_cut_off))
print("Show a few observations:")
print("Before transformation:")
print(data.iloc[0:7,0:3])
print("\nAfter transformation:")
print("X:")
print(X[0:2,:,0:2])
print("\nY - Regression:")
print(Y[0:2])

class SP_Model(nn.Module):
    
    def __init__(self, in_dim, h_dim, n_layers):
        super(SP_Model, self).__init__()
        self.prep = nn.Sequential(
            nn.LSTM(input_size = in_dim, hidden_size = h_dim, num_layers = n_layers, batch_first=True, bidirectional = False))
        self.net = nn.Sequential(
            nn.Linear(h_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 1))
    
    def forward(self, x):
        out, h = self.prep(x)
        return self.net(out)

class SP_dataset(Dataset):
    
    def __init__(self, x, y_reg, y_clf):
        self.x = torch.Tensor(x)
        self.y_reg = torch.Tensor(y_reg)
        self.y_clf = torch.Tensor(y_clf)
    
    def __getitem__(self, index):
        return self.x[index], self.y_reg[index], self.y_clf[index]
    
    def __len__(self):
        return len(self.x)

def train_model(model, train_dataset, test_dataset, device, binary_pred = False,\
                lr=0.0005, epochs=20, batch_size=32):
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)

    model = model.to(device)
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    history = {'train_loss': [], 'train_acc': [], 'test_loss': [], 'test_acc': []}
    
    print('Start!')
    
    if binary_pred:
        cri = nn.BCEWithLogitsLoss().to(device)
        
        for epoch in range(epochs):
        
            model.train()
            train_loss = 0
            train_acc = 0
            test_loss = 0
            test_acc = 0

            for X, _, y in train_loader:
                X = X.to(device)
                y = y.to(device)
                out = model(X)[:, -1, :].squeeze(-1)
                acc = (torch.round(torch.sigmoid(out))==y).float().mean().item()
                loss = cri(out, y)

                opt.zero_grad()
                loss.backward()
                opt.step()

                train_loss += loss
                train_acc += acc

            model.eval()
            with torch.no_grad():
                for X, _, y in test_loader:
                    X = X.to(device)
                    y = y.to(device)
                    out = model(X)[:, -1, :].squeeze(-1)
                    acc = (torch.round(torch.sigmoid(out))==y).float().mean().item()
                    loss = cri(out, y)
                    test_loss += loss
                    test_acc += acc  

            train_loss = (train_loss/len(train_loader)).item()
            train_acc = train_acc/len(train_loader)
            test_loss = (test_loss/len(test_loader)).item()
            test_acc = test_acc/len(test_loader)  

            history['train_loss'].append(train_loss)
            history['train_acc'].append(train_acc)
            history['test_loss'].append(test_loss)
            history['test_acc'].append(test_acc)    

            print(f'Epoch: {epoch+1}/{epochs}')
            print(f'Train Loss: {format(train_loss, ".8f")} \t Train Accuracy: {format(train_acc*100, ".3f")} %')
            print(f'Test Loss: {format(test_loss, ".8f")} \t Test Accuracy: {format(test_acc*100, ".3f")} %')
            
    else:
        cri = nn.L1Loss().to(device)
        
        for epoch in range(epochs):
        
            model.train()
            train_loss = 0
            test_loss = 0

            for X, y, _ in train_loader:
                X = X.to(device)
                y = y.to(device)
                out = model(X)[:, -1, :]
                loss = cri(out, y)

                opt.zero_grad()
                loss.backward()
                opt.step()

                train_loss += loss

            model.eval()
            with torch.no_grad():
                for X, y, _ in test_loader:
                    X = X.to(device)
                    y = y.to(device)
                    out = model(X)[:, -1, :]
                    loss = cri(out, y)
                    test_loss += loss 

            train_loss = (train_loss/len(train_loader)).item()
            test_loss = (test_loss/len(test_loader)).item()

            history['train_loss'].append(train_loss)
            history['test_loss'].append(test_loss)   
            

            print(f'Epoch: {epoch+1}/{epochs}')
            print(f'Train Loss: {format(train_loss, ".8f")}')
            print(f'Test Loss: {format(test_loss, ".8f")}')

    print(f'\nComplete!')
    
    return history

training = SP_dataset(X[:train_test_cut_off], Y[:train_test_cut_off], Y_binary[:train_test_cut_off])
testing = SP_dataset(X[train_test_cut_off:], Y[train_test_cut_off:], Y_binary[train_test_cut_off:])

model_A = SP_Model(2, 2, 3)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
lr = 0.01
epochs = 100
batch_size = 200
hist = train_model(model_A, training, testing, device, True, lr, epochs, batch_size)

def plot_history(hist):
    f, ax = plt.subplots(2,1, figsize=(6,9))

    ax[0].set_title('Loss')
    ax[0].plot(hist['train_loss'], label = 'Train', color='b')
    ax[0].plot(hist['test_loss'], label = 'Test', color='r')    
    ax[0].legend()

    ax[1].set_title('Accuracy')
    ax[1].plot(hist['train_acc'], label = 'Train', color='b')
    ax[1].plot(hist['test_acc'], label = 'Test', color='r')    
    ax[1].legend();

plot_history(hist)

def transform_data(data, feature_cols, target_col, cut_off_index, lookback=3):
    
    X, Y, x_scaler, y_scaler = None, None, None, None
    
    x = np.array(data[feature_cols])
    y = np.array(data[target_col])
    
    x_scaler = StandardScaler()
    y_scaler = StandardScaler()
    
    x_scaler.fit(x[:cut_off_index])
    y_scaler.fit(np.array((y[:cut_off_index],)).T)
    x = x_scaler.transform(x)
    y = y_scaler.transform(np.array((y,)).T)
    
    X = np.array([x[i:(i+lookback)] for i in range(len(x)-lookback)])
    Y = np.array([y[i+lookback] for i in range(len(y)-lookback)])
    
    return X, Y, x_scaler, y_scaler
np.set_printoptions(precision = 3)

feature_cols = ['CLOSE']
target_col = "CLOSE"
lookback = 3
train_test_cut_off = int(len(data)*0.8)

X, Y, x_scaler, y_scaler = transform_data(data, feature_cols, target_col, \
                                                   train_test_cut_off, lookback=lookback)
print("Total samples: {0}, train: {1}, test: {2}\n".\
      format(len(X), train_test_cut_off, len(X)- train_test_cut_off))
print("Show a few observations:")
print("Before transformation:")
print(data.iloc[0:7,0:3])
print("\nAfter transformation:")
print("X:")
print(X[0:2,:,0:2])
print("\nY - Regression:")
print(Y[0:2])

training = SP_dataset(X[:train_test_cut_off], Y[:train_test_cut_off], Y_binary[:train_test_cut_off])
testing = SP_dataset(X[train_test_cut_off:], Y[train_test_cut_off:], Y_binary[train_test_cut_off:])

model_A = SP_Model(1, 2, 3)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
lr = 0.01
epochs = 100
batch_size = 250
hist = train_model(model_A, training, testing, device, True, lr, epochs, batch_size)

plot_history(hist)