GeoLLM / raw.tex
Ciallo0d00's picture
Upload folder using huggingface_hub
badcf3c verified
\documentclass[11pt]{article}
\usepackage[sc]{mathpazo} %Like Palatino with extensive math support
\usepackage{fullpage}
\usepackage[authoryear,sectionbib,sort]{natbib}
\linespread{1.7}
\usepackage[utf8]{inputenc}
\usepackage{lineno}
\usepackage{titlesec}
\titleformat{\section}[block]{\Large\bfseries\filcenter}{\thesection}{1em}{}
\titleformat{\subsection}[block]{\Large\itshape\filcenter}{\thesubsection}{1em}{}
\titleformat{\subsubsection}[block]{\large\itshape}{\thesubsubsection}{1em}{}
\titleformat{\paragraph}[runin]{\itshape}{\theparagraph}{1em}{}[. ]
\usepackage{fancyhdr}
\pagestyle{fancy}
\usepackage{minted}
\newminted{python}{}
\usepackage{xcolor} % to access the named colour LightGray
\definecolor{LightGray}{gray}{0.95}
\usepackage{dirtree}
\setlength{\parskip}{6pt}
\setlength{\parindent}{0pt}
%%%%%%%%%%%%%%%%%%%%%
% Header
%%%%%%%%%%%%%%%%%%%%%
%
% Customize the line below with the last name of your first author and
% the short title of your MS. You can comment authorship information out
% while your MS is undergoing double-blind review.
%
\rhead{Ge et al., \textit{preprint submitted to Engineering Geology}}
\setlength{\headsep}{0.3in}
\lhead{}
%%%%%%%%%%%%%%%%%%%%%
% Line numbering
%%%%%%%%%%%%%%%%%%%%%
%
% Please use line numbering with your initial submission and
% subsequent revisions. After acceptance, please turn line numbering
% off by adding percent signs to the lines %\usepackage{lineno} and
% to %\linenumbers{} and %\modulolinenumbers[3] below.
%
% To avoid line numbering being thrown off around math environments,
% the math environments have to be wrapped using
% \begin{linenomath*} and \end{linenomath*}
%
% (Thanks to Vlastimil Krivan for pointing this out to us!)
\title{Appendix: \\
Benchmarking Large Language Models in Geological Natural Language Processing}
\author{Qi Ge$^{1}$ \\
Pengfa Li$^{2}$ \\
Jin Li$^{2}$ \\
Yiyan Deng$^{2}$ \\
Hongyue Sun$^{3,\ast}$ \\
Zhongqiang Liu$^{4,\ast}$ \\
}
\date{}
\begin{document}
\maketitle
\noindent{} 1. College of Civil Engineering, Nanjing Forestry University;
\noindent{} 2. School of Artificial Intelligence, Nanjing University of Information Science and Technology;
\noindent{} 3. Ocean College, Zhejiang University;
\noindent{} 4. Department of Natural Hazards, Norwegian Geotechnical Institute.
\noindent{} $\ast$ Corresponding authors; e-mails: [email protected], [email protected].
%\linenumbers{}
%\modulolinenumbers[3]
\renewcommand{\theequation}{S\arabic{equation}}
% redefine the command that creates the equation number.
\renewcommand{\thetable}{S\arabic{table}}
\renewcommand{\thesection}{S\arabic{section}}
\renewcommand{\thefigure}{S\arabic{figure}}
\setcounter{equation}{0} % reset counter
\setcounter{figure}{0}
\setcounter{table}{0}
\newpage{}
\section*{Supplementary code}
The Python codebase for the LLM benchmark code for this study is saved in a directory named "Geo-LLM-Benchmarks" which contains several components:
\vspace{12pt}
\dirtree{%
.1 GeoLLM.
.2 data.
.3 Task1.
.4 .....
.3 Task2.
.4 .....
.2 docs.
.3 Readme.txt.
.3 requirements.txt.
.2 scripts.
.3 Task1.
.4 KNN\_token.py.
.3 Task2.
.4 cot\_process.py.
.4 KNN.py.
.4 utils.
.5 LLM\_APIs.txt.
.5 LLM.py.
.2 tasks.
.3 task1.
.4 eval.py.
.4 Task1\_test.ipynb.
.4 metrics.
.5 graph\_matching.py.
.4 utils.
.5 LLM\_APIs.txt.
.5 LLM.py.
.3 task2.
.4 eval.py.
.4 pretreatment\_split\_data\_Geo.py.
.4 Task2\_test.ipynb.
.4 utils.
.5 knn\_prompt.py.
.5 LLM\_APIs.txt.
.5 save\_response.py.
.5 promp\_get.py.
}
\vspace{3pt}
The LiteTransNet codebase is a structured collection of files and directories for LiteTransNet model. Below is an introduction to the function of each component within the \texttt{"lite-trans-net"} directory:
\begin{itemize}
\item \textbf{data}: This directory holds the CSV files of the landslide dataset that are used in the case study. These files provide the data needed to train the LiteTransNet model.
\item \textbf{models}: The \texttt{models} directory stores the saved PyTorch models of the trained Transformer networks.
\item \textbf{training.py}: This script manages the training process for the LiteTransNet model. It includes code for handling the training loop and optimization steps, evaluating the test set, and saving checkpoints of the model.
\item \textbf{tst}: This directory contains several key components of the transformer architecture used in LiteTransNet:
\begin{itemize}
\item \texttt{encoder.py}: Contains the implementation of the encoder part of the transformer model, which processes the input data.
\item \texttt{decoder.py}: Implements the decoder part, which generates the output from the encoded representations.
\item \texttt{multiHeadAttention.py}: Provides the multi-head attention mechanism, a key component of transformers that allows the model to focus on different parts of the input sequence.
\item \texttt{positionwiseFeedForward.py}: Defines position-wise feedforward networks used within the transformer model.
\item \texttt{transformer.py}: This is the script where the entire transformer model is put together using the encoder, decoder, and other components.
\item \texttt{utils.py}: A utility script that contains helper functions used across various scripts in the \texttt{tst} directory.
\end{itemize}
\item \textbf{dataset.py}: This file includes code for handling the dataset class, which preprocesses and provides data to the LiteTransNet model during both training and test stages.
\item \textbf{utils.py}: This script offers general utility functions that assist in preprocessing data.
\end{itemize}
\newpage
The training.py file:
\begin{minted}[bgcolor=LightGray,breaklines=true,fontsize=\footnotesize]{python}
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Subset
from tqdm import tqdm
from collections import OrderedDict
import itertools
from tst import Transformer
from src.dataset import LSDataset
from src.utils import fit, visualise_result
import csv
import matplotlib.pyplot as plt
# Dataset name
name = 'bsh'
# Fixed parameters
d_input = 3
d_output = 1
h = 1
N = 2
chunk_mode = None
NUM_WORKERS = 0
pe = None
BATCH_SIZE = 12
EPOCHS = 200
dropout = 0.2
LR = 0.002
opt = 'Adam'
# ===== user set params ====
param_grid = OrderedDict({
"d_model": [16, 32, 48],
"q": [1, 3, 5],
"k": [1, 3, 5],
"v": [1, 3, 5],
"attention_size": [6, 9, 12]
})
# Generate all possible combinations of parameter values
param_combinations = list(itertools.product(*param_grid.values()))
for i, params in enumerate(param_combinations):
print(f"Training model {i+1}/{len(param_combinations)} with params {params}")
# Set the parameters
d_model, q, k, v, attention_size = params
# Config
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using device {device}")
# Load dataset
lsDataset = LSDataset(name)
# Split the dataset into train and test sets
dataset_test = Subset(lsDataset, range(len(lsDataset) - 24, len(lsDataset)))
dataset_train = Subset(lsDataset, range(len(lsDataset) - 24))
dataloader_train = DataLoader(dataset_train,
batch_size=BATCH_SIZE,
shuffle=True,
num_workers=NUM_WORKERS,
pin_memory=False
)
dataloader_val = DataLoader(dataset_test,
batch_size=BATCH_SIZE,
shuffle=True,
num_workers=NUM_WORKERS
)
dataloader_test = DataLoader(dataset_test,
batch_size=BATCH_SIZE,
shuffle=False,
num_workers=NUM_WORKERS
)
# Load transformer with Adam optimizer and MSE loss function
if pe == None:
net = Transformer(d_input, d_model, d_output, q, v, h, N, attention_size=attention_size,
dropout=dropout, chunk_mode=chunk_mode, pe=pe).to(device)
else:
pe_period = 12
net = Transformer(d_input, d_model, d_output, q, v, h, N, attention_size=attention_size,
dropout=dropout, chunk_mode=chunk_mode, pe=pe, pe_period=pe_period).to(device)
# Create the optimizer with the initial learning rate
optimizer = optim.Adam(net.parameters(), lr=LR)
loss_function = nn.MSELoss()
# Fit model
with tqdm(total=EPOCHS) as pbar:
train_loss, test_loss = fit(net, optimizer, loss_function, dataloader_train,
dataloader_val, epochs=EPOCHS, pbar=pbar, device=device)
# loss visualisation
plt.plot(train_loss, label='Training Loss')
plt.plot(test_loss, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.savefig('loss.png')
plt.close()
# Switch to evaluation
_ = net.eval()
# Select target values in test split
y_true = lsDataset._y[dataloader_test.dataset.indices]
train_y = lsDataset._y[dataloader_train.dataset.indices]
# Compute predictions (test)
predictions = torch.empty(len(dataloader_test.dataset), 12, 1)
idx_prediction = 0
with torch.no_grad():
for x, y in tqdm(dataloader_test, total=len(dataloader_test)):
netout = net(x.to(device)).cpu()
predictions[idx_prediction:idx_prediction+x.shape[0]] = netout
idx_prediction += x.shape[0]
# Save model
torch.save(net.state_dict(), 'model.pth')
\end{minted}
\newpage
The dataset.py file:
\begin{minted}[bgcolor=LightGray,breaklines=true,fontsize=\footnotesize]{python}
import numpy as np
from torch.utils.data import Dataset
import torch
from src.utils.utils import normalizer, slicing_window, loading_data
class LSDataset(Dataset):
def __init__(self,
name: str,
**kwargs):
super().__init__(**kwargs)
self.name = name
self._load_csv()
def _load_csv(self):
# load data
dataset = loading_data(self.name) ###### loading_data_4v
data = dataset.values
n_months = 12
# normalisation
normed_data, min_val, max_val = normalizer(data)
# split into 3d array, the label is the next row
features, labels = slicing_window(normed_data, n_months)
# Store feature and label
self._x = features
self._y = labels
# Convert to float32 (output)
self._x = torch.Tensor(self._x)
self._y = torch.Tensor(self._y)
self._maxvalue = max_val
self._minvalue = min_val
def __getitem__(self, idx):
if torch.is_tensor(idx):
idx = idx.tolist()
return (self._x[idx], self._y[idx])
def __len__(self):
return self._x.shape[0]
\end{minted}
\newpage
The utils.py file:
\begin{minted}[bgcolor=LightGray,breaklines=true,fontsize=\footnotesize]{python}
import csv
import torch
import numpy as np
import pandas as pd
from sklearn import metrics
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt
def compute_loss(net: torch.nn.Module,
dataloader: torch.utils.data.DataLoader,
loss_function: torch.nn.Module,
device: torch.device = 'cpu') -> torch.Tensor:
"""Compute the loss of a network on a given dataset.
Does not compute gradient.
Parameters
----------
net:
Network to evaluate.
dataloader:
Iterator on the dataset.
loss_function:
Loss function to compute.
device:
Torch device, or :py:class:`str`.
Returns
-------
Loss as a tensor with no grad.
"""
running_loss = 0
with torch.no_grad():
for x, y in dataloader:
netout = net(x.to(device)).cpu()
running_loss += loss_function(y, netout)
return running_loss / len(dataloader)
def normalizer(data):
numerator = data - np.min(data, 0)
denominator = np.max(data, 0) - np.min(data, 0)
# norm_data = numerator / (denominator + 1e-7)
norm_data = numerator / denominator
return norm_data, np.min(data, 0), np.max(data, 0)
def scaler(data):
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler(feature_range=(-1, 1))
data_scaled = scaler.fit_transform(data)
min_value = scaler.data_min_
max_value = scaler.data_max_
return data_scaled, min_value, max_value
def rescaler(data, min_value, max_value):
inv_y = (data - (-1)) * (max_value - min_value) / (1 - (-1)) + min_value
return inv_y
def renormlizer(data, max_val, min_val):
data = data * (max_val - min_val)
data = data + min_val
return data
def slicing_window(data, n_in):
list_of_features = []
list_of_labels = []
for i in range(len(data)-n_in+1):
arr_features = data[i:(i+n_in), :-1]
arr_label = data[i:(i+n_in), -1]
list_of_features.append(arr_features)
list_of_labels.append(arr_label.reshape(-1, 1))
features = np.array(list_of_features)
labels = np.array(list_of_labels)
return features, labels
\end{minted}
\newpage
The tst/encoder.py file:
\begin{minted}[bgcolor=LightGray,breaklines=true,fontsize=\footnotesize]{python}
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from tst.multiHeadAttention import MultiHeadAttention, MultiHeadAttentionChunk, MultiHeadAttentionWindow
from tst.positionwiseFeedForward import PositionwiseFeedForward
class Encoder(nn.Module):
"""Encoder block from Attention is All You Need.
Apply Multi Head Attention block followed by a Point-wise Feed Forward block.
Residual sum and normalization are applied at each step.
Parameters
----------
d_model:
Dimension of the input vector.
q:
Dimension of all query matrix.
v:
Dimension of all value matrix.
h:
Number of heads.
attention_size:
Number of backward elements to apply attention.
Deactivated if ``None``. Default is ``None``.
dropout:
Dropout probability after each MHA or PFF block.
Default is ``0.3``.
chunk_mode:
Swict between different MultiHeadAttention blocks.
One of ``'chunk'``, ``'window'`` or ``None``. Default is ``'chunk'``.
"""
def __init__(self,
d_model: int,
q: int,
v: int,
h: int,
attention_size: int = None,
dropout: float = 0.3,
chunk_mode: str = 'chunk'):
"""Initialize the Encoder block"""
super().__init__()
chunk_mode_modules = {
'chunk': MultiHeadAttentionChunk,
'window': MultiHeadAttentionWindow,
}
if chunk_mode in chunk_mode_modules.keys():
MHA = chunk_mode_modules[chunk_mode]
elif chunk_mode is None:
MHA = MultiHeadAttention
else:
raise NameError(
f'chunk_mode "{chunk_mode}" not understood. Must be one of {", ".join(chunk_mode_modules.keys())} or None.')
self._selfAttention = MHA(d_model, q, v, h, attention_size=attention_size)
self._feedForward = PositionwiseFeedForward(d_model)
self._layerNorm1 = nn.LayerNorm(d_model)
self._layerNorm2 = nn.LayerNorm(d_model)
self._dopout = nn.Dropout(p=dropout)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Propagate the input through the Encoder block.
Apply the Multi Head Attention block, add residual and normalize.
Apply the Point-wise Feed Forward block, add residual and normalize.
Parameters
----------
x:
Input tensor with shape (batch_size, K, d_model).
Returns
-------
Output tensor with shape (batch_size, K, d_model).
"""
# Self attention
residual = x
x = self._selfAttention(query=x, key=x, value=x)
x = self._dopout(x)
x = self._layerNorm1(x + residual)
# Feed forward
residual = x
x = self._feedForward(x)
x = self._dopout(x)
x = self._layerNorm2(x + residual)
return x
@property
def attention_map(self) -> torch.Tensor:
"""Attention map after a forward propagation,
variable `score` in the original paper.
"""
return self._selfAttention.attention_map
\end{minted}
\newpage
The tst/decoder.py file:
\begin{minted}[bgcolor=LightGray,breaklines=true,fontsize=\footnotesize]{python}
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from tst.multiHeadAttention import MultiHeadAttention, MultiHeadAttentionChunk, MultiHeadAttentionWindow
from tst.positionwiseFeedForward import PositionwiseFeedForward
class Decoder(nn.Module):
"""Decoder block from Attention is All You Need.
Apply two Multi Head Attention block followed by a Point-wise Feed Forward block.
Residual sum and normalization are applied at each step.
Parameters
----------
d_model:
Dimension of the input vector.
q:
Dimension of all query matrix.
v:
Dimension of all value matrix.
h:
Number of heads.
attention_size:
Number of backward elements to apply attention.
Deactivated if ``None``. Default is ``None``.
dropout:
Dropout probability after each MHA or PFF block.
Default is ``0.3``.
chunk_mode:
Swict between different MultiHeadAttention blocks.
One of ``'chunk'``, ``'window'`` or ``None``. Default is ``'chunk'``.
"""
def __init__(self,
d_model: int,
q: int,
v: int,
h: int,
attention_size: int = None,
dropout: float = 0.3,
chunk_mode: str = 'chunk'):
"""Initialize the Decoder block"""
super().__init__()
chunk_mode_modules = {
'chunk': MultiHeadAttentionChunk,
'window': MultiHeadAttentionWindow,
}
if chunk_mode in chunk_mode_modules.keys():
MHA = chunk_mode_modules[chunk_mode]
elif chunk_mode is None:
MHA = MultiHeadAttention
else:
raise NameError(
f'chunk_mode "{chunk_mode}" not understood. Must be one of {", ".join(chunk_mode_modules.keys())} or None.')
self._selfAttention = MHA(d_model, q, v, h, attention_size=attention_size)
self._encoderDecoderAttention = MHA(d_model, q, v, h, attention_size=attention_size)
self._feedForward = PositionwiseFeedForward(d_model)
self._layerNorm1 = nn.LayerNorm(d_model)
self._layerNorm2 = nn.LayerNorm(d_model)
self._layerNorm3 = nn.LayerNorm(d_model)
self._dopout = nn.Dropout(p=dropout)
def forward(self, x: torch.Tensor, memory: torch.Tensor) -> torch.Tensor:
"""Propagate the input through the Decoder block.
Apply the self attention block, add residual and normalize.
Apply the encoder-decoder attention block, add residual and normalize.
Apply the feed forward network, add residual and normalize.
Parameters
----------
x:
Input tensor with shape (batch_size, K, d_model).
memory:
Memory tensor with shape (batch_size, K, d_model)
from encoder output.
Returns
-------
x:
Output tensor with shape (batch_size, K, d_model).
"""
# Self attention
residual = x
x = self._selfAttention(query=x, key=x, value=x, mask="subsequent")
x = self._dopout(x)
x = self._layerNorm1(x + residual)
# Encoder-decoder attention
residual = x
x = self._encoderDecoderAttention(query=x, key=memory, value=memory)
x = self._dopout(x)
x = self._layerNorm2(x + residual)
# Feed forward
residual = x
x = self._feedForward(x)
x = self._dopout(x)
x = self._layerNorm3(x + residual)
return x
\end{minted}
\newpage
The tst/multiHeadAttention.py file:
\begin{minted}[bgcolor=LightGray,breaklines=true,fontsize=\footnotesize]{python}
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from tst.utils import generate_local_map_mask
class MultiHeadAttention(nn.Module):
"""Multi Head Attention block from Attention is All You Need.
Given 3 inputs of shape (batch_size, K, d_model), that will be used
to compute query, keys and values, we output a self attention
tensor of shape (batch_size, K, d_model).
Parameters
----------
d_model:
Dimension of the input vector.
q:
Dimension of all query matrix.
v:
Dimension of all value matrix.
h:
Number of heads.
attention_size:
Number of backward elements to apply attention.
Deactivated if ``None``. Default is ``None``.
"""
def __init__(self,
d_model: int,
q: int,
v: int,
h: int,
attention_size: int = None):
"""Initialize the Multi Head Block."""
super().__init__()
self._h = h
self._attention_size = attention_size
# Query, keys and value matrices
self._W_q = nn.Linear(d_model, q*self._h)
self._W_k = nn.Linear(d_model, q*self._h)
self._W_v = nn.Linear(d_model, v*self._h)
# Output linear function
self._W_o = nn.Linear(self._h*v, d_model)
# Score placeholder
self._scores = None
def forward(self,
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
mask: Optional[str] = None) -> torch.Tensor:
"""Propagate forward the input through the MHB.
We compute for each head the queries, keys and values matrices,
followed by the Scaled Dot-Product. The result is concatenated
and returned with shape (batch_size, K, d_model).
Parameters
----------
query:
Input tensor with shape (batch_size, K, d_model) used to compute queries.
key:
Input tensor with shape (batch_size, K, d_model) used to compute keys.
value:
Input tensor with shape (batch_size, K, d_model) used to compute values.
mask:
Mask to apply on scores before computing attention.
One of ``'subsequent'``, None. Default is None.
Returns
-------
Self attention tensor with shape (batch_size, K, d_model).
"""
K = query.shape[1]
# Compute Q, K and V, concatenate heads on batch dimension
queries = torch.cat(self._W_q(query).chunk(self._h, dim=-1), dim=0)
keys = torch.cat(self._W_k(key).chunk(self._h, dim=-1), dim=0)
values = torch.cat(self._W_v(value).chunk(self._h, dim=-1), dim=0)
# Scaled Dot Product
self._scores = torch.bmm(queries, keys.transpose(1, 2)) / np.sqrt(K)
# Compute local map mask
if self._attention_size is not None:
attention_mask = generate_local_map_mask(K, self._attention_size, mask_future=False, device=self._scores.device)
self._scores = self._scores.masked_fill(attention_mask, float('-inf'))
# Compute future mask
if mask == "subsequent":
future_mask = torch.triu(torch.ones((K, K)), diagonal=1).bool()
future_mask = future_mask.to(self._scores.device)
self._scores = self._scores.masked_fill(future_mask, float('-inf'))
# Apply sotfmax
self._scores = F.softmax(self._scores, dim=-1)
attention = torch.bmm(self._scores, values)
# Concatenat the heads
attention_heads = torch.cat(attention.chunk(self._h, dim=0), dim=-1)
# Apply linear transformation W^O
self_attention = self._W_o(attention_heads)
return self_attention
@property
def attention_map(self) -> torch.Tensor:
"""Attention map after a forward propagation,
variable `score` in the original paper.
"""
if self._scores is None:
raise RuntimeError(
"Evaluate the model once to generate attention map")
return self._scores
class MultiHeadAttentionChunk(MultiHeadAttention):
"""Multi Head Attention block with chunk.
Given 3 inputs of shape (batch_size, K, d_model), that will be used
to compute query, keys and values, we output a self attention
tensor of shape (batch_size, K, d_model).
Queries, keys and values are divided in chunks of constant size.
Parameters
----------
d_model:
Dimension of the input vector.
q:
Dimension of all query matrix.
v:
Dimension of all value matrix.
h:
Number of heads.
attention_size:
Number of backward elements to apply attention.
Deactivated if ``None``. Default is ``None``.
chunk_size:
Size of chunks to apply attention on. Last one may be smaller (see :class:`torch.Tensor.chunk`).
Default is 168.
"""
def __init__(self,
d_model: int,
q: int,
v: int,
h: int,
attention_size: int = None,
chunk_size: Optional[int] = 168,
**kwargs):
"""Initialize the Multi Head Block."""
super().__init__(d_model, q, v, h, attention_size, **kwargs)
self._chunk_size = chunk_size
# Score mask for decoder
self._future_mask = nn.Parameter(torch.triu(torch.ones((self._chunk_size, self._chunk_size)), diagonal=1).bool(),
requires_grad=False)
if self._attention_size is not None:
self._attention_mask = nn.Parameter(generate_local_map_mask(self._chunk_size, self._attention_size),
requires_grad=False)
def forward(self,
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
mask: Optional[str] = None) -> torch.Tensor:
"""Propagate forward the input through the MHB.
We compute for each head the queries, keys and values matrices,
followed by the Scaled Dot-Product. The result is concatenated
and returned with shape (batch_size, K, d_model).
Parameters
----------
query:
Input tensor with shape (batch_size, K, d_model) used to compute queries.
key:
Input tensor with shape (batch_size, K, d_model) used to compute keys.
value:
Input tensor with shape (batch_size, K, d_model) used to compute values.
mask:
Mask to apply on scores before computing attention.
One of ``'subsequent'``, None. Default is None.
Returns
-------
Self attention tensor with shape (batch_size, K, d_model).
"""
K = query.shape[1]
n_chunk = K // self._chunk_size
# Compute Q, K and V, concatenate heads on batch dimension
queries = torch.cat(torch.cat(self._W_q(query).chunk(self._h, dim=-1), dim=0).chunk(n_chunk, dim=1), dim=0)
keys = torch.cat(torch.cat(self._W_k(key).chunk(self._h, dim=-1), dim=0).chunk(n_chunk, dim=1), dim=0)
values = torch.cat(torch.cat(self._W_v(value).chunk(self._h, dim=-1), dim=0).chunk(n_chunk, dim=1), dim=0)
# Scaled Dot Product
self._scores = torch.bmm(queries, keys.transpose(1, 2)) / np.sqrt(self._chunk_size)
# Compute local map mask
if self._attention_size is not None:
self._scores = self._scores.masked_fill(self._attention_mask, float('-inf'))
# Compute future mask
if mask == "subsequent":
self._scores = self._scores.masked_fill(self._future_mask, float('-inf'))
# Apply softmax
self._scores = F.softmax(self._scores, dim=-1)
attention = torch.bmm(self._scores, values)
# Concatenat the heads
attention_heads = torch.cat(torch.cat(attention.chunk(
n_chunk, dim=0), dim=1).chunk(self._h, dim=0), dim=-1)
# Apply linear transformation W^O
self_attention = self._W_o(attention_heads)
return self_attention
class MultiHeadAttentionWindow(MultiHeadAttention):
"""Multi Head Attention block with moving window.
Given 3 inputs of shape (batch_size, K, d_model), that will be used
to compute query, keys and values, we output a self attention
tensor of shape (batch_size, K, d_model).
Queries, keys and values are divided in chunks using a moving window.
Parameters
----------
d_model:
Dimension of the input vector.
q:
Dimension of all query matrix.
v:
Dimension of all value matrix.
h:
Number of heads.
attention_size:
Number of backward elements to apply attention.
Deactivated if ``None``. Default is ``None``.
window_size:
Size of the window used to extract chunks.
Default is 168
padding:
Padding around each window. Padding will be applied to input sequence.
Default is 168 // 4 = 42.
"""
def __init__(self,
d_model: int,
q: int,
v: int,
h: int,
attention_size: int = None,
window_size: Optional[int] = 168,
padding: Optional[int] = 168 // 4,
**kwargs):
"""Initialize the Multi Head Block."""
super().__init__(d_model, q, v, h, attention_size, **kwargs)
self._window_size = window_size
self._padding = padding
self._q = q
self._v = v
# Step size for the moving window
self._step = self._window_size - 2 * self._padding
# Score mask for decoder
self._future_mask = nn.Parameter(torch.triu(torch.ones((self._window_size, self._window_size)), diagonal=1).bool(),
requires_grad=False)
if self._attention_size is not None:
self._attention_mask = nn.Parameter(generate_local_map_mask(self._window_size, self._attention_size),
requires_grad=False)
def forward(self,
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
mask: Optional[str] = None) -> torch.Tensor:
"""Propagate forward the input through the MHB.
We compute for each head the queries, keys and values matrices,
followed by the Scaled Dot-Product. The result is concatenated
and returned with shape (batch_size, K, d_model).
Parameters
----------
query:
Input tensor with shape (batch_size, K, d_model) used to compute queries.
key:
Input tensor with shape (batch_size, K, d_model) used to compute keys.
value:
Input tensor with shape (batch_size, K, d_model) used to compute values.
mask:
Mask to apply on scores before computing attention.
One of ``'subsequent'``, None. Default is None.
Returns
-------
Self attention tensor with shape (batch_size, K, d_model).
"""
batch_size = query.shape[0]
# Apply padding to input sequence
query = F.pad(query.transpose(1, 2), (self._padding, self._padding), 'replicate').transpose(1, 2)
key = F.pad(key.transpose(1, 2), (self._padding, self._padding), 'replicate').transpose(1, 2)
value = F.pad(value.transpose(1, 2), (self._padding, self._padding), 'replicate').transpose(1, 2)
# Compute Q, K and V, concatenate heads on batch dimension
queries = torch.cat(self._W_q(query).chunk(self._h, dim=-1), dim=0)
keys = torch.cat(self._W_k(key).chunk(self._h, dim=-1), dim=0)
values = torch.cat(self._W_v(value).chunk(self._h, dim=-1), dim=0)
# Divide Q, K and V using a moving window
queries = queries.unfold(dimension=1, size=self._window_size, step=self._step).reshape((-1, self._q, self._window_size)).transpose(1, 2)
keys = keys.unfold(dimension=1, size=self._window_size, step=self._step).reshape((-1, self._q, self._window_size)).transpose(1, 2)
values = values.unfold(dimension=1, size=self._window_size, step=self._step).reshape((-1, self._v, self._window_size)).transpose(1, 2)
# Scaled Dot Product
self._scores = torch.bmm(queries, keys.transpose(1, 2)) / np.sqrt(self._window_size)
# Compute local map mask
if self._attention_size is not None:
self._scores = self._scores.masked_fill(self._attention_mask, float('-inf'))
# Compute future mask
if mask == "subsequent":
self._scores = self._scores.masked_fill(self._future_mask, float('-inf'))
# Apply softmax
self._scores = F.softmax(self._scores, dim=-1)
attention = torch.bmm(self._scores, values)
# Fold chunks back
attention = attention.reshape((batch_size*self._h, -1, self._window_size, self._v))
attention = attention[:, :, self._padding:-self._padding, :]
attention = attention.reshape((batch_size*self._h, -1, self._v))
# Concatenat the heads
attention_heads = torch.cat(attention.chunk(self._h, dim=0), dim=-1)
# Apply linear transformation W^O
self_attention = self._W_o(attention_heads)
return self_attention
\end{minted}
\newpage
The tst/positionwiseFeedForward.py file:
\begin{minted}[bgcolor=LightGray,breaklines=true,fontsize=\footnotesize]{python}
import torch
import torch.nn as nn
import torch.nn.functional as F
class PositionwiseFeedForward(nn.Module):
"""Position-wise Feed Forward Network block from Attention is All You Need.
Apply two linear transformations to each input, separately but indetically. We
implement them as 1D convolutions. Input and output have a shape (batch_size, d_model).
Parameters
----------
d_model:
Dimension of input tensor.
d_ff:
Dimension of hidden layer, default is 2048.
"""
def __init__(self,
d_model: int,
d_ff: Optional[int] = 2048):
"""Initialize the PFF block."""
super().__init__()
self._linear1 = nn.Linear(d_model, d_ff)
self._linear2 = nn.Linear(d_ff, d_model)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Propagate forward the input through the PFF block.
Apply the first linear transformation, then a relu actvation,
and the second linear transformation.
Parameters
----------
x:
Input tensor with shape (batch_size, K, d_model).
Returns
-------
Output tensor with shape (batch_size, K, d_model).
"""
return self._linear2(F.relu(self._linear1(x)))
\end{minted}
\newpage
The tst/transformer.py file:
\begin{minted}[bgcolor=LightGray,breaklines=true,fontsize=\footnotesize]{python}
import torch
import torch.nn as nn
from tst.encoder import Encoder
from tst.decoder import Decoder
from tst.utils import generate_original_PE, generate_regular_PE
class Transformer(nn.Module):
"""Transformer model from Attention is All You Need.
A classic transformer model adapted for sequential data.
Embedding has been replaced with a fully connected layer,
the last layer softmax is now a sigmoid.
Attributes
----------
layers_encoding: :py:class:`list` of :class:`Encoder.Encoder`
stack of Encoder layers.
layers_decoding: :py:class:`list` of :class:`Decoder.Decoder`
stack of Decoder layers.
Parameters
----------
d_input:
Model input dimension.
d_model:
Dimension of the input vector.
d_output:
Model output dimension.
q:
Dimension of queries and keys.
v:
Dimension of values.
h:
Number of heads.
N:
Number of encoder and decoder layers to stack.
attention_size:
Number of backward elements to apply attention.
Deactivated if ``None``. Default is ``None``.
dropout:
Dropout probability after each MHA or PFF block.
Default is ``0.3``.
chunk_mode:
Switch between different MultiHeadAttention blocks.
One of ``'chunk'``, ``'window'`` or ``None``. Default is ``'chunk'``.
pe:
Type of positional encoding to add.
Must be one of ``'original'``, ``'regular'`` or ``None``. Default is ``None``.
pe_period:
If using the ``'regular'` pe, then we can define the period. Default is ``24``.
"""
def __init__(self,
d_input: int,
d_model: int,
d_output: int,
q: int,
v: int,
h: int,
N: int,
attention_size: int = None,
dropout: float = 0.3,
chunk_mode: str = 'chunk',
pe: str = None,
pe_period: int = 12):
"""Create transformer structure from Encoder and Decoder blocks."""
super().__init__()
self._d_model = d_model
self.layers_encoding = nn.ModuleList([Encoder(d_model,
q,
v,
h,
attention_size=attention_size,
dropout=dropout,
chunk_mode=chunk_mode) for _ in range(N)])
self.layers_decoding = nn.ModuleList([Decoder(d_model,
q,
v,
h,
attention_size=attention_size,
dropout=dropout,
chunk_mode=chunk_mode) for _ in range(N)])
self._embedding = nn.Linear(d_input, d_model)
self._linear = nn.Linear(d_model, d_output)
# positional encoding:
pe_functions = {
'original': generate_original_PE,
'regular': generate_regular_PE,
}
if pe in pe_functions.keys():
self._generate_PE = pe_functions[pe]
self._pe_period = pe_period
elif pe is None:
self._generate_PE = None
else:
raise NameError(
f'PE "{pe}" not understood. Must be one of {", ".join(pe_functions.keys())} or None.')
self.name = 'transformer'
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Propagate input through transformer
Forward input through an embedding module,
the encoder then decoder stacks, and an output module.
Parameters
----------
x:
:class:`torch.Tensor` of shape (batch_size, K, d_input).
Returns
-------
Output tensor with shape (batch_size, K, d_output).
"""
K = x.shape[1]
# Embeddin module
encoding = self._embedding(x)
# Add position encoding
if self._generate_PE is not None:
pe_params = {'period': self._pe_period} if self._pe_period else {}
positional_encoding = self._generate_PE(K, self._d_model, **pe_params)
positional_encoding = positional_encoding.to(encoding.device)
encoding.add_(positional_encoding)
# Encoding stack
for layer in self.layers_encoding:
encoding = layer(encoding) ### output size of the encoder: d_model
# Decoding stack
decoding = encoding
# Add position encoding
if self._generate_PE is not None:
positional_encoding = self._generate_PE(K, self._d_model)
positional_encoding = positional_encoding.to(decoding.device)
decoding.add_(positional_encoding)
for layer in self.layers_decoding:
decoding = layer(decoding, encoding)
# Output module
output = self._linear(decoding)
output = torch.sigmoid(output)
return output
\end{minted}
\newpage
The tst/utils.py file:
\begin{minted}[bgcolor=LightGray,breaklines=true,fontsize=\footnotesize]{python}
from typing import Optional, Union
import numpy as np
import torch
def generate_original_PE(length: int, d_model: int) -> torch.Tensor:
"""Generate positional encoding as described in original paper. :class:`torch.Tensor`
Parameters
----------
length:
Time window length, i.e. K.
d_model:
Dimension of the model vector.
Returns
-------
Tensor of shape (K, d_model).
"""
PE = torch.zeros((length, d_model))
pos = torch.arange(length).unsqueeze(1)
PE[:, 0::2] = torch.sin(
pos / torch.pow(1000, torch.arange(0, d_model, 2, dtype=torch.float32)/d_model))
PE[:, 1::2] = torch.cos(
pos / torch.pow(1000, torch.arange(1, d_model, 2, dtype=torch.float32)/d_model))
return PE
def generate_regular_PE(length: int, d_model: int, period: Optional[int] = 24) -> torch.Tensor:
"""Generate positional encoding with a given period.
Parameters
----------
length:
Time window length, i.e. K.
d_model:
Dimension of the model vector.
period:
Size of the pattern to repeat.
Default is 24.
Returns
-------
Tensor of shape (K, d_model).
"""
PE = torch.zeros((length, d_model))
pos = torch.arange(length, dtype=torch.float32).unsqueeze(1)
PE = torch.sin(pos * 2 * np.pi / period)
PE = PE.repeat((1, d_model))
return PE
def generate_local_map_mask(chunk_size: int,
attention_size: int,
mask_future=False,
device: torch.device = 'cpu') -> torch.BoolTensor:
"""Compute attention mask as attention_size wide diagonal.
Parameters
----------
chunk_size:
Time dimension size.
attention_size:
Number of backward elements to apply attention.
device:
torch device. Default is ``'cpu'``.
Returns
-------
Mask as a boolean tensor.
"""
local_map = np.empty((chunk_size, chunk_size))
i, j = np.indices(local_map.shape)
if mask_future:
local_map[i, j] = (i - j > attention_size) ^ (j - i > 0)
else:
local_map[i, j] = np.abs(i - j) > attention_size
return torch.BoolTensor(local_map).to(device)
\end{minted}
\end{document}