Spaces:
Runtime error
Runtime error
| \documentclass[11pt]{article} | |
| \usepackage[sc]{mathpazo} %Like Palatino with extensive math support | |
| \usepackage{fullpage} | |
| \usepackage[authoryear,sectionbib,sort]{natbib} | |
| \linespread{1.7} | |
| \usepackage[utf8]{inputenc} | |
| \usepackage{lineno} | |
| \usepackage{titlesec} | |
| \titleformat{\section}[block]{\Large\bfseries\filcenter}{\thesection}{1em}{} | |
| \titleformat{\subsection}[block]{\Large\itshape\filcenter}{\thesubsection}{1em}{} | |
| \titleformat{\subsubsection}[block]{\large\itshape}{\thesubsubsection}{1em}{} | |
| \titleformat{\paragraph}[runin]{\itshape}{\theparagraph}{1em}{}[. ] | |
| \usepackage{fancyhdr} | |
| \pagestyle{fancy} | |
| \usepackage{minted} | |
| \newminted{python}{} | |
| \usepackage{xcolor} % to access the named colour LightGray | |
| \definecolor{LightGray}{gray}{0.95} | |
| \usepackage{dirtree} | |
| \setlength{\parskip}{6pt} | |
| \setlength{\parindent}{0pt} | |
| %%%%%%%%%%%%%%%%%%%%% | |
| % Header | |
| %%%%%%%%%%%%%%%%%%%%% | |
| % | |
| % Customize the line below with the last name of your first author and | |
| % the short title of your MS. You can comment authorship information out | |
| % while your MS is undergoing double-blind review. | |
| % | |
| \rhead{Ge et al., \textit{preprint submitted to Engineering Geology}} | |
| \setlength{\headsep}{0.3in} | |
| \lhead{} | |
| %%%%%%%%%%%%%%%%%%%%% | |
| % Line numbering | |
| %%%%%%%%%%%%%%%%%%%%% | |
| % | |
| % Please use line numbering with your initial submission and | |
| % subsequent revisions. After acceptance, please turn line numbering | |
| % off by adding percent signs to the lines %\usepackage{lineno} and | |
| % to %\linenumbers{} and %\modulolinenumbers[3] below. | |
| % | |
| % To avoid line numbering being thrown off around math environments, | |
| % the math environments have to be wrapped using | |
| % \begin{linenomath*} and \end{linenomath*} | |
| % | |
| % (Thanks to Vlastimil Krivan for pointing this out to us!) | |
| \title{Appendix: \\ | |
| Benchmarking Large Language Models in Geological Natural Language Processing} | |
| \author{Qi Ge$^{1}$ \\ | |
| Pengfa Li$^{2}$ \\ | |
| Jin Li$^{2}$ \\ | |
| Yiyan Deng$^{2}$ \\ | |
| Hongyue Sun$^{3,\ast}$ \\ | |
| Zhongqiang Liu$^{4,\ast}$ \\ | |
| } | |
| \date{} | |
| \begin{document} | |
| \maketitle | |
| \noindent{} 1. College of Civil Engineering, Nanjing Forestry University; | |
| \noindent{} 2. School of Artificial Intelligence, Nanjing University of Information Science and Technology; | |
| \noindent{} 3. Ocean College, Zhejiang University; | |
| \noindent{} 4. Department of Natural Hazards, Norwegian Geotechnical Institute. | |
| \noindent{} $\ast$ Corresponding authors; e-mails: [email protected], [email protected]. | |
| %\linenumbers{} | |
| %\modulolinenumbers[3] | |
| \renewcommand{\theequation}{S\arabic{equation}} | |
| % redefine the command that creates the equation number. | |
| \renewcommand{\thetable}{S\arabic{table}} | |
| \renewcommand{\thesection}{S\arabic{section}} | |
| \renewcommand{\thefigure}{S\arabic{figure}} | |
| \setcounter{equation}{0} % reset counter | |
| \setcounter{figure}{0} | |
| \setcounter{table}{0} | |
| \newpage{} | |
| \section*{Supplementary code} | |
| The Python codebase for the LLM benchmark code for this study is saved in a directory named "Geo-LLM-Benchmarks" which contains several components: | |
| \vspace{12pt} | |
| \dirtree{% | |
| .1 GeoLLM. | |
| .2 data. | |
| .3 Task1. | |
| .4 ..... | |
| .3 Task2. | |
| .4 ..... | |
| .2 docs. | |
| .3 Readme.txt. | |
| .3 requirements.txt. | |
| .2 scripts. | |
| .3 Task1. | |
| .4 KNN\_token.py. | |
| .3 Task2. | |
| .4 cot\_process.py. | |
| .4 KNN.py. | |
| .4 utils. | |
| .5 LLM\_APIs.txt. | |
| .5 LLM.py. | |
| .2 tasks. | |
| .3 task1. | |
| .4 eval.py. | |
| .4 Task1\_test.ipynb. | |
| .4 metrics. | |
| .5 graph\_matching.py. | |
| .4 utils. | |
| .5 LLM\_APIs.txt. | |
| .5 LLM.py. | |
| .3 task2. | |
| .4 eval.py. | |
| .4 pretreatment\_split\_data\_Geo.py. | |
| .4 Task2\_test.ipynb. | |
| .4 utils. | |
| .5 knn\_prompt.py. | |
| .5 LLM\_APIs.txt. | |
| .5 save\_response.py. | |
| .5 promp\_get.py. | |
| } | |
| \vspace{3pt} | |
| The LiteTransNet codebase is a structured collection of files and directories for LiteTransNet model. Below is an introduction to the function of each component within the \texttt{"lite-trans-net"} directory: | |
| \begin{itemize} | |
| \item \textbf{data}: This directory holds the CSV files of the landslide dataset that are used in the case study. These files provide the data needed to train the LiteTransNet model. | |
| \item \textbf{models}: The \texttt{models} directory stores the saved PyTorch models of the trained Transformer networks. | |
| \item \textbf{training.py}: This script manages the training process for the LiteTransNet model. It includes code for handling the training loop and optimization steps, evaluating the test set, and saving checkpoints of the model. | |
| \item \textbf{tst}: This directory contains several key components of the transformer architecture used in LiteTransNet: | |
| \begin{itemize} | |
| \item \texttt{encoder.py}: Contains the implementation of the encoder part of the transformer model, which processes the input data. | |
| \item \texttt{decoder.py}: Implements the decoder part, which generates the output from the encoded representations. | |
| \item \texttt{multiHeadAttention.py}: Provides the multi-head attention mechanism, a key component of transformers that allows the model to focus on different parts of the input sequence. | |
| \item \texttt{positionwiseFeedForward.py}: Defines position-wise feedforward networks used within the transformer model. | |
| \item \texttt{transformer.py}: This is the script where the entire transformer model is put together using the encoder, decoder, and other components. | |
| \item \texttt{utils.py}: A utility script that contains helper functions used across various scripts in the \texttt{tst} directory. | |
| \end{itemize} | |
| \item \textbf{dataset.py}: This file includes code for handling the dataset class, which preprocesses and provides data to the LiteTransNet model during both training and test stages. | |
| \item \textbf{utils.py}: This script offers general utility functions that assist in preprocessing data. | |
| \end{itemize} | |
| \newpage | |
| The training.py file: | |
| \begin{minted}[bgcolor=LightGray,breaklines=true,fontsize=\footnotesize]{python} | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, Subset | |
| from tqdm import tqdm | |
| from collections import OrderedDict | |
| import itertools | |
| from tst import Transformer | |
| from src.dataset import LSDataset | |
| from src.utils import fit, visualise_result | |
| import csv | |
| import matplotlib.pyplot as plt | |
| # Dataset name | |
| name = 'bsh' | |
| # Fixed parameters | |
| d_input = 3 | |
| d_output = 1 | |
| h = 1 | |
| N = 2 | |
| chunk_mode = None | |
| NUM_WORKERS = 0 | |
| pe = None | |
| BATCH_SIZE = 12 | |
| EPOCHS = 200 | |
| dropout = 0.2 | |
| LR = 0.002 | |
| opt = 'Adam' | |
| # ===== user set params ==== | |
| param_grid = OrderedDict({ | |
| "d_model": [16, 32, 48], | |
| "q": [1, 3, 5], | |
| "k": [1, 3, 5], | |
| "v": [1, 3, 5], | |
| "attention_size": [6, 9, 12] | |
| }) | |
| # Generate all possible combinations of parameter values | |
| param_combinations = list(itertools.product(*param_grid.values())) | |
| for i, params in enumerate(param_combinations): | |
| print(f"Training model {i+1}/{len(param_combinations)} with params {params}") | |
| # Set the parameters | |
| d_model, q, k, v, attention_size = params | |
| # Config | |
| device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
| print(f"Using device {device}") | |
| # Load dataset | |
| lsDataset = LSDataset(name) | |
| # Split the dataset into train and test sets | |
| dataset_test = Subset(lsDataset, range(len(lsDataset) - 24, len(lsDataset))) | |
| dataset_train = Subset(lsDataset, range(len(lsDataset) - 24)) | |
| dataloader_train = DataLoader(dataset_train, | |
| batch_size=BATCH_SIZE, | |
| shuffle=True, | |
| num_workers=NUM_WORKERS, | |
| pin_memory=False | |
| ) | |
| dataloader_val = DataLoader(dataset_test, | |
| batch_size=BATCH_SIZE, | |
| shuffle=True, | |
| num_workers=NUM_WORKERS | |
| ) | |
| dataloader_test = DataLoader(dataset_test, | |
| batch_size=BATCH_SIZE, | |
| shuffle=False, | |
| num_workers=NUM_WORKERS | |
| ) | |
| # Load transformer with Adam optimizer and MSE loss function | |
| if pe == None: | |
| net = Transformer(d_input, d_model, d_output, q, v, h, N, attention_size=attention_size, | |
| dropout=dropout, chunk_mode=chunk_mode, pe=pe).to(device) | |
| else: | |
| pe_period = 12 | |
| net = Transformer(d_input, d_model, d_output, q, v, h, N, attention_size=attention_size, | |
| dropout=dropout, chunk_mode=chunk_mode, pe=pe, pe_period=pe_period).to(device) | |
| # Create the optimizer with the initial learning rate | |
| optimizer = optim.Adam(net.parameters(), lr=LR) | |
| loss_function = nn.MSELoss() | |
| # Fit model | |
| with tqdm(total=EPOCHS) as pbar: | |
| train_loss, test_loss = fit(net, optimizer, loss_function, dataloader_train, | |
| dataloader_val, epochs=EPOCHS, pbar=pbar, device=device) | |
| # loss visualisation | |
| plt.plot(train_loss, label='Training Loss') | |
| plt.plot(test_loss, label='Validation Loss') | |
| plt.xlabel('Epoch') | |
| plt.ylabel('Loss') | |
| plt.title('Training and Validation Loss') | |
| plt.legend() | |
| plt.savefig('loss.png') | |
| plt.close() | |
| # Switch to evaluation | |
| _ = net.eval() | |
| # Select target values in test split | |
| y_true = lsDataset._y[dataloader_test.dataset.indices] | |
| train_y = lsDataset._y[dataloader_train.dataset.indices] | |
| # Compute predictions (test) | |
| predictions = torch.empty(len(dataloader_test.dataset), 12, 1) | |
| idx_prediction = 0 | |
| with torch.no_grad(): | |
| for x, y in tqdm(dataloader_test, total=len(dataloader_test)): | |
| netout = net(x.to(device)).cpu() | |
| predictions[idx_prediction:idx_prediction+x.shape[0]] = netout | |
| idx_prediction += x.shape[0] | |
| # Save model | |
| torch.save(net.state_dict(), 'model.pth') | |
| \end{minted} | |
| \newpage | |
| The dataset.py file: | |
| \begin{minted}[bgcolor=LightGray,breaklines=true,fontsize=\footnotesize]{python} | |
| import numpy as np | |
| from torch.utils.data import Dataset | |
| import torch | |
| from src.utils.utils import normalizer, slicing_window, loading_data | |
| class LSDataset(Dataset): | |
| def __init__(self, | |
| name: str, | |
| **kwargs): | |
| super().__init__(**kwargs) | |
| self.name = name | |
| self._load_csv() | |
| def _load_csv(self): | |
| # load data | |
| dataset = loading_data(self.name) ###### loading_data_4v | |
| data = dataset.values | |
| n_months = 12 | |
| # normalisation | |
| normed_data, min_val, max_val = normalizer(data) | |
| # split into 3d array, the label is the next row | |
| features, labels = slicing_window(normed_data, n_months) | |
| # Store feature and label | |
| self._x = features | |
| self._y = labels | |
| # Convert to float32 (output) | |
| self._x = torch.Tensor(self._x) | |
| self._y = torch.Tensor(self._y) | |
| self._maxvalue = max_val | |
| self._minvalue = min_val | |
| def __getitem__(self, idx): | |
| if torch.is_tensor(idx): | |
| idx = idx.tolist() | |
| return (self._x[idx], self._y[idx]) | |
| def __len__(self): | |
| return self._x.shape[0] | |
| \end{minted} | |
| \newpage | |
| The utils.py file: | |
| \begin{minted}[bgcolor=LightGray,breaklines=true,fontsize=\footnotesize]{python} | |
| import csv | |
| import torch | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn import metrics | |
| from sklearn.metrics import mean_squared_error, mean_absolute_error | |
| import matplotlib.pyplot as plt | |
| def compute_loss(net: torch.nn.Module, | |
| dataloader: torch.utils.data.DataLoader, | |
| loss_function: torch.nn.Module, | |
| device: torch.device = 'cpu') -> torch.Tensor: | |
| """Compute the loss of a network on a given dataset. | |
| Does not compute gradient. | |
| Parameters | |
| ---------- | |
| net: | |
| Network to evaluate. | |
| dataloader: | |
| Iterator on the dataset. | |
| loss_function: | |
| Loss function to compute. | |
| device: | |
| Torch device, or :py:class:`str`. | |
| Returns | |
| ------- | |
| Loss as a tensor with no grad. | |
| """ | |
| running_loss = 0 | |
| with torch.no_grad(): | |
| for x, y in dataloader: | |
| netout = net(x.to(device)).cpu() | |
| running_loss += loss_function(y, netout) | |
| return running_loss / len(dataloader) | |
| def normalizer(data): | |
| numerator = data - np.min(data, 0) | |
| denominator = np.max(data, 0) - np.min(data, 0) | |
| # norm_data = numerator / (denominator + 1e-7) | |
| norm_data = numerator / denominator | |
| return norm_data, np.min(data, 0), np.max(data, 0) | |
| def scaler(data): | |
| from sklearn.preprocessing import MinMaxScaler | |
| scaler = MinMaxScaler(feature_range=(-1, 1)) | |
| data_scaled = scaler.fit_transform(data) | |
| min_value = scaler.data_min_ | |
| max_value = scaler.data_max_ | |
| return data_scaled, min_value, max_value | |
| def rescaler(data, min_value, max_value): | |
| inv_y = (data - (-1)) * (max_value - min_value) / (1 - (-1)) + min_value | |
| return inv_y | |
| def renormlizer(data, max_val, min_val): | |
| data = data * (max_val - min_val) | |
| data = data + min_val | |
| return data | |
| def slicing_window(data, n_in): | |
| list_of_features = [] | |
| list_of_labels = [] | |
| for i in range(len(data)-n_in+1): | |
| arr_features = data[i:(i+n_in), :-1] | |
| arr_label = data[i:(i+n_in), -1] | |
| list_of_features.append(arr_features) | |
| list_of_labels.append(arr_label.reshape(-1, 1)) | |
| features = np.array(list_of_features) | |
| labels = np.array(list_of_labels) | |
| return features, labels | |
| \end{minted} | |
| \newpage | |
| The tst/encoder.py file: | |
| \begin{minted}[bgcolor=LightGray,breaklines=true,fontsize=\footnotesize]{python} | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from tst.multiHeadAttention import MultiHeadAttention, MultiHeadAttentionChunk, MultiHeadAttentionWindow | |
| from tst.positionwiseFeedForward import PositionwiseFeedForward | |
| class Encoder(nn.Module): | |
| """Encoder block from Attention is All You Need. | |
| Apply Multi Head Attention block followed by a Point-wise Feed Forward block. | |
| Residual sum and normalization are applied at each step. | |
| Parameters | |
| ---------- | |
| d_model: | |
| Dimension of the input vector. | |
| q: | |
| Dimension of all query matrix. | |
| v: | |
| Dimension of all value matrix. | |
| h: | |
| Number of heads. | |
| attention_size: | |
| Number of backward elements to apply attention. | |
| Deactivated if ``None``. Default is ``None``. | |
| dropout: | |
| Dropout probability after each MHA or PFF block. | |
| Default is ``0.3``. | |
| chunk_mode: | |
| Swict between different MultiHeadAttention blocks. | |
| One of ``'chunk'``, ``'window'`` or ``None``. Default is ``'chunk'``. | |
| """ | |
| def __init__(self, | |
| d_model: int, | |
| q: int, | |
| v: int, | |
| h: int, | |
| attention_size: int = None, | |
| dropout: float = 0.3, | |
| chunk_mode: str = 'chunk'): | |
| """Initialize the Encoder block""" | |
| super().__init__() | |
| chunk_mode_modules = { | |
| 'chunk': MultiHeadAttentionChunk, | |
| 'window': MultiHeadAttentionWindow, | |
| } | |
| if chunk_mode in chunk_mode_modules.keys(): | |
| MHA = chunk_mode_modules[chunk_mode] | |
| elif chunk_mode is None: | |
| MHA = MultiHeadAttention | |
| else: | |
| raise NameError( | |
| f'chunk_mode "{chunk_mode}" not understood. Must be one of {", ".join(chunk_mode_modules.keys())} or None.') | |
| self._selfAttention = MHA(d_model, q, v, h, attention_size=attention_size) | |
| self._feedForward = PositionwiseFeedForward(d_model) | |
| self._layerNorm1 = nn.LayerNorm(d_model) | |
| self._layerNorm2 = nn.LayerNorm(d_model) | |
| self._dopout = nn.Dropout(p=dropout) | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| """Propagate the input through the Encoder block. | |
| Apply the Multi Head Attention block, add residual and normalize. | |
| Apply the Point-wise Feed Forward block, add residual and normalize. | |
| Parameters | |
| ---------- | |
| x: | |
| Input tensor with shape (batch_size, K, d_model). | |
| Returns | |
| ------- | |
| Output tensor with shape (batch_size, K, d_model). | |
| """ | |
| # Self attention | |
| residual = x | |
| x = self._selfAttention(query=x, key=x, value=x) | |
| x = self._dopout(x) | |
| x = self._layerNorm1(x + residual) | |
| # Feed forward | |
| residual = x | |
| x = self._feedForward(x) | |
| x = self._dopout(x) | |
| x = self._layerNorm2(x + residual) | |
| return x | |
| @property | |
| def attention_map(self) -> torch.Tensor: | |
| """Attention map after a forward propagation, | |
| variable `score` in the original paper. | |
| """ | |
| return self._selfAttention.attention_map | |
| \end{minted} | |
| \newpage | |
| The tst/decoder.py file: | |
| \begin{minted}[bgcolor=LightGray,breaklines=true,fontsize=\footnotesize]{python} | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from tst.multiHeadAttention import MultiHeadAttention, MultiHeadAttentionChunk, MultiHeadAttentionWindow | |
| from tst.positionwiseFeedForward import PositionwiseFeedForward | |
| class Decoder(nn.Module): | |
| """Decoder block from Attention is All You Need. | |
| Apply two Multi Head Attention block followed by a Point-wise Feed Forward block. | |
| Residual sum and normalization are applied at each step. | |
| Parameters | |
| ---------- | |
| d_model: | |
| Dimension of the input vector. | |
| q: | |
| Dimension of all query matrix. | |
| v: | |
| Dimension of all value matrix. | |
| h: | |
| Number of heads. | |
| attention_size: | |
| Number of backward elements to apply attention. | |
| Deactivated if ``None``. Default is ``None``. | |
| dropout: | |
| Dropout probability after each MHA or PFF block. | |
| Default is ``0.3``. | |
| chunk_mode: | |
| Swict between different MultiHeadAttention blocks. | |
| One of ``'chunk'``, ``'window'`` or ``None``. Default is ``'chunk'``. | |
| """ | |
| def __init__(self, | |
| d_model: int, | |
| q: int, | |
| v: int, | |
| h: int, | |
| attention_size: int = None, | |
| dropout: float = 0.3, | |
| chunk_mode: str = 'chunk'): | |
| """Initialize the Decoder block""" | |
| super().__init__() | |
| chunk_mode_modules = { | |
| 'chunk': MultiHeadAttentionChunk, | |
| 'window': MultiHeadAttentionWindow, | |
| } | |
| if chunk_mode in chunk_mode_modules.keys(): | |
| MHA = chunk_mode_modules[chunk_mode] | |
| elif chunk_mode is None: | |
| MHA = MultiHeadAttention | |
| else: | |
| raise NameError( | |
| f'chunk_mode "{chunk_mode}" not understood. Must be one of {", ".join(chunk_mode_modules.keys())} or None.') | |
| self._selfAttention = MHA(d_model, q, v, h, attention_size=attention_size) | |
| self._encoderDecoderAttention = MHA(d_model, q, v, h, attention_size=attention_size) | |
| self._feedForward = PositionwiseFeedForward(d_model) | |
| self._layerNorm1 = nn.LayerNorm(d_model) | |
| self._layerNorm2 = nn.LayerNorm(d_model) | |
| self._layerNorm3 = nn.LayerNorm(d_model) | |
| self._dopout = nn.Dropout(p=dropout) | |
| def forward(self, x: torch.Tensor, memory: torch.Tensor) -> torch.Tensor: | |
| """Propagate the input through the Decoder block. | |
| Apply the self attention block, add residual and normalize. | |
| Apply the encoder-decoder attention block, add residual and normalize. | |
| Apply the feed forward network, add residual and normalize. | |
| Parameters | |
| ---------- | |
| x: | |
| Input tensor with shape (batch_size, K, d_model). | |
| memory: | |
| Memory tensor with shape (batch_size, K, d_model) | |
| from encoder output. | |
| Returns | |
| ------- | |
| x: | |
| Output tensor with shape (batch_size, K, d_model). | |
| """ | |
| # Self attention | |
| residual = x | |
| x = self._selfAttention(query=x, key=x, value=x, mask="subsequent") | |
| x = self._dopout(x) | |
| x = self._layerNorm1(x + residual) | |
| # Encoder-decoder attention | |
| residual = x | |
| x = self._encoderDecoderAttention(query=x, key=memory, value=memory) | |
| x = self._dopout(x) | |
| x = self._layerNorm2(x + residual) | |
| # Feed forward | |
| residual = x | |
| x = self._feedForward(x) | |
| x = self._dopout(x) | |
| x = self._layerNorm3(x + residual) | |
| return x | |
| \end{minted} | |
| \newpage | |
| The tst/multiHeadAttention.py file: | |
| \begin{minted}[bgcolor=LightGray,breaklines=true,fontsize=\footnotesize]{python} | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from tst.utils import generate_local_map_mask | |
| class MultiHeadAttention(nn.Module): | |
| """Multi Head Attention block from Attention is All You Need. | |
| Given 3 inputs of shape (batch_size, K, d_model), that will be used | |
| to compute query, keys and values, we output a self attention | |
| tensor of shape (batch_size, K, d_model). | |
| Parameters | |
| ---------- | |
| d_model: | |
| Dimension of the input vector. | |
| q: | |
| Dimension of all query matrix. | |
| v: | |
| Dimension of all value matrix. | |
| h: | |
| Number of heads. | |
| attention_size: | |
| Number of backward elements to apply attention. | |
| Deactivated if ``None``. Default is ``None``. | |
| """ | |
| def __init__(self, | |
| d_model: int, | |
| q: int, | |
| v: int, | |
| h: int, | |
| attention_size: int = None): | |
| """Initialize the Multi Head Block.""" | |
| super().__init__() | |
| self._h = h | |
| self._attention_size = attention_size | |
| # Query, keys and value matrices | |
| self._W_q = nn.Linear(d_model, q*self._h) | |
| self._W_k = nn.Linear(d_model, q*self._h) | |
| self._W_v = nn.Linear(d_model, v*self._h) | |
| # Output linear function | |
| self._W_o = nn.Linear(self._h*v, d_model) | |
| # Score placeholder | |
| self._scores = None | |
| def forward(self, | |
| query: torch.Tensor, | |
| key: torch.Tensor, | |
| value: torch.Tensor, | |
| mask: Optional[str] = None) -> torch.Tensor: | |
| """Propagate forward the input through the MHB. | |
| We compute for each head the queries, keys and values matrices, | |
| followed by the Scaled Dot-Product. The result is concatenated | |
| and returned with shape (batch_size, K, d_model). | |
| Parameters | |
| ---------- | |
| query: | |
| Input tensor with shape (batch_size, K, d_model) used to compute queries. | |
| key: | |
| Input tensor with shape (batch_size, K, d_model) used to compute keys. | |
| value: | |
| Input tensor with shape (batch_size, K, d_model) used to compute values. | |
| mask: | |
| Mask to apply on scores before computing attention. | |
| One of ``'subsequent'``, None. Default is None. | |
| Returns | |
| ------- | |
| Self attention tensor with shape (batch_size, K, d_model). | |
| """ | |
| K = query.shape[1] | |
| # Compute Q, K and V, concatenate heads on batch dimension | |
| queries = torch.cat(self._W_q(query).chunk(self._h, dim=-1), dim=0) | |
| keys = torch.cat(self._W_k(key).chunk(self._h, dim=-1), dim=0) | |
| values = torch.cat(self._W_v(value).chunk(self._h, dim=-1), dim=0) | |
| # Scaled Dot Product | |
| self._scores = torch.bmm(queries, keys.transpose(1, 2)) / np.sqrt(K) | |
| # Compute local map mask | |
| if self._attention_size is not None: | |
| attention_mask = generate_local_map_mask(K, self._attention_size, mask_future=False, device=self._scores.device) | |
| self._scores = self._scores.masked_fill(attention_mask, float('-inf')) | |
| # Compute future mask | |
| if mask == "subsequent": | |
| future_mask = torch.triu(torch.ones((K, K)), diagonal=1).bool() | |
| future_mask = future_mask.to(self._scores.device) | |
| self._scores = self._scores.masked_fill(future_mask, float('-inf')) | |
| # Apply sotfmax | |
| self._scores = F.softmax(self._scores, dim=-1) | |
| attention = torch.bmm(self._scores, values) | |
| # Concatenat the heads | |
| attention_heads = torch.cat(attention.chunk(self._h, dim=0), dim=-1) | |
| # Apply linear transformation W^O | |
| self_attention = self._W_o(attention_heads) | |
| return self_attention | |
| @property | |
| def attention_map(self) -> torch.Tensor: | |
| """Attention map after a forward propagation, | |
| variable `score` in the original paper. | |
| """ | |
| if self._scores is None: | |
| raise RuntimeError( | |
| "Evaluate the model once to generate attention map") | |
| return self._scores | |
| class MultiHeadAttentionChunk(MultiHeadAttention): | |
| """Multi Head Attention block with chunk. | |
| Given 3 inputs of shape (batch_size, K, d_model), that will be used | |
| to compute query, keys and values, we output a self attention | |
| tensor of shape (batch_size, K, d_model). | |
| Queries, keys and values are divided in chunks of constant size. | |
| Parameters | |
| ---------- | |
| d_model: | |
| Dimension of the input vector. | |
| q: | |
| Dimension of all query matrix. | |
| v: | |
| Dimension of all value matrix. | |
| h: | |
| Number of heads. | |
| attention_size: | |
| Number of backward elements to apply attention. | |
| Deactivated if ``None``. Default is ``None``. | |
| chunk_size: | |
| Size of chunks to apply attention on. Last one may be smaller (see :class:`torch.Tensor.chunk`). | |
| Default is 168. | |
| """ | |
| def __init__(self, | |
| d_model: int, | |
| q: int, | |
| v: int, | |
| h: int, | |
| attention_size: int = None, | |
| chunk_size: Optional[int] = 168, | |
| **kwargs): | |
| """Initialize the Multi Head Block.""" | |
| super().__init__(d_model, q, v, h, attention_size, **kwargs) | |
| self._chunk_size = chunk_size | |
| # Score mask for decoder | |
| self._future_mask = nn.Parameter(torch.triu(torch.ones((self._chunk_size, self._chunk_size)), diagonal=1).bool(), | |
| requires_grad=False) | |
| if self._attention_size is not None: | |
| self._attention_mask = nn.Parameter(generate_local_map_mask(self._chunk_size, self._attention_size), | |
| requires_grad=False) | |
| def forward(self, | |
| query: torch.Tensor, | |
| key: torch.Tensor, | |
| value: torch.Tensor, | |
| mask: Optional[str] = None) -> torch.Tensor: | |
| """Propagate forward the input through the MHB. | |
| We compute for each head the queries, keys and values matrices, | |
| followed by the Scaled Dot-Product. The result is concatenated | |
| and returned with shape (batch_size, K, d_model). | |
| Parameters | |
| ---------- | |
| query: | |
| Input tensor with shape (batch_size, K, d_model) used to compute queries. | |
| key: | |
| Input tensor with shape (batch_size, K, d_model) used to compute keys. | |
| value: | |
| Input tensor with shape (batch_size, K, d_model) used to compute values. | |
| mask: | |
| Mask to apply on scores before computing attention. | |
| One of ``'subsequent'``, None. Default is None. | |
| Returns | |
| ------- | |
| Self attention tensor with shape (batch_size, K, d_model). | |
| """ | |
| K = query.shape[1] | |
| n_chunk = K // self._chunk_size | |
| # Compute Q, K and V, concatenate heads on batch dimension | |
| queries = torch.cat(torch.cat(self._W_q(query).chunk(self._h, dim=-1), dim=0).chunk(n_chunk, dim=1), dim=0) | |
| keys = torch.cat(torch.cat(self._W_k(key).chunk(self._h, dim=-1), dim=0).chunk(n_chunk, dim=1), dim=0) | |
| values = torch.cat(torch.cat(self._W_v(value).chunk(self._h, dim=-1), dim=0).chunk(n_chunk, dim=1), dim=0) | |
| # Scaled Dot Product | |
| self._scores = torch.bmm(queries, keys.transpose(1, 2)) / np.sqrt(self._chunk_size) | |
| # Compute local map mask | |
| if self._attention_size is not None: | |
| self._scores = self._scores.masked_fill(self._attention_mask, float('-inf')) | |
| # Compute future mask | |
| if mask == "subsequent": | |
| self._scores = self._scores.masked_fill(self._future_mask, float('-inf')) | |
| # Apply softmax | |
| self._scores = F.softmax(self._scores, dim=-1) | |
| attention = torch.bmm(self._scores, values) | |
| # Concatenat the heads | |
| attention_heads = torch.cat(torch.cat(attention.chunk( | |
| n_chunk, dim=0), dim=1).chunk(self._h, dim=0), dim=-1) | |
| # Apply linear transformation W^O | |
| self_attention = self._W_o(attention_heads) | |
| return self_attention | |
| class MultiHeadAttentionWindow(MultiHeadAttention): | |
| """Multi Head Attention block with moving window. | |
| Given 3 inputs of shape (batch_size, K, d_model), that will be used | |
| to compute query, keys and values, we output a self attention | |
| tensor of shape (batch_size, K, d_model). | |
| Queries, keys and values are divided in chunks using a moving window. | |
| Parameters | |
| ---------- | |
| d_model: | |
| Dimension of the input vector. | |
| q: | |
| Dimension of all query matrix. | |
| v: | |
| Dimension of all value matrix. | |
| h: | |
| Number of heads. | |
| attention_size: | |
| Number of backward elements to apply attention. | |
| Deactivated if ``None``. Default is ``None``. | |
| window_size: | |
| Size of the window used to extract chunks. | |
| Default is 168 | |
| padding: | |
| Padding around each window. Padding will be applied to input sequence. | |
| Default is 168 // 4 = 42. | |
| """ | |
| def __init__(self, | |
| d_model: int, | |
| q: int, | |
| v: int, | |
| h: int, | |
| attention_size: int = None, | |
| window_size: Optional[int] = 168, | |
| padding: Optional[int] = 168 // 4, | |
| **kwargs): | |
| """Initialize the Multi Head Block.""" | |
| super().__init__(d_model, q, v, h, attention_size, **kwargs) | |
| self._window_size = window_size | |
| self._padding = padding | |
| self._q = q | |
| self._v = v | |
| # Step size for the moving window | |
| self._step = self._window_size - 2 * self._padding | |
| # Score mask for decoder | |
| self._future_mask = nn.Parameter(torch.triu(torch.ones((self._window_size, self._window_size)), diagonal=1).bool(), | |
| requires_grad=False) | |
| if self._attention_size is not None: | |
| self._attention_mask = nn.Parameter(generate_local_map_mask(self._window_size, self._attention_size), | |
| requires_grad=False) | |
| def forward(self, | |
| query: torch.Tensor, | |
| key: torch.Tensor, | |
| value: torch.Tensor, | |
| mask: Optional[str] = None) -> torch.Tensor: | |
| """Propagate forward the input through the MHB. | |
| We compute for each head the queries, keys and values matrices, | |
| followed by the Scaled Dot-Product. The result is concatenated | |
| and returned with shape (batch_size, K, d_model). | |
| Parameters | |
| ---------- | |
| query: | |
| Input tensor with shape (batch_size, K, d_model) used to compute queries. | |
| key: | |
| Input tensor with shape (batch_size, K, d_model) used to compute keys. | |
| value: | |
| Input tensor with shape (batch_size, K, d_model) used to compute values. | |
| mask: | |
| Mask to apply on scores before computing attention. | |
| One of ``'subsequent'``, None. Default is None. | |
| Returns | |
| ------- | |
| Self attention tensor with shape (batch_size, K, d_model). | |
| """ | |
| batch_size = query.shape[0] | |
| # Apply padding to input sequence | |
| query = F.pad(query.transpose(1, 2), (self._padding, self._padding), 'replicate').transpose(1, 2) | |
| key = F.pad(key.transpose(1, 2), (self._padding, self._padding), 'replicate').transpose(1, 2) | |
| value = F.pad(value.transpose(1, 2), (self._padding, self._padding), 'replicate').transpose(1, 2) | |
| # Compute Q, K and V, concatenate heads on batch dimension | |
| queries = torch.cat(self._W_q(query).chunk(self._h, dim=-1), dim=0) | |
| keys = torch.cat(self._W_k(key).chunk(self._h, dim=-1), dim=0) | |
| values = torch.cat(self._W_v(value).chunk(self._h, dim=-1), dim=0) | |
| # Divide Q, K and V using a moving window | |
| queries = queries.unfold(dimension=1, size=self._window_size, step=self._step).reshape((-1, self._q, self._window_size)).transpose(1, 2) | |
| keys = keys.unfold(dimension=1, size=self._window_size, step=self._step).reshape((-1, self._q, self._window_size)).transpose(1, 2) | |
| values = values.unfold(dimension=1, size=self._window_size, step=self._step).reshape((-1, self._v, self._window_size)).transpose(1, 2) | |
| # Scaled Dot Product | |
| self._scores = torch.bmm(queries, keys.transpose(1, 2)) / np.sqrt(self._window_size) | |
| # Compute local map mask | |
| if self._attention_size is not None: | |
| self._scores = self._scores.masked_fill(self._attention_mask, float('-inf')) | |
| # Compute future mask | |
| if mask == "subsequent": | |
| self._scores = self._scores.masked_fill(self._future_mask, float('-inf')) | |
| # Apply softmax | |
| self._scores = F.softmax(self._scores, dim=-1) | |
| attention = torch.bmm(self._scores, values) | |
| # Fold chunks back | |
| attention = attention.reshape((batch_size*self._h, -1, self._window_size, self._v)) | |
| attention = attention[:, :, self._padding:-self._padding, :] | |
| attention = attention.reshape((batch_size*self._h, -1, self._v)) | |
| # Concatenat the heads | |
| attention_heads = torch.cat(attention.chunk(self._h, dim=0), dim=-1) | |
| # Apply linear transformation W^O | |
| self_attention = self._W_o(attention_heads) | |
| return self_attention | |
| \end{minted} | |
| \newpage | |
| The tst/positionwiseFeedForward.py file: | |
| \begin{minted}[bgcolor=LightGray,breaklines=true,fontsize=\footnotesize]{python} | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| class PositionwiseFeedForward(nn.Module): | |
| """Position-wise Feed Forward Network block from Attention is All You Need. | |
| Apply two linear transformations to each input, separately but indetically. We | |
| implement them as 1D convolutions. Input and output have a shape (batch_size, d_model). | |
| Parameters | |
| ---------- | |
| d_model: | |
| Dimension of input tensor. | |
| d_ff: | |
| Dimension of hidden layer, default is 2048. | |
| """ | |
| def __init__(self, | |
| d_model: int, | |
| d_ff: Optional[int] = 2048): | |
| """Initialize the PFF block.""" | |
| super().__init__() | |
| self._linear1 = nn.Linear(d_model, d_ff) | |
| self._linear2 = nn.Linear(d_ff, d_model) | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| """Propagate forward the input through the PFF block. | |
| Apply the first linear transformation, then a relu actvation, | |
| and the second linear transformation. | |
| Parameters | |
| ---------- | |
| x: | |
| Input tensor with shape (batch_size, K, d_model). | |
| Returns | |
| ------- | |
| Output tensor with shape (batch_size, K, d_model). | |
| """ | |
| return self._linear2(F.relu(self._linear1(x))) | |
| \end{minted} | |
| \newpage | |
| The tst/transformer.py file: | |
| \begin{minted}[bgcolor=LightGray,breaklines=true,fontsize=\footnotesize]{python} | |
| import torch | |
| import torch.nn as nn | |
| from tst.encoder import Encoder | |
| from tst.decoder import Decoder | |
| from tst.utils import generate_original_PE, generate_regular_PE | |
| class Transformer(nn.Module): | |
| """Transformer model from Attention is All You Need. | |
| A classic transformer model adapted for sequential data. | |
| Embedding has been replaced with a fully connected layer, | |
| the last layer softmax is now a sigmoid. | |
| Attributes | |
| ---------- | |
| layers_encoding: :py:class:`list` of :class:`Encoder.Encoder` | |
| stack of Encoder layers. | |
| layers_decoding: :py:class:`list` of :class:`Decoder.Decoder` | |
| stack of Decoder layers. | |
| Parameters | |
| ---------- | |
| d_input: | |
| Model input dimension. | |
| d_model: | |
| Dimension of the input vector. | |
| d_output: | |
| Model output dimension. | |
| q: | |
| Dimension of queries and keys. | |
| v: | |
| Dimension of values. | |
| h: | |
| Number of heads. | |
| N: | |
| Number of encoder and decoder layers to stack. | |
| attention_size: | |
| Number of backward elements to apply attention. | |
| Deactivated if ``None``. Default is ``None``. | |
| dropout: | |
| Dropout probability after each MHA or PFF block. | |
| Default is ``0.3``. | |
| chunk_mode: | |
| Switch between different MultiHeadAttention blocks. | |
| One of ``'chunk'``, ``'window'`` or ``None``. Default is ``'chunk'``. | |
| pe: | |
| Type of positional encoding to add. | |
| Must be one of ``'original'``, ``'regular'`` or ``None``. Default is ``None``. | |
| pe_period: | |
| If using the ``'regular'` pe, then we can define the period. Default is ``24``. | |
| """ | |
| def __init__(self, | |
| d_input: int, | |
| d_model: int, | |
| d_output: int, | |
| q: int, | |
| v: int, | |
| h: int, | |
| N: int, | |
| attention_size: int = None, | |
| dropout: float = 0.3, | |
| chunk_mode: str = 'chunk', | |
| pe: str = None, | |
| pe_period: int = 12): | |
| """Create transformer structure from Encoder and Decoder blocks.""" | |
| super().__init__() | |
| self._d_model = d_model | |
| self.layers_encoding = nn.ModuleList([Encoder(d_model, | |
| q, | |
| v, | |
| h, | |
| attention_size=attention_size, | |
| dropout=dropout, | |
| chunk_mode=chunk_mode) for _ in range(N)]) | |
| self.layers_decoding = nn.ModuleList([Decoder(d_model, | |
| q, | |
| v, | |
| h, | |
| attention_size=attention_size, | |
| dropout=dropout, | |
| chunk_mode=chunk_mode) for _ in range(N)]) | |
| self._embedding = nn.Linear(d_input, d_model) | |
| self._linear = nn.Linear(d_model, d_output) | |
| # positional encoding: | |
| pe_functions = { | |
| 'original': generate_original_PE, | |
| 'regular': generate_regular_PE, | |
| } | |
| if pe in pe_functions.keys(): | |
| self._generate_PE = pe_functions[pe] | |
| self._pe_period = pe_period | |
| elif pe is None: | |
| self._generate_PE = None | |
| else: | |
| raise NameError( | |
| f'PE "{pe}" not understood. Must be one of {", ".join(pe_functions.keys())} or None.') | |
| self.name = 'transformer' | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| """Propagate input through transformer | |
| Forward input through an embedding module, | |
| the encoder then decoder stacks, and an output module. | |
| Parameters | |
| ---------- | |
| x: | |
| :class:`torch.Tensor` of shape (batch_size, K, d_input). | |
| Returns | |
| ------- | |
| Output tensor with shape (batch_size, K, d_output). | |
| """ | |
| K = x.shape[1] | |
| # Embeddin module | |
| encoding = self._embedding(x) | |
| # Add position encoding | |
| if self._generate_PE is not None: | |
| pe_params = {'period': self._pe_period} if self._pe_period else {} | |
| positional_encoding = self._generate_PE(K, self._d_model, **pe_params) | |
| positional_encoding = positional_encoding.to(encoding.device) | |
| encoding.add_(positional_encoding) | |
| # Encoding stack | |
| for layer in self.layers_encoding: | |
| encoding = layer(encoding) ### output size of the encoder: d_model | |
| # Decoding stack | |
| decoding = encoding | |
| # Add position encoding | |
| if self._generate_PE is not None: | |
| positional_encoding = self._generate_PE(K, self._d_model) | |
| positional_encoding = positional_encoding.to(decoding.device) | |
| decoding.add_(positional_encoding) | |
| for layer in self.layers_decoding: | |
| decoding = layer(decoding, encoding) | |
| # Output module | |
| output = self._linear(decoding) | |
| output = torch.sigmoid(output) | |
| return output | |
| \end{minted} | |
| \newpage | |
| The tst/utils.py file: | |
| \begin{minted}[bgcolor=LightGray,breaklines=true,fontsize=\footnotesize]{python} | |
| from typing import Optional, Union | |
| import numpy as np | |
| import torch | |
| def generate_original_PE(length: int, d_model: int) -> torch.Tensor: | |
| """Generate positional encoding as described in original paper. :class:`torch.Tensor` | |
| Parameters | |
| ---------- | |
| length: | |
| Time window length, i.e. K. | |
| d_model: | |
| Dimension of the model vector. | |
| Returns | |
| ------- | |
| Tensor of shape (K, d_model). | |
| """ | |
| PE = torch.zeros((length, d_model)) | |
| pos = torch.arange(length).unsqueeze(1) | |
| PE[:, 0::2] = torch.sin( | |
| pos / torch.pow(1000, torch.arange(0, d_model, 2, dtype=torch.float32)/d_model)) | |
| PE[:, 1::2] = torch.cos( | |
| pos / torch.pow(1000, torch.arange(1, d_model, 2, dtype=torch.float32)/d_model)) | |
| return PE | |
| def generate_regular_PE(length: int, d_model: int, period: Optional[int] = 24) -> torch.Tensor: | |
| """Generate positional encoding with a given period. | |
| Parameters | |
| ---------- | |
| length: | |
| Time window length, i.e. K. | |
| d_model: | |
| Dimension of the model vector. | |
| period: | |
| Size of the pattern to repeat. | |
| Default is 24. | |
| Returns | |
| ------- | |
| Tensor of shape (K, d_model). | |
| """ | |
| PE = torch.zeros((length, d_model)) | |
| pos = torch.arange(length, dtype=torch.float32).unsqueeze(1) | |
| PE = torch.sin(pos * 2 * np.pi / period) | |
| PE = PE.repeat((1, d_model)) | |
| return PE | |
| def generate_local_map_mask(chunk_size: int, | |
| attention_size: int, | |
| mask_future=False, | |
| device: torch.device = 'cpu') -> torch.BoolTensor: | |
| """Compute attention mask as attention_size wide diagonal. | |
| Parameters | |
| ---------- | |
| chunk_size: | |
| Time dimension size. | |
| attention_size: | |
| Number of backward elements to apply attention. | |
| device: | |
| torch device. Default is ``'cpu'``. | |
| Returns | |
| ------- | |
| Mask as a boolean tensor. | |
| """ | |
| local_map = np.empty((chunk_size, chunk_size)) | |
| i, j = np.indices(local_map.shape) | |
| if mask_future: | |
| local_map[i, j] = (i - j > attention_size) ^ (j - i > 0) | |
| else: | |
| local_map[i, j] = np.abs(i - j) > attention_size | |
| return torch.BoolTensor(local_map).to(device) | |
| \end{minted} | |
| \end{document} | |