Skip to content

Commit 8021d94

Browse files
author
Your Name
committed
feat(store+cypher): channel dedup, count(DISTINCT), SQL injection fix
Channel deduplication: - Added UNIQUE index on channels(project, channel_name, direction, file_path, function_name) to prevent duplicate rows at insert time - Changed INSERT to INSERT OR IGNORE - Added DISTINCT to all channel SELECT queries - Fixed SQL injection in channel DELETE (was snprintf, now parameterized) Cypher count(DISTINCT ...): - Parser now accepts DISTINCT keyword inside aggregate functions: count(DISTINCT n.name), count(DISTINCT n.file_path), etc. - Added distinct_arg flag to cbm_return_item_t - Executor tracks seen values per-column and only increments count for unique values when distinct_arg is set - Proper cleanup of distinct_seen arrays in both WITH and RETURN paths Enables queries like: MATCH (caller)-[e]->(n) WHERE e.type = 'CALLS' RETURN count(DISTINCT n.name) as unique_callees
1 parent b7ba394 commit 8021d94

3 files changed

Lines changed: 68 additions & 11 deletions

File tree

src/cypher/cypher.c

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1052,6 +1052,10 @@ static int parse_return_or_with(parser_t *p, cbm_return_clause_t **out, bool is_
10521052
cbm_token_type_t ft = peek(p)->type;
10531053
advance(p);
10541054
expect(p, TOK_LPAREN);
1055+
/* Check for DISTINCT inside aggregate: count(DISTINCT ...) */
1056+
if (match(p, TOK_DISTINCT)) {
1057+
item.distinct_arg = true;
1058+
}
10551059
if (match(p, TOK_STAR)) {
10561060
item.variable = heap_strdup("*");
10571061
} else {
@@ -2568,6 +2572,10 @@ static int execute_single(cbm_store_t *store, cbm_query_t *q, const char *projec
25682572
double *sums;
25692573
int *counts;
25702574
double *mins, *maxs;
2575+
/* For count(DISTINCT ...): per-column arrays of seen values */
2576+
const char ***distinct_seen; /* [col][seen_idx] */
2577+
int *distinct_seen_count; /* count per column */
2578+
int *distinct_seen_cap; /* capacity per column */
25712579
} with_agg_t;
25722580
int agg_cap = 256;
25732581
with_agg_t *aggs = calloc(agg_cap, sizeof(with_agg_t));
@@ -2606,6 +2614,9 @@ static int execute_single(cbm_store_t *store, cbm_query_t *q, const char *projec
26062614
aggs[found].counts = calloc(wc->count, sizeof(int));
26072615
aggs[found].mins = malloc(wc->count * sizeof(double));
26082616
aggs[found].maxs = malloc(wc->count * sizeof(double));
2617+
aggs[found].distinct_seen = calloc(wc->count, sizeof(const char **));
2618+
aggs[found].distinct_seen_count = calloc(wc->count, sizeof(int));
2619+
aggs[found].distinct_seen_cap = calloc(wc->count, sizeof(int));
26092620
for (int ci = 0; ci < wc->count; ci++) {
26102621
aggs[found].mins[ci] = 1e308;
26112622
aggs[found].maxs[ci] = -1e308;
@@ -2624,9 +2635,34 @@ static int execute_single(cbm_store_t *store, cbm_query_t *q, const char *projec
26242635
if (!wc->items[ci].func) {
26252636
continue;
26262637
}
2627-
aggs[found].counts[ci]++;
26282638
const char *raw = binding_get_virtual(&bindings[bi], wc->items[ci].variable,
26292639
wc->items[ci].property);
2640+
/* count(DISTINCT ...): only count if value not already seen */
2641+
if (wc->items[ci].distinct_arg && strcmp(wc->items[ci].func, "COUNT") == 0) {
2642+
bool already = false;
2643+
for (int di = 0; di < aggs[found].distinct_seen_count[ci]; di++) {
2644+
if (aggs[found].distinct_seen[ci][di] &&
2645+
strcmp(aggs[found].distinct_seen[ci][di], raw) == 0) {
2646+
already = true;
2647+
break;
2648+
}
2649+
}
2650+
if (!already) {
2651+
/* Track the value */
2652+
if (aggs[found].distinct_seen_count[ci] >= aggs[found].distinct_seen_cap[ci]) {
2653+
int newcap = aggs[found].distinct_seen_cap[ci] < 16 ? 16 :
2654+
aggs[found].distinct_seen_cap[ci] * 2;
2655+
aggs[found].distinct_seen[ci] = safe_realloc(
2656+
aggs[found].distinct_seen[ci], newcap * sizeof(const char *));
2657+
aggs[found].distinct_seen_cap[ci] = newcap;
2658+
}
2659+
aggs[found].distinct_seen[ci][aggs[found].distinct_seen_count[ci]++] =
2660+
heap_strdup(raw);
2661+
aggs[found].counts[ci]++;
2662+
}
2663+
} else {
2664+
aggs[found].counts[ci]++;
2665+
}
26302666
double dv = strtod(raw, NULL);
26312667
aggs[found].sums[ci] += dv;
26322668
if (dv < aggs[found].mins[ci]) {
@@ -2703,6 +2739,17 @@ static int execute_single(cbm_store_t *store, cbm_query_t *q, const char *projec
27032739
free(aggs[a].counts);
27042740
free(aggs[a].mins);
27052741
free(aggs[a].maxs);
2742+
if (aggs[a].distinct_seen) {
2743+
for (int ci = 0; ci < wc->count; ci++) {
2744+
for (int di = 0; di < aggs[a].distinct_seen_count[ci]; di++) {
2745+
free((void *)aggs[a].distinct_seen[ci][di]);
2746+
}
2747+
free(aggs[a].distinct_seen[ci]);
2748+
}
2749+
free(aggs[a].distinct_seen);
2750+
free(aggs[a].distinct_seen_count);
2751+
free(aggs[a].distinct_seen_cap);
2752+
}
27062753
}
27072754
free(aggs);
27082755
} else {

src/cypher/cypher.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ typedef struct {
238238
const char *func; /* "COUNT", "SUM", "AVG", "MIN", "MAX", "COLLECT",
239239
"toLower", "toUpper", "toString" or NULL */
240240
cbm_case_expr_t *kase; /* CASE expression (NULL if not CASE) */
241+
bool distinct_arg; /* true when func is count(DISTINCT ...) */
241242
} cbm_return_item_t;
242243

243244
typedef struct {

src/store/store.c

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,8 @@ static int init_schema(cbm_store_t *s) {
218218
");"
219219
"CREATE INDEX IF NOT EXISTS idx_channels_name ON channels(channel_name);"
220220
"CREATE INDEX IF NOT EXISTS idx_channels_project ON channels(project);"
221+
"CREATE UNIQUE INDEX IF NOT EXISTS idx_channels_unique "
222+
"ON channels(project, channel_name, direction, file_path, function_name);"
221223
"CREATE TABLE IF NOT EXISTS project_summaries ("
222224
" project TEXT PRIMARY KEY,"
223225
" summary TEXT NOT NULL,"
@@ -4968,10 +4970,16 @@ int cbm_extract_csharp_channels(const char *source, cbm_channel_match_t *out, in
49684970
int cbm_store_detect_channels(cbm_store_t *s, const char *project, const char *repo_path) {
49694971
if (!s || !s->db || !project || !repo_path) return 0;
49704972

4971-
/* Clear existing channels for this project */
4972-
char del[256];
4973-
snprintf(del, sizeof(del), "DELETE FROM channels WHERE project = '%s'", project);
4974-
exec_sql(s, del);
4973+
/* Clear existing channels for this project (parameterized — no SQL injection) */
4974+
{
4975+
sqlite3_stmt *del_stmt = NULL;
4976+
sqlite3_prepare_v2(s->db, "DELETE FROM channels WHERE project = ?1", -1, &del_stmt, NULL);
4977+
if (del_stmt) {
4978+
bind_text(del_stmt, 1, project);
4979+
sqlite3_step(del_stmt);
4980+
sqlite3_finalize(del_stmt);
4981+
}
4982+
}
49754983

49764984
/* Find all Function/Method nodes with source file references in supported languages */
49774985
const char *sql = "SELECT id, name, file_path, start_line, end_line FROM nodes "
@@ -4985,7 +4993,7 @@ int cbm_store_detect_channels(cbm_store_t *s, const char *project, const char *r
49854993

49864994
sqlite3_stmt *ins = NULL;
49874995
sqlite3_prepare_v2(s->db,
4988-
"INSERT INTO channels(project,channel_name,direction,transport,node_id,file_path,function_name) "
4996+
"INSERT OR IGNORE INTO channels(project,channel_name,direction,transport,node_id,file_path,function_name) "
49894997
"VALUES(?1,?2,?3,?4,?5,?6,?7)", -1, &ins, NULL);
49904998

49914999
exec_sql(s, "BEGIN TRANSACTION");
@@ -5067,24 +5075,25 @@ int cbm_store_find_channels(cbm_store_t *s, const char *project, const char *cha
50675075
*out = NULL;
50685076
*count = 0;
50695077

5070-
/* Build query — if project is NULL, search all; if channel is NULL, return all */
5078+
/* Build query — if project is NULL, search all; if channel is NULL, return all.
5079+
* Use DISTINCT to prevent duplicate rows from different extraction passes. */
50715080
char sql[1024];
50725081
if (project && channel) {
50735082
snprintf(sql, sizeof(sql),
5074-
"SELECT channel_name, direction, transport, project, file_path, function_name "
5083+
"SELECT DISTINCT channel_name, direction, transport, project, file_path, function_name "
50755084
"FROM channels WHERE project = ?1 AND channel_name LIKE ?2 "
50765085
"ORDER BY channel_name LIMIT 500");
50775086
} else if (project) {
50785087
snprintf(sql, sizeof(sql),
5079-
"SELECT channel_name, direction, transport, project, file_path, function_name "
5088+
"SELECT DISTINCT channel_name, direction, transport, project, file_path, function_name "
50805089
"FROM channels WHERE project = ?1 ORDER BY channel_name LIMIT 500");
50815090
} else if (channel) {
50825091
snprintf(sql, sizeof(sql),
5083-
"SELECT channel_name, direction, transport, project, file_path, function_name "
5092+
"SELECT DISTINCT channel_name, direction, transport, project, file_path, function_name "
50845093
"FROM channels WHERE channel_name LIKE ?1 ORDER BY channel_name LIMIT 500");
50855094
} else {
50865095
snprintf(sql, sizeof(sql),
5087-
"SELECT channel_name, direction, transport, project, file_path, function_name "
5096+
"SELECT DISTINCT channel_name, direction, transport, project, file_path, function_name "
50885097
"FROM channels ORDER BY channel_name LIMIT 500");
50895098
}
50905099

0 commit comments

Comments
 (0)