烏夜啼 李煜昨夜風兼雨,簾幃颯颯秋聲。

詞是由一堆字組成的,那么一個簡單的想法,我們可以通過計算每個字后面出現各個字的概率。

然后根據這些概率,不斷的遞歸生成“下一個字”,生成的字多了,截斷一部分,就是一首詞了。

具體思路為:

完整的代碼如下(帶注釋版的見simplemodel_with_comments.py):

simplemodel.py

import random
random.seed(42) # 去掉此行,獲得隨機結果
prompt = "春江"max_new_token = 100
with open('ci.txt', 'r', encoding='utf-8') as f: text = f.read()
chars = sorted(list(set(text)))vocab_size = len(chars)stoi = { ch:i for i,ch in enumerate(chars) }itos = { i:ch for i,ch in enumerate(chars) }encode = lambda s: [stoi[c] for c in s]decode = lambda l: ''.join([itos[i] for i in l])
transition = [[0 for _ in range(vocab_size)] for _ in range(vocab_size)]
for i in range(len(text) - 1): current_token_id = encode(text[i])[0] next_token_id = encode(text[i + 1])[0] transition[current_token_id][next_token_id] += 1
generated_token = encode(prompt)
for i in range(max_new_token - 1): current_token_id = generated_token[-1] logits = transition[current_token_id] total = sum(logits) logits = [logit / total for logit in logits] next_token_id = random.choices(range(vocab_size), weights=logits, k=1)[0] generated_token.append(next_token_id) current_token_id = next_token_id
print(decode(generated_token))

直接通過python simplemodel.py 即可運行,去掉random.seed(42) 可以看到不同的輸出結果。

在我的mac電腦上耗時2秒,效果如下:

$ python simplemodel.py春江月 張先生疑被。
倦旅。清歌聲月邊、莼鱸清唱,盡一卮酒紅蕖花月,彩籠里繁蕊珠璣。只今古。浣溪月上賓鴻相照。乞團,煙渚瀾翻覆古1半吐,還在蓬瀛煙沼。木蘭花露弓刀,更任東南樓縹緲。黃柳,

這像是一首名為“春江月”、作者為“張先生疑被?!钡脑~,但其實我們只是實現了一個“下一個詞預測器”。

在代碼的眼里,只不過“春”字后面大概率是“江”,而“江”字后面大概率是“月”而已,它不知道什么是詞,甚至不知道什么是一首詞的開頭、結尾。

這個字符序列層面的“意義”,實際上是由讀者賦予的。

詞匯表 – tokenizer

我們的“詞匯表”,相當于LLM里的tokenizer,只不過我們直接使用ci.txt 里出現過的所有字符當做詞匯表用。我們的詞匯表只有6418個詞匯,而真正的LLM有更大的vocab_size,以及更高效的編碼,一些常用詞組直接對應1個token,比如下面是qwen2.5的tokenizer。

>>> from transformers import AutoTokenizer
>>> tokenizer = AutoTokenizer.from_pretained('Qwen/Qwen2.5-0.5B')
>>> tokenizer.vocab_size
151643
>>> tokenizer.encode("春江花月夜")
[99528, 69177, 99232, 9754, 99530]
>>> tokenizer.encode("阿里巴巴")
[107076]
>>> tokenizer.encode("阿里媽媽")
[102661, 101935]
>>> tokenizer.encode("人工智能")
[104455]
>>> tokenizer.decode([102661, 104455, 101935])
'阿里人工智能媽媽'

qwen2.5使用了一個大小為151643的詞匯表,其中常見的詞匯“阿里巴巴”、“人工智能”都只對應1個token,而在我們的詞匯表里,1個字符永遠對應1個token,編碼效率較低。

模型、訓練、推理

我們剛剛實現的“模型”,實際是就是自然語言N-gram模型中的“Bigram模型”。這是一種基于統計的語言模型,用于預測一個詞出現的概率,在這個模型中,假設句子中的每個字只依賴于其前面的一個字。具體的實現就是一個詞頻字典transition,而所謂的“訓練”過程就是遍歷所有數據,統計“下一個詞”出現的頻率。但我們的“推理”過程還是非常像真正的LLM的,步驟如下:

