Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/graph_buffer/graph_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,10 @@ int cbm_gbuf_node_count(const cbm_gbuf_t *gb) {
return gb ? (int)cbm_ht_count(gb->node_by_qn) : 0;
}

const char *cbm_gbuf_root_path(const cbm_gbuf_t *gb) {
return gb ? gb->root_path : "";
}

int cbm_gbuf_delete_by_label(cbm_gbuf_t *gb, const char *label) {
if (!gb || !label) {
return CBM_NOT_FOUND;
Expand Down
3 changes: 3 additions & 0 deletions src/graph_buffer/graph_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ int cbm_gbuf_find_by_name(const cbm_gbuf_t *gb, const char *name, const cbm_gbuf
/* Count total nodes in buffer. */
int cbm_gbuf_node_count(const cbm_gbuf_t *gb);

/* Return the root path associated with this graph buffer. Borrowed string. */
const char *cbm_gbuf_root_path(const cbm_gbuf_t *gb);

/* Get the next ID that would be assigned. Used to initialize shared atomic counters. */
int64_t cbm_gbuf_next_id(const cbm_gbuf_t *gb);

Expand Down
137 changes: 137 additions & 0 deletions src/pipeline/pass_parallel.c
Original file line number Diff line number Diff line change
Expand Up @@ -1473,6 +1473,138 @@ bool extract_grpc_service_method(const char *callee, char *service, size_t srv_s
return stripped && service[0] && method[0];
}

static bool extract_overpass_service_name(const char *import_qn, char *service, size_t srv_sz) {
const char *prefix = "code.byted.org/overpass/";
const char *p = strstr(import_qn ? import_qn : "", prefix);
if (!p) {
return false;
}
p += strlen(prefix);
const char *end = strchr(p, '/');
if (!end || end == p) {
return false;
}
size_t len = (size_t)(end - p);
if (len >= srv_sz) {
len = srv_sz - SKIP_ONE;
}
memcpy(service, p, len);
service[len] = '\0';
return service[0] != '\0';
}

static bool extract_overpass_rawcall_method(const char *callee, const char **imp_keys,
const char **imp_vals, int imp_count, char *service,
size_t srv_sz, char *method, size_t meth_sz) {
service[0] = '\0';
method[0] = '\0';
const char *rawcall = strstr(callee ? callee : "", ".RawCall.");
if (!rawcall || !rawcall[SKIP_ONE + 7]) {
return false;
}
size_t alias_len = (size_t)(rawcall - callee);
if (alias_len == 0 || alias_len >= CBM_SZ_128) {
return false;
}
char alias[CBM_SZ_128];
memcpy(alias, callee, alias_len);
alias[alias_len] = '\0';

snprintf(method, meth_sz, "%s", rawcall + strlen(".RawCall."));
char *dot = strchr(method, '.');
if (dot) {
*dot = '\0';
}
if (!method[0]) {
return false;
}

for (int i = 0; i < imp_count; i++) {
if (!imp_keys[i] || !imp_vals[i] || strcmp(imp_keys[i], alias) != 0) {
continue;
}
if (!strstr(imp_vals[i], "code.byted.org/overpass/") || !strstr(imp_vals[i], "/rpc/")) {
continue;
}
return extract_overpass_service_name(imp_vals[i], service, srv_sz);
}
return false;
}

static bool extract_overpass_rawcall_method_from_imports(const char *callee,
const CBMImportArray *imports,
char *service, size_t srv_sz,
char *method, size_t meth_sz) {
service[0] = '\0';
method[0] = '\0';
const char *rawcall = strstr(callee ? callee : "", ".RawCall.");
if (!rawcall || !rawcall[SKIP_ONE + 7]) {
return false;
}
size_t alias_len = (size_t)(rawcall - callee);
if (alias_len == 0 || alias_len >= CBM_SZ_128) {
return false;
}
char alias[CBM_SZ_128];
memcpy(alias, callee, alias_len);
alias[alias_len] = '\0';

snprintf(method, meth_sz, "%s", rawcall + strlen(".RawCall."));
char *dot = strchr(method, '.');
if (dot) {
*dot = '\0';
}
if (!method[0] || !imports) {
return false;
}

for (int i = 0; i < imports->count; i++) {
const CBMImport *imp = &imports->items[i];
if (!imp->local_name || !imp->module_path || strcmp(imp->local_name, alias) != 0) {
continue;
}
if (!strstr(imp->module_path, "code.byted.org/overpass/") ||
!strstr(imp->module_path, "/rpc/")) {
continue;
}
return extract_overpass_service_name(imp->module_path, service, srv_sz);
}
return false;
}

static bool emit_overpass_grpc_edge(cbm_gbuf_t *gbuf, const cbm_gbuf_node_t *source,
const CBMCall *call, const char **imp_keys,
const char **imp_vals, int imp_count,
const CBMImportArray *raw_imports) {
char service[CBM_SZ_256];
char method[CBM_SZ_256];
if (!extract_overpass_rawcall_method(call->callee_name, imp_keys, imp_vals, imp_count, service,
sizeof(service), method, sizeof(method)) &&
!extract_overpass_rawcall_method_from_imports(call->callee_name, raw_imports, service,
sizeof(service), method, sizeof(method))) {
return false;
}

char route_qn[CBM_SZ_512];
snprintf(route_qn, sizeof(route_qn), "__grpc__%s/%s", service, method);

char route_name[CBM_SZ_256];
snprintf(route_name, sizeof(route_name), "%s/%s", service, method);

int64_t route_id = cbm_gbuf_upsert_node(gbuf, "Route", route_name, route_qn, "", 0, 0,
"{\"source\":\"overpass\"}");

char esc_c[CBM_SZ_256];
cbm_json_escape(esc_c, sizeof(esc_c), call->callee_name);
char props[CBM_SZ_1K];
snprintf(props, sizeof(props),
"{\"callee\":\"%s\",\"service\":\"%s\",\"method\":\"%s\",\"source\":\"overpass\","
"\"confidence\":%.2f}",
esc_c, service, method, 0.95);
cbm_gbuf_insert_edge(gbuf, source->id, route_id, "GRPC_CALLS", props);
return true;
}

/* Emit GRPC_CALLS edge via gRPC Route node. */
static void emit_grpc_edge(cbm_gbuf_t *gbuf, const cbm_gbuf_node_t *source, const CBMCall *call,
const cbm_resolution_t *res) {
Expand Down Expand Up @@ -1841,6 +1973,11 @@ static void resolve_file_calls(resolve_ctx_t *rc, resolve_worker_state_t *ws, CB
continue;
}

if (emit_overpass_grpc_edge(ws->local_edge_buf, source_node, call, imp_keys, imp_vals,
imp_count, &result->imports)) {
continue;
}

if (!res.qualified_name || res.qualified_name[0] == '\0') {
if (cbm_service_pattern_route_method(call->callee_name) != NULL) {
cbm_resolution_t fake_res = {.qualified_name = call->callee_name,
Expand Down
168 changes: 168 additions & 0 deletions src/pipeline/pass_route_nodes.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ enum {

#define SLEN(s) (sizeof(s) - 1)
#include "pipeline/pipeline_internal.h"
#include "pipeline/pipeline.h"
#include <stdint.h>
#include "graph_buffer/graph_buffer.h"
#include "foundation/log.h"
Expand Down Expand Up @@ -887,6 +888,172 @@ static void create_grpc_routes(cbm_gbuf_t *gb) {
}
}

static bool route_node_extract_json_string(const char *json, const char *key, char *buf,
size_t bufsz) {
if (!json || !key || !buf || bufsz == 0) {
return false;
}
char pat[CBM_SZ_128];
snprintf(pat, sizeof(pat), "\"%s\":\"", key);
const char *start = strstr(json, pat);
if (!start) {
return false;
}
start += strlen(pat);
const char *end = strchr(start, '"');
if (!end || end == start) {
return false;
}
size_t len = (size_t)(end - start);
if (len >= bufsz) {
len = bufsz - SKIP_ONE;
}
memcpy(buf, start, len);
buf[len] = '\0';
return true;
}

static bool route_node_overpass_service_from_go_mod(const char *root, char *service,
size_t service_sz) {
if (!root || !service || service_sz == 0) {
return false;
}
char path[CBM_SZ_512];
snprintf(path, sizeof(path), "%s/go.mod", root);
FILE *f = fopen(path, "rb");
if (!f) {
return false;
}
char line[CBM_SZ_512];
bool ok = false;
while (fgets(line, sizeof(line), f)) {
char *p = strstr(line, "module ");
if (!p) {
continue;
}
p += strlen("module ");
while (*p == ' ' || *p == '\t') {
p++;
}
const char *prefix = "code.byted.org/life/";
if (strncmp(p, prefix, strlen(prefix)) != 0) {
break;
}
p += strlen(prefix);
char *end = p;
while (*end && *end != '\n' && *end != '\r' && *end != ' ' && *end != '\t') {
end++;
}
size_t len = (size_t)(end - p);
if (len > 0 && len < service_sz) {
memcpy(service, p, len);
service[len] = '\0';
ok = true;
}
break;
}
fclose(f);
return ok;
}

static bool route_node_overpass_service_from_handler_import(const char *root, char *service,
size_t service_sz) {
if (!root || !service || service_sz == 0) {
return false;
}
char path[CBM_SZ_512];
snprintf(path, sizeof(path), "%s/handler.go", root);
FILE *f = fopen(path, "rb");
if (!f) {
return false;
}
char line[CBM_SZ_512];
bool ok = false;
const char *prefix = "code.byted.org/overpass/";
while (fgets(line, sizeof(line), f)) {
char *p = strstr(line, prefix);
if (!p || !strstr(p, "/kitex_gen/")) {
continue;
}
p += strlen(prefix);
char *end = strchr(p, '/');
if (!end || end == p) {
continue;
}
size_t len = (size_t)(end - p);
if (len > 0 && len < service_sz) {
memcpy(service, p, len);
service[len] = '\0';
ok = true;
}
break;
}
fclose(f);
return ok;
}

/* Create gRPC Route nodes for ByteDance Kitex/Overpass handler methods.
* Client packages call code.byted.org/overpass/<service>/rpc/<service>.RawCall.Method.
* Server repos usually implement methods on *<Service>Impl in handler.go. */
static void create_overpass_handler_routes(cbm_gbuf_t *gb) {
char service[CBM_SZ_256] = {0};
if (!route_node_overpass_service_from_handler_import(cbm_gbuf_root_path(gb), service,
sizeof(service)) &&
!route_node_overpass_service_from_go_mod(cbm_gbuf_root_path(gb), service,
sizeof(service))) {
return;
}

const cbm_gbuf_node_t **methods = NULL;
int method_count = 0;
if (cbm_gbuf_find_by_label(gb, "Method", &methods, &method_count) != 0 || method_count == 0) {
return;
}

int routes = 0;
for (int i = 0; i < method_count; i++) {
const cbm_gbuf_node_t *m = methods[i];
if (!m->file_path || !m->name || !m->properties_json) {
continue;
}
if (strcmp(m->file_path, "handler.go") != 0 && !strstr(m->file_path, "/handler.go")) {
continue;
}
char parent[CBM_SZ_512];
if (!route_node_extract_json_string(m->properties_json, "parent_class", parent,
sizeof(parent))) {
continue;
}
if (!strstr(parent, "ServiceImpl")) {
continue;
}

char route_qn[CBM_ROUTE_QN_SIZE];
snprintf(route_qn, sizeof(route_qn), "__grpc__%s/%s", service, m->name);

char route_name[CBM_SZ_256];
snprintf(route_name, sizeof(route_name), "%s/%s", service, m->name);

char props[CBM_SZ_256];
snprintf(props, sizeof(props), "{\"source\":\"overpass\",\"service\":\"%s\"}", service);

int64_t route_id =
cbm_gbuf_upsert_node(gb, "Route", route_name, route_qn, m->file_path, m->start_line,
m->end_line, props);

char hprops[CBM_SZ_512];
snprintf(hprops, sizeof(hprops), "{\"via\":\"overpass_handler\",\"service\":\"%s\"}",
service);
cbm_gbuf_insert_edge(gb, m->id, route_id, "HANDLES", hprops);
routes++;
}
if (routes > 0) {
char buf[CBM_SZ_16];
snprintf(buf, sizeof(buf), "%d", routes);
cbm_log_info("pass.route_nodes.overpass", "routes", buf);
}
}

/* Phase 3: Create DATA_FLOWS edges by linking callers through Route to handlers. */
static void create_data_flows(cbm_gbuf_t *gb) {
const cbm_gbuf_node_t **routes = NULL;
Expand Down Expand Up @@ -1223,6 +1390,7 @@ void cbm_pipeline_create_route_nodes(cbm_gbuf_t *gb) {
* Scans Class nodes from .proto files, follows DEFINES_METHOD edges
* to find rpc methods, creates __grpc__ServiceName/MethodName Route nodes. */
create_grpc_routes(gb);
create_overpass_handler_routes(gb);

/* Phase 5: filesystem-based SvelteKit routes (+server / +page.server /
* +layout.server) — no call-site equivalent for pass_calls.c to pick
Expand Down
Loading