import torch import torch.nn as nn import torch.nn.functional as F from typing import Optional, Tuple import math from transformers import PretrainedConfig, PreTrainedModel class MultiHeadAttention(nn.Module): def __init__(self, d_model: int, num_heads: int, dropout: float = 0.1): super().__init__() assert d_model % num_heads == 0 self.d_model = d_model self.num_heads = num_heads self.head_dim = d_model // num_heads self.q_proj = nn.Linear(d_model, d_model) self.k_proj = nn.Linear(d_model, d_model) self.v_proj = nn.Linear(d_model, d_model) self.o_proj = nn.Linear(d_model, d_model) self.dropout = nn.Dropout(dropout) def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None) -> torch.Tensor: batch_size, seq_len, d_model = x.shape # Project and reshape for multi-head attention q = self.q_proj(x).reshape(batch_size, seq_len, self.num_heads, self.head_dim) k = self.k_proj(x).reshape(batch_size, seq_len, self.num_heads, self.head_dim) v = self.v_proj(x).reshape(batch_size, seq_len, self.num_heads, self.head_dim) # Transpose for attention computation q = q.transpose(1, 2) # (batch_size, num_heads, seq_len, head_dim) k = k.transpose(1, 2) v = v.transpose(1, 2) # Compute attention scores scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.head_dim) if mask is not None: scores = scores.masked_fill(mask == 0, float('-inf')) attn_weights = F.softmax(scores, dim=-1) attn_weights = self.dropout(attn_weights) # Apply attention to values out = torch.matmul(attn_weights, v) # (batch_size, num_heads, seq_len, head_dim) out = out.transpose(1, 2) # (batch_size, seq_len, num_heads, head_dim) out = out.reshape(batch_size, seq_len, d_model) return self.o_proj(out) class PreludeBlock(nn.Module): def __init__(self, vocab_size: int, d_model: int, num_heads: int, dropout: float = 0.1): super().__init__() self.token_embedding = nn.Embedding(vocab_size, d_model) self.pos_encoding = nn.Parameter(torch.zeros(1, 1024, d_model)) # Max sequence length of 1024 self.attention = MultiHeadAttention(d_model, num_heads, dropout) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.feed_forward = nn.Sequential( nn.Linear(d_model, 4 * d_model), nn.GELU(), nn.Linear(4 * d_model, d_model), nn.Dropout(dropout) ) def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None) -> torch.Tensor: seq_len = x.size(1) # Embed tokens and add positional encoding x = self.token_embedding(x) + self.pos_encoding[:, :seq_len, :] # Self-attention block attended = self.attention(self.norm1(x), mask) x = x + attended # Feed-forward block x = x + self.feed_forward(self.norm2(x)) return x class RecurrentBlock(nn.Module): def __init__(self, d_model: int, num_heads: int, dropout: float = 0.1): super().__init__() self.attention = MultiHeadAttention(d_model, num_heads, dropout) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.feed_forward = nn.Sequential( nn.Linear(d_model, 4 * d_model), nn.GELU(), nn.Linear(4 * d_model, d_model), nn.Dropout(dropout) ) # Recurrent state projection self.state_proj = nn.Linear(d_model, d_model) def forward(self, x: torch.Tensor, recurrent_state: torch.Tensor, mask: Optional[torch.Tensor] = None) -> Tuple[torch.Tensor, torch.Tensor]: # Update recurrent state recurrent_state = self.state_proj(recurrent_state) # Combine input with recurrent state x = x + recurrent_state # Self-attention block attended = self.attention(self.norm1(x), mask) x = x + attended # Feed-forward block x = x + self.feed_forward(self.norm2(x)) return x, x # Return both output and new recurrent state class CodaBlock(nn.Module): def __init__(self, d_model: int, vocab_size: int): super().__init__() self.norm = nn.LayerNorm(d_model) self.output_proj = nn.Linear(d_model, vocab_size) def forward(self, x: torch.Tensor) -> torch.Tensor: x = self.norm(x) return self.output_proj(x) class LatentRecurrentDepthLM(nn.Module): def __init__(self, vocab_size: int, d_model: int, num_heads: int, dropout: float = 0.1): super().__init__() self.prelude = PreludeBlock(vocab_size, d_model, num_heads, dropout) self.recurrent = RecurrentBlock(d_model, num_heads, dropout) self.coda = CodaBlock(d_model, vocab_size) def forward(self, x: torch.Tensor, num_iterations: int, mask: Optional[torch.Tensor] = None) -> torch.Tensor: # Initial embedding and processing hidden = self.prelude(x, mask) # Initialize recurrent state recurrent_state = torch.zeros_like(hidden) # Apply recurrent block multiple times for _ in range(num_iterations): hidden, recurrent_state = self.recurrent(hidden, recurrent_state, mask) # Final output projection return self.coda(hidden) # Configuration for the Latent Recurrent Depth Model class LatentRecurrentDepthConfig(PretrainedConfig): model_type = "latent_recurrent_depth" def __init__(self, vocab_size=50257, d_model=768, num_heads=12, dropout=0.1, **kwargs): super().__init__(**kwargs) self.vocab_size = vocab_size self.d_model = d_model self.num_heads = num_heads self.dropout = dropout # Hugging Face-Compatible Model Wrapper class LatentRecurrentDepthModel(PreTrainedModel): config_class = LatentRecurrentDepthConfig base_model_prefix = "latent_recurrent_depth" def __init__(self, config: LatentRecurrentDepthConfig): super().__init__(config) self.latent_model = LatentRecurrentDepthLM(config.vocab_size, config.d_model, config.num_heads, config.dropout) self.init_weights() def forward(self, input_ids: torch.Tensor, num_iterations: int, mask: Optional[torch.Tensor] = None) -> torch.Tensor: return self.latent_model(input_ids, num_iterations, mask) def generate( self, input_ids: torch.Tensor, max_length: int = 20, num_iterations: int = 3, temperature: float = 1.0, top_k: Optional[int] = 50, ) -> torch.Tensor: """ Generate a sequence of tokens given input_ids. Args: input_ids: torch.Tensor of shape (batch, seq_length) containing the prompt. max_length: The number of tokens to generate. num_iterations: The number of recurrent iterations to use in each forward pass. temperature: Temperature for scaling logits. top_k: If set, only sample from the top k tokens. Returns: generated: torch.Tensor containing the generated sequence. """ generated = input_ids.clone() self.eval() with torch.no_grad(): for _ in range(max_length): # Get logits from the model for the current sequence. logits = self.forward(generated, num_iterations=num_iterations) # Use only the logits for the last token in the sequence. next_token_logits = logits[:, -1, :] / temperature if top_k is not None: # Top-k filtering top_k_logits, top_k_indices = torch.topk(next_token_logits, top_k) probabilities = F.softmax(top_k_logits, dim=-1) next_token = top_k_indices.gather(-1, torch.multinomial(probabilities, num_samples=1)) else: probabilities = F.softmax(next_token_logits, dim=-1) next_token = torch.multinomial(probabilities, num_samples=1) generated = torch.cat([generated, next_token], dim=1) # Optionally, break if the EOS token is generated. if next_token.item() == self.config.eos_token_id: break return generated