1、我們從transition 中獲取下一個token的logits(logits是機器學習中常用的術語,表示最后一層的原始輸出值),我們可以把logits[i]簡單理解為“下一個token_id是i的得分”,因此logits肯定是長度為vocab_size的字典;

2、獲得“得分字典”后,使用[logit / total for logit in logits] 做歸一化處理,這是為了下一步更好的做隨機采樣。在這里我們使用最簡單的線性歸一,不考慮total為0的情況;

3、根據歸一后的“得分字典”,使用random.choices 隨機獲取一個token id并返回;

4、循環反復,直到獲得足夠多的token。

二、進行重構,更加有“機器學習風格”

接下來我們把Bigram模型的實現變得更加“機器學習風格”,以便幫助我們理解后面真實的pytorch代碼,有pytorch背景的同學可以直接跳過本節。

完整的代碼碼如下(帶注釋版的見simplebigrammodel_with_comments.py):

simplebigrammodel.py

import random
from typing import List

random.seed(42) # 去掉此行,獲得隨機結果
prompts = ["春江", "往事"]
max_new_token = 100
max_iters = 8000
batch_size = 32
block_size = 8
with open('ci.txt', 'r', encoding='utf-8') as f:
text = f.read()
class Tokenizer:
def __init__(self, text: str):
self.chars = sorted(list(set(text)))
self.vocab_size = len(self.chars)
self.stoi = {ch: i for i, ch in enumerate(self.chars)}
self.itos = {i: ch for i, ch in enumerate(self.chars)}

def encode(self, s: str) -> List[int]:
return [self.stoi[c] for c in s]

def decode(self, l: List[int]) -> str:
return''.join([self.itos[i] for i in l])
class BigramLanguageModel():
def __init__(self, vocab_size: int):
self.vocab_size = vocab_size
self.transition = [[0 for _ in range(vocab_size)]
for _ in range(vocab_size)]

def __call__(self, x):
# 方便直接調用model(x)
return self.forward(x)

def forward(self, idx: List[List[int]]) -> List[List[List[float]]]:
'''
輸入idx,是一個二維數組,如[[1, 2, 3],
[4, 5, 6]]
表示同時希望推理的多個序列
輸出是一個三維數組,如[[[0.1, 0.2, 0.3, .. (vocab_size)],
[0.4, 0.5, 0.6, .. (vocab_size)],
[0.7, 0.8, 0.9, .. (vocab_size)]],
[[0.2, 0.3, 0.4, .. (vocab_size)],
[0.5, 0.6, 0.7, .. (vocab_size)],
[0.8, 0.9, 1.0, .. (vocab_size)]]]

'''
B = len(idx) # 批次大小
T = len(idx[0]) # 每一批的序列長度

logits = [
[[0.0 for _ in range(self.vocab_size)]
for _ in range(T)]
for _ in range(B)
]

for b in range(B):
for t in range(T):
current_token = idx[b][t]
# 計算了每一個token的下一個token的概率
logits[b][t] = self.transition[current_token]

return logits
def generate(self, idx: List[List[int]], max_new_tokens: int) -> List[int]:
for _ in range(max_new_tokens):
logits_batch = self(idx)
for batch_idx, logits in enumerate(logits_batch):
# 我們計算了每一個token的下一個token的概率
# 但實際上我們只需要最后一個token的“下一個token的概率”
logits = logits[-1]
total = max(sum(logits),1)
# 歸一化
logits = [logit / total for logit in logits]
# 根據概率隨機采樣
next_token = random.choices(
range(self.vocab_size),
weights=logits,
k=1
)[0]
idx[batch_idx].append(next_token)
return idx

