Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 138 additions & 18 deletions src/cypher/cypher.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "cypher/cypher.h"
#include "store/store.h"
#include "foundation/platform.h"
#include "foundation/compat.h" // cbm_clock_gettime — execution guards (#601)

enum {
CYP_BUF_16 = 16,
Expand Down Expand Up @@ -40,6 +41,7 @@ enum {
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h> // CLOCK_MONOTONIC — execution guards (#601)

/* ── Helpers ────────────────────────────────────────────────────── */

Expand Down Expand Up @@ -980,11 +982,11 @@ static cbm_expr_t *parse_exists_predicate(parser_t *p, bool negated) {
}
cbm_node_pattern_t anchor = {0};
cbm_rel_pattern_t rel = {0};
cbm_node_pattern_t far = {0};
if (parse_node(p, &anchor) < 0 || parse_rel(p, &rel) < 0 || parse_node(p, &far) < 0) {
cbm_node_pattern_t far_node = {0};
if (parse_node(p, &anchor) < 0 || parse_rel(p, &rel) < 0 || parse_node(p, &far_node) < 0) {
free_one_node_pattern(&anchor);
free_one_rel_pattern(&rel);
free_one_node_pattern(&far);
free_one_node_pattern(&far_node);
snprintf(p->error, sizeof(p->error),
"unsupported EXISTS pattern — only the single-hop form "
"'(var)-[:TYPE]->()' is supported");
Expand All @@ -1002,7 +1004,7 @@ static cbm_expr_t *parse_exists_predicate(parser_t *p, bool negated) {
: 0;
free_one_node_pattern(&anchor);
free_one_rel_pattern(&rel);
free_one_node_pattern(&far);
free_one_node_pattern(&far_node);
return expr_leaf(c);
}

Expand Down Expand Up @@ -2953,6 +2955,64 @@ static void expand_fixed_length(cbm_store_t *store, cbm_rel_pattern_t *rel,
}
}

/* ── Execution guards (#601): wall-clock timeout + working-set cap ────
* A whole-graph `OPTIONAL MATCH` drives off every node and then allocates a
* 10x growth buffer per relationship, blowing memory/CPU up before the
* post-hoc row ceiling can fire. The executor is single-threaded (the MCP
* event loop), so file-static guard state is safe. The timeout defaults to
* 10s and is overridable via CBM_CYPHER_TIMEOUT_MS (0 disables it), matching
* the CBM_* env-knob precedent (CBM_WORKERS, CBM_SQLITE_MMAP_SIZE). The cap
* bounds the intermediate working set before the growth allocation. */
enum {
CYPHER_DEFAULT_TIMEOUT_MS = 10000, /* wall-clock budget for one query */
CYPHER_MAX_BINDINGS = 1000000, /* intermediate working-set cap */
CYPHER_DEADLINE_CHECK_MASK = 0x3FF, /* sample the clock every 1024 iters */
CYP_NS_PER_MS = 1000000,
CYP_NS_PER_S = 1000000000,
};

typedef enum {
CYP_ABORT_NONE = 0,
CYP_ABORT_TIMEOUT,
CYP_ABORT_CAP,
CYP_ABORT_OOM,
} cyp_abort_t;

static int64_t g_cyp_deadline_ns = 0; /* 0 = no deadline */
static cyp_abort_t g_cyp_abort = CYP_ABORT_NONE; /* set when a guard trips */

static int64_t cyp_now_ns(void) {
struct timespec ts;
cbm_clock_gettime(CLOCK_MONOTONIC, &ts);
return ((int64_t)ts.tv_sec * CYP_NS_PER_S) + ts.tv_nsec;
}

/* Arm the guards for one top-level query. Reads CBM_CYPHER_TIMEOUT_MS. */
static void cyp_guard_begin(void) {
g_cyp_abort = CYP_ABORT_NONE;
int ms = CYPHER_DEFAULT_TIMEOUT_MS;
char buf[CBM_SZ_32];
if (cbm_safe_getenv("CBM_CYPHER_TIMEOUT_MS", buf, sizeof(buf), NULL) != NULL) {
long v = strtol(buf, NULL, CBM_DECIMAL_BASE);
if (v >= 0) {
ms = (int)v;
}
}
g_cyp_deadline_ns = (ms > 0) ? cyp_now_ns() + ((int64_t)ms * CYP_NS_PER_MS) : 0;
}

/* True once the wall-clock budget is spent (latches g_cyp_abort). */
static bool cyp_deadline_exceeded(void) {
if (g_cyp_deadline_ns == 0 || g_cyp_abort != CYP_ABORT_NONE) {
return g_cyp_abort != CYP_ABORT_NONE;
}
if (cyp_now_ns() > g_cyp_deadline_ns) {
g_cyp_abort = CYP_ABORT_TIMEOUT;
return true;
}
return false;
}

static void expand_pattern_rels(cbm_store_t *store, cbm_pattern_t *pat, binding_t **bindings,
int *bind_count, const int *bind_cap, const char **var_name,
bool is_optional) {
Expand All @@ -2963,11 +3023,34 @@ static void expand_pattern_rels(cbm_store_t *store, cbm_pattern_t *pat, binding_

bool is_variable_length = (rel->min_hops != SKIP_ONE || rel->max_hops != SKIP_ONE);

binding_t *new_bindings =
malloc(((*bind_cap * CYP_GROWTH_10) + SKIP_ONE) * sizeof(binding_t));
/* Working-set cap (#601): a whole-graph driving set would request a
* multi-GB growth buffer here. Refuse before allocating and let the
* caller surface a clear "narrow the query" error. Same threshold as
* the early guard in execute_single, so ordinary queries (bind_cap at
* the default row ceiling) are never rejected. */
if ((size_t)(*bind_cap) > CYPHER_MAX_BINDINGS / CYP_GROWTH_10) {
g_cyp_abort = CYP_ABORT_CAP;
return;
}
size_t want = ((size_t)(*bind_cap) * CYP_GROWTH_10) + SKIP_ONE;
binding_t *new_bindings = malloc(want * sizeof(binding_t));
if (!new_bindings) {
g_cyp_abort = CYP_ABORT_OOM;
return;
}
int new_count = 0;

for (int bi = 0; bi < *bind_count; bi++) {
/* Wall-clock guard (#601): sample the clock periodically so a
* pathological expansion can't spin forever. On trip, drop the
* partial new set and leave *bindings intact for the caller. */
if ((bi & CYPHER_DEADLINE_CHECK_MASK) == 0 && cyp_deadline_exceeded()) {
for (int k = 0; k < new_count; k++) {
binding_free(&new_bindings[k]);
}
free(new_bindings);
return;
}
binding_t *b = &(*bindings)[bi];
cbm_node_t *src = binding_get(b, *var_name);
if (!src) {
Expand Down Expand Up @@ -4263,6 +4346,18 @@ static int execute_single(cbm_store_t *store, cbm_query_t *q, const char *projec

/* Build initial bindings with early WHERE */
int bind_cap = scan_count > max_rows ? scan_count : (max_rows > 0 ? max_rows : SKIP_ONE);

/* Reject an oversized driving set up front (#601) — before allocating the
* bindings array — so a whole-graph MATCH can't request a multi-GB buffer
* (the relationship expansion then grows it 10x again). The expansion has
* its own cap too, for working sets that only blow up mid-traversal. */
if ((size_t)bind_cap > CYPHER_MAX_BINDINGS / CYP_GROWTH_10) {
g_cyp_abort = CYP_ABORT_CAP;
rb_init(rb);
cbm_store_free_nodes(scanned, scan_count);
return 0;
}

binding_t *bindings = malloc((bind_cap + SKIP_ONE) * sizeof(binding_t));
int bind_count = 0;
const char *var_name = pat0->nodes[0].variable ? pat0->nodes[0].variable : "_n0";
Expand All @@ -4286,20 +4381,25 @@ static int execute_single(cbm_store_t *store, cbm_query_t *q, const char *projec
/* Step 2b: Additional patterns */
expand_additional_patterns(store, q, project, max_rows, &bindings, &bind_count, &bind_cap);

/* Step 3: Late WHERE */
if (q->where && (pat0->rel_count > 0 || q->pattern_count > SKIP_ONE)) {
filter_bindings_where(q->where, bindings, &bind_count);
}
/* If a guard tripped during expansion (#601), skip the remaining work and
* return an empty result; cbm_cypher_execute turns g_cyp_abort into an
* explicit error. Bindings are still freed below. */
rb_init(rb);
if (g_cyp_abort == CYP_ABORT_NONE) {
/* Step 3: Late WHERE */
if (q->where && (pat0->rel_count > 0 || q->pattern_count > SKIP_ONE)) {
filter_bindings_where(q->where, bindings, &bind_count);
}

/* Step 3b: WITH clause */
execute_with_clause(q, &bindings, &bind_count);
/* Step 3b: WITH clause */
execute_with_clause(q, &bindings, &bind_count);

/* Step 4: Project results */
rb_init(rb);
if (q->ret) {
execute_return_clause(q, q->ret, bindings, bind_count, max_rows, rb);
} else {
execute_default_projection(pat0, bindings, bind_count, max_rows, rb);
/* Step 4: Project results */
if (q->ret) {
execute_return_clause(q, q->ret, bindings, bind_count, max_rows, rb);
} else {
execute_default_projection(pat0, bindings, bind_count, max_rows, rb);
}
}

for (int bi = 0; bi < bind_count; bi++) {
Expand All @@ -4326,6 +4426,8 @@ int cbm_cypher_execute(cbm_store_t *store, const char *query, const char *projec
return CBM_NOT_FOUND;
}

cyp_guard_begin(); /* arm wall-clock timeout + working-set cap (#601) */

result_builder_t rb = {0};
// cppcheck-suppress knownConditionTrueFalse
if (execute_single(store, q, project, max_rows, &rb) < 0) {
Expand Down Expand Up @@ -4353,6 +4455,24 @@ int cbm_cypher_execute(cbm_store_t *store, const char *query, const char *projec
uq = uq->union_next;
}

/* Execution guard tripped (#601): report it explicitly instead of
* returning a silently-truncated or never-arriving result. */
if (g_cyp_abort != CYP_ABORT_NONE) {
const char *msg =
(g_cyp_abort == CYP_ABORT_TIMEOUT)
? "query exceeded the execution time limit — add LIMIT, a directed MATCH, or a "
"WHERE filter (raise CBM_CYPHER_TIMEOUT_MS to allow longer)"
: (g_cyp_abort == CYP_ABORT_OOM)
? "query ran out of memory while expanding matches — narrow it with LIMIT or "
"a directed MATCH"
: "query expansion exceeded the working-set limit — add LIMIT, a directed "
"MATCH, or a WHERE filter";
rb_free(&rb);
cbm_query_free(q);
out->error = heap_strdup(msg);
return CBM_NOT_FOUND;
}

/* UNION (not ALL) deduplication */
if (q->union_next && !q->union_all) {
rb_apply_distinct(&rb);
Expand Down
36 changes: 29 additions & 7 deletions src/mcp/mcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -789,20 +789,42 @@ static cbm_store_t *resolve_store(cbm_mcp_server_t *srv, const char *project) {
project_db_path(project, path, sizeof(path));
srv->store = cbm_store_open_path_query(path);
if (srv->store) {
/* Check DB integrity — auto-clean corrupt databases */
/* Check DB integrity — preserve corrupt databases instead of
* silently deleting them so the user can inspect the evidence and
* recover or report the issue (#557). Rename to .corrupt.<ts> so
* the path is obvious in the cache directory. */
if (!cbm_store_check_integrity(srv->store)) {
cbm_log_error("store.auto_clean", "project", project, "path", path, "action",
"deleting corrupt db — re-index required");
cbm_store_close(srv->store);
srv->store = NULL;
/* Delete the corrupt DB + WAL/SHM files */
cbm_unlink(path);
/* Rename the corrupt DB + WAL/SHM to preserve evidence */
char wal_path[MCP_FIELD_SIZE];
char shm_path[MCP_FIELD_SIZE];
char corrupt_path[MCP_FIELD_SIZE];
char corrupt_wal[MCP_FIELD_SIZE];
char corrupt_shm[MCP_FIELD_SIZE];
long long now_s = (long long)time(NULL);
snprintf(wal_path, sizeof(wal_path), "%s-wal", path);
snprintf(shm_path, sizeof(shm_path), "%s-shm", path);
cbm_unlink(wal_path);
cbm_unlink(shm_path);
snprintf(corrupt_path, sizeof(corrupt_path), "%s.corrupt.%lld", path, now_s);
snprintf(corrupt_wal, sizeof(corrupt_wal), "%s-wal.corrupt.%lld", path, now_s);
snprintf(corrupt_shm, sizeof(corrupt_shm), "%s-shm.corrupt.%lld", path, now_s);
if (rename(path, corrupt_path) != 0) {
cbm_unlink(path); /* fallback: delete if rename fails */
}
/* best-effort: sidecars may not exist */
if (rename(wal_path, corrupt_wal) != 0) {
cbm_unlink(wal_path);
}
if (rename(shm_path, corrupt_shm) != 0) {
cbm_unlink(shm_path);
}
cbm_log_error("store.auto_clean", "project", project, "path", path, "action",
"corrupt db preserved", "preserved_as", corrupt_path);
(void)fprintf(stderr,
"ERROR corrupt project database: %s\n"
"ERROR preserved as: %s\n"
"ERROR inspect the .corrupt file, delete it when done, and re-index.\n",
path, corrupt_path);
return NULL;
}

Expand Down
Loading
Loading