Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 221 additions & 4 deletions libf0/pyin.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
from numba import njit


VITERBI_IMPLS = ("legacy", "fast")


# pYIN estimate computation
def pyin(x, Fs=22050, N=2048, H=256, F_min=55.0, F_max=1760.0, R=10, thresholds=np.arange(0.01, 1, 0.01),
beta_params=[1, 18], absolute_min_prob=0.01, voicing_prob=0.5):
beta_params=[1, 18], absolute_min_prob=0.01, voicing_prob=0.5, viterbi_impl="legacy"):
"""
Implementation of the pYIN F0-estimation algorithm.

Expand Down Expand Up @@ -45,6 +48,8 @@ def pyin(x, Fs=22050, N=2048, H=256, F_min=55.0, F_max=1760.0, R=10, thresholds=
Prior for voice activity
voicing_prob: float
Prior for transition probability?
viterbi_impl : str
Viterbi backend. One of ``"legacy"`` or ``"fast"``.
Returns
-------
f0 : ndarray
Expand All @@ -58,6 +63,9 @@ def pyin(x, Fs=22050, N=2048, H=256, F_min=55.0, F_max=1760.0, R=10, thresholds=
if F_min > F_max:
raise Exception("F_min must be smaller than F_max!")

if viterbi_impl not in VITERBI_IMPLS:
raise ValueError(f"Unknown viterbi_impl '{viterbi_impl}'. Expected one of {VITERBI_IMPLS}.")

if F_min < Fs/N:
raise Exception(f"The condition (F_min >= Fs/N) was not met. With Fs = {Fs}, N = {N} and F_min = {F_min} you have the following options: \n1) Set F_min >= {np.ceil(Fs/N)} Hz. \n2) Set N >= {np.ceil(Fs/F_min).astype(int)}. \n3) Set Fs <= {np.floor(F_min * N)} Hz.")

Expand All @@ -80,11 +88,14 @@ def pyin(x, Fs=22050, N=2048, H=256, F_min=55.0, F_max=1760.0, R=10, thresholds=
max_step_cents = 50 # Pitch jump can be at most 50 cents from frame to frame
max_step = int(max_step_cents / R)
triang_distr = triang.pdf(np.arange(-max_step, max_step+1), 0.5, scale=2*max_step, loc=-max_step)
A = compute_transition_matrix(B, triang_distr)

# HMM smoothing
C = np.ones((2*B, 1)) / (2*B) # uniform initialization
f0_idxs = viterbi_log_likelihood(A, C.flatten(), O) # libfmp Viterbi implementation
if viterbi_impl == "legacy":
A = compute_transition_matrix(B, triang_distr)
f0_idxs = viterbi_log_likelihood(A, C.flatten(), O)
else:
source_idx, log_trans, counts = compute_transition_structure_pyin_block(B, triang_distr)
f0_idxs = viterbi_log_likelihood_fast(source_idx, log_trans, counts, C.flatten(), O)

# Obtain F0-trajectory
F_axis_extended = np.concatenate((F_axis, np.zeros(len(F_axis))))
Expand Down Expand Up @@ -471,6 +482,212 @@ def viterbi_log_likelihood(A, C, B_O):
return S_opt


@njit
def compute_transition_structure_from_matrix(A):
"""Build a sparse predecessor structure from a dense transition matrix."""
I = A.shape[0]
tiny = np.finfo(0.).tiny

max_width = 0
for target in range(I):
count = 0
for source in range(I):
if A[source, target] > 0:
count += 1
if count > max_width:
max_width = count

source_idx = np.zeros((I, max_width), dtype=np.int32)
log_trans = np.full((I, max_width), -np.inf)
counts = np.zeros(I, dtype=np.int32)

for target in range(I):
count = 0
for source in range(I):
value = A[source, target]
if value > 0:
source_idx[target, count] = source
log_trans[target, count] = np.log(value + tiny)
count += 1
counts[target] = count

return source_idx, log_trans, counts


@njit
def compute_transition_structure_pyin_block(M, triang_distr, prob_self=0.99):
"""Build sparse predecessor structure from pYIN block transitions directly.