def get_batch(tokens, batch_size, block_size):
'''
隨機獲取一批數據x和y用于訓練
x和y都是二維數組,可以用于并行訓練
其中y數組內的每一個值,都是x數組內對應位置的值的下一個值
格式如下:
x = [[1, 2, 3],
[9, 10, 11]]
y = [[2, 3, 4],
[10, 11, 12]]
'''
ix = random.choices(range(len(tokens) - block_size), k=batch_size)
x, y = [], []
for i in ix:
x.append(tokens[i:i+block_size])
y.append(tokens[i+1:i+block_size+1])
return x, y
tokenizer = Tokenizer(text)
vocab_size = tokenizer.vocab_size
tokens = tokenizer.encode(text)
model = BigramLanguageModel(vocab_size)
# 訓練
for iter in range(max_iters):
x_batch, y_batch = get_batch(tokens, batch_size, block_size)
for i in range(len(x_batch)):
for j in range(len(x_batch[i])):
x = x_batch[i][j]
y = y_batch[i][j]
model.transition[x][y] += 1
prompt_tokens = [tokenizer.encode(prompt) for prompt in prompts]
# 推理
result = model.generate(prompt_tokens, max_new_token)
# decode
for tokens in result:
print(tokenizer.decode(tokens))
print('-'*10)

雖然有100多行代碼,但實際上功能和上一個50行代碼幾乎是一樣的,稍微運行、調試一下就能明白。

直接通過python simplebigrammodel.py 即可運行,這一次會生成2個字符序列:

$ python simplebigrammodel.py
春江紅紫霄效顰。

怎。
蘭修月。
兩個事對西風酒伴寄我登臨,看雪驚起步,總不與淚滿南園春來。
最關上閱。
信斷,名姝,夜正坐認舊武仙 朱弦。

歲,回。

看一絲竹。
愿皇受風,當。

妝一笑時,不堪
----------
往事多閑田舍、十三楚珪
酒困不須紫芝蘭花痕皺,青步虹。
暗殿人物華高層軒者,臨江淥池塘。
三峽。
天、彩霞冠
燕翻云垂楊、一聲羌笛罷瑤觥船窗幽園春生陣。
長橋。
無恙,中有心期。

開處。
燕姹綠遍,爛□
----------

解釋一下這100多行代碼的實現:

機器學習風格的一些約定

我們用Tokenizer 類封裝了詞匯表,以便它能像qwen的詞匯表一樣被使用。

同時,我們實現了一個BigramLanguageModel 類,這模仿pytorch里的nn.Module 寫法,即:

1.參數在__init__ 中初始化;

2.推理在forward 函數中實現,并通過__call__ 允許對象被直接調用;

3.序列生成在generate 函數中實現;

最后,我們修改了數據加載的機制,如下:

def get_batch(tokens, batch_size, block_size):
ix = random.choices(range(len(tokens) - block_size), k=batch_size)
x, y = [], []
for i in ix:
x.append(tokens[i:i+block_size])
y.append(tokens[i+1:i+block_size+1])
return x, y

每次調用get_batch 的時候,會隨機返回兩份數據,其中y 數組中的每一個token,都是x 數組內對應位置的token的下一個token。采用這樣的寫法,是為了方便后續操作。

批處理in,批處理out

這一個版本最難懂的地方,是數據都以多維數組的方式呈現,連推理結果返回的都是2個。

實際上,我們這里的“多維數組”,就是機器學習中的“張量”(Tensor),是為了最終方便GPU處理而準備的。

張量(Tensor)是數學和物理學中用于表示多維數據的對象,廣泛應用于機器學習、深度學習和計算機視覺等領域。在深度學習框架(如 TensorFlow 和 PyTorch)中,張量是數據的基本結構。

而我們代碼中低效的for循環,未來在GPU中都會被高效的并行計算。

我們先以傳統思維來仔細看一下forward 函數的實現,以進一步理解“張量”和“批處理”。

def forward(self, idx: List[List[int]]) -> List[List[List[float]]]:
B = len(idx) # 批次大小
T = len(idx[0]) # 每一批的序列長度

logits = [
[[0.0for _ in range(self.vocab_size)]
for _ in range(T)]
for _ in range(B)
]

