Skip to content

Commit 0d05b0a

Browse files
author
Your Name
committed
feat(pipeline): add Socket.IO and EventEmitter channel detection
Detect emit/listen channel patterns in JS/TS/Python source files during indexing. Extracts socket.emit/on, io.emit/on, emitter.emit/on patterns with a regex scanner that identifies receiver names against a whitelist of known channel communicators (socket, io, emitter, eventBus, etc.). Filters out generic Node.js stream events (error, close, data, etc.) and classifies transport as 'socketio' or 'eventemitter' based on receiver name. New schema: 'channels' table (project, channel_name, direction, transport, node_id, file_path, function_name) with indexes on channel_name and project. New store API: cbm_store_detect_channels() scans source from disk for all indexed Function/Method/Module nodes in JS/TS/Python files. cbm_store_find_channels() queries by project and/or channel name with partial matching. Automatic cross-repo matching at query time (no link step). New MCP tool: get_channels returns matched channels with emitter/listener info, filterable by channel name and project. Tested: TS monorepo detects 210 channel references including Socket.IO subscribe/unsubscribe flows between UI and server.
1 parent 8373e3f commit 0d05b0a

5 files changed

Lines changed: 366 additions & 3 deletions

File tree

src/mcp/mcp.c

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,14 @@ static const tool_def_t TOOLS[] = {
288288
"community detection. Returns up to 300 processes ordered by step count.",
289289
"{\"type\":\"object\",\"properties\":{\"project\":{\"type\":\"string\"}},\"required\":[\"project\"]}"},
290290

