diff --git a/libf0/pyin.py b/libf0/pyin.py index 8045599..a072831 100644 --- a/libf0/pyin.py +++ b/libf0/pyin.py @@ -11,9 +11,12 @@ from numba import njit +VITERBI_IMPLS = ("legacy", "fast") + + # pYIN estimate computation def pyin(x, Fs=22050, N=2048, H=256, F_min=55.0, F_max=1760.0, R=10, thresholds=np.arange(0.01, 1, 0.01), - beta_params=[1, 18], absolute_min_prob=0.01, voicing_prob=0.5): + beta_params=[1, 18], absolute_min_prob=0.01, voicing_prob=0.5, viterbi_impl="legacy"): """ Implementation of the pYIN F0-estimation algorithm. @@ -45,6 +48,8 @@ def pyin(x, Fs=22050, N=2048, H=256, F_min=55.0, F_max=1760.0, R=10, thresholds= Prior for voice activity voicing_prob: float Prior for transition probability? + viterbi_impl : str + Viterbi backend. One of ``"legacy"`` or ``"fast"``. Returns ------- f0 : ndarray @@ -58,6 +63,9 @@ def pyin(x, Fs=22050, N=2048, H=256, F_min=55.0, F_max=1760.0, R=10, thresholds= if F_min > F_max: raise Exception("F_min must be smaller than F_max!") + if viterbi_impl not in VITERBI_IMPLS: + raise ValueError(f"Unknown viterbi_impl '{viterbi_impl}'. Expected one of {VITERBI_IMPLS}.") + if F_min < Fs/N: raise Exception(f"The condition (F_min >= Fs/N) was not met. With Fs = {Fs}, N = {N} and F_min = {F_min} you have the following options: \n1) Set F_min >= {np.ceil(Fs/N)} Hz. \n2) Set N >= {np.ceil(Fs/F_min).astype(int)}. \n3) Set Fs <= {np.floor(F_min * N)} Hz.") @@ -80,11 +88,14 @@ def pyin(x, Fs=22050, N=2048, H=256, F_min=55.0, F_max=1760.0, R=10, thresholds= max_step_cents = 50 # Pitch jump can be at most 50 cents from frame to frame max_step = int(max_step_cents / R) triang_distr = triang.pdf(np.arange(-max_step, max_step+1), 0.5, scale=2*max_step, loc=-max_step) - A = compute_transition_matrix(B, triang_distr) - # HMM smoothing C = np.ones((2*B, 1)) / (2*B) # uniform initialization - f0_idxs = viterbi_log_likelihood(A, C.flatten(), O) # libfmp Viterbi implementation + if viterbi_impl == "legacy": + A = compute_transition_matrix(B, triang_distr) + f0_idxs = viterbi_log_likelihood(A, C.flatten(), O) + else: + source_idx, log_trans, counts = compute_transition_structure_pyin_block(B, triang_distr) + f0_idxs = viterbi_log_likelihood_fast(source_idx, log_trans, counts, C.flatten(), O) # Obtain F0-trajectory F_axis_extended = np.concatenate((F_axis, np.zeros(len(F_axis)))) @@ -471,6 +482,212 @@ def viterbi_log_likelihood(A, C, B_O): return S_opt +@njit +def compute_transition_structure_from_matrix(A): + """Build a sparse predecessor structure from a dense transition matrix.""" + I = A.shape[0] + tiny = np.finfo(0.).tiny + + max_width = 0 + for target in range(I): + count = 0 + for source in range(I): + if A[source, target] > 0: + count += 1 + if count > max_width: + max_width = count + + source_idx = np.zeros((I, max_width), dtype=np.int32) + log_trans = np.full((I, max_width), -np.inf) + counts = np.zeros(I, dtype=np.int32) + + for target in range(I): + count = 0 + for source in range(I): + value = A[source, target] + if value > 0: + source_idx[target, count] = source + log_trans[target, count] = np.log(value + tiny) + count += 1 + counts[target] = count + + return source_idx, log_trans, counts + + +@njit +def compute_transition_structure_pyin_block(M, triang_distr, prob_self=0.99): + """Build sparse predecessor structure from pYIN block transitions directly. + + The pYIN transition used here has two voicing blocks: + - same-voicing local triangular transitions (scaled by `prob_self`) + - cross-voicing same-pitch transitions (scaled by `1 - prob_self`) + """ + max_step = len(triang_distr) // 2 + states = 2 * M + tiny = np.finfo(0.).tiny + + same_counts = np.zeros(M, dtype=np.int32) + + for source in range(M): + if source < max_step: + start = 0 + end = source + max_step + weights = triang_distr[max_step - source:-1] + denom = np.sum(weights) + if denom <= 0: + continue + for k, target in enumerate(range(start, end)): + prob = prob_self * weights[k] / denom + if prob > 0: + same_counts[target] += 1 + elif source < M - max_step: + start = source - max_step + end = source + max_step + 1 + for k, target in enumerate(range(start, end)): + prob = prob_self * triang_distr[k] + if prob > 0: + same_counts[target] += 1 + else: + start = source - max_step + end = M + weights = triang_distr[0:max_step - (source - M)] + denom = np.sum(weights) + if denom <= 0: + continue + for target in range(start, end): + k = target - start + prob = prob_self * weights[k] / denom + if prob > 0: + same_counts[target] += 1 + + max_width = int(np.max(same_counts)) + 1 # +1 for cross-voicing same-pitch edge + source_idx = np.zeros((states, max_width), dtype=np.int32) + log_trans = np.full((states, max_width), -np.inf) + counts = np.zeros(states, dtype=np.int32) + log_cross = np.log((1.0 - prob_self) + tiny) + + # Match dense-structure source ordering for unvoiced targets: + # source indices are visited in ascending order, so cross-voicing edges + # (from voiced sources 0..M-1) appear before same-voicing edges (M..2M-1). + for pitch in range(M): + pitch_u = pitch + M + idx_u = counts[pitch_u] + source_idx[pitch_u, idx_u] = pitch + log_trans[pitch_u, idx_u] = log_cross + counts[pitch_u] += 1 + + for source in range(M): + if source < max_step: + start = 0 + end = source + max_step + weights = triang_distr[max_step - source:-1] + denom = np.sum(weights) + for k, target in enumerate(range(start, end)): + prob = prob_self * weights[k] / denom + if prob <= 0: + continue + log_prob = np.log(prob + tiny) + + idx_v = counts[target] + source_idx[target, idx_v] = source + log_trans[target, idx_v] = log_prob + counts[target] += 1 + + target_u = target + M + source_u = source + M + idx_u = counts[target_u] + source_idx[target_u, idx_u] = source_u + log_trans[target_u, idx_u] = log_prob + counts[target_u] += 1 + elif source < M - max_step: + start = source - max_step + end = source + max_step + 1 + for k, target in enumerate(range(start, end)): + prob = prob_self * triang_distr[k] + if prob <= 0: + continue + log_prob = np.log(prob + tiny) + + idx_v = counts[target] + source_idx[target, idx_v] = source + log_trans[target, idx_v] = log_prob + counts[target] += 1 + + target_u = target + M + source_u = source + M + idx_u = counts[target_u] + source_idx[target_u, idx_u] = source_u + log_trans[target_u, idx_u] = log_prob + counts[target_u] += 1 + else: + start = source - max_step + end = M + weights = triang_distr[0:max_step - (source - M)] + denom = np.sum(weights) + for k, target in enumerate(range(start, end)): + prob = prob_self * weights[k] / denom + if prob <= 0: + continue + log_prob = np.log(prob + tiny) + + idx_v = counts[target] + source_idx[target, idx_v] = source + log_trans[target, idx_v] = log_prob + counts[target] += 1 + + target_u = target + M + source_u = source + M + idx_u = counts[target_u] + source_idx[target_u, idx_u] = source_u + log_trans[target_u, idx_u] = log_prob + counts[target_u] += 1 + + # Match dense-structure ordering for voiced targets: + # cross-voicing edges come from sources M..2M-1, after same-voicing sources. + for pitch in range(M): + idx_v = counts[pitch] + source_idx[pitch, idx_v] = pitch + M + log_trans[pitch, idx_v] = log_cross + counts[pitch] += 1 + + return source_idx, log_trans, counts + + +@njit +def viterbi_log_likelihood_fast(source_idx, log_trans, counts, C, B_O): + """Sparse structured log-likelihood Viterbi for pYIN transition graphs.""" + I = source_idx.shape[0] + N = B_O.shape[1] + tiny = np.finfo(0.).tiny + C_log = np.log(C + tiny) + B_O_log = np.log(B_O + tiny) + + D_log = np.zeros((I, N)) + E = np.zeros((I, N - 1), dtype=np.int32) + D_log[:, 0] = C_log + B_O_log[:, 0] + + for n in range(1, N): + for target in range(I): + best_source = 0 + best_val = -np.inf + count = counts[target] + for k in range(count): + source = source_idx[target, k] + value = log_trans[target, k] + D_log[source, n - 1] + if value > best_val: + best_val = value + best_source = source + D_log[target, n] = best_val + B_O_log[target, n] + E[target, n - 1] = best_source + + S_opt = np.zeros(N, dtype=np.int32) + S_opt[-1] = np.argmax(D_log[:, -1]) + for n in range(N - 2, -1, -1): + S_opt[n] = E[int(S_opt[n + 1]), n] + + return S_opt + + @njit def delete_numba(arr, num): """Delete number from array, Numba compatible. Inspired by: diff --git a/scripts/benchmark_pyin_viterbi.py b/scripts/benchmark_pyin_viterbi.py new file mode 100644 index 0000000..0de0d5a --- /dev/null +++ b/scripts/benchmark_pyin_viterbi.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python +import argparse +import json +import time +from pathlib import Path + +import numpy as np +from scipy.io import wavfile +from scipy.special import beta, comb +from scipy.stats import triang + +import libf0 + + +DEFAULT_AUDIO = Path("data/DCS_LI_QuartetB_Take03_S1_LRX_excerpt.wav") + + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--audio", type=Path, default=DEFAULT_AUDIO) + parser.add_argument("--repeat-audio", type=int, default=1) + parser.add_argument("--warmup", type=int, default=1) + parser.add_argument("--iters", type=int, default=3) + parser.add_argument("--json", action="store_true") + return parser.parse_args() + + +def load_audio(path: Path, repeat_audio: int) -> tuple[np.ndarray, int]: + sample_rate, audio = wavfile.read(path) + if audio.ndim > 1: + audio = audio.mean(axis=1) + if np.issubdtype(audio.dtype, np.integer): + audio = audio.astype(np.float32) / max(np.iinfo(audio.dtype).max, 1) + else: + audio = audio.astype(np.float32, copy=False) + if repeat_audio > 1: + audio = np.tile(audio, repeat_audio) + return audio, int(sample_rate) + + +def time_call(fn, *, warmup: int, iters: int) -> float: + for _ in range(warmup): + fn() + durations_ms = [] + for _ in range(iters): + start = time.perf_counter() + fn() + durations_ms.append((time.perf_counter() - start) * 1000.0) + durations_ms.sort() + return durations_ms[len(durations_ms) // 2] + + +def prepare_pyin_state(x, Fs, N=2048, H=256, F_min=55.0, F_max=1760.0, R=10, + thresholds=np.arange(0.01, 1, 0.01), beta_params=(1, 18), + absolute_min_prob=0.01, voicing_prob=0.5): + x_pad = np.concatenate((np.zeros(N // 2), x, np.zeros(N // 2))) + thr_idxs = np.arange(len(thresholds)) + beta_distr = comb(len(thresholds), thr_idxs) * beta( + thr_idxs + beta_params[0], + len(thresholds) - thr_idxs + beta_params[1], + ) / beta(beta_params[0], beta_params[1]) + + B = int(np.log2(F_max / F_min) * (1200 / R)) + F_axis = F_min * np.power(2, np.arange(B) * R / 1200) + O, rms, p_orig, val_orig = libf0.yin_multi_thr( + x_pad, + Fs=Fs, + N=N, + H=H, + F_min=F_min, + F_max=F_max, + thresholds=thresholds, + beta_distr=beta_distr, + absolute_min_prob=absolute_min_prob, + F_axis=F_axis, + voicing_prob=voicing_prob, + ) + max_step_cents = 50 + max_step = int(max_step_cents / R) + triang_distr = triang.pdf( + np.arange(-max_step, max_step + 1), + 0.5, + scale=2 * max_step, + loc=-max_step, + ) + A = libf0.compute_transition_matrix(B, triang_distr) + C = np.ones((2 * B, 1)) / (2 * B) + source_idx, log_trans, counts = libf0.compute_transition_structure_pyin_block(B, triang_distr) + return { + "O": O, + "A": A, + "C": C, + "source_idx": source_idx, + "log_trans": log_trans, + "counts": counts, + "frames": int(O.shape[1]), + } + + +def benchmark(args): + audio, sample_rate = load_audio(args.audio, args.repeat_audio) + state = prepare_pyin_state(audio, sample_rate) + + legacy_path = libf0.viterbi_log_likelihood(state["A"], state["C"].flatten(), state["O"]) + fast_path = libf0.viterbi_log_likelihood_fast( + state["source_idx"], + state["log_trans"], + state["counts"], + state["C"].flatten(), + state["O"], + ) + + f0_legacy, t_legacy, conf_legacy = libf0.pyin(audio, Fs=sample_rate, viterbi_impl="legacy") + f0_fast, t_fast, conf_fast = libf0.pyin(audio, Fs=sample_rate, viterbi_impl="fast") + + return { + "audio": str(args.audio), + "repeat_audio": args.repeat_audio, + "frames": state["frames"], + "decoder_path_parity_ok": bool(np.array_equal(legacy_path, fast_path)), + "pyin_f0_parity_ok": bool(np.array_equal(f0_legacy, f0_fast)), + "pyin_time_parity_ok": bool(np.array_equal(t_legacy, t_fast)), + "pyin_conf_parity_ok": bool(np.array_equal(conf_legacy, conf_fast)), + "viterbi_log_likelihood_legacy_ms": time_call( + lambda: libf0.viterbi_log_likelihood(state["A"], state["C"].flatten(), state["O"]), + warmup=args.warmup, + iters=args.iters, + ), + "viterbi_log_likelihood_fast_ms": time_call( + lambda: libf0.viterbi_log_likelihood_fast( + state["source_idx"], + state["log_trans"], + state["counts"], + state["C"].flatten(), + state["O"], + ), + warmup=args.warmup, + iters=args.iters, + ), + "pyin_legacy_ms": time_call( + lambda: libf0.pyin(audio, Fs=sample_rate, viterbi_impl="legacy"), + warmup=args.warmup, + iters=args.iters, + ), + "pyin_fast_ms": time_call( + lambda: libf0.pyin(audio, Fs=sample_rate, viterbi_impl="fast"), + warmup=args.warmup, + iters=args.iters, + ), + } + + +def render_markdown(result): + core_speedup = result["viterbi_log_likelihood_legacy_ms"] / result["viterbi_log_likelihood_fast_ms"] + pyin_speedup = result["pyin_legacy_ms"] / result["pyin_fast_ms"] + lines = [ + "## libf0 pYIN Viterbi Benchmark", + "", + f"**Audio:** `{result['audio']}`", + f"**Repeated:** `{result['repeat_audio']}`", + f"**Frames:** `{result['frames']}`", + "", + "---", + "", + "## Decoder Core", + "", + "| Impl | Time |", + "|:--|--:|", + f"| `legacy` | **{result['viterbi_log_likelihood_legacy_ms']:.3f} ms** |", + f"| `fast` | **{result['viterbi_log_likelihood_fast_ms']:.3f} ms** |", + "", + f"> ✅ **Core speedup:** `{core_speedup:.2f}x`", + "", + "---", + "", + "## Full pYIN", + "", + "| Impl | Time |", + "|:--|--:|", + f"| `legacy` | **{result['pyin_legacy_ms']:.3f} ms** |", + f"| `fast` | **{result['pyin_fast_ms']:.3f} ms** |", + "", + f"> ✅ **End-to-end speedup:** `{pyin_speedup:.2f}x`", + "", + "## Parity", + "", + f"- decoder path parity: `{'ok' if result['decoder_path_parity_ok'] else 'mismatch'}`", + f"- f0 parity: `{'ok' if result['pyin_f0_parity_ok'] else 'mismatch'}`", + f"- time-axis parity: `{'ok' if result['pyin_time_parity_ok'] else 'mismatch'}`", + f"- confidence parity: `{'ok' if result['pyin_conf_parity_ok'] else 'mismatch'}`", + ] + return "\n".join(lines) + + +def main(): + args = parse_args() + result = benchmark(args) + if args.json: + print(json.dumps(result, indent=2)) + return + print(render_markdown(result)) + + +if __name__ == "__main__": + main() diff --git a/tests/test_algorithms.py b/tests/test_algorithms.py index c2ff89a..1264f36 100644 --- a/tests/test_algorithms.py +++ b/tests/test_algorithms.py @@ -8,6 +8,8 @@ import numpy as np from scipy.signal import chirp from scipy.interpolate import interp1d +from scipy.special import beta, comb +from scipy.stats import triang import libf0 @@ -71,6 +73,92 @@ def test_pyin(): assert np.allclose(libf0.hz_to_cents(f0_pyin_x2[1:]), libf0.hz_to_cents(chirp_func(t_pyin_x2[1:])), rtol=0, atol=20) +def test_pyin_viterbi_impls_match(): + R = 10 + thrs = np.arange(0.01, 1, 0.01) + betas = [1, 18] + abs_min_prob = 0.01 + voicing_prob = 0.5 + + f0_legacy, t_legacy, conf_legacy = libf0.pyin( + x1, Fs=Fs, N=N, H=H, F_min=F_min, F_max=F_max, R=R, + thresholds=thrs, beta_params=betas, + absolute_min_prob=abs_min_prob, voicing_prob=voicing_prob, + viterbi_impl="legacy") + f0_fast, t_fast, conf_fast = libf0.pyin( + x1, Fs=Fs, N=N, H=H, F_min=F_min, F_max=F_max, R=R, + thresholds=thrs, beta_params=betas, + absolute_min_prob=abs_min_prob, voicing_prob=voicing_prob, + viterbi_impl="fast") + + assert np.array_equal(f0_legacy, f0_fast) + assert np.array_equal(t_legacy, t_fast) + assert np.array_equal(conf_legacy, conf_fast) + + +def test_pyin_viterbi_core_impls_match(): + R = 10 + thrs = np.arange(0.01, 1, 0.01) + beta_params = [1, 18] + absolute_min_prob = 0.01 + voicing_prob = 0.5 + x_pad = np.concatenate((np.zeros(N // 2), x2, np.zeros(N // 2))) + + thr_idxs = np.arange(len(thrs)) + beta_distr = comb(len(thrs), thr_idxs) * beta( + thr_idxs + beta_params[0], + len(thrs) - thr_idxs + beta_params[1], + ) / beta(beta_params[0], beta_params[1]) + + B = int(np.log2(F_max / F_min) * (1200 / R)) + F_axis = F_min * np.power(2, np.arange(B) * R / 1200) + O, _, _, _ = libf0.yin_multi_thr( + x_pad, + Fs=Fs, + N=N, + H=H, + F_min=F_min, + F_max=F_max, + thresholds=thrs, + beta_distr=beta_distr, + absolute_min_prob=absolute_min_prob, + F_axis=F_axis, + voicing_prob=voicing_prob, + ) + + max_step_cents = 50 + max_step = int(max_step_cents / R) + triang_distr = triang.pdf( + np.arange(-max_step, max_step + 1), + 0.5, + scale=2 * max_step, + loc=-max_step, + ) + A = libf0.compute_transition_matrix(B, triang_distr) + C = np.ones((2 * B, 1)) / (2 * B) + + legacy = libf0.viterbi_log_likelihood(A, C.flatten(), O) + source_idx_matrix, log_trans_matrix, counts_matrix = libf0.compute_transition_structure_from_matrix(A) + source_idx_block, log_trans_block, counts_block = libf0.compute_transition_structure_pyin_block(B, triang_distr) + + assert np.array_equal(source_idx_matrix, source_idx_block) + assert np.array_equal(log_trans_matrix, log_trans_block) + assert np.array_equal(counts_matrix, counts_block) + + fast = libf0.viterbi_log_likelihood_fast(source_idx_block, log_trans_block, counts_block, C.flatten(), O) + + assert np.array_equal(legacy, fast) + + +def test_pyin_viterbi_impl_fail(): + try: + libf0.pyin(x1, Fs=Fs, N=N, H=H, F_min=F_min, F_max=F_max, + viterbi_impl="bogus") + except ValueError: + return + raise AssertionError("Expected ValueError for invalid viterbi_impl") + + def test_salience(): R = 10 n_harm = 10