for b in range(B):
for t in range(T):
current_token = idx[b][t]
# 計算了每一個token的下一個token的概率
logits[b][t] = self.transition[current_token]

return logits

forward 函數的入參是一個大小為B * T的二維數組,按照機器學習領域的說法,就是一個形狀為(B, T)的“張量”,表示輸入了“B”批次的數據,每個批次包含“T”個token。

這里B、T、C都是機器學習里的常用變量名,B(Batch Size)代表批量大小、T(Time Steps or Sequence Length)對于序列數據來說代表序列的長度、C(Channels)在圖像處理中代表通道數,在語言模型中可以表示特征維度。

返回值logits 是一個形狀為(B, T, C)的張量(C等于vocab_size),它表示了“每個批次”的序列中,“每個token”的下一個token的頻率。這么說起來很繞,其實只要想象成:“所有B*T個數的token,都有一張獨立的表,表中記錄了下一個出現的token是X的頻率”。

logits 的大小為B * T * C,由于我們是Bigram模型,每個token的概率只和它上一個token有關,所以實際上我們只需要計算批次中最后一個token的logit就可以了,但為了和以后的模型統一,依舊保留了這些冗余計算。

好消息,我們現在已經有了一個能跑的玩具“模型”,它能根據概率預測下一個詞,但卻缺乏了真正的訓練過程。

壞消息,在實現真正的機器學習之前,我們還是繞不開pytorch。不過幸運的是,我們只需要做到“知其然”即可。

三、5分鐘簡明pytorch教程

PyTorch 是一個開源的深度學習庫,提供一系列非常方便的基礎數據結構和函數,簡化我們的操作。

下面是一個使用pytorch實現線性回歸的簡單例子:

pytorch_5min.py

import torch
from torch import nn
from torch.nn import functional as F

torch.manual_seed(42) # 隨機數種子,方便復現

# 判斷環境中是否有GPU
device = 'cuda'if torch.cuda.is_available() else'mps'if torch.mps.is_available() else'cpu'
print(f"Using {device} device")

# 1. 創建tensor演示
x = torch.tensor([1.0, 2.0, 3.0])
y = torch.tensor([2.0, 4.0, 6.0])

# 2. 基本運算演示
print(x + y) # 加法: tensor([3., 6., 9.])
print(x * y) # 點乘: tensor([2., 8., 18.])
print(torch.matmul(x, y)) # 矩陣乘法: tensor(28.)
print(x @ y) # 另一種矩陣乘寫法: tensor(28.)
print(x.shape) # tensor的形狀: torch.Size([3])

# 3. 定義模型
class SimpleNet(nn.Module):
def __init__(self):
super().__init__()
self.linear = nn.Linear(1, 1) # 輸入維度=1,輸出維度=1

def forward(self, x):
return self.linear(x)

# 4. 生成訓練數據
# 真實關系: y = 2x + 1
x_train = torch.rand(100, 1) * 10 # 生成 0-10 之間的隨機數
y_train = 2 * x_train + 1 + torch.randn(100, 1) * 0.1 # 真實函數:y = 2x + 1 加上一些噪聲
# 將數據移動到指定設備
x_train = x_train.to(device)
y_train = y_train.to(device)

# 5. 創建模型和優化器
model = SimpleNet().to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
criterion = nn.MSELoss()

# 6. 訓練循環
epochs = 5000

print("\n訓練開始...")
for epoch in range(epochs):
# 前向傳播,預測結果
y_pred = model(x_train)

# 計算預測值和真實值之間的損失
loss = criterion(y_pred, y_train)

# 反向傳播,修改模型參數
optimizer.zero_grad() # 清除舊的梯度
loss.backward() # 計算新的梯度
optimizer.step() # 更新參數:參數 -= 學習率 * 梯度

if (epoch + 1) % 100 == 0:
w = model.linear.weight.item()
b = model.linear.bias.item()
print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}, w: {w:.2f}, b: {b:.2f}')