The pYIN transition used here has two voicing blocks:
- same-voicing local triangular transitions (scaled by `prob_self`)
- cross-voicing same-pitch transitions (scaled by `1 - prob_self`)
"""
max_step = len(triang_distr) // 2
states = 2 * M
tiny = np.finfo(0.).tiny

same_counts = np.zeros(M, dtype=np.int32)

for source in range(M):
if source < max_step:
start = 0
end = source + max_step
weights = triang_distr[max_step - source:-1]
denom = np.sum(weights)
if denom <= 0:
continue
for k, target in enumerate(range(start, end)):
prob = prob_self * weights[k] / denom
if prob > 0:
same_counts[target] += 1
elif source < M - max_step:
start = source - max_step
end = source + max_step + 1
for k, target in enumerate(range(start, end)):
prob = prob_self * triang_distr[k]
if prob > 0:
same_counts[target] += 1
else:
start = source - max_step
end = M
weights = triang_distr[0:max_step - (source - M)]
denom = np.sum(weights)
if denom <= 0:
continue
for target in range(start, end):
k = target - start
prob = prob_self * weights[k] / denom
if prob > 0:
same_counts[target] += 1

max_width = int(np.max(same_counts)) + 1 # +1 for cross-voicing same-pitch edge
source_idx = np.zeros((states, max_width), dtype=np.int32)
log_trans = np.full((states, max_width), -np.inf)
counts = np.zeros(states, dtype=np.int32)
log_cross = np.log((1.0 - prob_self) + tiny)

# Match dense-structure source ordering for unvoiced targets:
# source indices are visited in ascending order, so cross-voicing edges
# (from voiced sources 0..M-1) appear before same-voicing edges (M..2M-1).
for pitch in range(M):
pitch_u = pitch + M
idx_u = counts[pitch_u]
source_idx[pitch_u, idx_u] = pitch
log_trans[pitch_u, idx_u] = log_cross
counts[pitch_u] += 1

for source in range(M):
if source < max_step:
start = 0
end = source + max_step
weights = triang_distr[max_step - source:-1]
denom = np.sum(weights)
for k, target in enumerate(range(start, end)):
prob = prob_self * weights[k] / denom
if prob <= 0:
continue
log_prob = np.log(prob + tiny)

idx_v = counts[target]
source_idx[target, idx_v] = source
log_trans[target, idx_v] = log_prob
counts[target] += 1

target_u = target + M
source_u = source + M
idx_u = counts[target_u]
source_idx[target_u, idx_u] = source_u
log_trans[target_u, idx_u] = log_prob
counts[target_u] += 1
elif source < M - max_step:
start = source - max_step
end = source + max_step + 1
for k, target in enumerate(range(start, end)):
prob = prob_self * triang_distr[k]
if prob <= 0:
continue
log_prob = np.log(prob + tiny)

idx_v = counts[target]
source_idx[target, idx_v] = source
log_trans[target, idx_v] = log_prob
counts[target] += 1

target_u = target + M
source_u = source + M
idx_u = counts[target_u]
source_idx[target_u, idx_u] = source_u
log_trans[target_u, idx_u] = log_prob
counts[target_u] += 1
else:
start = source - max_step
end = M
weights = triang_distr[0:max_step - (source - M)]
denom = np.sum(weights)
for k, target in enumerate(range(start, end)):
prob = prob_self * weights[k] / denom
if prob <= 0:
continue
log_prob = np.log(prob + tiny)

idx_v = counts[target]
source_idx[target, idx_v] = source
log_trans[target, idx_v] = log_prob
counts[target] += 1

target_u = target + M
source_u = source + M
idx_u = counts[target_u]
source_idx[target_u, idx_u] = source_u
log_trans[target_u, idx_u] = log_prob
counts[target_u] += 1

# Match dense-structure ordering for voiced targets:
# cross-voicing edges come from sources M..2M-1, after same-voicing sources.
for pitch in range(M):
idx_v = counts[pitch]
source_idx[pitch, idx_v] = pitch + M
log_trans[pitch, idx_v] = log_cross
counts[pitch] += 1

return source_idx, log_trans, counts


@njit
def viterbi_log_likelihood_fast(source_idx, log_trans, counts, C, B_O):
"""Sparse structured log-likelihood Viterbi for pYIN transition graphs."""
I = source_idx.shape[0]
N = B_O.shape[1]
tiny = np.finfo(0.).tiny
C_log = np.log(C + tiny)
B_O_log = np.log(B_O + tiny)

D_log = np.zeros((I, N))
E = np.zeros((I, N - 1), dtype=np.int32)
D_log[:, 0] = C_log + B_O_log[:, 0]

for n in range(1, N):
for target in range(I):
best_source = 0
best_val = -np.inf
count = counts[target]
for k in range(count):
source = source_idx[target, k]
value = log_trans[target, k] + D_log[source, n - 1]
if value > best_val:
best_val = value
best_source = source
D_log[target, n] = best_val + B_O_log[target, n]
E[target, n - 1] = best_source

S_opt = np.zeros(N, dtype=np.int32)
S_opt[-1] = np.argmax(D_log[:, -1])
for n in range(N - 2, -1, -1):
S_opt[n] = E[int(S_opt[n + 1]), n]

return S_opt


@njit
def delete_numba(arr, num):
"""Delete number from array, Numba compatible. Inspired by:
Expand Down
Loading