diff --git a/src/pygpukit/tts/kokoro/layers.py b/src/pygpukit/tts/kokoro/layers.py index 23f5bce..f3771ab 100644 --- a/src/pygpukit/tts/kokoro/layers.py +++ b/src/pygpukit/tts/kokoro/layers.py @@ -833,6 +833,555 @@ def build_plbert_from_weights( ) +# ============================================================================= +# Weight Normalization and Instance Normalization +# ============================================================================= + + +class WeightNormConv1d: + """1D Convolution with weight normalization. + + Weight normalization decomposes weight W = g * (v / ||v||) + where g is a scalar magnitude and v is the direction. + """ + + def __init__( + self, + weight_g: GPUArray, # [out_channels, 1, 1] - magnitude + weight_v: GPUArray, # [out_channels, in_channels, kernel_size] - direction + bias: GPUArray | None = None, + stride: int = 1, + padding: int = 0, + dilation: int = 1, + ): + self.weight_g = weight_g + self.weight_v = weight_v + self.bias = bias + self.stride = stride + self.padding = padding + self.dilation = dilation + + self.out_channels = weight_v.shape[0] + self.in_channels = weight_v.shape[1] + self.kernel_size = weight_v.shape[2] + + def _compute_weight(self) -> np.ndarray: + """Compute normalized weight: W = g * (v / ||v||).""" + g = self.weight_g.to_numpy() # [out_channels, 1, 1] + v = self.weight_v.to_numpy() # [out_channels, in_channels, kernel_size] + + # Compute L2 norm of v along in_channels and kernel dimensions + v_norm = np.sqrt((v**2).sum(axis=(1, 2), keepdims=True) + 1e-12) + weight = g * (v / v_norm) + return weight.astype(np.float32) + + def __call__(self, x: GPUArray) -> GPUArray: + """Forward pass.""" + batch_size = x.shape[0] + length = x.shape[2] + + # Compute normalized weight + weight = self._compute_weight() + + # Calculate output length + effective_kernel = self.dilation * (self.kernel_size - 1) + 1 + out_length = (length + 2 * self.padding - effective_kernel) // self.stride + 1 + + x_np = x.to_numpy() + + # Pad input + if self.padding > 0: + x_np = np.pad(x_np, ((0, 0), (0, 0), (self.padding, self.padding)), mode="constant") + + # im2col + col = np.zeros( + (batch_size, self.in_channels, self.kernel_size, out_length), dtype=np.float32 + ) + for i in range(self.kernel_size): + i_dilated = i * self.dilation + for j in range(out_length): + j_strided = j * self.stride + col[:, :, i, j] = x_np[:, :, j_strided + i_dilated] + + col = col.reshape(batch_size, -1, out_length) + w_reshaped = weight.reshape(self.out_channels, -1) + out_np = np.einsum("bkl,ok->bol", col, w_reshaped) + + if self.bias is not None: + bias_np = self.bias.to_numpy() + out_np = out_np + bias_np.reshape(1, -1, 1) + + return from_numpy(out_np.astype(np.float32)) + + +class InstanceNorm1d: + """1D Instance Normalization. + + Normalizes each channel independently for each sample. + Uses gamma and beta for affine transform. + """ + + def __init__( + self, + gamma: GPUArray, # [channels] - scale + beta: GPUArray, # [channels] - shift + eps: float = 1e-5, + ): + self.gamma = gamma + self.beta = beta + self.eps = eps + self.num_features = gamma.shape[0] + + def __call__(self, x: GPUArray) -> GPUArray: + """Forward pass: y = gamma * (x - mean) / sqrt(var + eps) + beta.""" + x_np = x.to_numpy() # [batch, channels, length] + + # Compute mean and var along length dimension + mean = x_np.mean(axis=2, keepdims=True) + var = x_np.var(axis=2, keepdims=True) + + # Normalize + x_norm = (x_np - mean) / np.sqrt(var + self.eps) + + # Apply affine transform + gamma = self.gamma.to_numpy().reshape(1, -1, 1) + beta = self.beta.to_numpy().reshape(1, -1, 1) + out = gamma * x_norm + beta + + return from_numpy(out.astype(np.float32)) + + +class AdaIN: + """Adaptive Instance Normalization. + + Computes style-dependent scale and shift from a style vector. + y = scale * (x - mean) / std + shift + where scale and shift are computed from the style vector. + """ + + def __init__( + self, + fc_weight: GPUArray, # [2 * channels, style_dim] + fc_bias: GPUArray, # [2 * channels] + ): + self.fc_weight = fc_weight + self.fc_bias = fc_bias + self.num_features = fc_weight.shape[0] // 2 + + def __call__(self, x: GPUArray, style: GPUArray, eps: float = 1e-5) -> GPUArray: + """Forward pass. + + Args: + x: Input [batch, channels, length] + style: Style vector [batch, style_dim] + + Returns: + Normalized and styled output [batch, channels, length] + """ + x_np = x.to_numpy() + style_np = style.to_numpy() + + # Compute scale and shift from style + fc_w = self.fc_weight.to_numpy() + fc_b = self.fc_bias.to_numpy() + params = style_np @ fc_w.T + fc_b # [batch, 2 * channels] + + scale = params[:, : self.num_features].reshape(-1, self.num_features, 1) + shift = params[:, self.num_features :].reshape(-1, self.num_features, 1) + + # Instance normalization + mean = x_np.mean(axis=2, keepdims=True) + std = np.sqrt(x_np.var(axis=2, keepdims=True) + eps) + x_norm = (x_np - mean) / std + + # Apply adaptive style + out = scale * x_norm + shift + + return from_numpy(out.astype(np.float32)) + + +# ============================================================================= +# ALBERT Encoder (used by Kokoro instead of BERT) +# ============================================================================= + + +class ALBERTLayer: + """Single ALBERT layer with shared weights across layers.""" + + def __init__( + self, + query: Linear, + key: Linear, + value: Linear, + attention_dense: Linear, + attention_norm: LayerNorm, + ffn: Linear, + ffn_output: Linear, + full_layer_norm: LayerNorm, + num_attention_heads: int, + hidden_size: int, + ): + self.query = query + self.key = key + self.value = value + self.attention_dense = attention_dense + self.attention_norm = attention_norm + self.ffn = ffn + self.ffn_output = ffn_output + self.full_layer_norm = full_layer_norm + self.num_attention_heads = num_attention_heads + self.attention_head_size = hidden_size // num_attention_heads + + def transpose_for_scores(self, x: GPUArray) -> GPUArray: + """Reshape for multi-head attention.""" + batch_size = x.shape[0] + seq_len = x.shape[1] + + x_np = x.to_numpy() + x_reshaped = x_np.reshape( + batch_size, seq_len, self.num_attention_heads, self.attention_head_size + ) + x_transposed = x_reshaped.transpose(0, 2, 1, 3) + return from_numpy(x_transposed.astype(np.float32)) + + def __call__(self, hidden_states: GPUArray, attention_mask: GPUArray | None = None) -> GPUArray: + """Forward pass.""" + from pygpukit.ops.basic import add, gelu + + # Self-attention + query_layer = self.transpose_for_scores(self.query(hidden_states)) + key_layer = self.transpose_for_scores(self.key(hidden_states)) + value_layer = self.transpose_for_scores(self.value(hidden_states)) + + q_np = query_layer.to_numpy() + k_np = key_layer.to_numpy() + v_np = value_layer.to_numpy() + + # Scaled dot-product attention + attention_scores = np.matmul(q_np, k_np.transpose(0, 1, 3, 2)) + attention_scores = attention_scores / np.sqrt(self.attention_head_size) + + if attention_mask is not None: + mask_np = attention_mask.to_numpy() + attention_scores = attention_scores + mask_np + + attention_probs = np.exp(attention_scores - attention_scores.max(axis=-1, keepdims=True)) + attention_probs = attention_probs / attention_probs.sum(axis=-1, keepdims=True) + + context = np.matmul(attention_probs, v_np) + + # Reshape back + batch_size = context.shape[0] + seq_len = context.shape[2] + hidden_size = self.num_attention_heads * self.attention_head_size + context = context.transpose(0, 2, 1, 3).reshape(batch_size, seq_len, hidden_size) + context = from_numpy(context.astype(np.float32)) + + # Attention output + attention_output = self.attention_dense(context) + hidden_states = self.attention_norm(add(attention_output, hidden_states)) + + # Feed-forward + ffn_output = gelu(self.ffn(hidden_states)) + ffn_output = self.ffn_output(ffn_output) + hidden_states = self.full_layer_norm(add(ffn_output, hidden_states)) + + return hidden_states + + +class ALBERTEncoder: + """ALBERT encoder for Kokoro TTS. + + ALBERT shares weights across layers, making it more parameter-efficient. + """ + + def __init__( + self, + word_embeddings: GPUArray, + position_embeddings: GPUArray, + token_type_embeddings: GPUArray, + embeddings_norm: LayerNorm, + embedding_mapping: Linear, # Maps from embedding dim to hidden dim + layer: ALBERTLayer, # Shared layer + num_hidden_layers: int = 12, + ): + self.word_embeddings = word_embeddings + self.position_embeddings = position_embeddings + self.token_type_embeddings = token_type_embeddings + self.embeddings_norm = embeddings_norm + self.embedding_mapping = embedding_mapping + self.layer = layer + self.num_hidden_layers = num_hidden_layers + + def __call__( + self, + input_ids: GPUArray, + attention_mask: GPUArray | None = None, + ) -> GPUArray: + """Forward pass.""" + + batch_size = input_ids.shape[0] + seq_len = input_ids.shape[1] + + # Token embeddings + input_ids_np: np.ndarray = input_ids.to_numpy().astype(np.int32) + word_embeds_np = self.word_embeddings.to_numpy() + token_embeds = word_embeds_np[input_ids_np.flatten()].reshape(batch_size, seq_len, -1) + + # Position embeddings + positions = np.arange(seq_len, dtype=np.int32) + pos_embeds_np = self.position_embeddings.to_numpy() + pos_embeds = pos_embeds_np[positions].reshape(1, seq_len, -1) + + # Token type embeddings (all zeros for single sequence) + token_type_embeds_np = self.token_type_embeddings.to_numpy() + token_type_embeds = token_type_embeds_np[0].reshape(1, 1, -1) + + # Combine embeddings + embeddings = token_embeds + pos_embeds + token_type_embeds + embeddings = from_numpy(embeddings.astype(np.float32)) + embeddings = self.embeddings_norm(embeddings) + + # Project to hidden size + hidden_states = self.embedding_mapping(embeddings) + + # Create attention mask + if attention_mask is not None: + mask_np = attention_mask.to_numpy() + extended_mask = mask_np[:, np.newaxis, np.newaxis, :] + extended_mask = (1.0 - extended_mask) * -10000.0 + attention_mask = from_numpy(extended_mask.astype(np.float32)) + + # Apply shared layer multiple times + for _ in range(self.num_hidden_layers): + hidden_states = self.layer(hidden_states, attention_mask) + + return hidden_states + + +# ============================================================================= +# Kokoro Text Encoder (CNN + LSTM) +# ============================================================================= + + +class KokoroTextEncoder: + """Text encoder for Kokoro TTS. + + Architecture: Embedding -> CNN layers -> BiLSTM + """ + + def __init__( + self, + embedding: GPUArray, # [vocab_size, embed_dim] + cnn_layers: list[tuple[WeightNormConv1d, InstanceNorm1d]], + lstm: LSTM, + ): + self.embedding = embedding + self.cnn_layers = cnn_layers + self.lstm = lstm + + def __call__(self, input_ids: GPUArray) -> GPUArray: + """Forward pass. + + Args: + input_ids: Token IDs [batch, seq_len] + + Returns: + Encoded features [batch, seq_len, hidden_dim] + """ + batch_size = input_ids.shape[0] + seq_len = input_ids.shape[1] + + # Embedding lookup + input_ids_np: np.ndarray = input_ids.to_numpy().astype(np.int32) + embed_np = self.embedding.to_numpy() + x = embed_np[input_ids_np.flatten()].reshape(batch_size, seq_len, -1) + x = from_numpy(x.astype(np.float32)) + + # Transpose for CNN: [batch, embed_dim, seq_len] + x = from_numpy(x.to_numpy().transpose(0, 2, 1).astype(np.float32)) + + # CNN layers with instance norm + for conv, norm in self.cnn_layers: + x = conv(x) + x = norm(x) + x = leaky_relu(x) + + # Transpose back for LSTM: [batch, seq_len, channels] + x = from_numpy(x.to_numpy().transpose(0, 2, 1).astype(np.float32)) + + # BiLSTM + output, _ = self.lstm(x) + + return output + + +# ============================================================================= +# Kokoro AdaIN ResBlock +# ============================================================================= + + +class AdaINResBlock: + """Residual block with AdaIN for style conditioning.""" + + def __init__( + self, + conv1: WeightNormConv1d, + conv2: WeightNormConv1d, + norm1: AdaIN, + norm2: AdaIN, + conv1x1: WeightNormConv1d | None = None, # For channel mismatch + ): + self.conv1 = conv1 + self.conv2 = conv2 + self.norm1 = norm1 + self.norm2 = norm2 + self.conv1x1 = conv1x1 + + def __call__(self, x: GPUArray, style: GPUArray) -> GPUArray: + """Forward pass with style conditioning.""" + residual = x + + # First conv + AdaIN + out = self.norm1(x, style) + out = leaky_relu(out) + out = self.conv1(out) + + # Second conv + AdaIN + out = self.norm2(out, style) + out = leaky_relu(out) + out = self.conv2(out) + + # Residual connection (with 1x1 conv if needed) + if self.conv1x1 is not None: + residual = self.conv1x1(residual) + + out_np = out.to_numpy() + residual.to_numpy() + return from_numpy(out_np.astype(np.float32)) + + +# ============================================================================= +# Builder Functions +# ============================================================================= + + +def build_albert_from_weights( + weights: dict[str, GPUArray], + prefix: str = "bert", + num_hidden_layers: int = 12, + num_attention_heads: int = 12, + hidden_size: int = 768, +) -> ALBERTEncoder: + """Build ALBERT encoder from weight dictionary.""" + # Embeddings + word_embeddings = weights[f"{prefix}.module.embeddings.word_embeddings.weight"] + position_embeddings = weights[f"{prefix}.module.embeddings.position_embeddings.weight"] + token_type_embeddings = weights[f"{prefix}.module.embeddings.token_type_embeddings.weight"] + + embeddings_norm = LayerNorm( + weights[f"{prefix}.module.embeddings.LayerNorm.weight"], + weights.get(f"{prefix}.module.embeddings.LayerNorm.bias"), + ) + + embedding_mapping = Linear( + weights[f"{prefix}.module.encoder.embedding_hidden_mapping_in.weight"], + weights.get(f"{prefix}.module.encoder.embedding_hidden_mapping_in.bias"), + ) + + # Shared ALBERT layer + layer_prefix = f"{prefix}.module.encoder.albert_layer_groups.0.albert_layers.0" + + layer = ALBERTLayer( + query=Linear( + weights[f"{layer_prefix}.attention.query.weight"], + weights.get(f"{layer_prefix}.attention.query.bias"), + ), + key=Linear( + weights[f"{layer_prefix}.attention.key.weight"], + weights.get(f"{layer_prefix}.attention.key.bias"), + ), + value=Linear( + weights[f"{layer_prefix}.attention.value.weight"], + weights.get(f"{layer_prefix}.attention.value.bias"), + ), + attention_dense=Linear( + weights[f"{layer_prefix}.attention.dense.weight"], + weights.get(f"{layer_prefix}.attention.dense.bias"), + ), + attention_norm=LayerNorm( + weights[f"{layer_prefix}.attention.LayerNorm.weight"], + weights.get(f"{layer_prefix}.attention.LayerNorm.bias"), + ), + ffn=Linear( + weights[f"{layer_prefix}.ffn.weight"], + weights.get(f"{layer_prefix}.ffn.bias"), + ), + ffn_output=Linear( + weights[f"{layer_prefix}.ffn_output.weight"], + weights.get(f"{layer_prefix}.ffn_output.bias"), + ), + full_layer_norm=LayerNorm( + weights[f"{layer_prefix}.full_layer_layer_norm.weight"], + weights.get(f"{layer_prefix}.full_layer_layer_norm.bias"), + ), + num_attention_heads=num_attention_heads, + hidden_size=hidden_size, + ) + + return ALBERTEncoder( + word_embeddings=word_embeddings, + position_embeddings=position_embeddings, + token_type_embeddings=token_type_embeddings, + embeddings_norm=embeddings_norm, + embedding_mapping=embedding_mapping, + layer=layer, + num_hidden_layers=num_hidden_layers, + ) + + +def build_text_encoder_from_weights( + weights: dict[str, GPUArray], + prefix: str = "text_encoder", +) -> KokoroTextEncoder: + """Build Kokoro text encoder from weight dictionary.""" + # Embedding + embedding = weights[f"{prefix}.module.embedding.weight"] + + # CNN layers (3 layers) + cnn_layers = [] + for i in range(3): + conv = WeightNormConv1d( + weight_g=weights[f"{prefix}.module.cnn.{i}.0.weight_g"], + weight_v=weights[f"{prefix}.module.cnn.{i}.0.weight_v"], + bias=weights.get(f"{prefix}.module.cnn.{i}.0.bias"), + padding=2, # kernel_size=5, padding=2 for same output length + ) + norm = InstanceNorm1d( + gamma=weights[f"{prefix}.module.cnn.{i}.1.gamma"], + beta=weights[f"{prefix}.module.cnn.{i}.1.beta"], + ) + cnn_layers.append((conv, norm)) + + # BiLSTM + lstm = LSTM( + W_ih=weights[f"{prefix}.module.lstm.weight_ih_l0"], + W_hh=weights[f"{prefix}.module.lstm.weight_hh_l0"], + b_ih=weights[f"{prefix}.module.lstm.bias_ih_l0"], + b_hh=weights[f"{prefix}.module.lstm.bias_hh_l0"], + bidirectional=True, + W_ih_reverse=weights[f"{prefix}.module.lstm.weight_ih_l0_reverse"], + W_hh_reverse=weights[f"{prefix}.module.lstm.weight_hh_l0_reverse"], + b_ih_reverse=weights[f"{prefix}.module.lstm.bias_ih_l0_reverse"], + b_hh_reverse=weights[f"{prefix}.module.lstm.bias_hh_l0_reverse"], + ) + + return KokoroTextEncoder( + embedding=embedding, + cnn_layers=cnn_layers, + lstm=lstm, + ) + + __all__ = [ # Basic layers "Linear", @@ -840,6 +1389,10 @@ def build_plbert_from_weights( "Conv1d", "ConvTranspose1d", "ResBlock1d", + "WeightNormConv1d", + "InstanceNorm1d", + "AdaIN", + "AdaINResBlock", # Activations "leaky_relu", "tanh", @@ -850,6 +1403,12 @@ def build_plbert_from_weights( "StyleEncoder", "Decoder", "ISTFTNet", + "LSTM", + "ALBERTLayer", + "ALBERTEncoder", + "KokoroTextEncoder", # Utilities "build_plbert_from_weights", + "build_albert_from_weights", + "build_text_encoder_from_weights", ] diff --git a/src/pygpukit/tts/kokoro/model.py b/src/pygpukit/tts/kokoro/model.py index 4d88e8b..45a5763 100644 --- a/src/pygpukit/tts/kokoro/model.py +++ b/src/pygpukit/tts/kokoro/model.py @@ -30,7 +30,14 @@ from pygpukit.tts.kokoro.text import KokoroTokenizer, normalize_text, split_sentences if TYPE_CHECKING: - from pygpukit.tts.kokoro.layers import Decoder, ISTFTNet, PLBERTEncoder, StyleEncoder + from pygpukit.tts.kokoro.layers import ( + ALBERTEncoder, + Decoder, + ISTFTNet, + KokoroTextEncoder, + PLBERTEncoder, + StyleEncoder, + ) @dataclass @@ -100,9 +107,12 @@ def __init__( # Build model components lazily self._plbert: PLBERTEncoder | None = None + self._albert: ALBERTEncoder | None = None + self._text_encoder: KokoroTextEncoder | None = None self._style_encoder: StyleEncoder | None = None self._decoder: Decoder | None = None self._vocoder: ISTFTNet | None = None + self._bert_encoder_proj = None # bert_encoder linear projection (Linear layer) # Default voice self._current_voice: str | None = None @@ -208,50 +218,164 @@ def current_voice(self) -> str | None: def _build_components(self) -> None: """Build model components from weights (lazy initialization).""" - if self._plbert is not None: + if self._albert is not None: return # Already built - from pygpukit.tts.kokoro.layers import build_plbert_from_weights + from pygpukit.tts.kokoro.layers import ( + Linear, + build_albert_from_weights, + build_text_encoder_from_weights, + ) + + # Build ALBERT encoder (Kokoro uses ALBERT, not standard BERT) + try: + self._albert = build_albert_from_weights( + self.weights, + prefix="bert", + num_hidden_layers=self.config.plbert_num_hidden_layers, + num_attention_heads=self.config.plbert_num_attention_heads, + hidden_size=self.config.plbert_hidden_size, + ) + except KeyError as e: + # Log missing weights for debugging + import warnings + + warnings.warn(f"Failed to build ALBERT encoder: {e}", stacklevel=2) + self._albert = None + + # Build text encoder (CNN + BiLSTM) + try: + self._text_encoder = build_text_encoder_from_weights( + self.weights, + prefix="text_encoder", + ) + except KeyError as e: + import warnings + + warnings.warn(f"Failed to build text encoder: {e}", stacklevel=2) + self._text_encoder = None - # Build PLBERT encoder - # Note: Actual weight prefix may vary depending on checkpoint format - # This is a placeholder - actual implementation needs weight inspection + # Build bert_encoder projection layer try: - self._plbert = build_plbert_from_weights(self.config, self.weights, prefix="bert") - except (KeyError, ValueError): - # Weights might use different naming - self._plbert = None + proj_weight = self.weights.get("bert_encoder.weight") + proj_bias = self.weights.get("bert_encoder.bias") + if proj_weight is not None: + self._bert_encoder_proj = Linear(proj_weight, proj_bias) + except KeyError: + self._bert_encoder_proj = None - # TODO: Build other components (style encoder, decoder, vocoder) - # These require inspecting actual Kokoro weight structure + # Note: Decoder and vocoder require more complex weight mapping + # that depends on the specific predictor and decoder structure. + # These will be implemented as the weight structure is verified. def _forward_simple( self, tokens: list[int], voice_embedding: GPUArray | None = None, ) -> GPUArray: - """Simple forward pass without full model components. - - This is a placeholder implementation that demonstrates the API. - Full implementation requires matching Kokoro's exact weight structure. + """Forward pass through Kokoro TTS model. + + Pipeline: + 1. Convert tokens to input tensor + 2. Run through ALBERT encoder + 3. Project through bert_encoder + 4. Apply text encoder (CNN + BiLSTM) + 5. Apply style conditioning from voice embedding + 6. Generate audio via decoder + vocoder + + Note: Full decoder/vocoder implementation requires additional weight mapping. + Currently implements the text encoding pipeline with placeholder audio generation. """ - # For now, generate placeholder audio - # Actual implementation would: - # 1. Embed tokens - # 2. Run through PLBERT - # 3. Apply style - # 4. Decode to mel - # 5. Vocode to audio - - # Placeholder: generate silence with some noise - duration_per_token = 0.1 # 100ms per token - total_duration = len(tokens) * duration_per_token + # Build components if not already done + self._build_components() + + # Convert tokens to input array + input_ids = np.array([tokens], dtype=np.int32) # [1, seq_len] + input_ids_gpu = from_numpy(input_ids) + + # Run through ALBERT encoder if available + hidden_states = None + if self._albert is not None: + try: + hidden_states = self._albert(input_ids_gpu) # [1, seq_len, hidden_size] + + # Project through bert_encoder if available + if self._bert_encoder_proj is not None: + hidden_states = self._bert_encoder_proj(hidden_states) + except Exception as e: + import warnings + + warnings.warn(f"ALBERT forward failed: {e}, using text encoder fallback", stacklevel=2) + hidden_states = None + + # Run through text encoder if available + text_features = None + if self._text_encoder is not None: + try: + text_features = self._text_encoder(input_ids_gpu) # [1, seq_len, hidden_dim] + except Exception as e: + import warnings + + warnings.warn(f"Text encoder forward failed: {e}", stacklevel=2) + text_features = None + + # Combine ALBERT and text encoder outputs if both available + if hidden_states is not None and text_features is not None: + # Combine features (style conditioning would be applied here) + combined = hidden_states.to_numpy() + text_features.to_numpy() + combined = from_numpy(combined.astype(np.float32)) + elif hidden_states is not None: + combined = hidden_states + elif text_features is not None: + combined = text_features + else: + # Fallback: use token embeddings directly if no encoder is available + import warnings + + warnings.warn( + "No encoder available. TTS output will be placeholder audio. " + "Ensure model weights are correctly loaded.", + stacklevel=2, + ) + # Generate placeholder based on text length + duration_per_token = 0.08 # 80ms per token (typical TTS rate) + total_duration = len(tokens) * duration_per_token + num_samples = int(total_duration * self.config.sample_rate) + + # Generate silence instead of beep for placeholder + audio = np.zeros(num_samples, dtype=np.float32) + return from_numpy(audio) + + # Apply voice/style conditioning + # TODO: Implement proper style encoder when decoder weights are mapped + # For now, voice embedding is reserved for future use + _ = voice_embedding + + # Get sequence length and estimate audio duration + seq_len = len(tokens) + duration_per_token = 0.08 # 80ms per token (typical TTS rate) + total_duration = seq_len * duration_per_token num_samples = int(total_duration * self.config.sample_rate) - # Generate placeholder audio (sine wave for testing) - t = np.linspace(0, total_duration, num_samples, dtype=np.float32) - frequency = 440.0 # A4 note - audio = 0.1 * np.sin(2 * np.pi * frequency * t) + # TODO: Implement decoder and vocoder forward pass + # The decoder converts text features + style to mel spectrogram + # The vocoder (ISTFTNet) converts mel to waveform + # + # For now, generate placeholder audio proportional to text features + # This ensures the API works while decoder/vocoder are being implemented. + # + # Full implementation requires: + # 1. Duration predictor to get per-phoneme durations + # 2. Decoder with AdaIN style conditioning + # 3. ISTFTNet vocoder for waveform synthesis + + # Generate placeholder audio (silence) - NOT the 440Hz beep + # The actual audio generation requires decoder/vocoder implementation + audio = np.zeros(num_samples, dtype=np.float32) + + # Add a very quiet noise floor to indicate audio was "generated" + # This distinguishes from complete silence and helps with debugging + audio += np.random.randn(num_samples).astype(np.float32) * 0.001 return from_numpy(audio) diff --git a/tests/test_tts_layers.py b/tests/test_tts_layers.py new file mode 100644 index 0000000..4b5d491 --- /dev/null +++ b/tests/test_tts_layers.py @@ -0,0 +1,385 @@ +"""Unit tests for Kokoro TTS layer implementations. + +Tests the neural network layers used in Kokoro-82M TTS model. +Uses mock weights to verify layer behavior without requiring actual model files. +""" + +import numpy as np +import pytest + +import pygpukit as gk +from pygpukit.core.factory import from_numpy + +# Check if new TTS layers are available (they may not be in older installations) +try: + from pygpukit.tts.kokoro.layers import WeightNormConv1d # noqa: F401 + + HAS_TTS_LAYERS = True +except ImportError: + HAS_TTS_LAYERS = False + +pytestmark = pytest.mark.skipif(not HAS_TTS_LAYERS, reason="TTS layers not available") + + +@pytest.fixture +def skip_if_no_cuda(): + """Skip test if CUDA is not available.""" + if not gk.is_cuda_available(): + pytest.skip("CUDA not available") + + +class TestWeightNormConv1d: + """Tests for WeightNormConv1d layer.""" + + def test_weight_normalization(self, skip_if_no_cuda): + """Test that weight normalization computes W = g * (v / ||v||).""" + from pygpukit.tts.kokoro.layers import WeightNormConv1d + + out_channels, in_channels, kernel_size = 4, 2, 3 + + # Create mock weights + weight_g = from_numpy(np.ones((out_channels, 1, 1), dtype=np.float32) * 2.0) + weight_v = from_numpy(np.random.randn(out_channels, in_channels, kernel_size).astype(np.float32)) + + conv = WeightNormConv1d(weight_g=weight_g, weight_v=weight_v) + + # Compute normalized weight + weight = conv._compute_weight() + + # Verify: each output channel should have L2 norm equal to g + for i in range(out_channels): + channel_norm = np.sqrt((weight[i] ** 2).sum()) + np.testing.assert_allclose(channel_norm, 2.0, rtol=1e-5) + + def test_forward_shape(self, skip_if_no_cuda): + """Test that forward pass produces correct output shape.""" + from pygpukit.tts.kokoro.layers import WeightNormConv1d + + batch, in_channels, length = 2, 4, 16 + out_channels, kernel_size = 8, 3 + padding = 1 + + weight_g = from_numpy(np.ones((out_channels, 1, 1), dtype=np.float32)) + weight_v = from_numpy(np.random.randn(out_channels, in_channels, kernel_size).astype(np.float32)) + bias = from_numpy(np.zeros(out_channels, dtype=np.float32)) + + conv = WeightNormConv1d(weight_g=weight_g, weight_v=weight_v, bias=bias, padding=padding) + + x = from_numpy(np.random.randn(batch, in_channels, length).astype(np.float32)) + out = conv(x) + + # With padding=1 and kernel_size=3, output length should be same as input + assert out.shape == (batch, out_channels, length) + + +class TestInstanceNorm1d: + """Tests for InstanceNorm1d layer.""" + + def test_normalization(self, skip_if_no_cuda): + """Test that instance norm normalizes each channel to zero mean, unit variance.""" + from pygpukit.tts.kokoro.layers import InstanceNorm1d + + channels = 4 + gamma = from_numpy(np.ones(channels, dtype=np.float32)) + beta = from_numpy(np.zeros(channels, dtype=np.float32)) + + norm = InstanceNorm1d(gamma=gamma, beta=beta) + + # Create input with known statistics + batch, length = 2, 32 + x = from_numpy(np.random.randn(batch, channels, length).astype(np.float32) * 5 + 3) + + out = norm(x) + out_np = out.to_numpy() + + # Check each sample and channel has ~zero mean and ~unit variance + for b in range(batch): + for c in range(channels): + mean = out_np[b, c].mean() + var = out_np[b, c].var() + np.testing.assert_allclose(mean, 0.0, atol=1e-5) + np.testing.assert_allclose(var, 1.0, atol=1e-4) + + def test_affine_transform(self, skip_if_no_cuda): + """Test that gamma and beta are applied correctly.""" + from pygpukit.tts.kokoro.layers import InstanceNorm1d + + channels = 2 + gamma = from_numpy(np.array([2.0, 0.5], dtype=np.float32)) + beta = from_numpy(np.array([1.0, -1.0], dtype=np.float32)) + + norm = InstanceNorm1d(gamma=gamma, beta=beta) + + x = from_numpy(np.random.randn(1, channels, 100).astype(np.float32)) + out = norm(x) + out_np = out.to_numpy() + + # After normalization and affine: mean should be beta, std should be gamma + np.testing.assert_allclose(out_np[0, 0].mean(), 1.0, atol=0.1) + np.testing.assert_allclose(out_np[0, 1].mean(), -1.0, atol=0.1) + np.testing.assert_allclose(out_np[0, 0].std(), 2.0, atol=0.1) + np.testing.assert_allclose(out_np[0, 1].std(), 0.5, atol=0.1) + + +class TestAdaIN: + """Tests for Adaptive Instance Normalization layer.""" + + def test_style_conditioning(self, skip_if_no_cuda): + """Test that style vector modulates scale and shift.""" + from pygpukit.tts.kokoro.layers import AdaIN + + channels, style_dim = 4, 8 + + # FC layer: [2*channels, style_dim] + fc_weight = from_numpy(np.random.randn(2 * channels, style_dim).astype(np.float32) * 0.1) + fc_bias = from_numpy(np.zeros(2 * channels, dtype=np.float32)) + + adain = AdaIN(fc_weight=fc_weight, fc_bias=fc_bias) + + batch, length = 2, 16 + x = from_numpy(np.random.randn(batch, channels, length).astype(np.float32)) + style = from_numpy(np.random.randn(batch, style_dim).astype(np.float32)) + + out = adain(x, style) + + assert out.shape == (batch, channels, length) + + def test_different_styles_produce_different_outputs(self, skip_if_no_cuda): + """Test that different style vectors produce different outputs.""" + from pygpukit.tts.kokoro.layers import AdaIN + + channels, style_dim = 4, 8 + + fc_weight = from_numpy(np.random.randn(2 * channels, style_dim).astype(np.float32)) + fc_bias = from_numpy(np.zeros(2 * channels, dtype=np.float32)) + + adain = AdaIN(fc_weight=fc_weight, fc_bias=fc_bias) + + x = from_numpy(np.random.randn(1, channels, 16).astype(np.float32)) + style1 = from_numpy(np.random.randn(1, style_dim).astype(np.float32)) + style2 = from_numpy(np.random.randn(1, style_dim).astype(np.float32)) + + out1 = adain(x, style1).to_numpy() + out2 = adain(x, style2).to_numpy() + + # Outputs should be different + assert not np.allclose(out1, out2) + + +class TestALBERTLayer: + """Tests for ALBERTLayer.""" + + def test_forward_shape(self, skip_if_no_cuda): + """Test that ALBERT layer preserves sequence dimensions.""" + from pygpukit.tts.kokoro.layers import ALBERTLayer, LayerNorm, Linear + + batch, seq_len, hidden_size = 2, 16, 64 + num_heads = 4 + intermediate_size = 128 + + # Create mock weights + def make_linear(in_f, out_f): + w = from_numpy(np.random.randn(out_f, in_f).astype(np.float32) * 0.02) + b = from_numpy(np.zeros(out_f, dtype=np.float32)) + return Linear(w, b) + + def make_norm(size): + w = from_numpy(np.ones(size, dtype=np.float32)) + b = from_numpy(np.zeros(size, dtype=np.float32)) + return LayerNorm(w, b) + + layer = ALBERTLayer( + query=make_linear(hidden_size, hidden_size), + key=make_linear(hidden_size, hidden_size), + value=make_linear(hidden_size, hidden_size), + attention_dense=make_linear(hidden_size, hidden_size), + attention_norm=make_norm(hidden_size), + ffn=make_linear(hidden_size, intermediate_size), + ffn_output=make_linear(intermediate_size, hidden_size), + full_layer_norm=make_norm(hidden_size), + num_attention_heads=num_heads, + hidden_size=hidden_size, + ) + + x = from_numpy(np.random.randn(batch, seq_len, hidden_size).astype(np.float32)) + out = layer(x) + + assert out.shape == (batch, seq_len, hidden_size) + + +class TestALBERTEncoder: + """Tests for ALBERTEncoder.""" + + def test_forward_shape(self, skip_if_no_cuda): + """Test that ALBERT encoder produces correct output shape.""" + from pygpukit.tts.kokoro.layers import ALBERTEncoder, ALBERTLayer, LayerNorm, Linear + + vocab_size, embed_dim, hidden_size = 100, 32, 64 + max_positions, num_heads = 128, 4 + num_layers = 2 + intermediate_size = 128 + + def make_linear(in_f, out_f): + w = from_numpy(np.random.randn(out_f, in_f).astype(np.float32) * 0.02) + b = from_numpy(np.zeros(out_f, dtype=np.float32)) + return Linear(w, b) + + def make_norm(size): + w = from_numpy(np.ones(size, dtype=np.float32)) + b = from_numpy(np.zeros(size, dtype=np.float32)) + return LayerNorm(w, b) + + # Embeddings + word_emb = from_numpy(np.random.randn(vocab_size, embed_dim).astype(np.float32) * 0.02) + pos_emb = from_numpy(np.random.randn(max_positions, embed_dim).astype(np.float32) * 0.02) + type_emb = from_numpy(np.random.randn(2, embed_dim).astype(np.float32) * 0.02) + + # Shared layer + layer = ALBERTLayer( + query=make_linear(hidden_size, hidden_size), + key=make_linear(hidden_size, hidden_size), + value=make_linear(hidden_size, hidden_size), + attention_dense=make_linear(hidden_size, hidden_size), + attention_norm=make_norm(hidden_size), + ffn=make_linear(hidden_size, intermediate_size), + ffn_output=make_linear(intermediate_size, hidden_size), + full_layer_norm=make_norm(hidden_size), + num_attention_heads=num_heads, + hidden_size=hidden_size, + ) + + encoder = ALBERTEncoder( + word_embeddings=word_emb, + position_embeddings=pos_emb, + token_type_embeddings=type_emb, + embeddings_norm=make_norm(embed_dim), + embedding_mapping=make_linear(embed_dim, hidden_size), + layer=layer, + num_hidden_layers=num_layers, + ) + + batch, seq_len = 2, 16 + input_ids = from_numpy(np.random.randint(0, vocab_size, (batch, seq_len)).astype(np.int32)) + + out = encoder(input_ids) + + assert out.shape == (batch, seq_len, hidden_size) + + +class TestKokoroTextEncoder: + """Tests for KokoroTextEncoder (CNN + BiLSTM).""" + + def test_forward_shape(self, skip_if_no_cuda): + """Test that text encoder produces correct output shape.""" + from pygpukit.tts.kokoro.layers import ( + LSTM, + InstanceNorm1d, + KokoroTextEncoder, + WeightNormConv1d, + ) + + vocab_size, embed_dim = 100, 32 + cnn_channels = 64 + lstm_hidden = 128 + + # Embedding + embedding = from_numpy(np.random.randn(vocab_size, embed_dim).astype(np.float32) * 0.02) + + # CNN layers + cnn_layers = [] + in_ch = embed_dim + for _ in range(3): + conv = WeightNormConv1d( + weight_g=from_numpy(np.ones((cnn_channels, 1, 1), dtype=np.float32)), + weight_v=from_numpy(np.random.randn(cnn_channels, in_ch, 5).astype(np.float32) * 0.02), + padding=2, + ) + norm = InstanceNorm1d( + gamma=from_numpy(np.ones(cnn_channels, dtype=np.float32)), + beta=from_numpy(np.zeros(cnn_channels, dtype=np.float32)), + ) + cnn_layers.append((conv, norm)) + in_ch = cnn_channels + + # BiLSTM + lstm = LSTM( + W_ih=from_numpy(np.random.randn(4 * lstm_hidden, cnn_channels).astype(np.float32) * 0.02), + W_hh=from_numpy(np.random.randn(4 * lstm_hidden, lstm_hidden).astype(np.float32) * 0.02), + b_ih=from_numpy(np.zeros(4 * lstm_hidden, dtype=np.float32)), + b_hh=from_numpy(np.zeros(4 * lstm_hidden, dtype=np.float32)), + bidirectional=True, + W_ih_reverse=from_numpy(np.random.randn(4 * lstm_hidden, cnn_channels).astype(np.float32) * 0.02), + W_hh_reverse=from_numpy(np.random.randn(4 * lstm_hidden, lstm_hidden).astype(np.float32) * 0.02), + b_ih_reverse=from_numpy(np.zeros(4 * lstm_hidden, dtype=np.float32)), + b_hh_reverse=from_numpy(np.zeros(4 * lstm_hidden, dtype=np.float32)), + ) + + encoder = KokoroTextEncoder(embedding=embedding, cnn_layers=cnn_layers, lstm=lstm) + + batch, seq_len = 2, 16 + input_ids = from_numpy(np.random.randint(0, vocab_size, (batch, seq_len)).astype(np.int32)) + + out = encoder(input_ids) + + # BiLSTM output: [batch, seq_len, 2 * lstm_hidden] + assert out.shape == (batch, seq_len, 2 * lstm_hidden) + + +class TestAdaINResBlock: + """Tests for AdaINResBlock.""" + + def test_residual_connection(self, skip_if_no_cuda): + """Test that residual connection is applied.""" + from pygpukit.tts.kokoro.layers import AdaIN, AdaINResBlock, WeightNormConv1d + + channels, style_dim = 32, 16 + + def make_conv(in_ch, out_ch): + return WeightNormConv1d( + weight_g=from_numpy(np.ones((out_ch, 1, 1), dtype=np.float32)), + weight_v=from_numpy(np.random.randn(out_ch, in_ch, 3).astype(np.float32) * 0.02), + padding=1, + ) + + def make_adain(ch, style_d): + return AdaIN( + fc_weight=from_numpy(np.random.randn(2 * ch, style_d).astype(np.float32) * 0.1), + fc_bias=from_numpy(np.zeros(2 * ch, dtype=np.float32)), + ) + + block = AdaINResBlock( + conv1=make_conv(channels, channels), + conv2=make_conv(channels, channels), + norm1=make_adain(channels, style_dim), + norm2=make_adain(channels, style_dim), + ) + + batch, length = 2, 16 + x = from_numpy(np.random.randn(batch, channels, length).astype(np.float32)) + style = from_numpy(np.random.randn(batch, style_dim).astype(np.float32)) + + out = block(x, style) + + assert out.shape == (batch, channels, length) + + +class TestBuildFunctions: + """Tests for weight builder functions.""" + + def test_build_albert_missing_weights_raises(self, skip_if_no_cuda): + """Test that missing weights raise KeyError.""" + from pygpukit.tts.kokoro.layers import build_albert_from_weights + + weights = {} # Empty weights + + with pytest.raises(KeyError): + build_albert_from_weights(weights) + + def test_build_text_encoder_missing_weights_raises(self, skip_if_no_cuda): + """Test that missing weights raise KeyError.""" + from pygpukit.tts.kokoro.layers import build_text_encoder_from_weights + + weights = {} # Empty weights + + with pytest.raises(KeyError): + build_text_encoder_from_weights(weights)