# 7. 打印結果
w = model.linear.weight.item()
b = model.linear.bias.item()
print(f'\n訓練完成!')
print(f'學習到的函數: y = {w:.2f}x + {b:.2f}')
print(f'實際函數: y = 2.00x + 1.00')

# 8. 測試模型
test_x = torch.tensor([[0.0], [5.0], [10.0]]).to(device)
with torch.no_grad():
test_y = model(test_x)
print("\n預測結果:")
for x, y in zip(test_x, test_y):
print(f'x = {x.item():.1f}, y = {y.item():.2f}')

通過python pytorch_5min.py 即可運行:

$ python pytorch_5min.py 
Using mps device
tensor([3., 6., 9.])
tensor([ 2., 8., 18.])
tensor(28.)
tensor(28.)
torch.Size([3])

訓練開始...
Epoch [100/5000], Loss: 0.0988, w: 2.09, b: 0.41
Epoch [200/5000], Loss: 0.0420, w: 2.05, b: 0.64
...
Epoch [5000/5000], Loss: 0.0066, w: 2.00, b: 1.02

訓練完成!
學習到的函數: y = 2.00x + 1.02
實際函數: y = 2.00x + 1.00

預測結果:
x = 0.0, y = 1.02
x = 5.0, y = 11.00
x = 10.0, y = 20.98

這個例子中,最特別的是有真正的“訓練”過程,“訓練”究竟是什么?我們經常聽到的“反向傳播”、“梯度下降”、“學習率”又是什么?

鑒于這只是5分鐘教程,我們只要記住后面我們所有的機器學習代碼都是這樣的結構即可。

tensor操作

這一部分詳見代碼,看完代碼后才發現,大學時候的《線性代數》課程是多么重要。

這里最值得注意的是“矩陣相乘”,即“點積”、matmul操作,簡寫為“@”符號,是后面self-attention機制的核心。

矩陣乘還經常用作張量形狀的變換,如形狀為(B, T, embd)的張量和形狀為(embd, C)的張量相乘,結果為(B, T, C)的張量 —— 這一點也經常被用到。

此外,tensor.to(device) 可以把tensor數據移動到指定的設備,如GPU。

模型、神經網絡的layer

我們的模型內部只有一個簡單的線性層nn.Linear(1, 1) ,它輸入輸出都是一維張量。(1,1)的線性層實際上內部就是一個線性方程,對于輸入任何數字x,它會輸出x * w + b,實際上神經網絡中的“layer”就是內含了一系列參數、可被訓練的單元。通過輸出nn.Linear 可以更清晰的看出實現。

>>> layer = nn.Linear(1, 1)
>>> layer.weight.item(), layer.bias.item()
(0.8262009620666504, 0.9049515724182129)
>>> torch.tensor([[1.0],[2.]])
tensor([[1.],
[2.]])
>>> layer(_)
tensor([[1.7312],
[2.5574]], grad_fn=<AddmmBackward0>)

手動計算一下就能發現,實際上layer的輸出值,就是輸入x * weight + bias的結果。

其中grad_fn 是pytorch用來反向傳播的關鍵,pytorch記住了這個tensor是怎么計算出來的,在后面的反向傳播中被使用,對pytorch用戶不可見。

反向傳播和梯度下降

5分鐘的教程只需要我們先硬記住一點,機器學習的“訓練”就是這樣一個過程:

1.先“前向傳播”,計算出輸出(如Linear層輸出結果)。

2.再“反向傳播”。

a.通過“損失函數”計算出模型的輸出和真實數據之間的“損失值”loss(如例子中的MSELoss損失函數);

b.計算“梯度”,利用損失函數對輸出層的梯度進行計算,接著向前傳播(反向傳播)計算前一層的梯度,直到輸入層(這一步pytorch能自動處理,不需要我們關心??梢院唵卫斫鉃椋疤荻取本褪菗p失函數對各個參數的導數。核心目的就是為了計算出“如何調整w和b的值來減少損失”);

