We reviewed the famous Vaswani, A., Shazeer, N., Parmar, N., Uszkoreit, J., Jones, L., Gomez, A. N., … & Polosukhin, I. (2017). Attention is all you need. In Advances in neural information processing systems (pp. 5998-6008) last time.
Now this is the actual implementation of a Transformer model in Pytorch for Neural Machine Translation(NMT) task.
If you didn’t get the overall concepts and understandings of Transformer, I recommend you to visit the previous post and read it.
https://songstudio.info/tech/tech-24/
Then let us begin to look at the details more specifically.
First, I used English-French parallel corpus provided by “European Parliament Proceedings Parallel Corpus 1996-2011”, cited in publication Koehn, P. (2005, September). Europarl: A parallel corpus for statistical machine translation. In MT summit (Vol. 5, pp. 79-86).
Originally, there are $2,007,723$ sentence pairs, but due to the lack of resources I extracted $150,000$ pairs.
Data Preprocessing
I used Google’s sentencepiece tokenizer to preprocess the dataset.
The reason I chose this is that the sentencepiece is based on BPE(Byte Pair Encoding), which can alleviate OOV(Out Of Vocabulary) problem and can be implemented into different languages without any additional tokenization or preprocessing.
For more information on sentencepiece tokenizer, you can check https://github.com/google/sentencepiece.
Since this is a translation task from English to French, English becomes source language and French is target language.
By training the tokenizer with English, I got src_sp.model
and src_sp.vocab
each of which is trained tokenzer model and vocabulary.
Likewise by training with French, I got trg_sp.model
and trg_sp.vocab
.
Next, I preprocessed tokenized sentences to make source data into encoder inputs and target data into decoder inputs.
After tokenizing, we should convert each word into corresponding index, which is automatically supported by sentencepiece’s EncodeAsIds
function, and add paddings to fit all sentences into same length.
When processing target data, there is a little difference with source data process.
Because target data is actual sentences which we are going to generate, this should be annotated with $[sos]$ or $[eos]$ token which represents start token and end token each.
So I converted each data into below form.
The input_trg_data
is put into the decoder as a target input when training and output_trg_data
is the ground truth sequence when training and also the optimal answer for translation.
So the decoder is trained to make the sequence $(wa \to wb \to wc \to wd \to end)$ given the encoder’s output and the target input sequence $(start \to wa \to wb \to wc \to wd)$ for teaching forcing.
This is code for preprocessing.
def add_padding(tokenized_text):
if len(tokenized_text) < seq_len:
left = seq_len - len(tokenized_text)
padding = [pad_id] * left
tokenized_text += padding
return tokenized_text
def process_src(text_list):
tokenized_list = []
for text in tqdm(text_list):
tokenized = src_sp.EncodeAsIds(text.strip())
tokenized_list.append(add_padding(tokenized + [eos_id]))
return tokenized_list
def process_trg(text_list):
input_tokenized_list = []
output_tokenized_list = []
for text in tqdm(text_list):
tokenized = trg_sp.EncodeAsIds(text.strip())
trg_input = [sos_id] + tokenized
trg_output = tokenized + [eos_id]
input_tokenized_list.append(add_padding(trg_input))
output_tokenized_list.append(add_padding(trg_output))
return input_tokenized_list, output_tokenized_list
Now we have to make masks for encoder and decoder.
The encoder mask should have False
value only at the padded spot, but as we saw in “Attention is all you need” paper, the decoder mask is a little bit different.
First, I made e_mask
and d_mask
in same way.
e_mask = (self.src_data != pad_id).unsqueeze(1) # (num_samples, 1, L)
d_mask = (self.input_trg_data != pad_id).unsqueeze(1) # (num_samples, 1, L)
Then additionally I made np_mask
which annotates the spot to ignore based on the actual sequence length of input_trg_data
.
With torch.tril
function, I could do this without difficulty.
Then by conducting &
operation between d_mask
and np_mask
, the decoder
mask can have False
at paddings and word spots which exceed the current attending word position.
np_mask = torch.ones([1, seq_len, seq_len], dtype=torch.bool) # (1, L, L)
np_mask = torch.tril(np_mask) # (1, L, L) to triangular shape
d_mask = d_mask & np_mask # (num_samples, L, L) padding false
Finally, I wrapped src_data
, input_trg_data
, output_trg_data
, encoder_mask
, decoder_mask
and put into a dataloader object to batchify entire samples.
Embedding
I implemented simple nn.Embedding
layer to train the model a optimal look up table for word embedding.
self.src_embedding = nn.Embedding(self.src_vocab_size, d_model)
self.trg_embedding = nn.Embedding(self.trg_vocab_size, d_model)
Positional Encoding
To put positional information to our embedded input, I made a class PositionalEncoder
.
The calculation is based on equations provided in the paper.
\[PE_{(pos,2i)}=sin(pos/10000^{2i/d_{model}})\] \[PE_{(pos,2i+1)}=cos(pos/10000^{2i/d_{model}})\]As we reviewed in previous post, $pos$ represents the position of each word in a sequence and $i$ is each dimension in an embedded vector.
The actual code is as follows.
class PositionalEncoder(nn.Module):
def __init__(self):
super().__init__()
# Make initial positional encoding matrix with 0
pe_matrix= torch.zeros(seq_len, d_model) # (L, d_model)
# Calculating position encoding values
for pos in range(seq_len):
for i in range(d_model):
if i % 2 == 0:
pe_matrix[pos, i] = math.sin(pos / (10000 ** (2 * i / d_model)))
elif i % 2 == 1:
pe_matrix[pos, i] = math.cos(pos / (10000 ** (2 * i / d_model)))
pe_matrix = pe_matrix.unsqueeze(0) # (1, L, d_model)
self.positional_encoding = pe_matrix.to(device=device).requires_grad_(False)
def forward(self, x):
x = x * math.sqrt(d_model) # (B, L, d_model)
x = x + self.positional_encoding # (B, L, d_model)
return x
I made positional_encoding
matrix which should be added to the input tensor.
Remember that this matrix should not be tuned because as the paper says, this values are constant.
So I set this matrix frozen so that the back-propagation process must not change it.
And the reason why there is an additional multiplication, which is at line $19$, is that to make the positional encoding not affect the original embedding value too much.
We can think this as a kind of scaling.
Although this is not exactly mentioned in the paper, but in several other posts and implementations, this scaling step is usually included.
Multi-head Attention
Now this is the main part.
Multi-head attention first makes $Q$, $K$, $V$ matrix with linear transformation, splits each matrices into $num\_heads$, conducts self attention mechanisms and finally concatenates all results from each head.
class MultiheadAttention(nn.Module):
def __init__(self):
super().__init__()
self.inf = 1e9
# W^Q, W^K, W^V in the paper
self.w_q = nn.Linear(d_model, d_model)
self.w_k = nn.Linear(d_model, d_model)
self.w_v = nn.Linear(d_model, d_model)
self.dropout = nn.Dropout(drop_out_rate)
self.attn_softmax = nn.Softmax(dim=-1)
# Final output linear transformation
self.w_0 = nn.Linear(d_model, d_model)
def forward(self, q, k, v, mask=None):
input_shape = q.shape
# Linear calculation + split into num_heads
q = self.w_q(q).view(input_shape[0], -1, num_heads, d_k) # (B, L, num_heads, d_k)
k = self.w_k(k).view(input_shape[0], -1, num_heads, d_k) # (B, L, num_heads, d_k)
v = self.w_v(v).view(input_shape[0], -1, num_heads, d_k) # (B, L, num_heads, d_k)
# For convenience, convert all tensors in size (B, num_heads, L, d_k)
q = q.transpose(1, 2)
k = k.transpose(1, 2)
v = v.transpose(1, 2)
# Conduct self-attention
attn_values = self.self_attention(q, k, v, mask=mask) # (B, num_heads, L, d_k)
concat_output = attn_values.transpose(1, 2)\
.contiguous().view(input_shape[0], -1, d_model) # (B, L, d_model)
return self.w_0(concat_output)
Note that we can split each matrix for each head by just converting the shape of entire matrix with torch.view
.
And we should transpose the dimension $1$ and dimension $2$ since self attention should be conducted by each head, so $num\_heads$ should be put outside of $seq\_len$.
And after conducting the self attention, we can get final attention values.
To concatenate them, I reverted dimension $1$ and $2$ by transposing and make the matrix shape into $(batch\_size, seq\_len, d\_model)$, which is the original shape.
After the final linear transformation, the output is passed to next layers.
Self Attention (Scaled dot-product attention)
Self attention step gets matrices q
, k
, v
and conducts scaled dot-product attention calculation as we saw in the paper.
def self_attention(self, q, k, v, mask=None):
# Calculate attention scores with scaled dot-product attention
attn_scores = torch.matmul(q, k.transpose(-2, -1)) # (B, num_heads, L, L)
attn_scores = attn_scores / math.sqrt(d_k)
# If there is a mask, make masked spots -INF
if mask is not None:
mask = mask.unsqueeze(1) # (B, 1, L) => (B, 1, 1, L) or (B, L, L) => (B, 1, L, L)
attn_scores = attn_scores.masked_fill_(mask == 0, -1 * self.inf)
# Softmax and multiplying K to calculate attention value
attn_distribs = self.attn_softmax(attn_scores)
attn_distribs = self.dropout(attn_distribs)
attn_values = torch.matmul(attn_distribs, v) # (B, num_heads, L, d_k)
return attn_values
The noticeable point here is that we use encoder/decoder mask in this function.
The masks make the attention score have $-\infty$ at certain spots which should not be attended.
This spots eventually become values converging on nearly $0$ since the softmax function puts these values as exponents of $e$.
Therefore the masked spots are properly ignored.
I added an additional dropout layer to prevent overfitting.
The rest of the codes are not that difficult to understand I think.
Encoder and Decoder
This is a Feed-Forward layer which makes the sequence in different shape and revert it again.
class FeedFowardLayer(nn.Module):
def __init__(self):
super().__init__()
self.linear_1 = nn.Linear(d_model, d_ff, bias=True)
self.relu = nn.ReLU()
self.linear_2 = nn.Linear(d_ff, d_model, bias=True)
self.dropout = nn.Dropout(drop_out_rate)
def forward(self, x):
x = self.relu(self.linear_1(x)) # (B, L, d_ff)
x = self.dropout(x)
x = self.linear_2(x) # (B, L, d_model)
return x
And this is a layer normalization layer, which prevents the values in input tensors from exploding.
class LayerNormalization(nn.Module):
def __init__(self, eps=1e-6):
super().__init__()
self.eps = eps
self.layer = nn.LayerNorm([d_model], elementwise_affine=True, eps=self.eps)
def forward(self, x):
x = self.layer(x)
return x
Of course, the calculation itself is not complicated so we can manually implement layer normalization formula.
But there is a module that Pytorch supports, so I thought it would be better to use predefined library.
Then I constructed EncoderLayer
and DecoderLayer
classes with several layers we have seen so far.
Note that each layer looks like below image, as we’ve seen before.
class EncoderLayer(nn.Module):
def __init__(self):
super().__init__()
self.layer_norm_1 = LayerNormalization()
self.multihead_attention = MultiheadAttention()
self.drop_out_1 = nn.Dropout(drop_out_rate)
self.layer_norm_2 = LayerNormalization()
self.feed_forward = FeedFowardLayer()
self.drop_out_2 = nn.Dropout(drop_out_rate)
def forward(self, x, e_mask):
x_1 = self.layer_norm_1(x) # (B, L, d_model)
x = x + self.drop_out_1(
self.multihead_attention(x_1, x_1, x_1, mask=e_mask)
) # (B, L, d_model)
x_2 = self.layer_norm_2(x) # (B, L, d_model)
x = x + self.drop_out_2(self.feed_forward(x_2)) # (B, L, d_model)
return x # (B, L, d_model)
class DecoderLayer(nn.Module):
def __init__(self):
super().__init__()
self.layer_norm_1 = LayerNormalization()
self.masked_multihead_attention = MultiheadAttention()
self.drop_out_1 = nn.Dropout(drop_out_rate)
self.layer_norm_2 = LayerNormalization()
self.multihead_attention = MultiheadAttention()
self.drop_out_2 = nn.Dropout(drop_out_rate)
self.layer_norm_3 = LayerNormalization()
self.feed_forward = FeedFowardLayer()
self.drop_out_3 = nn.Dropout(drop_out_rate)
def forward(self, x, e_output, e_mask, d_mask):
x_1 = self.layer_norm_1(x) # (B, L, d_model)
x = x + self.drop_out_1(
self.masked_multihead_attention(x_1, x_1, x_1, mask=d_mask)
) # (B, L, d_model)
x_2 = self.layer_norm_2(x) # (B, L, d_model)
x = x + self.drop_out_2(
self.multihead_attention(x_2, e_output, e_output, mask=e_mask)
) # (B, L, d_model)
x_3 = self.layer_norm_3(x) # (B, L, d_model)
x = x + self.drop_out_3(self.feed_forward(x_3)) # (B, L, d_model)
return x # (B, L, d_model)
Although above picture describes overall architectures quite simply, I added additional layer normalization layers and dropouts, which are not mentioned in the paper.
Since a transformer itself has a large number of parameters and calculations, these help parameters not to be adjusted too much and to normalize values inside it.
Also, note that in the decoder two multi-head attentions get parameters differently.
We put q
, k
, v
as inputs and the decoder mask as a mask for the first attention.
On the other hands, in the second attention, the encoder output becomes k
and v
, the output from the first decoder attention is used as q
, and the encoder mask is put as a mask.
This is same with the relation between encoder/decoder hidden states in the attention mechanism in a seq2seq model as we’ve checked before.
And finally, these are the completed encoder and decoder.
class Encoder(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.ModuleList([EncoderLayer() for i in range(num_layers)])
self.layer_norm = LayerNormalization()
def forward(self, x, e_mask):
for i in range(num_layers):
x = self.layers[i](x, e_mask)
return self.layer_norm(x)
class Decoder(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.ModuleList([DecoderLayer() for i in range(num_layers)])
self.layer_norm = LayerNormalization()
def forward(self, x, e_output, e_mask, d_mask):
for i in range(num_layers):
x = self.layers[i](x, e_output, e_mask, d_mask)
return self.layer_norm(x)
I used predefined nn.ModuleList
s to stack several encoder/decoder layers.
With this module, we can access each layer by iterating with a for loop.
Transformer
By wrapping positional encoders, embedding layers, encoders and decoders, we can finally build the entireTransformer
module.
class Transformer(nn.Module):
def __init__(self, src_vocab_size, trg_vocab_size):
super().__init__()
self.src_vocab_size = src_vocab_size
self.trg_vocab_size = trg_vocab_size
self.src_embedding = nn.Embedding(self.src_vocab_size, d_model)
self.trg_embedding = nn.Embedding(self.trg_vocab_size, d_model)
self.positional_encoder = PositionalEncoder()
self.encoder = Encoder()
self.decoder = Decoder()
self.output_linear = nn.Linear(d_model, self.trg_vocab_size)
self.softmax = nn.LogSoftmax(dim=-1)
def forward(self, src_input, trg_input, e_mask=None, d_mask=None):
src_input = self.src_embedding(src_input) # (B, L) => (B, L, d_model)
trg_input = self.trg_embedding(trg_input) # (B, L) => (B, L, d_model)
src_input = self.positional_encoder(src_input) # (B, L, d_model) => (B, L, d_model)
trg_input = self.positional_encoder(trg_input) # (B, L, d_model) => (B, L, d_model)
e_output = self.encoder(src_input, e_mask) # (B, L, d_model)
d_output = self.decoder(trg_input, e_output, e_mask, d_mask) # (B, L, d_model)
# (B, L, d_model) => # (B, L, trg_vocab_size)
output = self.softmax(self.output_linear(d_output))
return output
Since we got a completed Transformer model, now this is the time to set a training/testing procedure for English-French NMT task.
Training
I made a Manager
object which has our model, dataloader, optimizer and loss function.
Since a Transformer
object needs the size of source vocabulary and target vocabulary, it can be created as follows.
self.model = Transformer(len(self.src_i2w), len(self.trg_i2w)).to(device)
And I used “Adam” for an optimizer and “Negative Log-Likelihood” for a loss function.
Since we have LogSoftmax
module in our model, we don’t have to use “Cross Entropy” loss function.
self.optim = torch.optim.Adam(self.model.parameters(), lr=learning_rate)
self.criterion = nn.NLLLoss(ignore_index=pad_id)
At last, this is our training code.
def train(self):
print("Training starts.")
self.model.train()
best_loss = sys.float_info.max
total_training_time = datetime.timedelta()
for epoch in range(1, num_epochs+1):
train_losses = []
for i, batch in tqdm(enumerate(self.train_loader)):
src_input, trg_input, trg_output, e_mask, d_mask = batch
src_input, trg_input, trg_output, e_mask, d_mask = \
src_input.to(device), trg_input.to(device), trg_output.to(device),\
e_mask.to(device), d_mask.to(device)
# (B, L, vocab_size)
output = self.model(src_input, trg_input, e_mask, d_mask)
trg_output_shape = trg_output.shape
self.optim.zero_grad()
loss = self.criterion(
output.view(-1, sp_vocab_size),
trg_output.view(trg_output_shape[0] * trg_output_shape[1])
)
loss.backward()
self.optim.step()
train_losses.append(loss.item())
mean_train_loss = np.mean(train_losses)
print(f"#################### Epoch: {epoch} ####################")
print(f"Train loss: {mean_train_loss}")
if mean_train_loss < best_loss:
if not os.path.exists(ckpt_dir):
os.mkdir(ckpt_dir)
torch.save(self.model.state_dict(), f"{ckpt_dir}/best_model.pth")
print(f"Current best model is saved.")
best_loss = mean_train_loss
I made training logs printed in every epoch and saved trained models only when the training loss decreases compared to the last smallest training loss value.
And this is the captured training logs and the training loss plots which are noted while training.
Testing
Now we have a trained model, this is time for the actual translation task.
def test(self, model_name, input_sentence):
if not os.path.exists(f"{ckpt_dir}/{model_name}"):
print(f"There is no model named {model_name}. Test aborted.")
return
print("Testing starts.")
self.model.load_state_dict(torch.load(f"{ckpt_dir}/{model_name}"))
self.model.eval()
print("Loading sentencepiece tokenizer...")
src_sp = spm.SentencePieceProcessor()
trg_sp = spm.SentencePieceProcessor()
src_sp.Load(f"{SP_DIR}/{src_model_prefix}.model")
trg_sp.Load(f"{SP_DIR}/{trg_model_prefix}.model")
print("Preprocessing input sentence...")
tokenized = src_sp.EncodeAsIds(input_sentence)
src_data = torch.LongTensor(add_padding(tokenized)).unsqueeze(0).to(device) # (1, L)
e_mask = (src_data != pad_id).unsqueeze(1).to(device) # (1, 1, L)
print("Encoding input sentence...")
src_data = self.model.src_embedding(src_data)
src_data = self.model.positional_encoder(src_data)
e_output = self.model.encoder(src_data, e_mask) # (1, L, d_model)
outputs = torch.zeros(seq_len).long().to(device) # (L)
outputs[0] = sos_id # (L)
output_len = 0
for i in range(1, seq_len):
d_mask = (outputs.unsqueeze(0) != pad_id).unsqueeze(1).to(device) # (1, 1, L)
np_mask = torch.ones([1, seq_len, seq_len], dtype=torch.bool).to(device) # (1, L, L)
np_mask = torch.tril(np_mask) # (1, L, L) to triangular shape
d_mask = d_mask & np_mask # (1, L, L) padding false
trg_embedded = self.model.trg_embedding(outputs.unsqueeze(0))
trg_positional_encoded = self.model.positional_encoder(trg_embedded)
decoder_output = self.model.decoder(
trg_positional_encoded,
e_output,
e_mask,
d_mask
) # (1, L, d_model)
output = self.model.softmax(
self.model.output_linear(decoder_output)
) # (1, L, trg_vocab_size)
output = torch.argmax(output, dim=-1) # (1, L)
last_word_id = output[0][i-1].item()
if last_word_id == eos_id:
break
outputs[i] = last_word_id
output_len = i
decoded_output = outputs[1:][:output_len].tolist()
decoded_output = trg_sp.decode_ids(decoded_output)
print(f"Result: {decoded_output}")
print(f"Testing finished!")
The source sentence is embedded, positional encoded and processed by the encoder same with the training process.
But when testing, we should set the trained model to generate each target word sequentially since it is trained based on next word prediction.
So I first made outputs
tensor only initialized with sos_id
at its first space, which is the starting token.
The rest of it is filled with paddings.
This outputs
tensor becomes the decoder input from now on.
So the decoder makes new output with a generated next word, then we should take and add it to our outputs
tensor.
After the new word is inserted, the updated outputs
tensor goes into the decoder again.
This iterations go on and on until the model generates the end token.
In training, transformers are trained relatively faster than other traditional seq2seq architectures because they get input sequences without RNN.
But as we can see, actual generation after the training is not the case since the model should conducts decoder operations sequentially.
Therefore, although the time complexity of training a transformer is not $O(N)$($N$ is the length of a sequence), that of testing becomes $O(N)$ eventually.
Anyway, this is the test result from the input “I love you.”
Considering that the translated output has the same meaning as input’s, this result is quite acceptable.
But this is the case that the input sentence is quite short.
A longer sentence like “If you’re good at something, never do it for free.” was translated into “Si vous foutez à quelque chose, jamais cela ne veut être gratuit”, which means that “If you trust something, it never wants to be free.” according to Google translate.
So there are some differences in meaning and this becomes worse as the length of an input gets longer.
For example, the model converted “Sometimes it is the people no one imagines anything of who do the things that no one can imagine.” into “Parfois, c’est le peuple qui n’est plus l’un des imaginations que personne ne peut imaginer”, which is “Sometimes it is the people who are no longer one of the imaginations that no one can imagine.” in English.
As we can see, little of the original context remains.
In my opinion, the model was not trained enough since the training loss value did not converge to certain point.
At the end of the training, the loss was still decreasing.
I think the training data and time were not enough, since generation tasks like NMT need huge amounts of data and longer training procedure to get decent performances.
With more resources and more improved environment, this limitations can be overcome, I think.
So this is the end of posts about Transformer.
We have reviewed the paper and seen how this model can be coded in Pytorch.
Of course the model itself was not perfect, but I will try to improve it once the conditions permit.
I think with more data and more sophisticated experiments, the performance can be improved.
The entire codes can be accessed here.