import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as npclass CustomEmbedding(nn.Module):def __init__(self, num_embeddings, embedding_dim):super(CustomEmbedding, self).__init__()self.embeddings = nn.Parameter(torch.randn(num_embeddings, embedding_dim))def forward(self, input):return self.embeddings[input]class TimeSeriesTransformer(nn.Module):def __init__(self, input_dims, d_model, n_heads, n_layers, seq_len, num_classes, output_steps):super(TimeSeriesTransformer, self).__init__()self.input_dims = input_dims self.d_model = d_modelself.n_heads = n_headsself.n_layers = n_layersself.seq_len = seq_lenself.embeddings = nn.ModuleList([CustomEmbedding(num_embeddings=100 if i == 0 else 50, embedding_dim=dim)for i, dim in enumerate(input_dims)])self.positional_encoding = self.get_positional_encoding(seq_len, d_model)self.transformer_encoder = nn.TransformerEncoder(nn.TransformerEncoderLayer(d_model=d_model, nhead=n_heads), num_layers=n_layers)self.fc_classification = nn.Linear(d_model, num_classes * output_steps) self.fc_regression = nn.Linear(d_model, 1 * output_steps) def get_positional_encoding(self, seq_len, d_model):position = np.arange(seq_len)[:, np.newaxis]div_term = np.exp(np.arange(0, d_model, 2) * -(np.log(10000.0) / d_model))pos_enc = np.zeros((seq_len, d_model)) pos_enc[:, 0::2] = np.sin(position * div_term)pos_enc[:, 1::2] = np.cos(position * div_term)return torch.FloatTensor(pos_enc).unsqueeze(0) def forward(self, x):embeddings = [embed(x[:, :, i]) for i, embed in enumerate(self.embeddings)] x = torch.cat(embeddings, dim=-1) assert x.shape[-1] == self.d_model, "Concatenated embeddings do not match d_model."x += self.positional_encoding[:, :x.size(1), :] x = x.permute(1, 0, 2) x = self.transformer_encoder(x) x = x.mean(dim=0) class_output = self.fc_classification(x) reg_output = self.fc_regression(x) return class_output.reshape(-1, output_steps, num_classes), reg_output.reshape(-1, output_steps, 1)
input_dims = [32, 16]
d_model = sum(input_dims)
n_heads = 4
n_layers = 2
seq_len = 10
batch_size = 32
num_classes = 3
output_steps = 5
num_samples = 1000
num_epochs = 100
learning_rate = 0.001
x1 = torch.randint(0, 99, (num_samples, seq_len, 1))
x2 = torch.randint(0, 49, (num_samples, seq_len, 1))
X = torch.cat([x1, x2], dim=-1)
y_class = torch.randint(0, num_classes, (num_samples, output_steps))
y_reg = torch.randn(num_samples, output_steps, 1)
dataset = TensorDataset(X, y_class, y_reg)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
model = TimeSeriesTransformer(input_dims, d_model, n_heads, n_layers, seq_len, num_classes, output_steps)
criterion_class = nn.CrossEntropyLoss(ignore_index=-1)
criterion_reg = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
for epoch in range(num_epochs):model.train()running_loss_class = 0.0running_loss_reg = 0.0for inputs, labels_class, labels_reg in data_loader:optimizer.zero_grad()outputs_class, outputs_reg = model(inputs)loss_class = criterion_class(outputs_class.view(-1, num_classes), labels_class.view(-1)) loss_reg = criterion_reg(outputs_reg.view(-1, 1), labels_reg.view(-1, 1)) total_loss = loss_class + loss_regtotal_loss.backward()optimizer.step()running_loss_class += loss_class.item()running_loss_reg += loss_reg.item()print(f'Epoch [{epoch + 1}/{num_epochs}], 'f'Classification Loss: {running_loss_class / len(data_loader):.4f}, 'f'Regression Loss: {running_loss_reg / len(data_loader):.4f}')
model.eval()
with torch.no_grad():correct = 0total = 0for inputs, labels_class, labels_reg in data_loader:outputs_class, outputs_reg = model(inputs)predicted = torch.argmax(outputs_class, dim=2).view(-1) labels_flat = labels_class.view(-1) total += labels_flat.size(0) correct += (predicted == labels_flat).sum().item() accuracy = 100 * correct / totalprint(f'Accuracy: {accuracy:.2f}%')