291+
{"get_channels",
292+
"Find message channels (Socket.IO events, EventEmitter signals) across projects. "
293+
"Shows which functions emit and listen on each channel, enabling cross-service "
294+
"message flow tracing. Auto-detects patterns during indexing. "
295+
"Query by channel name (partial match) and/or project.",
296+
"{\"type\":\"object\",\"properties\":{\"project\":{\"type\":\"string\"},"
297+
"\"channel\":{\"type\":\"string\",\"description\":\"Channel name filter (partial match)\"}}}"},
298+
291299
{"search_code",
292300
"Graph-augmented code search. Finds text patterns via grep, then enriches results with "
293301
"the knowledge graph: deduplicates matches into containing functions, ranks by structural "
@@ -1165,6 +1173,53 @@ static char *handle_delete_project(cbm_mcp_server_t *srv, const char *args) {
11651173
return result;
11661174
}
11671175

1176+
static char *handle_get_channels(cbm_mcp_server_t *srv, const char *args) {
1177+
char *project = cbm_mcp_get_string_arg(args, "project");
1178+
char *channel = cbm_mcp_get_string_arg(args, "channel");
1179+
cbm_store_t *store = resolve_store(srv, project);
1180+
REQUIRE_STORE(store, project);
1181+
1182+
cbm_channel_info_t *channels = NULL;
1183+
int count = 0;
1184+
cbm_store_find_channels(store, project, channel, &channels, &count);
1185+
1186+
yyjson_mut_doc *doc = yyjson_mut_doc_new(NULL);
1187+
yyjson_mut_val *root = yyjson_mut_obj(doc);
1188+
yyjson_mut_doc_set_root(doc, root);
1189+
1190+
yyjson_mut_obj_add_int(doc, root, "total", count);
1191+
1192+
/* Group by channel name for readable output */
1193+
yyjson_mut_val *arr = yyjson_mut_arr(doc);
1194+
for (int i = 0; i < count; i++) {
1195+
yyjson_mut_val *item = yyjson_mut_obj(doc);
1196+
yyjson_mut_obj_add_strcpy(doc, item, "channel",
1197+
channels[i].channel_name ? channels[i].channel_name : "");
1198+
yyjson_mut_obj_add_strcpy(doc, item, "direction",
1199+
channels[i].direction ? channels[i].direction : "");
1200+
yyjson_mut_obj_add_strcpy(doc, item, "transport",
1201+
channels[i].transport ? channels[i].transport : "");
1202+
yyjson_mut_obj_add_strcpy(doc, item, "project",
1203+
channels[i].project ? channels[i].project : "");
1204+
yyjson_mut_obj_add_strcpy(doc, item, "file",
1205+
channels[i].file_path ? channels[i].file_path : "");
1206+
yyjson_mut_obj_add_strcpy(doc, item, "function",
1207+
channels[i].function_name ? channels[i].function_name : "");
1208+
yyjson_mut_arr_add_val(arr, item);
1209+
}
1210+
yyjson_mut_obj_add_val(doc, root, "channels", arr);
1211+
1212+
char *json = yy_doc_to_str(doc);
1213+
yyjson_mut_doc_free(doc);
1214+
cbm_store_free_channels(channels, count);
1215+
free(project);
1216+
free(channel);
1217+
1218+
char *result = cbm_mcp_text_result(json, false);
1219+
free(json);
1220+
return result;
1221+
}
1222+
11681223
static char *handle_list_processes(cbm_mcp_server_t *srv, const char *args) {
11691224
char *project = cbm_mcp_get_string_arg(args, "project");
11701225
cbm_store_t *store = resolve_store(srv, project);
@@ -2968,6 +3023,9 @@ char *cbm_mcp_handle_tool(cbm_mcp_server_t *srv, const char *tool_name, const ch
29683023
if (strcmp(tool_name, "list_processes") == 0) {
29693024
return handle_list_processes(srv, args_json);
29703025
}
3026+
if (strcmp(tool_name, "get_channels") == 0) {
3027+
return handle_get_channels(srv, args_json);
3028+
}
29713029

29723030
/* Pipeline-dependent tools */
29733031
if (strcmp(tool_name, "index_repository") == 0) {

src/pipeline/httplink.c

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,6 @@ static int count_segments(const char *path) {
362362
return count;
363363
}
364364

365-
/* Jaccard similarity of path segments (intersection/union) */
366365
static double segment_jaccard(const char *norm_call, const char *norm_route) {
367366
/* Split into segments */
368367
char a[1024];
@@ -1907,3 +1906,92 @@ int cbm_httplink_all_exclude_paths(const cbm_httplink_config_t *cfg, const char
19071906

19081907
return count;
19091908
}
1909+
1910+
/* ── Channel extraction: Socket.IO / EventEmitter ────────────────── */
1911+
1912+
typedef struct cbm_channel_match {
1913+
char channel[256];
1914+
char direction[8]; /* "emit" or "listen" */
1915+
char transport[32]; /* "socketio", "eventemitter" */
1916+
} cbm_channel_match_t;
1917+
1918+
int cbm_extract_channels(const char *source, cbm_channel_match_t *out, int max_out) {
1919+
if (!source || !*source) return 0;
1920+
1921+
cbm_regex_t re;
1922+
if (cbm_regcomp(&re,
1923+
"([a-zA-Z_][a-zA-Z0-9_]*)\\.("
1924+
"emit|on|once|addListener|removeListener"
1925+
")\\([[:space:]]*['\"`]([^'\"`]{1,128})['\"`]",
1926+
CBM_REG_EXTENDED) != 0) {
1927+
return 0;
1928+
}
1929+
1930+
static const char *channel_receivers[] = {
1931+
"socket", "io", "client", "server", "connection",
1932+
"emitter", "eventEmitter", "eventBus", "bus", "pubsub",
1933+
"producer", "consumer", "channel", "broker",
1934+
"nsp", "namespace", "this", NULL
1935+
};
1936+
1937+
int count = 0;
1938+
const char *p = source;
1939+
cbm_regmatch_t match[4];
1940+
1941+
while (count < max_out && cbm_regexec(&re, p, 4, match, 0) == 0) {
1942+
int rlen = match[1].rm_eo - match[1].rm_so;
1943+
char receiver[64];
1944+
if (rlen >= (int)sizeof(receiver)) rlen = (int)sizeof(receiver) - 1;
1945+
memcpy(receiver, p + match[1].rm_so, (size_t)rlen);
1946+
receiver[rlen] = '\0';
1947+
1948+
bool is_channel = false;
1949+
for (int i = 0; channel_receivers[i]; i++) {
1950+
if (strcasecmp(receiver, channel_receivers[i]) == 0) {
1951+
is_channel = true;
1952+
break;
1953+
}
1954+
}
1955+
1956+
if (is_channel) {
1957+
int mlen = match[2].rm_eo - match[2].rm_so;
1958+
char method[32];
1959+
if (mlen >= (int)sizeof(method)) mlen = (int)sizeof(method) - 1;
1960+
memcpy(method, p + match[2].rm_so, (size_t)mlen);
1961+
method[mlen] = '\0';
1962+
1963+
int clen = match[3].rm_eo - match[3].rm_so;
1964+
if (clen >= (int)sizeof(out[count].channel))
1965+
clen = (int)sizeof(out[count].channel) - 1;
1966+
memcpy(out[count].channel, p + match[3].rm_so, (size_t)clen);
1967+
out[count].channel[clen] = '\0';
1968+
1969+
const char *ch = out[count].channel;
1970+
if (strcmp(ch, "error") != 0 && strcmp(ch, "close") != 0 &&
1971+
strcmp(ch, "end") != 0 && strcmp(ch, "data") != 0 &&
1972+
strcmp(ch, "connect") != 0 && strcmp(ch, "disconnect") != 0 &&
1973+
strcmp(ch, "connection") != 0 && strcmp(ch, "message") != 0 &&
1974+
strcmp(ch, "open") != 0 && strcmp(ch, "drain") != 0 &&
1975+
strcmp(ch, "finish") != 0 && strcmp(ch, "pipe") != 0 &&
1976+
strcmp(ch, "unpipe") != 0 && strcmp(ch, "readable") != 0 &&
1977+
strcmp(ch, "resume") != 0 && strcmp(ch, "pause") != 0) {
1978+
if (strcmp(method, "emit") == 0) {
1979+
strncpy(out[count].direction, "emit", sizeof(out[count].direction) - 1);
1980+
} else {
1981+
strncpy(out[count].direction, "listen", sizeof(out[count].direction) - 1);
1982+
}
1983+
if (strcasecmp(receiver, "socket") == 0 || strcasecmp(receiver, "io") == 0 ||
1984+
strcasecmp(receiver, "nsp") == 0 || strcasecmp(receiver, "namespace") == 0) {
1985+
strncpy(out[count].transport, "socketio", sizeof(out[count].transport) - 1);
1986+
} else {
1987+
strncpy(out[count].transport, "eventemitter", sizeof(out[count].transport) - 1);
1988+
}
1989+
count++;
1990+
}
1991+
}
1992+
p += match[0].rm_eo;
1993+
}
1994+
1995+
cbm_regfree(&re);
1996+
return count;
1997+
}

src/pipeline/pipeline.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -839,6 +839,17 @@ int cbm_pipeline_run(cbm_pipeline_t *p) {
839839
cbm_store_close(proc_store);
840840
}
841841
}
842+
843+
/* ── Channel detection: scan source for emit/on patterns ── */
844+
{
845+
cbm_store_t *ch_store = cbm_store_open_path(db_path);
846+
if (ch_store) {
847+
int nch = cbm_store_detect_channels(ch_store, p->project_name, p->repo_path);
848+
cbm_log_info("pass.done", "pass", "channels",
849+
"detected", itoa_buf(nch));
850+
cbm_store_close(ch_store);
851+
}
852+
}
842853
}
843854
}
844855

src/store/store.c

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,18 @@ static int init_schema(cbm_store_t *s) {
206206
" step INTEGER NOT NULL,"
207207
" PRIMARY KEY (process_id, step)"
208208
");"
209+
"CREATE TABLE IF NOT EXISTS channels ("
210+
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
211+
" project TEXT NOT NULL REFERENCES projects(name) ON DELETE CASCADE,"
212+
" channel_name TEXT NOT NULL,"
213+
" direction TEXT NOT NULL," /* 'emit' or 'listen' */
214+
" transport TEXT NOT NULL DEFAULT 'socketio',"
215+
" node_id INTEGER NOT NULL,"
216+
" file_path TEXT DEFAULT '',"
217+
" function_name TEXT DEFAULT ''"
218+
");"
219+
"CREATE INDEX IF NOT EXISTS idx_channels_name ON channels(channel_name);"
220+
"CREATE INDEX IF NOT EXISTS idx_channels_project ON channels(project);"
209221
"CREATE TABLE IF NOT EXISTS project_summaries ("
210222
" project TEXT PRIMARY KEY,"
211223
" summary TEXT NOT NULL,"
@@ -4778,6 +4790,179 @@ void cbm_store_free_process_steps(cbm_process_step_t *arr, int count) {
47784790
free(arr);
47794791
}
47804792

4793+
/* ── Channels (cross-service message tracing) ────────────────────── */
4794+
4795+
/* Forward declaration of channel extractor from httplink.c */
4796+
typedef struct {
4797+
char channel[256];
4798+
char direction[8];
4799+
char transport[32];
4800+
} cbm_channel_match_t;
4801+
int cbm_extract_channels(const char *source, cbm_channel_match_t *out, int max_out);
4802+
4803+
int cbm_store_detect_channels(cbm_store_t *s, const char *project, const char *repo_path) {
4804+
if (!s || !s->db || !project || !repo_path) return 0;
4805+
4806+
/* Clear existing channels for this project */
4807+
char del[256];
4808+
snprintf(del, sizeof(del), "DELETE FROM channels WHERE project = '%s'", project);
4809+
exec_sql(s, del);
4810+
4811+
/* Find all JS/TS Function/Method nodes with source file references */
4812+
const char *sql = "SELECT id, name, file_path, start_line, end_line FROM nodes "
4813+
"WHERE project = ?1 AND label IN ('Function','Method','Module') "
4814+
"AND (file_path LIKE '%.ts' OR file_path LIKE '%.js' "
4815+
"OR file_path LIKE '%.tsx' OR file_path LIKE '%.py')";
4816+
sqlite3_stmt *stmt = NULL;
4817+
if (sqlite3_prepare_v2(s->db, sql, -1, &stmt, NULL) != SQLITE_OK) return 0;
4818+
bind_text(stmt, 1, project);
4819+
4820+
sqlite3_stmt *ins = NULL;
4821+
sqlite3_prepare_v2(s->db,
4822+
"INSERT INTO channels(project,channel_name,direction,transport,node_id,file_path,function_name) "
4823+
"VALUES(?1,?2,?3,?4,?5,?6,?7)", -1, &ins, NULL);
4824+
4825+
exec_sql(s, "BEGIN TRANSACTION");
4826+
int total = 0;
4827+
4828+
while (sqlite3_step(stmt) == SQLITE_ROW) {
4829+
int64_t node_id = sqlite3_column_int64(stmt, 0);
4830+
const char *name = (const char *)sqlite3_column_text(stmt, 1);
4831+
const char *fpath = (const char *)sqlite3_column_text(stmt, 2);
4832+
int start = sqlite3_column_int(stmt, 3);
4833+
int end = sqlite3_column_int(stmt, 4);
4834+
4835+
if (!fpath || !fpath[0] || start <= 0 || end <= 0) continue;
4836+
4837+
/* Read source lines from disk */
4838+
char full_path[2048];
4839+
snprintf(full_path, sizeof(full_path), "%s/%s", repo_path, fpath);
4840+
4841+
FILE *f = fopen(full_path, "r");
4842+
if (!f) continue;
4843+
4844+
/* Read relevant lines */
4845+
char *source = NULL;
4846+
size_t src_len = 0;
4847+
size_t src_cap = 0;
4848+
int line_num = 0;
4849+
char line[4096];
4850+
4851+
while (fgets(line, sizeof(line), f)) {
4852+
line_num++;
4853+
if (line_num < start) continue;
4854+
if (line_num > end) break;
4855+
size_t ll = strlen(line);
4856+
if (src_len + ll >= src_cap) {
4857+
src_cap = (src_cap == 0) ? 4096 : src_cap * 2;
4858+
source = safe_realloc(source, src_cap);
4859+
}
4860+
memcpy(source + src_len, line, ll);
4861+
src_len += ll;
4862+
}
4863+
fclose(f);
4864+
4865+
if (source) {
4866+
source[src_len] = '\0';
4867+
cbm_channel_match_t matches[64];
4868+
int mc = cbm_extract_channels(source, matches, 64);
4869+
for (int i = 0; i < mc && ins; i++) {
4870+
sqlite3_reset(ins);
4871+
bind_text(ins, 1, project);
4872+
bind_text(ins, 2, matches[i].channel);
4873+
bind_text(ins, 3, matches[i].direction);
4874+
bind_text(ins, 4, matches[i].transport);
4875+
sqlite3_bind_int64(ins, 5, node_id);
4876+
bind_text(ins, 6, fpath);
4877+
bind_text(ins, 7, name ? name : "");
4878+
sqlite3_step(ins);
4879+
total++;
4880+
}
4881+
free(source);
4882+
}
4883+
}
4884+
4885+
exec_sql(s, "COMMIT");
4886+
sqlite3_finalize(stmt);
4887+
if (ins) sqlite3_finalize(ins);
4888+
return total;
4889+
}
4890+
4891+
int cbm_store_find_channels(cbm_store_t *s, const char *project, const char *channel,
4892+
cbm_channel_info_t **out, int *count) {
4893+
*out = NULL;
4894+
*count = 0;
4895+
4896+
/* Build query — if project is NULL, search all; if channel is NULL, return all */
4897+
char sql[1024];
4898+
if (project && channel) {
4899+
snprintf(sql, sizeof(sql),
4900+
"SELECT channel_name, direction, transport, project, file_path, function_name "
4901+
"FROM channels WHERE project = ?1 AND channel_name LIKE ?2 "
4902+
"ORDER BY channel_name LIMIT 500");
4903+
} else if (project) {
4904+
snprintf(sql, sizeof(sql),
4905+
"SELECT channel_name, direction, transport, project, file_path, function_name "
4906+
"FROM channels WHERE project = ?1 ORDER BY channel_name LIMIT 500");
4907+
} else if (channel) {
4908+
snprintf(sql, sizeof(sql),
4909+
"SELECT channel_name, direction, transport, project, file_path, function_name "
4910+
"FROM channels WHERE channel_name LIKE ?1 ORDER BY channel_name LIMIT 500");
4911+
} else {
4912+
snprintf(sql, sizeof(sql),
4913+
"SELECT channel_name, direction, transport, project, file_path, function_name "
4914+
"FROM channels ORDER BY channel_name LIMIT 500");
4915+
}
4916+
4917+
sqlite3_stmt *stmt = NULL;
4918+
if (sqlite3_prepare_v2(s->db, sql, -1, &stmt, NULL) != SQLITE_OK) return CBM_STORE_OK;
4919+
4920+
int bi = 0;
4921+
if (project && channel) {
4922+
bind_text(stmt, 1, project);
4923+
char pat[256];
4924+
snprintf(pat, sizeof(pat), "%%%s%%", channel);
4925+
bind_text(stmt, 2, pat);
4926+
} else if (project) {
4927+
bind_text(stmt, 1, project);
4928+
} else if (channel) {
4929+
char pat[256];
4930+
snprintf(pat, sizeof(pat), "%%%s%%", channel);
4931+
bind_text(stmt, 1, pat);
4932+
}
4933+
(void)bi;
4934+
4935+
int cap = 64;
4936+
int n = 0;
4937+
cbm_channel_info_t *arr = calloc((size_t)cap, sizeof(cbm_channel_info_t));
4938+
while (sqlite3_step(stmt) == SQLITE_ROW) {
4939+
if (n >= cap) { cap *= 2; arr = safe_realloc(arr, (size_t)cap * sizeof(cbm_channel_info_t)); }
4940+
arr[n].channel_name = heap_strdup((const char *)sqlite3_column_text(stmt, 0));
4941+
arr[n].direction = heap_strdup((const char *)sqlite3_column_text(stmt, 1));
4942+
arr[n].transport = heap_strdup((const char *)sqlite3_column_text(stmt, 2));
4943+
arr[n].project = heap_strdup((const char *)sqlite3_column_text(stmt, 3));
4944+
arr[n].file_path = heap_strdup((const char *)sqlite3_column_text(stmt, 4));
4945+
arr[n].function_name = heap_strdup((const char *)sqlite3_column_text(stmt, 5));
4946+
n++;
4947+
}
4948+
sqlite3_finalize(stmt);
4949+
*out = arr;
4950+
*count = n;
4951+
return CBM_STORE_OK;
4952+
}
4953+
4954+
void cbm_store_free_channels(cbm_channel_info_t *arr, int count) {
4955+
for (int i = 0; i < count; i++) {
4956+
free((void *)arr[i].channel_name);
4957+
free((void *)arr[i].direction);
4958+
free((void *)arr[i].transport);
4959+
free((void *)arr[i].project);
4960+
free((void *)arr[i].file_path);
4961+
free((void *)arr[i].function_name);
4962+
}
4963+
free(arr);
4964+
}
4965+
47814966
/* ── ADR (Architecture Decision Record) ────────────────────────── */
47824967

47834968
static const char *canonical_sections[] = {"PURPOSE", "STACK", "ARCHITECTURE",

0 commit comments

Comments
 (0)