Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions cmd/jivetalking/debugsink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"fmt"
"os"
"path/filepath"
"sync"
)

// debugSink is the shared, serialised writer for the debug log file. It holds
// the destination file and a mutex so concurrent writers each emit whole lines
// without interleaving. It carries no per-file prefix; per-file attribution is
// the wrapper's job (see withFilePrefix).
type debugSink struct {
mu sync.Mutex
file *os.File
disabled bool
}

// newDebugSink builds a sink over file, which may be nil. The disabled/no-op
// case is decided once here (nil file) rather than re-checked per write under a
// race.
func newDebugSink(file *os.File) *debugSink {
return &debugSink{file: file, disabled: file == nil}
}

// Logf formats one full line and writes it atomically against other writers.
// The signature matches BaseFilterConfig.SetLogger so the sink can back the
// shared log closure. The line format mirrors the original closure
// (fmt.Fprintf(debugLog, format+"\n", args...)) for byte-identity.
func (s *debugSink) Logf(format string, args ...any) {
if s.disabled {
return
}
s.mu.Lock()
defer s.mu.Unlock()
fmt.Fprintf(s.file, format+"\n", args...)
}

// withFilePrefix wraps base and returns a logger that prepends a per-file
// marker (the basename of path) to the format string before delegating, so the
// args still bind to the original verbs. It owns no file and no lock; it is the
// per-worker, prefix-only seam.
func withFilePrefix(path string, base func(format string, args ...any)) func(format string, args ...any) {
marker := "[" + filepath.Base(path) + "] "
return func(format string, args ...any) {
base(marker+format, args...)
}
}
170 changes: 170 additions & 0 deletions cmd/jivetalking/debugsink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package main

import (
"bufio"
"fmt"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"testing"
)

// TestDebugSinkConcurrentWritesRace fans many goroutines through one shared
// sink. Run under `CGO_ENABLED=1 go test -race` to detect data races on the
// shared file and mutex.
func TestDebugSinkConcurrentWritesRace(t *testing.T) {
path := filepath.Join(t.TempDir(), "race.log")
file, err := os.Create(path)
if err != nil {
t.Fatalf("create temp file: %v", err)
}
defer file.Close()

sink := newDebugSink(file)

const (
workers = 16
linesPerWorker = 500
)

var wg sync.WaitGroup
wg.Add(workers)
for w := range workers {
go func(id int) {
defer wg.Done()
for i := range linesPerWorker {
sink.Logf("worker %d line %d", id, i)
}
}(w)
}
wg.Wait()

if err := file.Close(); err != nil {
t.Fatalf("close temp file: %v", err)
}

lines := readLines(t, path)
if got, want := len(lines), workers*linesPerWorker; got != want {
t.Fatalf("line count = %d, want %d", got, want)
}
}

// TestDebugSinkPrefixAttribution drives MANY per-file withFilePrefix wrappers
// over ONE shared sink with concurrent writes, then asserts every output line
// is whole (no mid-line interleaving) and carries exactly one file marker that
// matches the wrapper that produced it.
func TestDebugSinkPrefixAttribution(t *testing.T) {
path := filepath.Join(t.TempDir(), "attrib.log")
file, err := os.Create(path)
if err != nil {
t.Fatalf("create temp file: %v", err)
}
defer file.Close()

sink := newDebugSink(file)

const (
wrappers = 12
linesPerWriter = 400
)

// One distinct marker per wrapper; basename drives the marker text.
names := make([]string, wrappers)
markers := make([]string, wrappers)
for w := range wrappers {
names[w] = fmt.Sprintf("episode-%02d.wav", w)
markers[w] = "[" + names[w] + "] "
}

var wg sync.WaitGroup
wg.Add(wrappers)
for w := range wrappers {
go func(id int) {
defer wg.Done()
logf := withFilePrefix(names[id], sink.Logf)
for i := range linesPerWriter {
logf("payload writer %d seq %d", id, i)
}
}(w)
}
wg.Wait()

if err := file.Close(); err != nil {
t.Fatalf("close temp file: %v", err)
}

lines := readLines(t, path)
if got, want := len(lines), wrappers*linesPerWriter; got != want {
t.Fatalf("line count = %d, want %d", got, want)
}

// A whole, well-formed line is exactly one marker followed by a payload
// naming the same writer id as the marker. Any interleaving breaks this.
markerRe := regexp.MustCompile(`\[episode-\d{2}\.wav\] `)
lineRe := regexp.MustCompile(`^\[episode-(\d{2})\.wav\] payload writer (\d+) seq \d+$`)

seen := make([]int, wrappers)
for n, line := range lines {
// Exactly one marker per line (no mid-line interleaving).
if count := len(markerRe.FindAllString(line, -1)); count != 1 {
t.Fatalf("line %d has %d markers, want 1: %q", n, count, line)
}
m := lineRe.FindStringSubmatch(line)
if m == nil {
t.Fatalf("line %d malformed: %q", n, line)
}
// The marker's writer id must match the payload's writer id.
if m[1] != fmt.Sprintf("%02d", mustAtoi(t, m[2])) {
t.Fatalf("line %d marker/payload writer mismatch: %q", n, line)
}
id := mustAtoi(t, m[2])
if id < 0 || id >= wrappers {
t.Fatalf("line %d writer id %d out of range", n, id)
}
// The line carries the exact marker string for that writer.
if !strings.HasPrefix(line, markers[id]) {
t.Fatalf("line %d prefix %q, want marker %q", n, line, markers[id])
}
seen[id]++
}

for id, count := range seen {
if count != linesPerWriter {
t.Fatalf("writer %d produced %d lines, want %d", id, count, linesPerWriter)
}
}
}

