
API網關如何發展:更輕、更智能、云原生
首先,我們將逐塊構建Transformer模型的所有組件。然后,我們將組裝所有塊來構建我們的模型。之后,我們將使用從 Hugging Face 數據集中獲取的數據集訓練和驗證我們的模型。最后,我們將通過對新的翻譯文本數據執行翻譯來測試我們的模型。
重要提示:我將對 transformer 架構中的所有組件逐步進行拆解,并就概念、原因和方式提供必要的解釋。我還將對我認為需要解釋的逐行代碼提供評論。
? ? ? ? 為了使LLM模型能夠進行從英語到馬來語任務的翻譯,我們需要使用同時具有源(英語)和目標(馬來語)語言對的數據集。因此,我們將使用來自 Huggingface 的數據集,名為“Helsinki-NLP/opus-100”,它有 100 萬對英語-馬來語訓練數據集,足以獲得良好的準確性,并且在驗證和測試數據集中各有 2000 個數據。它已經預拆分,因此我們不必再次進行數據集拆分。
# Import necessary libraries
# Install datasets, tokenizers library if you've not done so yet (!pip install datasets, tokenizers).
import os
import math
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from pathlib import Path
from datasets import load_dataset
from tqdm import tqdm
# Assign device value as "cuda" to train on GPU if GPU is available. Otherwise it will fall back to default as "cpu".
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Loading train, validation, test dataset from huggingface path below.
raw_train_dataset = load_dataset("Helsinki-NLP/opus-100", "en-ms", split='train')
raw_validation_dataset = load_dataset("Helsinki-NLP/opus-100", "en-ms", split='validation')
raw_test_dataset = load_dataset("Helsinki-NLP/opus-100", "en-ms", split='test')
# Directory to store dataset files.
os.mkdir("./dataset-en")
os.mkdir("./dataset-my")
# Directory to save model during model training after each EPOCHS (in step 10).
os.mkdir("./malaygpt")
# Director to store source and target tokenizer.
os.mkdir("./tokenizer_en")
os.mkdir("./tokenizer_my")
dataset_en = []
dataset_my = []
file_count = 1
# In order to train the tokenizer (in step 2), we'll separate the training dataset into english and malay.
# Create multiple small file of size 50k data each and store into dataset-en and dataset-my directory.
for data in tqdm(raw_train_dataset["translation"]):
dataset_en.append(data["en"].replace('\n', " "))
dataset_my.append(data["ms"].replace('\n', " "))
if len(dataset_en) == 50000:
with open(f'./dataset-en/file{file_count}.txt', 'w', encoding='utf-8') as fp:
fp.write('\n'.join(dataset_en))
dataset_en = []
with open(f'./dataset-my/file{file_count}.txt', 'w', encoding='utf-8') as fp:
fp.write('\n'.join(dataset_my))
dataset_my = []
file_count += 1
? ? ? ??Transformer模型不能直接處理原始文本,它只處理數字。因此,我們必須做一些事情來將原始文本轉換為數字。為此,我們將使用一種流行的分詞器,稱為 BPE 分詞器,它是在 GPT3 等模型中使用的subword分詞器。我們將首先在語料庫數據(在本例中為訓練數據集)上訓練 BPE 分詞器,我們在步驟 1 中準備了該數據。流程如下圖所示:
訓練完成后,分詞器會生成英語和馬來語的詞匯表。詞匯表是語料庫數據中唯一token的集合。由于我們正在執行翻譯任務,因此我們需要兩種語言的分詞器。BPE 分詞器獲取原始文本,將其與詞匯表中的token映射,并為輸入原始文本中的每個單詞返回一個token。token可以是單個單詞或子單詞。這是subword分詞器相對于其他分詞器的優勢之一,因為它可以克服 OOV(out of vocabulary)問題。然后,分詞器在詞匯表中返回token的唯一索引或位置 ID,這將進一步用于創建嵌入,如上面的流程所示。
# import tokenzier library classes and modules.
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace
# path to the training dataset files which will be used to train tokenizer.
path_en = [str(file) for file in Path('./dataset-en').glob("**/*.txt")]
path_my = [str(file) for file in Path('./dataset-my').glob("**/*.txt")]
# [ Creating Source Language Tokenizer - English ].
# Additional special tokens are created such as [UNK] - to represent Unknown words, [PAD] - Padding token to maintain same sequence length across the model.
# [CLS] - token to denote start of sentence, [SEP] - token to denote end of sentence.
tokenizer_en = Tokenizer(BPE(unk_token="[UNK]"))
trainer_en = BpeTrainer(min_frequency=2, special_tokens=["[PAD]","[UNK]","[CLS]", "[SEP]", "[MASK]"])
# splitting tokens based on whitespace.
tokenizer_en.pre_tokenizer = Whitespace()
# Tokenizer trains the dataset files created in step 1
tokenizer_en.train(files=path_en, trainer=trainer_en)
# Save tokenizer for future use.
tokenizer_en.save("./tokenizer_en/tokenizer_en.json")
# [ Creating Target Language Tokenizer - Malay ].
tokenizer_my = Tokenizer(BPE(unk_token="[UNK]"))
trainer_my = BpeTrainer(min_frequency=2, special_tokens=["[PAD]","[UNK]","[CLS]", "[SEP]", "[MASK]"])
tokenizer_my.pre_tokenizer = Whitespace()
tokenizer_my.train(files=path_my, trainer=trainer_my)
tokenizer_my.save("./tokenizer_my/tokenizer_my.json")
tokenizer_en = Tokenizer.from_file("./tokenizer_en/tokenizer_en.json")
tokenizer_my = Tokenizer.from_file("./tokenizer_my/tokenizer_my.json")
# Getting size of both tokenizer.
source_vocab_size = tokenizer_en.get_vocab_size()
target_vocab_size = tokenizer_my.get_vocab_size()
# Define token-ids variables, we need this for training model.
CLS_ID = torch.tensor([tokenizer_my.token_to_id("[CLS]")], dtype=torch.int64).to(device)
SEP_ID = torch.tensor([tokenizer_my.token_to_id("[SEP]")], dtype=torch.int64).to(device)
PAD_ID = torch.tensor([tokenizer_my.token_to_id("[PAD]")], dtype=torch.int64).to(device)
? ? ? ?在此步驟中,我們將為源語言和目標語言準備數據集,稍后將用于訓練和驗證我們將要構建的模型。我們將創建一個接受原始數據集的類,并定義一個函數,該函數使用源 (tokenizer_en) 和目標 (tokenizer_my) 分詞器分別對源文本和目標文本進行編碼。最后,我們將為訓練和驗證數據集創建一個 DataLoader,該數據集批量迭代數據集(在我們的示例中,批大小將設置為 10)。批處理大小可以根據數據大小和可用處理能力進行更改。
# This class takes raw dataset and max_seq_len (maximum length of a sequence in the entire dataset).
class EncodeDataset(Dataset):
def __init__(self, raw_dataset, max_seq_len):
super().__init__()
self.raw_dataset = raw_dataset
self.max_seq_len = max_seq_len
def __len__(self):
return len(self.raw_dataset)
def __getitem__(self, index):
# Fetching raw text for the given index that consists of source and target pair.
raw_text = self.raw_dataset[index]
# Separating text to source and target text and will be later used for encoding.
source_text = raw_text["en"]
target_text = raw_text["ms"]
# Encoding source text with source tokenizer(tokenizer_en) and target text with target tokenizer(tokenizer_my).
source_text_encoded = torch.tensor(tokenizer_en.encode(source_text).ids, dtype = torch.int64).to(device)
target_text_encoded = torch.tensor(tokenizer_my.encode(target_text).ids, dtype = torch.int64).to(device)
# To train the model, the sequence lenth of each input sequence should be equal max seq length.
# Hence additional number of padding will be added to the input sequence if the length is less than the max_seq_len.
num_source_padding = self.max_seq_len - len(source_text_encoded) - 2
num_target_padding = self.max_seq_len - len(target_text_encoded) - 1
encoder_padding = torch.tensor([PAD_ID] * num_source_padding, dtype = torch.int64).to(device)
decoder_padding = torch.tensor([PAD_ID] * num_target_padding, dtype = torch.int64).to(device)
# encoder_input has the first token as start of sentence - CLS_ID, followed by source encoding which is then followed by the end of sentence token - SEP.
# To reach the required max_seq_len, addition PAD token will be added at the end.
encoder_input = torch.cat([CLS_ID, source_text_encoded, SEP_ID, encoder_padding]).to(device)
# decoder_input has the first token as start of sentence - CLS_ID, followed by target encoding.
# To reach the required max_seq_len, addition PAD token will be added at the end. There is no end of sentence token - SEP in decoder_input.
decoder_input = torch.cat([CLS_ID, target_text_encoded, decoder_padding ]).to(device)
# target_label has the first token as target encoding followed by end of sentence token - SEP. There is no start of sentence token - CLS in target label.
# To reach the required max_seq_len, addition PAD token will be added at the end.
target_label = torch.cat([target_text_encoded,SEP_ID,decoder_padding]).to(device)
# As we've added extra padding token with input encoding, during training, we don't want this token to be trained by model as there is nothing to learn in this token.
# So, we'll use encoder mask to nullify the padding token value prior to calculating output of self attention in encoder block.
encoder_mask = (encoder_input != PAD_ID).unsqueeze(0).unsqueeze(0).int().to(device)
# We also don't want any token to get influenced by the future token during the decoding stage. Hence, Causal mask is being implemented during masked multihead attention to handle this.
decoder_mask = (decoder_input != PAD_ID).unsqueeze(0).unsqueeze(0).int() & causal_mask(decoder_input.size(0)).to(device)
return {
'encoder_input': encoder_input,
'decoder_input': decoder_input,
'target_label': target_label,
'encoder_mask': encoder_mask,
'decoder_mask': decoder_mask,
'source_text': source_text,
'target_text': target_text
}
# Causal mask will make sure any token that comes after the current token will be masked, meaning the value will be replaced by -ve infinity which will be converted to zero or close to zero after softmax function.
# Hence the model will just ignore these value or willn't be able to learn anything from these values.
def causal_mask(size):
# dimension of causal mask (batch_size, seq_len, seq_len)
mask = torch.triu(torch.ones(1, size, size), diagonal = 1).type(torch.int)
return mask == 0
# To calculate the max sequence lenth in the entire training dataset for the source and target dataset.
max_seq_len_source = 0
max_seq_len_target = 0
for data in raw_train_dataset["translation"]:
enc_ids = tokenizer_en.encode(data["en"]).ids
dec_ids = tokenizer_my.encode(data["ms"]).ids
max_seq_len_source = max(max_seq_len_source, len(enc_ids))
max_seq_len_target = max(max_seq_len_target, len(dec_ids))
print(f'max_seqlen_source: {max_seq_len_source}') #530
print(f'max_seqlen_target: {max_seq_len_target}') #526
# To simplify the training process, we'll just take single max_seq_len and add 20 to cover the additional length of tokens such as PAD, CLS, SEP in the sequence.
max_seq_len = 550
# Instantiate the EncodeRawDataset class and create the encoded train and validation-dataset.
train_dataset = EncodeDataset(raw_train_dataset["translation"], max_seq_len)
val_dataset = EncodeDataset(raw_validation_dataset["translation"], max_seq_len)
# Creating DataLoader wrapper for both training and validation dataset. This dataloader will be used later stage during training and validation of our LLM model.
train_dataloader = DataLoader(train_dataset, batch_size = 10, shuffle = True, generator=torch.Generator(device='cuda'))
val_dataloader = DataLoader(val_dataset, batch_size = 1, shuffle = True, generator=torch.Generator(device='cuda'))
Input Embedding:步驟 2 中從分詞器生成的token ID 序列將被輸入到嵌入層。嵌入層將 token-id 映射到詞匯表,并為每個標記生成一個維度為 512 的嵌入向量[ 512 取自注意論文]。嵌入向量可以根據已訓練的訓練數據集捕獲令牌的語義含義。嵌入向量中的每個維度值都表示與令牌相關的某種特征。例如,如果標記是 Dog,則某個維度值將表示眼睛、嘴巴、腿、身高等。如果我們在 n 維空間中繪制一個向量,則外觀相似的物體(如狗和貓)將彼此靠近,而外觀不相似的物體(例如學校、家庭嵌入向量)將位于更遠的地方。
Positional Encoding:Transformer架構的優點之一是它可以并行處理任意數量的輸入序列,從而減少了大量的訓練時間,也使預測速度更快。但是,一個缺點是,在并行處理多個token序列時,token在句子中的位置不會按順序排列。這可能會導致句子的不同含義或上下文,這取決于token的位置。因此,為了解決這個問題,本文采用了位置編碼方法。該論文建議在每個token的 512維的索引級別上應用兩個數學函數(一個是 sin,一個是cosine)。下面是簡單的正弦和余弦數學函數。
? Sin 函數應用于每個偶數維值,而cosine函數應用于嵌入向量的奇數維值。最后,將生成的位置編碼器矢量添加到嵌入向量中。現在,我們有了嵌入向量,它可以捕獲token的語義含義以及token的位置。請注意,位置編碼的值在每個序列中都保持不變。
# Input embedding and positional encoding
class EmbeddingLayer(nn.Module):
def __init__(self, vocab_size: int, d_model: int):
super().__init__()
self.d_model = d_model
# Using pytorch embedding layer module to map token id to vocabulary and then convert into embeeding vector.
# The vocab_size is the vocabulary size of the training dataset created by tokenizer during training of corpus dataset in step 2.
self.embedding = nn.Embedding(vocab_size, d_model)
def forward(self, input):
# In addition of feeding input sequence to the embedding layer, the extra multiplication by square root of d_model is done to normalize the embedding layer output
embedding_output = self.embedding(input) * math.sqrt(self.d_model)
return embedding_output
class PositionalEncoding(nn.Module):
def __init__(self, max_seq_len: int, d_model: int, dropout_rate: float):
super().__init__()
self.dropout = nn.Dropout(dropout_rate)
# We're creating a matrix of the same shape as embedding vector.
pe = torch.zeros(max_seq_len, d_model)
# Calculate the position part of PE functions.
pos = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)
# Calculate the division part of PE functions. Take note that the div part expression is slightly different that papers expression as this exponential functions seems to works better.
div_term = torch.exp(torch.arange(0, d_model, 2).float()) * (-math.log(10000)/d_model)
# Fill in the odd and even matrix value with the sin and cosine mathematical function results.
pe[:, 0::2] = torch.sin(pos * div_term)
pe[:, 1::2] = torch.cos(pos * div_term)
# Since we're expecting the input sequences in batches so the extra batch_size dimension is added in 0 postion.
pe = pe.unsqueeze(0)
def forward(self, input_embdding):
# Add positional encoding together with the input embedding vector.
input_embdding = input_embdding + (self.pe[:, :input_embdding.shape[1], :]).requires_grad_(False)
# Perform dropout to prevent overfitting.
return self.dropout(input_embdding)
? ? ? ? 就像 Transformer 是LLM心臟一樣,self-attention機制是 Transformer 架構的核心。
? ? ? ? 那么,為什么你需要self-attention呢?讓我們用下面的一個簡單的例子來回答這個問題。
在第 1 句和第 2 句中,“bank”一詞顯然有兩種不同的含義。但是,“bank”一詞的嵌入值在兩個句子中是相同的。這是不對的。我們希望根據句子的上下文更改嵌入值。因此,我們需要一種機制,使嵌入值可以動態變化,以根據句子的整體含義給出上下文含義。self-attention機制可以動態更新嵌入值,可以根據句子表示上下文含義。
? ? ? ? ?如果self-attention已經這么好了,為什么我們還需要多頭自我注意力呢?讓我們看看下面的另一個例子來找出答案。
在這個例子中,如果我們使用self-attention,它可能只關注句子的一個方面,也許只是一個“what”方面,因為它只能捕捉到“What did John do?”,但是,其他方面,例如“when”或“where”,對于模型學習同樣重要。因此,我們需要找到一種方法,讓self-attention機制同時學習句子中的多重關系。因此,這就是Multi-Head Self Attention(多頭注意力可以互換使用)的用武之地。在Multi-Head Self Attention中,單頭嵌入將分為多個頭,以便每個頭都會查看句子的不同方面并相應地學習,這就是我們想要的。
現在,我們知道為什么需要Multi-Head Self Attention。讓我們看看Multi-Head Self Attention實際上是如何工作的?
? ? ? ? 如果您對矩陣乘法了解的話,那么理解其機制對您來說是一項非常容易的任務。讓我們先看一下整個流程圖,下面我將逐點描述從輸入到輸出的流程。
1. 首先,讓我們制作編碼器輸入的 3 個副本(輸入嵌入和位置編碼的組合,我們在步驟 4 中已經完成)。讓我們給每個副本起一個名字 Q、K 和 V。它們中的每一個都只是編碼器輸入的副本。編碼器輸入形狀:(seq_len,d_model),seq_len:最大序列長度,d_model:在這種情況下嵌入向量維度為512。
2. 接下來,我們將執行 Q 與權重 W_q、K 與權重 W_k 和 V 與權重 W_v 的矩陣乘法。每個權重矩陣的形狀為(d_model, d_model)。生成的新query、key和value嵌入向量的形狀為(seq_len, d_model)。權重參數將由模型隨機初始化,稍后將隨著模型開始訓練而更新。為什么我們首先需要權重矩陣乘法?因為這些是query、key和value嵌入向量所需的可學習參數,以提供更好的表示。
3. 根據self-attention論文,頭數為 8 個。每個新的query、key和value嵌入向量將被劃分為 8 個較小的查詢、鍵和值嵌入向量單元。嵌入向量的新形狀為 (seq_len, d_model/num_heads) 或 (seq_len, d_k)。[ d_k = d_model/num_heads ]。
4. 每個查詢嵌入向量將執行點積操作,轉置其自身的鍵嵌入向量和序列中的所有其他嵌入向量。這個點積給出了注意力分數。注意力分數顯示給定token與給定輸入序列中所有其他token的相似程度。分數越高,相似性越高。
5. 然后將softmax函數應用于注意力得分矩陣,并輸出形狀為(seq_len,seq_len)的權重矩陣。
6. 然后,這些權重矩陣將與相應的值嵌入向量相乘。這將產生 8 個形狀為 (seq_len, d_v) 的注意力頭。[ d_v = d_model/num_heads ]。
7. 最后,所有頭將連接成一個具有新形狀(seq_len、d_model)的磁頭。這個新的單頭將矩陣乘以輸出權重矩陣 W_o (d_model, d_model)。Multi-Head Attention 的最終輸出代表了單詞的上下文含義以及學習輸入句子多個方面的能力。
下面,讓我們開始編寫 Multi-Head attention block。
class MultiHeadAttention(nn.Module):
def __init__(self, d_model: int, num_heads: int, dropout_rate: float):
super().__init__()
# Define dropout to prevent overfitting.
self.dropout = nn.Dropout(dropout_rate)
# Weight matrix are introduced and are all learnable parameters.
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)
self.num_heads = num_heads
assert d_model % num_heads == 0, "d_model must be divisible by number of heads"
# d_k is the new dimension of each splitted self attention heads
self.d_k = d_model // num_heads
def forward(self, q, k, v, encoder_mask=None):
# We'll be training our model with multiple batches of sequence at once in parallel, hence we'll need to include batch_size in the shape as well.
# query, key and value are calculated by matrix multiplication of corresponding weights with the input embeddings.
# Change of shape: q(batch_size, seq_len, d_model) @ W_q(d_model, d_model) => query(batch_size, seq_len, d_model) [same goes to key and value].
query = self.W_q(q)
key = self.W_k(k)
value = self.W_v(v)
# Splitting query, key and value into number of heads. d_model is splitted in d_k across 8 heads.
# Change of shape: query(batch_size, seq_len, d_model) => query(batch_size, seq_len, num_heads, d_k) -> query(batch_size,num_heads, seq_len,d_k) [same goes to key and value].
query = query.view(query.shape[0], query.shape[1], self.num_heads ,self.d_k).transpose(1,2)
key = key.view(key.shape[0], key.shape[1], self.num_heads ,self.d_k).transpose(1,2)
value = value.view(value.shape[0], value.shape[1], self.num_heads ,self.d_k).transpose(1,2)
# :: SELF ATTENTION BLOCK STARTS ::
# Attention score is calculated to find the similarity or relation between query with key of itself and all other embedding in the sequence.
# Change of shape: query(batch_size,num_heads, seq_len,d_k) @ key(batch_size,num_heads, seq_len,d_k) => attention_score(batch_size,num_heads, seq_len,seq_len).
attention_score = (query @ key.transpose(-2,-1))/math.sqrt(self.d_k)
# If mask is provided, the attention score needs to modify as per the mask value. Refer to the details in point no 4.
if encoder_mask is not None:
attention_score = attention_score.masked_fill(encoder_mask==0, -1e9)
# Softmax function calculates the probability distribution among all the attention scores. It assign higher probabiliy value to higher attention score. Meaning more similar tokens get higher probability value.
# Change of shape: same as attention_score
attention_weight = torch.softmax(attention_score, dim=-1)
if self.dropout is not None:
attention_weight = self.dropout(attention_weight)
# Final step in Self attention block is, matrix multiplication of attention_weight with Value embedding vector.
# Change of shape: attention_score(batch_size,num_heads, seq_len,seq_len) @ value(batch_size,num_heads, seq_len,d_k) => attention_output(batch_size,num_heads, seq_len,d_k)
attention_output = attention_score @ value
# :: SELF ATTENTION BLOCK ENDS ::
# Now, all the heads will be combined back to a single head
# Change of shape:attention_output(batch_size,num_heads, seq_len,d_k) => attention_output(batch_size,seq_len,num_heads,d_k) => attention_output(batch_size,seq_len,d_model)
attention_output = attention_output.transpose(1,2).contiguous().view(attention_output.shape[0], -1, self.num_heads * self.d_k)
# Finally attention_output is matrix multiplied with output weight matrix to give the final Multi-Head attention output.
# The shape of the multihead_output is same as the embedding input
# Change of shape: attention_output(batch_size,seq_len,d_model) @ W_o(d_model, d_model) => multihead_output(batch_size, seq_len, d_model)
multihead_output = self.W_o(attention_output)
return multihead_output
Feedfoward Network:Feedfoward Network使用深度神經網絡來學習兩個線性層(第一層有d_model個節點,第二層有d_ff節點,根據注意論文分配的值)的所有特征,并將 ReLU 激活函數應用于第一線性層的輸出,為嵌入值提供非線性,并應用 dropout 以進一步避免過擬合。
LayerNorm:我們對嵌入值應用層歸一化,以確保網絡中嵌入向量的值分布保持一致。這確保了學習的順利進行。我們將使用稱為 gamma 和 beta 的額外學習參數來根據網絡需要擴展和移動嵌入值。
Add&Norm:這包括殘差連接和層規范化(前面已說明)。在前向傳遞期間,殘差連接可確保在后期仍能記住前一層中的要素,從而在計算輸出時做出必要的貢獻。同樣,在向后傳播期間,殘差連接通過在每個階段減少執行一次反向傳播來確保防止梯度消失。AddAndNorm 在編碼器(2 次)和解碼器塊(3 次)中使用。它從上一層獲取輸入,先對其進行規范化,然后再將其添加到上一層的輸出中。
# Feedfoward Network, Layer Normalization and AddAndNorm Block
class FeedForward(nn.Module):
def __init__(self, d_model: int, d_ff: int, dropout_rate: float):
super().__init__()
self.layer_1 = nn.Linear(d_model, d_ff)
self.activation_1 = nn.ReLU()
self.dropout = nn.Dropout(dropout_rate)
self.layer_2 = nn.Linear(d_ff, d_model)
def forward(self, input):
return self.layer_2(self.dropout(self.activation_1(self.layer_1(input))))
class LayerNorm(nn.Module):
def __init__(self, eps: float = 1e-5):
super().__init__()
#Epsilon is a very small value and it plays an important role to prevent potentially division by zero problem.
self.eps = eps
#Extra learning parameters gamma and beta are introduced to scale and shift the embedding value as the network needed.
self.gamma = nn.Parameter(torch.ones(1))
self.beta = nn.Parameter(torch.zeros(1))
def forward(self, input):
mean = input.mean(dim=-1, keepdim=True)
std = input.std(dim=-1, keepdim=True)
return self.gamma * ((input - mean)/(std + self.eps)) + self.beta
class AddAndNorm(nn.Module):
def __init__(self, dropout_rate: float):
super().__init__()
self.dropout = nn.Dropout(dropout_rate)
self.layer_norm = LayerNorm()
def forward(self, input, sub_layer):
return input + self.dropout(sub_layer(self.layer_norm(input)))
Encoder Block:編碼器塊內部有兩個主要組件:Multi-Head Attention和Feedforward。還有 2 個單元的 Add & Norm。我們將首先按照 Attention 白皮書中的流程將所有這些組件組裝到 EncoderBlock 類中。根據論文,該編碼器塊已重復 6 次。
Encoder:然后,我們將創建一個名為 Encoder 的附加類,該類將獲取 EncoderBlock 的列表并將其堆疊并給出最終的 Encoder 輸出。
class EncoderBlock(nn.Module):
def __init__(self, multihead_attention: MultiHeadAttention, feed_forward: FeedForward, dropout_rate: float):
super().__init__()
self.multihead_attention = multihead_attention
self.feed_forward = feed_forward
self.add_and_norm_list = nn.ModuleList([AddAndNorm(dropout_rate) for _ in range(2)])
def forward(self, encoder_input, encoder_mask):
# First AddAndNorm unit taking encoder input from skip connection and adding it with the output of MultiHead attention block.
encoder_input = self.add_and_norm_list[0](encoder_input, lambda encoder_input: self.multihead_attention(encoder_input, encoder_input, encoder_input, encoder_mask))
# Second AddAndNorm unit taking output of MultiHead attention block from skip connection and adding it with the output of Feedforward layer.
encoder_input = self.add_and_norm_list[1](encoder_input, self.feed_forward)
return encoder_input
class Encoder(nn.Module):
def __init__(self, encoderblocklist: nn.ModuleList):
super().__init__()
# Encoder class is initialized by taking encoderblock list.
self.encoderblocklist = encoderblocklist
self.layer_norm = LayerNorm()
def forward(self, encoder_input, encoder_mask):
# Looping through all the encoder block - 6 times.
for encoderblock in self.encoderblocklist:
encoder_input = encoderblock(encoder_input, encoder_mask)
# Normalize the final encoder block output and return. This encoder output will be used later on as key and value for the cross attention in decoder block.
encoder_output = self.layer_norm(encoder_input)
return encoder_output
Decoder Block:解碼器塊中有三個主要組件:Masked Multi-Head Attention、Multi-Head Attention 和 Feedforward。解碼器塊也有 3 個單元的 Add & Norm。我們將按照 Attention 論文中的流程將所有這些組件組裝到 DecoderBlock 類中。根據論文,這個解碼器塊重復了 6 次。
Decoder:我們將創建名為 Decoder 的附加類,該類將獲取 DecoderBlock 的列表,對其進行堆疊,并提供最終的解碼器輸出。
Decoder Block中有兩種類型的Multi-Head Attention。第一個是Masked Multi-Head Attention,它接受解碼器輸入作為query、key和value以及解碼器掩碼(也稱為因果掩碼)。因果掩碼可防止模型查看按序列順序排列的嵌入。步驟 3 和步驟 5 中提供了有關其工作原理的詳細說明。第二個是Cross Attention,它接受解碼器輸入作為query,而key和value則來自編碼器,計算方式與self-attention類似。
Projection Layer:最終的解碼器輸出將傳遞到投影層。在此層中,解碼器輸出將首先饋送到線性層中,其中嵌入的形狀將發生變化,如下面的代碼部分所示。隨后,softmax 函數將解碼器輸出轉換為詞匯表上的概率分布,并選擇概率最高的標記作為預測輸出。
class DecoderBlock(nn.Module):
def __init__(self, masked_multihead_attention: MultiHeadAttention,multihead_attention: MultiHeadAttention, feed_forward: FeedForward, dropout_rate: float):
super().__init__()
self.masked_multihead_attention = masked_multihead_attention
self.multihead_attention = multihead_attention
self.feed_forward = feed_forward
self.add_and_norm_list = nn.ModuleList([AddAndNorm(dropout_rate) for _ in range(3)])
def forward(self, decoder_input, decoder_mask, encoder_output, encoder_mask):
# First AddAndNorm unit taking decoder input from skip connection and adding it with the output of Masked Multi-Head attention block.
decoder_input = self.add_and_norm_list[0](decoder_input, lambda decoder_input: self.masked_multihead_attention(decoder_input,decoder_input, decoder_input, decoder_mask))
# Second AddAndNorm unit taking output of Masked Multi-Head attention block from skip connection and adding it with the output of MultiHead attention block.
decoder_input = self.add_and_norm_list[1](decoder_input, lambda decoder_input: self.multihead_attention(decoder_input,encoder_output, encoder_output, encoder_mask)) # cross attention
# Third AddAndNorm unit taking output of MultiHead attention block from skip connection and adding it with the output of Feedforward layer.
decoder_input = self.add_and_norm_list[2](decoder_input, self.feed_forward)
return decoder_input
class Decoder(nn.Module):
def __init__(self,decoderblocklist: nn.ModuleList):
super().__init__()
self.decoderblocklist = decoderblocklist
self.layer_norm = LayerNorm()
def forward(self, decoder_input, decoder_mask, encoder_output, encoder_mask):
for decoderblock in self.decoderblocklist:
decoder_input = decoderblock(decoder_input, decoder_mask, encoder_output, encoder_mask)
decoder_output = self.layer_norm(decoder_input)
return decoder_output
class ProjectionLayer(nn.Module):
def __init__(self, vocab_size: int, d_model: int):
super().__init__()
self.projection_layer = nn.Linear(d_model, vocab_size)
def forward(self, decoder_output):
# Projection layer first take in decoder output and passed into the linear layer of shape (d_model, vocab_size)
# Change in shape: decoder_output(batch_size, seq_len, d_model) @ linear_layer(d_model, vocab_size) => output(batch_size, seq_len, vocab_size)
output = self.projection_layer(decoder_output)
# softmax function to output the probability distribution over the vocabulary
return torch.log_softmax(output, dim=-1)
最后,我們完成了transformer架構中所有組件塊的構建。唯一懸而未決的任務是將它們組裝在一起。
首先,我們創建一個 Transformer 類,該類將初始化組件類的所有實例。在 transformer 類中,我們將首先定義 encode 函數,該函數執行 transformer 編碼器部分的所有任務并生成編碼器輸出。
其次,我們定義了一個decode函數,該函數執行 transformer 解碼器部分的所有任務并生成解碼器輸出。
第三,我們定義了一個project函數,它接收解碼器的輸出,并將輸出映射到詞匯表進行預測。
? ? ? ? 現在,transformer架構已經準備就緒。現在,我們可以通過定義一個函數來構建我們的轉換LLM模型,該函數包含以下代碼中給出的所有必要參數。
class Transformer(nn.Module):
def __init__(self, source_embed: EmbeddingLayer, target_embed: EmbeddingLayer, positional_encoding: PositionalEncoding, multihead_attention: MultiHeadAttention, masked_multihead_attention: MultiHeadAttention, feed_forward: FeedForward, encoder: Encoder, decoder: Decoder, projection_layer: ProjectionLayer, dropout_rate: float):
super().__init__()
# Initialize instances of all the component class of transformer architecture.
self.source_embed = source_embed
self.target_embed = target_embed
self.positional_encoding = positional_encoding
self.multihead_attention = multihead_attention
self.masked_multihead_attention = masked_multihead_attention
self.feed_forward = feed_forward
self.encoder = encoder
self.decoder = decoder
self.projection_layer = projection_layer
self.dropout = nn.Dropout(dropout_rate)
# Encode function takes in encoder input, does necessary processing inside all encoder blocks and gives encoder output.
def encode(self, encoder_input, encoder_mask):
encoder_input = self.source_embed(encoder_input)
encoder_input = self.positional_encoding(encoder_input)
encoder_output = self.encoder(encoder_input, encoder_mask)
return encoder_output
# Decode function takes in decoder input, does necessary processing inside all decoder blocks and gives decoder output.
def decode(self, decoder_input, decoder_mask, encoder_output, encoder_mask):
decoder_input = self.target_embed(decoder_input)
decoder_input = self.positional_encoding(decoder_input)
decoder_output = self.decoder(decoder_input, decoder_mask, encoder_output, encoder_mask)
return decoder_output
# Projec function takes in decoder output into its projection layer and maps the output to the vocabulary for prediction.
def project(self, decoder_output):
return self.projection_layer(decoder_output)
def build_model(source_vocab_size, target_vocab_size, max_seq_len=1135, d_model=512, d_ff=2048, num_heads=8, num_blocks=6, dropout_rate=0.1):
# Define and assign all the parameters value needed for the transformer architecture
source_embed = EmbeddingLayer(source_vocab_size, d_model)
target_embed = EmbeddingLayer(target_vocab_size, d_model)
positional_encoding = PositionalEncoding(max_seq_len, d_model, dropout_rate)
multihead_attention = MultiHeadAttention(d_model, num_heads, dropout_rate)
masked_multihead_attention = MultiHeadAttention(d_model, num_heads, dropout_rate)
feed_forward = FeedForward(d_model, d_ff, dropout_rate)
projection_layer = ProjectionLayer(target_vocab_size, d_model)
encoder_block = EncoderBlock(multihead_attention, feed_forward, dropout_rate)
decoder_block = DecoderBlock(masked_multihead_attention,multihead_attention, feed_forward, dropout_rate)
encoderblocklist = []
decoderblocklist = []
for _ in range(num_blocks):
encoderblocklist.append(encoder_block)
for _ in range(num_blocks):
decoderblocklist.append(decoder_block)
encoderblocklist = nn.ModuleList(encoderblocklist)
decoderblocklist = nn.ModuleList(decoderblocklist)
encoder = Encoder(encoderblocklist)
decoder = Decoder(decoderblocklist)
# Instantiate the transformer class by providing all the parameters values
model = Transformer(source_embed, target_embed, positional_encoding, multihead_attention, masked_multihead_attention,feed_forward, encoder, decoder, projection_layer, dropout_rate)
for param in model.parameters():
if param.dim() > 1:
nn.init.xavier_uniform_(param)
return model
# Finally, call build model and assign it to model variable.
# This model is now fully ready to train and validate our dataset.
# After training and validation, we can perform new translation task using this very model
model = build_model(source_vocab_size, target_vocab_size)
現在是訓練我們的模型的時候了。訓練過程非常簡單,我們將使用在步驟 3 中創建的訓練 DataLoader。由于訓練數據集總數為 100 萬,我強烈建議在 GPU 設備上訓練我們的模型。我花了大約 5 個小時才完成 20 個epoch。在每個 epoch 之后,我們將保存模型權重以及優化器狀態,以便從停止前的點恢復訓練,而不是從頭開始恢復訓練。
在每個epoch之后,我們將使用驗證 DataLoader 啟動驗證。驗證數據集的大小為 2000,這是相當合理的。在驗證過程中,我們只需要計算一次編碼器輸出,直到解碼器輸出獲得句子末尾標記 [SEP],這是因為在解碼器獲得 [SEP] 標記之前,我們會發送相同的內容給編碼器輸出,這沒有意義。
? ? ? ? 解碼器輸入將首先從句子標記 [CLS] 的開頭開始。每次預測后,解碼器輸入將附加下一個生成的標記,直到達到句子標記 [SEP] 的末尾。最后,投影圖層將輸出映射到相應的文本表示。
def training_model(preload_epoch=None):
# The entire training, validation cycle will run for 20 times.
EPOCHS = 20
initial_epoch = 0
global_step = 0
# Adam is one of the most commonly used optimization algorithms that hold the current state and will update the parameters based on the computed gradients.
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
# If the preload_epoch is not none, that means the training will start with the weights, optimizer that has been last saved. The new epoch number will be preload epoch + 1.
if preload_epoch is not None:
model_filename = f"./malaygpt/model_{preload_epoch}.pt"
state = torch.load(model_filename)
initial_epoch = state['epoch'] + 1
optimizer.load_state_dict(state['optimizer_state_dict'])
global_step = state['global_step']
# The CrossEntropyLoss loss function computes the difference between the projection output and target label.
loss_fn = nn.CrossEntropyLoss(ignore_index = tokenizer_en.token_to_id("[PAD]"), label_smoothing=0.1).to(device)
for epoch in range(initial_epoch, EPOCHS):
# ::: Start of Training block :::
model.train()
# training with the training dataloder prepared in step 3.
for batch in tqdm(train_dataloader):
encoder_input = batch['encoder_input'].to(device) # (batch_size, seq_len)
decoder_input = batch['decoder_input'].to(device) # (batch_size, seq_len)
target_label = batch['target_label'].to(device) # (batch_size, seq_len)
encoder_mask = batch['encoder_mask'].to(device)
decoder_mask = batch['decoder_mask'].to(device)
encoder_output = model.encode(encoder_input, encoder_mask)
decoder_output = model.decode(decoder_input, decoder_mask, encoder_output, encoder_mask)
projection_output = model.project(decoder_output)
# projection_output(batch_size, seq_len, vocab_size)
loss = loss_fn(projection_output.view(-1, projection_output.shape[-1]), target_label.view(-1))
# backward pass
optimizer.zero_grad()
loss.backward()
# update weights
optimizer.step()
global_step += 1
print(f'Epoch [{epoch+1}/{EPOCHS}]: Train Loss: {loss.item():.2f}')
# save the state of the model after every epoch
model_filename = f"./malaygpt/model_{epoch}.pt"
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'global_step': global_step
}, model_filename)
# ::: End of Training block :::
# ::: Start of Validation block :::
model.eval()
with torch.inference_mode():
for batch in tqdm(val_dataloader):
encoder_input = batch['encoder_input'].to(device) # (batch_size, seq_len)
encoder_mask = batch['encoder_mask'].to(device)
source_text = batch['source_text']
target_text = batch['target_text']
# Computing the output of the encoder for the source sequence.
encoder_output = model.encode(encoder_input, encoder_mask)
# for prediction task, the first token that goes in decoder input is the [CLS] token
decoder_input = torch.empty(1,1).fill_(tokenizer_my.token_to_id('[CLS]')).type_as(encoder_input).to(device)
# since we need to keep adding the output back to the input until the [SEP] - end token is received.
while True:
# check if the max length is received, if it is, then we stop.
if decoder_input.size(1) == max_seq_len:
break
# Recreate mask each time the new output is added the decoder input for next token prediction
decoder_mask = causal_mask(decoder_input.size(1)).type_as(encoder_mask).to(device)
decoder_output = model.decode(decoder_input,decoder_mask,encoder_output,encoder_mask)
# Apply projection only to the next token.
projection = model.project(decoder_output[:, -1])
# Select the token with highest probablity which is a called greedy search implementation.
_, new_token = torch.max(projection, dim=1)
new_token = torch.empty(1,1). type_as(encoder_input).fill_(new_token.item()).to(device)
# Add the new token back to the decoder input.
decoder_input = torch.cat([decoder_input, new_token], dim=1)
# Check if the new token is the end of token, then we stop if received [SEP].
if new_token == tokenizer_my.token_to_id('[SEP]'):
break
# Assigned decoder output as the fully appended decoder input.
decoder_output = decoder_input.sequeeze(0)
model_predicted_text = tokenizer_my.decode(decoder_output.detach().cpu.numpy())
print(f'SOURCE TEXT": {source_text}')
print(f'TARGET TEXT": {target_text}')
print(f'PREDICTED TEXT": {model_predicted_text}')
# ::: End of Validation block :::
# This function runs the training and validation for 20 epochs
training_model(preload_epoch=None)
我們將為我們的翻譯函數提供一個新的通用名稱,稱為 malaygpt。這接受用戶輸入的英語原始文本,并輸出馬來語的翻譯文本。讓我們運行該函數并嘗試一下。
def malaygpt(user_input_text):
model.eval()
with torch.inference_mode():
user_input_text = user_input_text.strip()
user_input_text_encoded = torch.tensor(tokenizer_en.encode(user_input_text).ids, dtype = torch.int64).to(device)
num_source_padding = max_seq_len - len(user_input_text_encoded) - 2
encoder_padding = torch.tensor([PAD_ID] * num_source_padding, dtype = torch.int64).to(device)
encoder_input = torch.cat([CLS_ID, user_input_text_encoded, SEP_ID, encoder_padding]).to(device)
encoder_mask = (encoder_input != PAD_ID).unsqueeze(0).unsqueeze(0).int().to(device)
# Computing the output of the encoder for the source sequence
encoder_output = model.encode(encoder_input, encoder_mask)
# for prediction task, the first token that goes in decoder input is the [CLS] token
decoder_input = torch.empty(1,1).fill_(tokenizer_my.token_to_id('[CLS]')).type_as(encoder_input).to(device)
# since we need to keep adding the output back to the input until the [SEP] - end token is received.
while True:
# check if the max length is received
if decoder_input.size(1) == max_seq_len:
break
# recreate mask each time the new output is added the decoder input for next token prediction
decoder_mask = causal_mask(decoder_input.size(1)).type_as(encoder_mask).to(device)
decoder_output = model.decode(decoder_input,decoder_mask,encoder_output,encoder_mask)
# apply projection only to the next token
projection = model.project(decoder_output[:, -1])
# select the token with highest probablity which is a greedy search implementation
_, new_token = torch.max(projection, dim=1)
new_token = torch.empty(1,1). type_as(encoder_input).fill_(new_token.item()).to(device)
# add the new token back to the decoder input
decoder_input = torch.cat([decoder_input, new_token], dim=1)
# check if the new token is the end of token
if new_token == tokenizer_my.token_to_id('[SEP]'):
break
# final decoder out is the concatinated decoder input till the end token
decoder_output = decoder_input.sequeeze(0)
model_predicted_text = tokenizer_my.decode(decoder_output.detach().cpu.numpy())
return model_predicted_text
測試時間!讓我們做一些翻譯測試。
文章轉自微信公眾號@ArronAI