| |
| import torch |
| import torch.nn as nn |
| import inspect |
| from huggingface_hub import PyTorchModelHubMixin |
|
|
| |
| BATCH_SIZE = 16 |
| BLOCK_SIZE = 1024 |
| MAX_ITERS = 5 |
| EVAL_INTERVAL = 500 |
| LEARNING_RATE = 6e-4 |
| DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' |
| EVAL_ITERS = 200 |
| N_EMBD = 768 |
| N_HEAD = 12 |
| N_LAYER = 12 |
| DROPOUT = 0.2 |
| MODEL_PATH = "Naive_gpt\model_weights_llama" |
|
|
| class CausalSelfAttention(nn.Module): |
|
|
| def __init__(self): |
| super().__init__() |
| assert N_EMBD % N_HEAD == 0 |
| |
| self.c_attn = nn.Linear(N_EMBD, 3 * N_EMBD) |
| |
| self.c_proj = nn.Linear(N_EMBD, N_EMBD) |
| self.c_proj.NANOGPT_SCALE_INIT = 1 |
| |
| self.n_head = N_HEAD |
| self.n_embd = N_EMBD |
|
|
| def forward(self, x): |
| B, T, C = x.size() |
| |
| |
| |
| qkv = self.c_attn(x) |
| q, k, v = qkv.split(self.n_embd, dim=2) |
| k = k.view(B, T, self.n_head, C // |
| self.n_head).transpose(1, 2) |
| q = q.view(B, T, self.n_head, C // |
| self.n_head).transpose(1, 2) |
| v = v.view(B, T, self.n_head, C // |
| self.n_head).transpose(1, 2) |
| y = nn.functional.scaled_dot_product_attention( |
| q, k, v, is_causal=True) |
| |
| y = y.transpose(1, 2).contiguous().view(B, T, C) |
| |
| y = self.c_proj(y) |
| return y |
| |
|
|
| class FeedFoward(nn.Module): |
| def __init__(self): |
| super().__init__() |
| self.c_fc = nn.Linear(N_EMBD, 4 * N_EMBD) |
| self.gelu = nn.GELU(approximate='tanh') |
| self.c_proj = nn.Linear(4 * N_EMBD, N_EMBD) |
| self.c_proj.NANOGPT_SCALE_INIT = 1 |
|
|
| def forward(self, x): |
| x = self.c_fc(x) |
| x = self.gelu(x) |
| x = self.c_proj(x) |
| return x |
| """ a simple linear layer followed by a non-linearity """ |
|
|
| class Block(nn.Module): |
| """ Transformer block: communication followed by computation """ |
|
|
| def __init__(self, n_embd, n_head): |
| super().__init__() |
| head_size = N_EMBD // n_head |
| self.sa = CausalSelfAttention() |
| self.ffwd = FeedFoward() |
| self.ln1 = nn.LayerNorm(N_EMBD) |
| self.ln2 = nn.LayerNorm(N_EMBD) |
|
|
| def forward(self, x): |
| x = x + self.sa(self.ln1(x)) |
| x = x + self.ffwd(self.ln2(x)) |
| return x |
|
|
| class GPTLanguageModel(nn.Module, PyTorchModelHubMixin): |
|
|
| def __init__(self, vocab_size=20000, block_size=1024, n_embd=768, n_head=12, n_layer=12): |
| super().__init__() |
| print("This is vocab size:", vocab_size) |
| self.token_embedding_table = nn.Embedding(vocab_size, n_embd) |
| self.position_embedding_table = nn.Embedding(block_size, n_embd) |
| self.blocks = nn.Sequential( |
| *[Block(n_embd, n_head=n_head) for _ in range(n_layer)] |
| ) |
| self.ln_f = nn.LayerNorm(n_embd) |
| self.lm_head = nn.Linear(n_embd, vocab_size) |
|
|
| self.token_embedding_table.weight = self.lm_head.weight |
|
|
| self.apply(self._init_weights) |
| self.config = {"BLOCK_SIZE": block_size, "N_EMBD": n_embd, "N_HEAD":n_head, "N_LAYER": n_layer} |
| |
|
|
| def _init_weights(self, module): |
| if isinstance(module, nn.Linear): |
| std = 0.02 |
| if hasattr(module, 'NANOGPT_SCALE_INIT'): |
| std *= (2 * N_LAYER) ** -0.5 |
| torch.nn.init.normal_(module.weight, mean=0.0, std=std) |
| if module.bias is not None: |
| torch.nn.init.zeros_(module.bias) |
| elif isinstance(module, nn.Embedding): |
| torch.nn.init.normal_(module.weight, mean=0.0, std=0.02) |
|
|
| def forward(self, idx, targets=None): |
| B, T = idx.shape |
| assert T <= BLOCK_SIZE, f"Cannot forward sequence of length {T}, block size is only {BLOCK_SIZE}" |
|
|
|
|
| tok_emb = self.token_embedding_table(idx) |
| pos_emb = self.position_embedding_table(torch.arange(0, T, dtype=torch.long, device=idx.device)) |
| x = tok_emb + pos_emb |
| x = self.blocks(x) |
| x = self.ln_f(x) |
| logits = self.lm_head(x) |
|
|
| if targets is None: |
| loss = None |
| else: |
| loss = nn.functional.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1)) |
|
|
| return logits, loss |
|
|
| def generate(self, idx, max_new_tokens, temperature=1.0): |
| """ |
| Generate tokens using the language model. |
| Args: |
| idx: Input token indices |
| max_new_tokens: Number of tokens to generate |
| temperature: Controls randomness in generation |
| - temperature > 1.0 increases randomness |
| - temperature < 1.0 decreases randomness |
| - temperature = 0 makes it deterministic (always picks highest probability) |
| """ |
| for _ in range(max_new_tokens): |
| |
| idx_cond = idx[:, -BLOCK_SIZE:] |
| |
| logits, _ = self(idx_cond) |
| |
| logits = logits[:, -1, :] |
|
|
| if temperature == 0.0: |
| |
| idx_next = torch.argmax(logits, dim=-1, keepdim=True) |
| else: |
| |
| logits = logits / temperature |
| |
| probs = torch.softmax(logits, dim=-1) |
| |
| idx_next = torch.multinomial(probs, num_samples=1) |
|
|
| |
| idx = torch.cat((idx, idx_next), dim=1) |
| return idx |
|
|
| def save(self, path=MODEL_PATH): |
| torch.save(self.state_dict(), path) |
|
|
| def load(self, path=MODEL_PATH): |
| |
| state_dict = torch.load(path)["model"] |
|
|
| new_state_dict = {} |
| for key, value in state_dict.items(): |
| new_key = key.replace('_orig_mod.', '') |
| new_state_dict[new_key] = value |
|
|
| self.load_state_dict(new_state_dict) |
|
|
|
|
| def configure_optimizers(self, weight_decay=0.1, learning_rate=LEARNING_RATE, device=DEVICE): |
| param_dict = {pn: p for pn, p in self.named_parameters()} |
| param_dict = {pn: p for pn, p in param_dict.items() if p.requires_grad} |
|
|
| decay_parameters = [p for n, p in param_dict.items() if p.dim() >= 2] |
| nodecay_parameters = [p for n, p in param_dict.items() if p.dim() < 2] |
| optim_groups = [ |
| {"params": decay_parameters, "weight_decay": weight_decay}, |
| {"params": nodecay_parameters, "weight_decay": 0.0}, |
| ] |
| fused_available = 'fused' in inspect.signature(torch.optim.AdamW).parameters |
| use_fused = fused_available and device == "cuda" |
| optimizer = torch.optim.AdamW(optim_groups, lr=learning_rate, betas=(0.9, 0.95), eps=1e-8, fused = use_fused) |
| return optimizer |
| MODEL_PATH = "Naive_gpt\model_weights_llama" |