c.更新參數,“梯度”是一個向量,把“梯度”乘上我們的“學習率”再加上原來的參數,就是我們新的參數了。如果學習率大,那么每次更新的多,學習率小,每次更新的就少?!疤荻认陆怠保褪俏覀兺ㄟ^迭代更新參數,以尋找到損失函數最小的過程;

這中間最復雜的求導、算梯度、更新每一層參數的操作,pytorch都自動完成了(前面看到的grad_fn 就是用于這個過程),我們只需要知道在這個結構下,選擇不同的優化器算法、損失函數實現、模型結構即可,剩下的交給pytorch。

而“推理”,就只有“前向傳播”,計算出輸出即可。

四、實現一個真正的Bigram模型

5分鐘“精通”完pytorch,接下來我們來實現真正的pytorch版Bigram模型。

首先,我們把前面的simplebigrammodel.py ,用pytorch的tensor數據結構改造成一個新版本,代碼見simplebigrammodel_torch.py ,這里不再展開。通過這份代碼,能在熟悉算法的基礎上,進一步深刻理解tensor。

然后,我們基于它進一步實現Bigram模型,后續我們的代碼都將基于這個為基礎,逐漸改出完整的gpt。

完整代碼如下,也可以看babygpt_v1.py。

babygpt_v1.py

import torch
import torch.nn as nn
from torch.nn import functional as F
from typing import List
import time
torch.manual_seed(42)
prompts = ["春江", "往事"] # 推理的輸入prompts
max_new_token = 100 # 推理生成的最大tokens數量
max_iters = 5000 # 訓練的最大迭代次數
eval_iters = 100 # 評估的迭代次數
eval_interval = 200 # 評估的間隔
batch_size = 32 # 每個批次的大小
block_size = 8 # 每個序列的最大長度
learning_rate = 1e-2 # 學習率
n_embed = 32 # 嵌入層的維度
tain_data_ratio = 0.9 # 訓練數據占數據集的比例,剩下的是驗證數據
device = 'cuda'if torch.cuda.is_available() else'mps'if torch.mps.is_available() else'cpu'
with open('ci.txt', 'r', encoding='utf-8') as f:
text = f.read()
class Tokenizer:
def __init__(self, text: str):
self.chars = sorted(list(set(text)))
self.vocab_size = len(self.chars)
self.stoi = {ch: i for i, ch in enumerate(self.chars)}
self.itos = {i: ch for i, ch in enumerate(self.chars)}

def encode(self, s: str) -> List[int]:
return [self.stoi[c] for c in s]

def decode(self, l: List[int]) -> str:
return''.join([self.itos[i] for i in l])

