Skip to content

Commit d6966f9

Browse files
committed
fix(thread-safety): eliminate races in log mutex, watcher, and index threads
- http_server: Replace lazy log mutex init with 3-state init (UNINIT → INITING → INITED) using atomic CAS. Concurrent callers spin until init completes, preventing use of uninitialized mutex. cbm_ui_log_append calls cbm_ui_log_init on first use so early startup logs are not dropped. - watcher: Add cbm_mutex_t to protect projects hash table. All accessors (watch, unwatch, touch, watch_count, poll_once) are guarded. poll_once snapshots project pointers under lock then polls without holding it, keeping the critical section small during git I/O and indexing. state_new OOM is handled with early return. - compat_thread: Add cbm_thread_detach() for POSIX and Windows. Both join() and detach() clear the handle on success across both platforms for consistent lifecycle tracking. - http_server: Detach index job threads to prevent handle leaks.
1 parent 1d30971 commit d6966f9

6 files changed

Lines changed: 124 additions & 8 deletions

File tree

src/foundation/compat_thread.c

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,15 @@ int cbm_thread_join(cbm_thread_t *t) {
5656
return CBM_NOT_FOUND;
5757
}
5858
CloseHandle(t->handle);
59+
t->handle = NULL;
60+
return 0;
61+
}
62+
63+
int cbm_thread_detach(cbm_thread_t *t) {
64+
if (t->handle) {
65+
CloseHandle(t->handle);
66+
t->handle = NULL;
67+
}
5968
return 0;
6069
}
6170

@@ -74,7 +83,19 @@ int cbm_thread_create(cbm_thread_t *t, size_t stack_size, void *(*fn)(void *), v
7483
}
7584

7685
int cbm_thread_join(cbm_thread_t *t) {
77-
return pthread_join(t->handle, NULL);
86+
int rc = pthread_join(t->handle, NULL);
87+
if (rc == 0) {
88+
memset(&t->handle, 0, sizeof(t->handle));
89+
}
90+
return rc;
91+
}
92+
93+
int cbm_thread_detach(cbm_thread_t *t) {
94+
int rc = pthread_detach(t->handle);
95+
if (rc == 0) {
96+
memset(&t->handle, 0, sizeof(t->handle));
97+
}
98+
return rc;
7899
}
79100

80101
#endif

