From d6966f984902c90cc22e0147563f5fbfc1581cc3 Mon Sep 17 00:00:00 2001 From: map588 Date: Sat, 4 Apr 2026 20:49:56 -0400 Subject: [PATCH] fix(thread-safety): eliminate races in log mutex, watcher, and index threads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - http_server: Replace lazy log mutex init with 3-state init (UNINIT → INITING → INITED) using atomic CAS. Concurrent callers spin until init completes, preventing use of uninitialized mutex. cbm_ui_log_append calls cbm_ui_log_init on first use so early startup logs are not dropped. - watcher: Add cbm_mutex_t to protect projects hash table. All accessors (watch, unwatch, touch, watch_count, poll_once) are guarded. poll_once snapshots project pointers under lock then polls without holding it, keeping the critical section small during git I/O and indexing. state_new OOM is handled with early return. - compat_thread: Add cbm_thread_detach() for POSIX and Windows. Both join() and detach() clear the handle on success across both platforms for consistent lifecycle tracking. - http_server: Detach index job threads to prevent handle leaks. --- src/foundation/compat_thread.c | 23 +++++++++++- src/foundation/compat_thread.h | 3 ++ src/main.c | 3 ++ src/ui/http_server.c | 36 ++++++++++++++++--- src/ui/http_server.h | 3 ++ src/watcher/watcher.c | 64 ++++++++++++++++++++++++++++++++-- 6 files changed, 124 insertions(+), 8 deletions(-) diff --git a/src/foundation/compat_thread.c b/src/foundation/compat_thread.c index e87afb12..19610cef 100644 --- a/src/foundation/compat_thread.c +++ b/src/foundation/compat_thread.c @@ -56,6 +56,15 @@ int cbm_thread_join(cbm_thread_t *t) { return CBM_NOT_FOUND; } CloseHandle(t->handle); + t->handle = NULL; + return 0; +} + +int cbm_thread_detach(cbm_thread_t *t) { + if (t->handle) { + CloseHandle(t->handle); + t->handle = NULL; + } return 0; } @@ -74,7 +83,19 @@ int cbm_thread_create(cbm_thread_t *t, size_t stack_size, void *(*fn)(void *), v } int cbm_thread_join(cbm_thread_t *t) { - return pthread_join(t->handle, NULL); + int rc = pthread_join(t->handle, NULL); + if (rc == 0) { + memset(&t->handle, 0, sizeof(t->handle)); + } + return rc; +} + +int cbm_thread_detach(cbm_thread_t *t) { + int rc = pthread_detach(t->handle); + if (rc == 0) { + memset(&t->handle, 0, sizeof(t->handle)); + } + return rc; } #endif diff --git a/src/foundation/compat_thread.h b/src/foundation/compat_thread.h index 145b68bf..7d561093 100644 --- a/src/foundation/compat_thread.h +++ b/src/foundation/compat_thread.h @@ -39,6 +39,9 @@ int cbm_thread_create(cbm_thread_t *t, size_t stack_size, void *(*fn)(void *), v /* Wait for thread to finish. Returns 0 on success. */ int cbm_thread_join(cbm_thread_t *t); +/* Detach thread so resources are freed on exit. Returns 0 on success. */ +int cbm_thread_detach(cbm_thread_t *t); + /* ── Mutex ────────────────────────────────────────────────────── */ #ifdef _WIN32 diff --git a/src/main.c b/src/main.c index 9a79d05e..9f8f187f 100644 --- a/src/main.c +++ b/src/main.c @@ -307,6 +307,9 @@ int main(int argc, char **argv) { } /* Create and start watcher in background thread */ + /* Initialize log mutex before any threads are created */ + cbm_ui_log_init(); + cbm_store_t *watch_store = cbm_store_open_memory(); g_watcher = cbm_watcher_new(watch_store, watcher_index_fn, NULL); diff --git a/src/ui/http_server.c b/src/ui/http_server.c index 053f317b..d115905c 100644 --- a/src/ui/http_server.c +++ b/src/ui/http_server.c @@ -140,16 +140,41 @@ static char g_log_ring[LOG_RING_SIZE][LOG_LINE_MAX]; static int g_log_head = 0; static int g_log_count = 0; static cbm_mutex_t g_log_mutex; -static atomic_int g_log_mutex_init = 0; + +enum { + CBM_LOG_MUTEX_UNINIT = 0, + CBM_LOG_MUTEX_INITING = 1, + CBM_LOG_MUTEX_INITED = 2 +}; +static atomic_int g_log_mutex_init = CBM_LOG_MUTEX_UNINIT; + +/* Safe for concurrent callers: only publishes INITED after cbm_mutex_init() + * has completed. Callers that lose the CAS race spin until init finishes. */ +void cbm_ui_log_init(void) { + int state = atomic_load(&g_log_mutex_init); + if (state == CBM_LOG_MUTEX_INITED) + return; + + state = CBM_LOG_MUTEX_UNINIT; + if (atomic_compare_exchange_strong(&g_log_mutex_init, &state, CBM_LOG_MUTEX_INITING)) { + cbm_mutex_init(&g_log_mutex); + atomic_store(&g_log_mutex_init, CBM_LOG_MUTEX_INITED); + return; + } + + /* Another thread is initializing — spin until done */ + while (atomic_load(&g_log_mutex_init) != CBM_LOG_MUTEX_INITED) { + cbm_usleep(1000); /* 1ms */ + } +} /* Called from a log hook — appends a line to the ring buffer (thread-safe) */ void cbm_ui_log_append(const char *line) { if (!line) return; - if (!atomic_load(&g_log_mutex_init)) { - cbm_mutex_init(&g_log_mutex); - atomic_store(&g_log_mutex_init, 1); - } + /* Ensure mutex is initialized (safe for early single-threaded logging + * and concurrent calls via atomic_exchange once-init pattern). */ + cbm_ui_log_init(); cbm_mutex_lock(&g_log_mutex); snprintf(g_log_ring[g_log_head], LOG_LINE_MAX, "%s", line); g_log_head = (g_log_head + 1) % LOG_RING_SIZE; @@ -791,6 +816,7 @@ static void handle_index_start(struct mg_connection *c, struct mg_http_message * mg_http_reply(c, 500, g_cors_json, "{\"error\":\"thread creation failed\"}"); return; } + cbm_thread_detach(&tid); /* Don't leak thread handle */ mg_http_reply(c, 202, g_cors_json, "{\"status\":\"indexing\",\"slot\":%d,\"path\":\"%s\"}", slot, job->root_path); diff --git a/src/ui/http_server.h b/src/ui/http_server.h index 4858a049..4a63a0f5 100644 --- a/src/ui/http_server.h +++ b/src/ui/http_server.h @@ -32,6 +32,9 @@ void cbm_http_server_run(cbm_http_server_t *srv); /* Check if the server started successfully (listener bound). */ bool cbm_http_server_is_running(const cbm_http_server_t *srv); +/* Initialize the log ring buffer mutex. Must be called once before any threads. */ +void cbm_ui_log_init(void); + /* Append a log line to the UI ring buffer (called from log hook). */ void cbm_ui_log_append(const char *line); diff --git a/src/watcher/watcher.c b/src/watcher/watcher.c index 8bef36e9..8d1f85f9 100644 --- a/src/watcher/watcher.c +++ b/src/watcher/watcher.c @@ -20,6 +20,7 @@ #include "foundation/log.h" #include "foundation/hash_table.h" #include "foundation/compat.h" +#include "foundation/compat_thread.h" #include "foundation/compat_fs.h" #include "foundation/str_util.h" @@ -50,6 +51,7 @@ struct cbm_watcher { cbm_index_fn index_fn; void *user_data; CBMHashTable *projects; /* name → project_state_t* */ + cbm_mutex_t projects_lock; atomic_int stopped; }; @@ -236,6 +238,7 @@ cbm_watcher_t *cbm_watcher_new(cbm_store_t *store, cbm_index_fn index_fn, void * w->index_fn = index_fn; w->user_data = user_data; w->projects = cbm_ht_create(CBM_SZ_32); + cbm_mutex_init(&w->projects_lock); atomic_init(&w->stopped, 0); return w; } @@ -244,8 +247,11 @@ void cbm_watcher_free(cbm_watcher_t *w) { if (!w) { return; } + cbm_mutex_lock(&w->projects_lock); cbm_ht_foreach(w->projects, free_state_entry, NULL); cbm_ht_free(w->projects); + cbm_mutex_unlock(&w->projects_lock); + cbm_mutex_destroy(&w->projects_lock); free(w); } @@ -264,6 +270,7 @@ void cbm_watcher_watch(cbm_watcher_t *w, const char *project_name, const char *r } /* Remove old entry first (key points to state's project_name) */ + cbm_mutex_lock(&w->projects_lock); project_state_t *old = cbm_ht_get(w->projects, project_name); if (old) { cbm_ht_delete(w->projects, project_name); @@ -271,7 +278,13 @@ void cbm_watcher_watch(cbm_watcher_t *w, const char *project_name, const char *r } project_state_t *s = state_new(project_name, root_path); + if (!s) { + cbm_mutex_unlock(&w->projects_lock); + cbm_log_warn("watcher.watch.oom", "project", project_name, "path", root_path); + return; + } cbm_ht_set(w->projects, s->project_name, s); + cbm_mutex_unlock(&w->projects_lock); cbm_log_info("watcher.watch", "project", project_name, "path", root_path); } @@ -279,10 +292,16 @@ void cbm_watcher_unwatch(cbm_watcher_t *w, const char *project_name) { if (!w || !project_name) { return; } + bool removed = false; + cbm_mutex_lock(&w->projects_lock); project_state_t *s = cbm_ht_get(w->projects, project_name); if (s) { cbm_ht_delete(w->projects, project_name); state_free(s); + removed = true; + } + cbm_mutex_unlock(&w->projects_lock); + if (removed) { cbm_log_info("watcher.unwatch", "project", project_name); } } @@ -291,18 +310,23 @@ void cbm_watcher_touch(cbm_watcher_t *w, const char *project_name) { if (!w || !project_name) { return; } + cbm_mutex_lock(&w->projects_lock); project_state_t *s = cbm_ht_get(w->projects, project_name); if (s) { /* Reset backoff — poll immediately on next cycle */ s->next_poll_ns = 0; } + cbm_mutex_unlock(&w->projects_lock); } int cbm_watcher_watch_count(const cbm_watcher_t *w) { if (!w) { return 0; } - return (int)cbm_ht_count(w->projects); + cbm_mutex_lock(&((cbm_watcher_t *)w)->projects_lock); + int count = (int)cbm_ht_count(w->projects); + cbm_mutex_unlock(&((cbm_watcher_t *)w)->projects_lock); + return count; } /* ── Single poll cycle ──────────────────────────────────────────── */ @@ -411,17 +435,53 @@ static void poll_project(const char *key, void *val, void *ud) { s->next_poll_ns = ctx->now + ((int64_t)s->interval_ms * US_PER_MS); } +/* Callback to snapshot project state pointers into an array. */ +typedef struct { + project_state_t **items; + int count; + int cap; +} snapshot_ctx_t; + +static void snapshot_project(const char *key, void *val, void *ud) { + (void)key; + snapshot_ctx_t *sc = ud; + if (val && sc->count < sc->cap) { + sc->items[sc->count++] = val; + } +} + int cbm_watcher_poll_once(cbm_watcher_t *w) { if (!w) { return 0; } + /* Snapshot project pointers under lock, then poll without holding it. + * This keeps the critical section small — poll_project does git I/O + * and may invoke index_fn which runs the full pipeline. */ + cbm_mutex_lock(&w->projects_lock); + int n = cbm_ht_count(w->projects); + if (n == 0) { + cbm_mutex_unlock(&w->projects_lock); + return 0; + } + project_state_t **snap = malloc(n * sizeof(project_state_t *)); + if (!snap) { + cbm_mutex_unlock(&w->projects_lock); + return 0; + } + snapshot_ctx_t sc = {.items = snap, .count = 0, .cap = n}; + cbm_ht_foreach(w->projects, snapshot_project, &sc); + cbm_mutex_unlock(&w->projects_lock); + poll_ctx_t ctx = { .w = w, .now = now_ns(), .reindexed = 0, }; - cbm_ht_foreach(w->projects, poll_project, &ctx); + for (int i = 0; i < sc.count; i++) { + poll_project(NULL, snap[i], &ctx); + } + free(snap); return ctx.reindexed; }