class BabyGPT(nn.Module):
def __init__(self, vocab_size: int, n_embd: int):
super().__init__()
self.token_embedding_table = nn.Embedding(vocab_size, n_embd) # 嵌入層,把token映射到n_embd維空間
self.lm_head = nn.Linear(n_embd, vocab_size) # 線性層,把n_embd維空間映射到vocab_size維空間,
def forward(self, idx, targets=None):
tok_emb = self.token_embedding_table(idx) # 獲得token的嵌入表示 (B,T,n_embd)
logits = self.lm_head(tok_emb) # 通過線性層,把embedding結果重新映射回vocab_size維空間 (B,T,vocab_size)
if targets is None: # 推理場景,不需要計算損失值
loss = None
else:
B, T, C = logits.shape
logits = logits.view(B*T, C) # 把(B,T,C)的形狀轉換為(B*T,C),因為交叉熵損失函數第一個參數只接受二維輸入。這個操作并沒有丟失信息
targets = targets.view(B*T) # 把(B,T)的形狀轉換為(B*T),因為交叉熵損失函數第二個參數只接受一維輸入。這個操作并沒有丟失信息
loss = F.cross_entropy(logits, targets) # 計算交叉熵損失
return logits, loss
def generate(self, idx, max_new_tokens):
for _ in range(max_new_tokens):
logits, _ = self(idx) # logits的形狀是(B,T,vocab_size),每一個token都計算了下一個token的概率
logits = logits[:, -1, :] # 實際上我們只需要最后一個token算出來的值
probs = F.softmax(logits, dim=-1) # 使用softmax函數算概率分布,這里dim=-1表示對最后一個維度進行softmax
idx_next = torch.multinomial(probs, num_samples=1) # 根據概率分布隨機采樣,這里num_samples=1表示采樣一個token
idx = torch.cat((idx, idx_next), dim=1) # 把采樣的token拼接到序列后面
return idx
tokenizer = Tokenizer(text)
vocab_size = tokenizer.vocab_size
raw_data = torch.tensor(tokenizer.encode(text), dtype=torch.long).to(device)
n = int(tain_data_ratio*len(raw_data))
data = {'train': raw_data[:n], 'val': raw_data[n:]}
def get_batch(data, batch_size, block_size):
ix = torch.randint(len(data) - block_size, (batch_size,))
x = torch.stack([data[i:i+block_size] for i in ix])
y = torch.stack([data[i+1:i+block_size+1] for i in ix])
x, y = x.to(device), y.to(device)
return x, y
@torch.no_grad()
def estimate_loss(model, data, batch_size, block_size, eval_iters):
'''
計算模型在訓練集和驗證集上的損失
'''
out = {}
model.eval() # 切換到評估模式
for split in ['train', 'val']:
losses = torch.zeros(eval_iters)
for k in range(eval_iters):
x, y = get_batch(data[split], batch_size, block_size)
_, loss = model(x, y)
losses[k] = loss.item()
out[split] = losses.mean()
model.train() # 切換回訓練模式
return out
model = BabyGPT(vocab_size, n_embed).to(device)
# 訓練
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
start_time = time.time()
tokens_processed = 0
for iter in range(max_iters):
x, y = get_batch(data['train'], batch_size, block_size)
logits, loss = model(x, y)
optimizer.zero_grad(set_to_none=True)
loss.backward()
optimizer.step()
tokens_processed += batch_size * block_size
if iter % eval_interval == 0:
elapsed = time.time() - start_time
tokens_per_sec = tokens_processed / elapsed if elapsed > 0else0
losses = estimate_loss(model, data, batch_size, block_size, eval_iters)
print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}, speed: {tokens_per_sec:.2f} tokens/sec")
# 推理
prompt_tokens = torch.stack([torch.tensor(tokenizer.encode(p)).to(device) for p in prompts])
# 生成
result = model.generate(prompt_tokens, max_new_token)
# 解碼并打印結果
for tokens in result:
print(tokenizer.decode(tokens.tolist()))
print('-'*10)

在我的mac上通過 python babygpt_v1.py 運行,大概60k t/s的訓練速度,而在4090上這個速度可以達到180k t/s。

$ python babygpt_v1.py 
step 0: train loss 8.9236, val loss 8.9194, speed: 1118.03 tokens/sec
step 200: train loss 5.8334, val loss 5.9927, speed: 50238.47 tokens/sec
step 400: train loss 5.5678, val loss 5.7631, speed: 56604.35 tokens/sec
step 600: train loss 5.4697, val loss 5.7274, speed: 59267.69 tokens/sec
step 800: train loss 5.3885, val loss 5.6038, speed: 60842.13 tokens/sec
step 1000: train loss 5.3467, val loss 5.5955, speed: 61404.86 tokens/sec
...

這份代碼也沒有難點,實際上就是前面pytorch實現的線性回歸模型和我們自己土方法實現的bigram模型的結合體,尤其是訓練部分,基本上和前面線性回歸是一樣的,差別主要在模型上。

模型

Embedding層

這次我們的模型由一個nn.Embedding(vocab_size, n_embd) 層和一個nn.Linear(n_embd, vocab_size) 層組成。

nn.Embedding(vocab_size, n_embd) 可以簡單理解成一個映射表,只不過它的key取值為0 ~ vocab_size-1,而它的value是一個n_embd維的參數。簡單的理解為,通過embedding操作(嵌入操作),我們把一個離散的token,映射為了一個密集的向量。