src/foundation/compat_thread.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ int cbm_thread_create(cbm_thread_t *t, size_t stack_size, void *(*fn)(void *), v
3939
/* Wait for thread to finish. Returns 0 on success. */
4040
int cbm_thread_join(cbm_thread_t *t);
4141

42+
/* Detach thread so resources are freed on exit. Returns 0 on success. */
43+
int cbm_thread_detach(cbm_thread_t *t);
44+
4245
/* ── Mutex ────────────────────────────────────────────────────── */
4346

4447
#ifdef _WIN32

src/main.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,9 @@ int main(int argc, char **argv) {
307307
}
308308

309309
/* Create and start watcher in background thread */
310+
/* Initialize log mutex before any threads are created */
311+
cbm_ui_log_init();
312+
310313
cbm_store_t *watch_store = cbm_store_open_memory();
311314
g_watcher = cbm_watcher_new(watch_store, watcher_index_fn, NULL);
312315

src/ui/http_server.c

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,16 +140,41 @@ static char g_log_ring[LOG_RING_SIZE][LOG_LINE_MAX];
140140
static int g_log_head = 0;
141141
static int g_log_count = 0;
142142
static cbm_mutex_t g_log_mutex;
143-
static atomic_int g_log_mutex_init = 0;
143+
144+
enum {
145+
CBM_LOG_MUTEX_UNINIT = 0,
146+
CBM_LOG_MUTEX_INITING = 1,
147+
CBM_LOG_MUTEX_INITED = 2
148+
};
149+
static atomic_int g_log_mutex_init = CBM_LOG_MUTEX_UNINIT;
150+
151+
/* Safe for concurrent callers: only publishes INITED after cbm_mutex_init()
152+
* has completed. Callers that lose the CAS race spin until init finishes. */
153+
void cbm_ui_log_init(void) {
154+
int state = atomic_load(&g_log_mutex_init);
155+
if (state == CBM_LOG_MUTEX_INITED)
156+
return;
157+
158+
state = CBM_LOG_MUTEX_UNINIT;
159+
if (atomic_compare_exchange_strong(&g_log_mutex_init, &state, CBM_LOG_MUTEX_INITING)) {
160+
cbm_mutex_init(&g_log_mutex);
161+
atomic_store(&g_log_mutex_init, CBM_LOG_MUTEX_INITED);
162+
return;
163+
}
164+
165+
/* Another thread is initializing — spin until done */
166+
while (atomic_load(&g_log_mutex_init) != CBM_LOG_MUTEX_INITED) {
167+
cbm_usleep(1000); /* 1ms */
168+
}
169+
}
144170

145171
/* Called from a log hook — appends a line to the ring buffer (thread-safe) */
146172
void cbm_ui_log_append(const char *line) {
147173
if (!line)
148174
return;
149-
if (!atomic_load(&g_log_mutex_init)) {
150-
cbm_mutex_init(&g_log_mutex);
151-
atomic_store(&g_log_mutex_init, 1);
152-
}
175+
/* Ensure mutex is initialized (safe for early single-threaded logging
176+
* and concurrent calls via atomic_exchange once-init pattern). */
177+
cbm_ui_log_init();
153178
cbm_mutex_lock(&g_log_mutex);
154179
snprintf(g_log_ring[g_log_head], LOG_LINE_MAX, "%s", line);
155180
g_log_head = (g_log_head + 1) % LOG_RING_SIZE;
@@ -791,6 +816,7 @@ static void handle_index_start(struct mg_connection *c, struct mg_http_message *
791816
mg_http_reply(c, 500, g_cors_json, "{\"error\":\"thread creation failed\"}");
792817
return;
793818
}
819+
cbm_thread_detach(&tid); /* Don't leak thread handle */
794820

795821
mg_http_reply(c, 202, g_cors_json, "{\"status\":\"indexing\",\"slot\":%d,\"path\":\"%s\"}",
796822
slot, job->root_path);

src/ui/http_server.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ void cbm_http_server_run(cbm_http_server_t *srv);
3232
/* Check if the server started successfully (listener bound). */
3333
bool cbm_http_server_is_running(const cbm_http_server_t *srv);
3434

35+
/* Initialize the log ring buffer mutex. Must be called once before any threads. */
36+
void cbm_ui_log_init(void);
37+
3538
/* Append a log line to the UI ring buffer (called from log hook). */
3639
void cbm_ui_log_append(const char *line);
3740

src/watcher/watcher.c

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "foundation/log.h"
2121
#include "foundation/hash_table.h"
2222
#include "foundation/compat.h"
23+
#include "foundation/compat_thread.h"
2324
#include "foundation/compat_fs.h"
2425
#include "foundation/str_util.h"
2526

@@ -50,6 +51,7 @@ struct cbm_watcher {
5051
cbm_index_fn index_fn;
5152
void *user_data;
5253
CBMHashTable *projects; /* name → project_state_t* */
54+
cbm_mutex_t projects_lock;
5355
atomic_int stopped;
5456
};
5557

@@ -236,6 +238,7 @@ cbm_watcher_t *cbm_watcher_new(cbm_store_t *store, cbm_index_fn index_fn, void *
236238
w->index_fn = index_fn;
237239
w->user_data = user_data;
238240
w->projects = cbm_ht_create(CBM_SZ_32);
241+
cbm_mutex_init(&w->projects_lock);
239242
atomic_init(&w->stopped, 0);
240243
return w;
241244
}
@@ -244,8 +247,11 @@ void cbm_watcher_free(cbm_watcher_t *w) {
244247
if (!w) {
245248
return;
246249
}
250+
cbm_mutex_lock(&w->projects_lock);
247251
cbm_ht_foreach(w->projects, free_state_entry, NULL);
248252
cbm_ht_free(w->projects);
253+
cbm_mutex_unlock(&w->projects_lock);
254+
cbm_mutex_destroy(&w->projects_lock);
249255
free(w);
250256
}
251257

@@ -264,25 +270,38 @@ void cbm_watcher_watch(cbm_watcher_t *w, const char *project_name, const char *r
264270
}
265271

266272
/* Remove old entry first (key points to state's project_name) */
273+
cbm_mutex_lock(&w->projects_lock);
267274
project_state_t *old = cbm_ht_get(w->projects, project_name);
268275
if (old) {
269276
cbm_ht_delete(w->projects, project_name);
270277
state_free(old);
271278
}
272279

273280
project_state_t *s = state_new(project_name, root_path);
281+
if (!s) {
282+
cbm_mutex_unlock(&w->projects_lock);
283+
cbm_log_warn("watcher.watch.oom", "project", project_name, "path", root_path);
284+
return;
285+
}
274286
cbm_ht_set(w->projects, s->project_name, s);
287+
cbm_mutex_unlock(&w->projects_lock);
275288
cbm_log_info("watcher.watch", "project", project_name, "path", root_path);
276289
}
277290

278291
void cbm_watcher_unwatch(cbm_watcher_t *w, const char *project_name) {
279292
if (!w || !project_name) {
280293
return;
281294
}
295+
bool removed = false;
296+
cbm_mutex_lock(&w->projects_lock);
282297
project_state_t *s = cbm_ht_get(w->projects, project_name);
283298
if (s) {
284299
cbm_ht_delete(w->projects, project_name);
285300
state_free(s);
301+
removed = true;
302+
}
303+
cbm_mutex_unlock(&w->projects_lock);
304+
if (removed) {
286305
cbm_log_info("watcher.unwatch", "project", project_name);
287306
}
288307
}
@@ -291,18 +310,23 @@ void cbm_watcher_touch(cbm_watcher_t *w, const char *project_name) {
291310
if (!w || !project_name) {
292311
return;
293312
}
313+
cbm_mutex_lock(&w->projects_lock);
294314
project_state_t *s = cbm_ht_get(w->projects, project_name);
295315
if (s) {
296316
/* Reset backoff — poll immediately on next cycle */
297317
s->next_poll_ns = 0;
298318
}
319+
cbm_mutex_unlock(&w->projects_lock);
299320
}
300321

301322
int cbm_watcher_watch_count(const cbm_watcher_t *w) {
302323
if (!w) {
303324
return 0;
304325
}
305-
return (int)cbm_ht_count(w->projects);
326+
cbm_mutex_lock(&((cbm_watcher_t *)w)->projects_lock);
327+
int count = (int)cbm_ht_count(w->projects);
328+
cbm_mutex_unlock(&((cbm_watcher_t *)w)->projects_lock);
329+
return count;
306330
}
307331

308332
/* ── Single poll cycle ──────────────────────────────────────────── */
@@ -411,17 +435,53 @@ static void poll_project(const char *key, void *val, void *ud) {
411435
s->next_poll_ns = ctx->now + ((int64_t)s->interval_ms * US_PER_MS);
412436
}
413437

438+
/* Callback to snapshot project state pointers into an array. */
439+
typedef struct {
440+
project_state_t **items;
441+
int count;
442+
int cap;
443+
} snapshot_ctx_t;
444+
445+
static void snapshot_project(const char *key, void *val, void *ud) {
446+
(void)key;
447+
snapshot_ctx_t *sc = ud;
448+
if (val && sc->count < sc->cap) {
449+
sc->items[sc->count++] = val;
450+
}
451+
}
452+
414453
int cbm_watcher_poll_once(cbm_watcher_t *w) {
415454
if (!w) {
416455
return 0;
417456
}
418457

458+
/* Snapshot project pointers under lock, then poll without holding it.
459+
* This keeps the critical section small — poll_project does git I/O
460+
* and may invoke index_fn which runs the full pipeline. */
461+
cbm_mutex_lock(&w->projects_lock);
462+
int n = cbm_ht_count(w->projects);
463+
if (n == 0) {
464+
cbm_mutex_unlock(&w->projects_lock);
465+
return 0;
466+
}
467+
project_state_t **snap = malloc(n * sizeof(project_state_t *));
468+
if (!snap) {
469+
cbm_mutex_unlock(&w->projects_lock);
470+
return 0;
471+
}
472+
snapshot_ctx_t sc = {.items = snap, .count = 0, .cap = n};
473+
cbm_ht_foreach(w->projects, snapshot_project, &sc);
474+
cbm_mutex_unlock(&w->projects_lock);
475+
419476
poll_ctx_t ctx = {
420477
.w = w,
421478
.now = now_ns(),
422479
.reindexed = 0,
423480
};
424-
cbm_ht_foreach(w->projects, poll_project, &ctx);
481+
for (int i = 0; i < sc.count; i++) {
482+
poll_project(NULL, snap[i], &ctx);
483+
}
484+
free(snap);
425485
return ctx.reindexed;
426486
}
427487

0 commit comments

Comments
 (0)