func readLines(t *testing.T, path string) []string {
t.Helper()
f, err := os.Open(path)
if err != nil {
t.Fatalf("open %s: %v", path, err)
}
defer f.Close()

var lines []string
sc := bufio.NewScanner(f)
sc.Buffer(make([]byte, 0, 64*1024), 1024*1024)
for sc.Scan() {
lines = append(lines, sc.Text())
}
if err := sc.Err(); err != nil {
t.Fatalf("scan %s: %v", path, err)
}
return lines
}

func mustAtoi(t *testing.T, s string) int {
t.Helper()
n := 0
for _, r := range s {
if r < '0' || r > '9' {
t.Fatalf("non-numeric value %q", s)
}
n = n*10 + int(r-'0')
}
return n
}
12 changes: 7 additions & 5 deletions cmd/jivetalking/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,9 @@ func main() {
if debugLog != nil {
defer debugLog.Close()
}
sink := newDebugSink(debugLog)
log := func(format string, args ...any) {
if debugLog != nil {
fmt.Fprintf(debugLog, format+"\n", args...)
}
sink.Logf(format, args...)
}

// Set the config's debug log function to use the same log
Expand Down Expand Up @@ -148,8 +147,9 @@ func main() {

// Create progress handler
ph := &progressHandler{
p: p,
log: log,
p: p,
log: log,
fileIndex: i,
}

// Process the audio file
Expand Down Expand Up @@ -277,6 +277,7 @@ func (ph *progressHandler) timings(pass2Time time.Duration) logging.ProcessingTi
type progressHandler struct {
p *tea.Program
log func(string, ...any)
fileIndex int
pass1Start time.Time
pass1Time time.Duration
pass3Start time.Time
Expand Down Expand Up @@ -305,6 +306,7 @@ func (ph *progressHandler) callback(update processor.ProgressUpdate) {
}

ph.p.Send(ui.ProgressMsg{
FileIndex: ph.fileIndex,
Pass: update.Pass,
PassName: update.PassName,
Progress: update.Progress,
Expand Down
1 change: 1 addition & 0 deletions internal/ui/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

// ProgressMsg represents a progress update from the processor
type ProgressMsg struct {
FileIndex int
Pass processor.PassNumber
PassName string
Progress float64
Expand Down
37 changes: 18 additions & 19 deletions internal/ui/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ type FileProgress struct {
type Model struct {
// File queue
Files []FileProgress
CurrentIndex int
TotalFiles int
CompletedFiles int
FailedFiles int
Expand All @@ -82,10 +81,9 @@ func NewModel(inputFiles []string) Model {
}

return Model{
Files: files,
CurrentIndex: -1, // No file processing yet
TotalFiles: len(inputFiles),
StartTime: time.Now(),
Files: files,
TotalFiles: len(inputFiles),
StartTime: time.Now(),
}
}

Expand All @@ -109,30 +107,31 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {

case ProgressMsg:
// Update the current file's progress
if m.CurrentIndex >= 0 && m.CurrentIndex < len(m.Files) {
m.Files[m.CurrentIndex] = updateFileProgress(m.Files[m.CurrentIndex], msg)
if msg.FileIndex >= 0 && msg.FileIndex < len(m.Files) {
m.Files[msg.FileIndex] = updateFileProgress(m.Files[msg.FileIndex], msg)
}
return m, nil

case FileStartMsg:
// Start processing next file
m.CurrentIndex = msg.FileIndex
m.Files[m.CurrentIndex].Status = StatusAnalyzing
m.Files[m.CurrentIndex].StartTime = time.Now()
if msg.FileIndex >= 0 && msg.FileIndex < len(m.Files) {
m.Files[msg.FileIndex].Status = StatusAnalyzing
m.Files[msg.FileIndex].StartTime = time.Now()
}
return m, nil

case FileCompleteMsg:
// Mark file as complete
if m.CurrentIndex >= 0 && m.CurrentIndex < len(m.Files) {
m.Files[m.CurrentIndex].Status = StatusComplete
m.Files[m.CurrentIndex].InputLUFS = msg.InputLUFS
m.Files[m.CurrentIndex].OutputLUFS = msg.OutputLUFS
m.Files[m.CurrentIndex].NoiseFloor = msg.NoiseFloor
m.Files[m.CurrentIndex].OutputPath = msg.OutputPath
m.Files[m.CurrentIndex].Error = msg.Error
if msg.FileIndex >= 0 && msg.FileIndex < len(m.Files) {
m.Files[msg.FileIndex].Status = StatusComplete
m.Files[msg.FileIndex].InputLUFS = msg.InputLUFS
m.Files[msg.FileIndex].OutputLUFS = msg.OutputLUFS
m.Files[msg.FileIndex].NoiseFloor = msg.NoiseFloor
m.Files[msg.FileIndex].OutputPath = msg.OutputPath
m.Files[msg.FileIndex].Error = msg.Error

if msg.Error != nil {
m.Files[m.CurrentIndex].Status = StatusError
m.Files[msg.FileIndex].Status = StatusError
m.FailedFiles++
} else {
m.CompletedFiles++
Expand All @@ -153,7 +152,7 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
func (m Model) View() string {
// Debug: Show basic info even before window size is set
if m.Width == 0 {
return fmt.Sprintf("Initializing...\nFiles: %d\nCurrent: %d\n", len(m.Files), m.CurrentIndex)
return fmt.Sprintf("Initializing...\nFiles: %d\n", len(m.Files))
}

// Build the view based on current state
Expand Down
Loading
Loading