實際上Embedding的實現真的就是一個lookup-table,如下所示:

>>> layer = nn.Embedding(10, 3)
>>> layer.weight.shape
torch.Size([10, 3])
>>> layer(torch.tensor(1))
tensor([0.4534, 1.1587, 1.6280], grad_fn=<EmbeddingBackward0>)
>>> layer.weight[torch.tensor(1)]
tensor([0.4534, 1.1587, 1.6280], grad_fn=<SelectBackward0>)

Embedding內部就是保存了一個(vocab_size, n_embd)的張量,“對tensor X執行嵌入操作”和“在weight中取key為X的值”效果是一樣的。

Embedding通常作為各種模型的第一層,因為我們要把離散的“token”,映射為一些連續的“數值”,才可以繼續后續的操作。兩個token id之間是沒有關系的,但兩個Embedding的向量可以有距離、關聯度等關系。

由于我們只實現了一個Bigram模型,下一個詞只和上一個詞有關,而Embedding內部恰好能表示一種A到B的映射關系,所以這里我們的模型主體就是Embedding本身,我們訓練的直接就是Embedding內的參數。

lm_head層

lm_head(Language Model Head)是我們的輸出層,幾乎所有模型最后一層都是這么一個Linear 層,它的用途是把我們中間各種layer算出來的結果,最終映射到vocab_size 維的向量里去。因為我們最終要算的,就是vocab_size 個詞里,每個詞出現的概率。

語言模型的常見流程如下示意圖,模型間主要的差異都在中間層上,LLM也不例外:

損失函數、歸一函數和采樣

forward 實現中,我們使用交叉熵函數作為損失函數,且為了滿足交叉熵函數對于參數的要求,我們把(B, T, C)的張量,變形為(B * T, C),不需要理解交叉熵函數計算方式,只需知道它得出了兩個tensor的差值即可。

我們使用softmax 代替前面的線性歸一函數做歸一化,也省去了考慮total 值為0的情況,并且用torch.multinomial 代替random.choices 作為采樣函數。

訓練

訓練部分代碼和5分鐘pytorch教程中的沒太多差別,我們用AdamW 優化器替換了SGD 優化器,具體原因這里不展開解釋,只要知道這就是不一樣的調整參數的算法即可。

并且我們每處理一些數據,就嘗試輸出當前模型,在訓練數據和校驗數據上的損失值。以便我們觀察模型是否過擬合了訓練數據。

如果數據足夠多、耗時足夠久的話,我們在這里可以用torch.save 方法把參數保存下來,也就是checkpoint。

五、回顧和Next

令人興奮,目前為止,我們用131行python代碼,實現了一個語言模型,居然能生成看起來像是詞的東西,It just works。

這個模型目前參數量為 Embedding層:6148 (vocab_size) * 32 (n_embd) + Linear層6148 * 32 + 6148 = 399620 ,消耗399620 * 4字節 = 1.52MB 空間,即一個0.0004B的參數,而qwen2.5最小的也是0.5B。

我們親眼看到了模型的參數、layer、學習率、正向傳播、反向傳播、梯度等一堆概念。

如果對于模型流程和結構沒太理解,可以問AI實現各種簡單的demo,會發現結構大差不大;如果對于中間各種變量轉換沒太理解,強烈建議在調試中通過.shape 觀察各種tensor的形狀變化、通過.weight 觀察各個layer的參數變量,來體會其中的細節。

文章轉載自:從零開始200行python代碼實現LLM

上一篇:

大模型微調方法與實踐經驗
最后一篇
#你可能也喜歡這些API文章!

我們有何不同?

API服務商零注冊

多API并行試用

數據驅動選型,提升決策效率

查看全部API→
??

熱門場景實測,選對API

#AI文本生成大模型API

對比大模型API的內容創意新穎性、情感共鳴力、商業轉化潛力

25個渠道
一鍵對比試用API 限時免費

#AI深度推理大模型API

對比大模型API的邏輯推理準確性、分析深度、可視化建議合理性

10個渠道
一鍵對比試用API 限時免費