diff --git a/.pnpmfile.cjs b/.pnpmfile.cjs new file mode 100644 index 000000000..0c8a56fa6 --- /dev/null +++ b/.pnpmfile.cjs @@ -0,0 +1,29 @@ +const workflowPackageVersions = { + '@tanstack/workflow-core': '0.0.3', + '@tanstack/workflow-runtime': '0.0.1', +} + +function readPackage(pkg) { + if ( + pkg.name === '@tanstack/workflow-runtime' || + pkg.name === '@tanstack/workflow-store-drizzle-postgres' || + pkg.name === '@tanstack/workflow-netlify' + ) { + pkg.dependencies = { + ...pkg.dependencies, + ...Object.fromEntries( + Object.entries(workflowPackageVersions).filter(([name]) => + pkg.dependencies?.[name]?.startsWith('workspace:'), + ), + ), + } + } + + return pkg +} + +module.exports = { + hooks: { + readPackage, + }, +} diff --git a/drizzle/migrations/0000_workflow_run_store.sql b/drizzle/migrations/0000_workflow_run_store.sql new file mode 100644 index 000000000..e51670c43 --- /dev/null +++ b/drizzle/migrations/0000_workflow_run_store.sql @@ -0,0 +1,103 @@ +CREATE TABLE IF NOT EXISTS "workflow_runs" ( + "run_id" text PRIMARY KEY, + "workflow_id" text NOT NULL, + "workflow_version" text, + "status" text NOT NULL, + "input" jsonb NOT NULL, + "output" jsonb, + "error" jsonb, + "waiting_for" jsonb, + "pending_approval" jsonb, + "wake_at" bigint, + "lease_owner" text, + "lease_expires_at" bigint, + "created_at" bigint NOT NULL, + "updated_at" bigint NOT NULL +); +--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "workflow_runs_status_idx" ON "workflow_runs" ("status", "updated_at"); +--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "workflow_runs_lease_idx" ON "workflow_runs" ("status", "lease_expires_at"); +--> statement-breakpoint +CREATE TABLE IF NOT EXISTS "workflow_run_states" ( + "run_id" text PRIMARY KEY, + "workflow_id" text NOT NULL, + "workflow_version" text, + "status" text NOT NULL, + "input" jsonb NOT NULL, + "output" jsonb, + "error" jsonb, + "waiting_for" jsonb, + "pending_approval" jsonb, + "created_at" bigint NOT NULL, + "updated_at" bigint NOT NULL +); +--> statement-breakpoint +CREATE TABLE IF NOT EXISTS "workflow_event_locks" ( + "run_id" text PRIMARY KEY, + "created_at" bigint NOT NULL +); +--> statement-breakpoint +CREATE TABLE IF NOT EXISTS "workflow_events" ( + "run_id" text NOT NULL, + "event_index" integer NOT NULL, + "event_type" text NOT NULL, + "step_id" text, + "event" jsonb NOT NULL, + "created_at" bigint NOT NULL, + CONSTRAINT "workflow_events_run_id_event_index_pk" PRIMARY KEY ("run_id", "event_index") +); +--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "workflow_events_type_idx" ON "workflow_events" ("run_id", "event_type"); +--> statement-breakpoint +CREATE TABLE IF NOT EXISTS "workflow_timers" ( + "run_id" text NOT NULL, + "signal_id" text NOT NULL, + "workflow_id" text NOT NULL, + "workflow_version" text, + "wake_at" bigint NOT NULL, + "lease_owner" text, + "lease_expires_at" bigint, + CONSTRAINT "workflow_timers_run_id_signal_id_pk" PRIMARY KEY ("run_id", "signal_id") +); +--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "workflow_timers_due_idx" ON "workflow_timers" ("wake_at", "lease_expires_at"); +--> statement-breakpoint +CREATE TABLE IF NOT EXISTS "workflow_signal_deliveries" ( + "run_id" text NOT NULL, + "signal_id" text NOT NULL, + "created_at" bigint NOT NULL, + CONSTRAINT "workflow_signal_deliveries_run_id_signal_id_pk" PRIMARY KEY ("run_id", "signal_id") +); +--> statement-breakpoint +CREATE TABLE IF NOT EXISTS "workflow_schedules" ( + "schedule_id" text PRIMARY KEY, + "workflow_id" text NOT NULL, + "workflow_version" text, + "schedule" jsonb NOT NULL, + "overlap_policy" text NOT NULL, + "input" jsonb, + "next_fire_at" bigint, + "enabled" boolean NOT NULL, + "updated_at" bigint NOT NULL +); +--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "workflow_schedules_due_idx" ON "workflow_schedules" ("enabled", "next_fire_at"); +--> statement-breakpoint +CREATE TABLE IF NOT EXISTS "workflow_schedule_buckets" ( + "schedule_id" text NOT NULL, + "bucket_id" text NOT NULL, + "workflow_id" text NOT NULL, + "workflow_version" text, + "run_id" text NOT NULL, + "fire_at" bigint NOT NULL, + "input" jsonb, + "overlap_policy" text NOT NULL, + "status" text NOT NULL, + "lease_owner" text, + "lease_expires_at" bigint, + "started_at" bigint, + CONSTRAINT "workflow_schedule_buckets_schedule_id_bucket_id_pk" PRIMARY KEY ("schedule_id", "bucket_id") +); +--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "workflow_schedule_buckets_lease_idx" ON "workflow_schedule_buckets" ("status", "fire_at", "lease_expires_at"); diff --git a/netlify/functions/sync-intent-discover-background.ts b/netlify/functions/sync-intent-discover-background.ts deleted file mode 100644 index 0e9ecf2c7..000000000 --- a/netlify/functions/sync-intent-discover-background.ts +++ /dev/null @@ -1,217 +0,0 @@ -import type { Config } from '@netlify/functions' -import { - searchIntentPackages, - fetchPackument, - isIntentCompatible, - selectVersionsToSync, - extractSkillsFromTarball, -} from '~/utils/intent.server' -import { - upsertIntentPackage, - getKnownVersions, - enqueuePackageVersion, - markPackageVerified, -} from '~/utils/intent-db.server' - -/** - * Netlify Scheduled Function - Discover Intent-compatible npm packages - * - * Phase 1 of 2. Fast: no tarball downloads (except brief header peeks for GitHub path). - * - * Two discovery paths: - * 1. NPM keyword search — instant, finds packages that published with tanstack-intent keyword - * 2. GitHub code search — finds repos that depend on @tanstack/intent but may not have the keyword yet - * - * Both paths enqueue new versions for tarball processing (syncStatus = 'pending'). - * Actual skill extraction happens in sync-intent-process-background. - * - * Scheduled: Every 6 hours - */ -const handler = async (req: Request) => { - const { next_run } = await req.json() - const startTime = Date.now() - - console.log('[intent-discover] Starting discovery (NPM + GitHub)...') - - let versionsEnqueued = 0 - const errors: Array = [] - - // --------------------------------------------------------------------------- - // Path 1: NPM keyword search - // --------------------------------------------------------------------------- - try { - console.log( - '[intent-discover] Searching NPM for keywords:tanstack-intent...', - ) - const searchResults = await searchIntentPackages() - console.log( - `[intent-discover] NPM found ${searchResults.objects.length} candidates`, - ) - - for (const { package: pkg } of searchResults.objects) { - try { - await upsertIntentPackage({ name: pkg.name, verified: false }) - - const packument = await fetchPackument(pkg.name) - const latestVersion = packument['dist-tags'].latest - if (!latestVersion) continue - - const latestMeta = packument.versions[latestVersion] - if (!latestMeta || !isIntentCompatible(latestMeta)) continue - - await markPackageVerified(pkg.name) - - const knownVersions = await getKnownVersions(pkg.name) - const toEnqueue = selectVersionsToSync(packument, knownVersions) - for (const { version, tarball, publishedAt } of toEnqueue) { - await enqueuePackageVersion({ - packageName: pkg.name, - version, - tarballUrl: tarball, - publishedAt, - }) - versionsEnqueued++ - } - console.log( - `[intent-discover] NPM: ${pkg.name} - enqueued ${toEnqueue.length}`, - ) - } catch (e) { - const msg = `npm/${pkg.name}: ${e instanceof Error ? e.message : String(e)}` - console.error(`[intent-discover] ${msg}`) - errors.push(msg) - } - } - } catch (e) { - console.error( - '[intent-discover] NPM path failed:', - e instanceof Error ? e.message : String(e), - ) - } - - // --------------------------------------------------------------------------- - // Path 2: GitHub code search for @tanstack/intent dependents - // --------------------------------------------------------------------------- - const githubToken = process.env.GITHUB_AUTH_TOKEN - if (githubToken) { - try { - console.log( - '[intent-discover] Searching GitHub for @tanstack/intent dependents...', - ) - const ghHeaders = { - Authorization: `Bearer ${githubToken}`, - Accept: 'application/vnd.github.v3+json', - } - - const searchRes = await fetch( - 'https://api.github.com/search/code?q=%22%40tanstack%2Fintent%22+filename%3Apackage.json&per_page=100', - { headers: ghHeaders }, - ) - if (!searchRes.ok) throw new Error(`GitHub search ${searchRes.status}`) - - const searchData = (await searchRes.json()) as { - items: Array<{ path: string; repository: { full_name: string } }> - } - - // Deduplicate repo+path pairs - const seen = new Set() - const candidates = searchData.items.filter((item) => { - const key = `${item.repository.full_name}|${item.path}` - if (seen.has(key)) return false - seen.add(key) - return true - }) - - console.log( - `[intent-discover] GitHub found ${candidates.length} package.json files`, - ) - - for (const { repo, path } of candidates.map((i) => ({ - repo: i.repository.full_name, - path: i.path, - }))) { - try { - const contentRes = await fetch( - `https://api.github.com/repos/${repo}/contents/${path}`, - { headers: ghHeaders }, - ) - if (!contentRes.ok) continue - - const contentData = (await contentRes.json()) as { content?: string } - if (!contentData.content) continue - - const pkgJson = JSON.parse( - Buffer.from(contentData.content, 'base64').toString('utf-8'), - ) as { name?: string; private?: boolean } - - const pkgName = pkgJson.name - if (!pkgName || pkgJson.private) continue - - // Check NPM - const npmRes = await fetch( - `https://registry.npmjs.org/${encodeURIComponent(pkgName)}/latest`, - ) - if (!npmRes.ok) continue - - const npmMeta = (await npmRes.json()) as { - version?: string - dist?: { tarball?: string } - } - if (!npmMeta.version || !npmMeta.dist?.tarball) continue - - // Peek at tarball for skills - const skills = await extractSkillsFromTarball(npmMeta.dist.tarball) - if (skills.length === 0) continue - - await upsertIntentPackage({ name: pkgName, verified: true }) - await markPackageVerified(pkgName) - - const packument = await fetchPackument(pkgName) - const knownVersions = await getKnownVersions(pkgName) - const toEnqueue = selectVersionsToSync(packument, knownVersions) - - for (const { version, tarball, publishedAt } of toEnqueue) { - await enqueuePackageVersion({ - packageName: pkgName, - version, - tarballUrl: tarball, - publishedAt, - }) - versionsEnqueued++ - } - if (toEnqueue.length > 0) { - console.log( - `[intent-discover] GitHub: ${pkgName} - enqueued ${toEnqueue.length}`, - ) - } - } catch (e) { - const msg = `github/${repo}: ${e instanceof Error ? e.message : String(e)}` - console.error(`[intent-discover] ${msg}`) - errors.push(msg) - } - } - } catch (e) { - console.error( - '[intent-discover] GitHub path failed:', - e instanceof Error ? e.message : String(e), - ) - } - } else { - console.warn( - '[intent-discover] GITHUB_AUTH_TOKEN not set, skipping GitHub path', - ) - } - - const duration = Date.now() - startTime - console.log( - `[intent-discover] Done in ${duration}ms - enqueued: ${versionsEnqueued}, errors: ${errors.length}`, - ) - if (errors.length > 0) - console.warn(`[intent-discover] Errors:\n ${errors.join('\n ')}`) - console.log('[intent-discover] Next invocation at:', next_run) -} - -export default handler - -export const config: Config = { - schedule: '0 */6 * * *', // Every 6 hours -} diff --git a/netlify/functions/sync-intent-process-background.ts b/netlify/functions/sync-intent-process-background.ts deleted file mode 100644 index 4d7fe72cf..000000000 --- a/netlify/functions/sync-intent-process-background.ts +++ /dev/null @@ -1,117 +0,0 @@ -import type { Config } from '@netlify/functions' -import { extractSkillsFromTarball } from '~/utils/intent.server' -import { - getPendingVersions, - replaceSkillsForVersion, - markVersionSynced, - markVersionFailed, -} from '~/utils/intent-db.server' - -// Hard budget: stop processing with this much time remaining before the -// 15-minute Netlify background function limit. Each tarball can take 1-5s, -// so 3 minutes of headroom is sufficient. -const BUDGET_MS = 12 * 60 * 1000 // 12 minutes - -/** - * Netlify Scheduled Function - Process pending Intent skill extractions - * - * Phase 2 of 2. Drains the pending version queue populated by - * sync-intent-discover-background. Runs more frequently so new packages - * appear in the registry quickly. - * - * Durability: each version is atomically marked 'synced' or 'failed' in the - * DB immediately after processing. A timeout or crash loses at most the - * single in-flight version (which stays 'pending' and is retried next run). - * Failed versions are retried every cycle until they succeed. - * - * Scheduled: Every 15 minutes - */ -const handler = async (req: Request) => { - const { next_run } = await req.json() - const startTime = Date.now() - const deadline = startTime + BUDGET_MS - - console.log('[intent-process] Starting queue drain...') - - let processed = 0 - let failed = 0 - let skipped = 0 - - try { - // Fetch enough pending items to keep us busy, but not so many we hold - // a huge result set in memory. We'll loop until time runs out. - const BATCH_SIZE = 50 - - while (Date.now() < deadline) { - const remaining = deadline - Date.now() - const pending = await getPendingVersions(BATCH_SIZE) - - if (pending.length === 0) { - console.log('[intent-process] Queue empty, nothing to do') - break - } - - console.log( - `[intent-process] ${pending.length} pending version(s), ${Math.round(remaining / 1000)}s remaining`, - ) - - for (const item of pending) { - // Check budget before each item, not just at batch boundaries - if (Date.now() >= deadline) { - skipped += pending.length - pending.indexOf(item) - console.log( - `[intent-process] Budget exhausted, stopping. ${skipped} item(s) deferred to next run.`, - ) - break - } - - if (!item.tarballUrl) { - await markVersionFailed(item.id, 'No tarball URL recorded') - failed++ - continue - } - - try { - const skills = await extractSkillsFromTarball(item.tarballUrl) - await replaceSkillsForVersion(item.id, skills) - await markVersionSynced(item.id, skills.length) - processed++ - console.log( - `[intent-process] ✓ ${item.packageName}@${item.version} - ${skills.length} skill(s)`, - ) - } catch (err) { - const reason = err instanceof Error ? err.message : String(err) - await markVersionFailed(item.id, reason) - failed++ - console.error( - `[intent-process] ✗ ${item.packageName}@${item.version}: ${reason}`, - ) - } - } - - // If we got fewer items than the batch size, the queue is drained - if (pending.length < BATCH_SIZE) break - } - - const duration = Date.now() - startTime - console.log( - `[intent-process] Done in ${duration}ms - processed: ${processed}, failed: ${failed}, deferred: ${skipped}`, - ) - console.log('[intent-process] Next invocation at:', next_run) - } catch (error) { - const duration = Date.now() - startTime - console.error( - `[intent-process] Fatal error after ${duration}ms:`, - error instanceof Error ? error.message : String(error), - ) - if (error instanceof Error && error.stack) { - console.error('[intent-process] Stack:', error.stack) - } - } -} - -export default handler - -export const config: Config = { - schedule: '*/15 * * * *', // Every 15 minutes -} diff --git a/netlify/functions/workflow-sweep-background.ts b/netlify/functions/workflow-sweep-background.ts new file mode 100644 index 000000000..6a8deca18 --- /dev/null +++ b/netlify/functions/workflow-sweep-background.ts @@ -0,0 +1,14 @@ +import type { Config } from '@netlify/functions' +import { createNetlifyWorkflowSweepHandler } from '@tanstack/workflow-netlify' +import { workflowRuntime } from '~/utils/workflow-runtime.server' + +export default createNetlifyWorkflowSweepHandler({ + runtime: workflowRuntime, + maxDurationMs: 25_000, + maxScheduledRuns: 10, + maxTimers: 10, +}) + +export const config: Config = { + schedule: '*/5 * * * *', +} diff --git a/package.json b/package.json index 24b2f2aae..a17bb30b6 100644 --- a/package.json +++ b/package.json @@ -64,6 +64,10 @@ "@tanstack/react-router-ssr-query": "1.167.0", "@tanstack/react-start": "1.168.10", "@tanstack/react-table": "^8.21.3", + "@tanstack/workflow-core": "0.0.3", + "@tanstack/workflow-netlify": "0.0.2", + "@tanstack/workflow-runtime": "0.0.1", + "@tanstack/workflow-store-drizzle-postgres": "0.0.2", "@types/d3": "^7.4.3", "@uploadthing/react": "^7.3.3", "@visx/hierarchy": "^3.12.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f11cb6021..ce6f00cbd 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -16,6 +16,8 @@ overrides: minimatch@5.1.9>brace-expansion: 2.0.3 minimatch@10.2.5>brace-expansion: 5.0.5 +pnpmfileChecksum: sha256-YCZ4WLk6+c48Uc0M/YjAU0PHsew0tG/ugn+OYO5ie1M= + importers: .: @@ -125,6 +127,18 @@ importers: '@tanstack/react-table': specifier: ^8.21.3 version: 8.21.3(react-dom@19.2.3(react@19.2.3))(react@19.2.3) + '@tanstack/workflow-core': + specifier: 0.0.3 + version: 0.0.3 + '@tanstack/workflow-netlify': + specifier: 0.0.2 + version: 0.0.2 + '@tanstack/workflow-runtime': + specifier: 0.0.1 + version: 0.0.1 + '@tanstack/workflow-store-drizzle-postgres': + specifier: 0.0.2 + version: 0.0.2(@electric-sql/pglite@0.3.16)(@opentelemetry/api@1.9.1)(@types/pg@8.20.0)(postgres@3.4.8) '@types/d3': specifier: ^7.4.3 version: 7.4.3 @@ -3958,6 +3972,22 @@ packages: resolution: {integrity: sha512-uhOeFyxLcU41HzvrxsGpiWdcMbScY1EDgbZ5K7DVRMYInbLYWAC0EA/kx9wXAoSM8q82bUG2hRl8+EAjE6XAbA==} engines: {node: '>=20.19'} + '@tanstack/workflow-core@0.0.3': + resolution: {integrity: sha512-0ev5ISQR8knUBAco46bc8h2/bMGS3ZPlEC/I/EgeD7PXE0DnmNJLvrkwZ5KfdzcbdM8pqJQLR5coBHtNurOcEg==} + engines: {node: '>=18'} + + '@tanstack/workflow-netlify@0.0.2': + resolution: {integrity: sha512-xJn4w6zYP3k0OiKrgDXfqget/j5kteGrJMD0OxzaQv933qYuJntEuIXoFwfAYYTrAMH6cqMXXRrBYO1mUeE3Ew==} + engines: {node: '>=18'} + + '@tanstack/workflow-runtime@0.0.1': + resolution: {integrity: sha512-JzFuL4DQ5obXZOLrp2Rq/xLf3JQAZ7i4l2EbK/Y9bpCXQtka5XJUk0hpFBFzaJmkn8wDQSbKjID1TMpuyqO2Hg==} + engines: {node: '>=18'} + + '@tanstack/workflow-store-drizzle-postgres@0.0.2': + resolution: {integrity: sha512-m/FGOUoT1HTFCXM/bjwn807bW3CLbkvfJXUbx4cpjSXtVx5lTUd250kkHZtHfjey79tiEOe/cQ3rZRooheYs1A==} + engines: {node: '>=18'} + '@tweenjs/tween.js@23.1.3': resolution: {integrity: sha512-vJmvvwFxYuGnF2axRtPYocag6Clbb5YS7kLL+SO/TeVFzHqDIWrNKYtcsPMibjDx9O+bu+psAy9NKfWklassUA==} @@ -12003,6 +12033,54 @@ snapshots: '@tanstack/virtual-file-routes@1.162.0': {} + '@tanstack/workflow-core@0.0.3': + dependencies: + '@standard-schema/spec': 1.1.0 + + '@tanstack/workflow-netlify@0.0.2': + dependencies: + '@tanstack/workflow-runtime': 0.0.1 + + '@tanstack/workflow-runtime@0.0.1': + dependencies: + '@tanstack/workflow-core': 0.0.3 + + '@tanstack/workflow-store-drizzle-postgres@0.0.2(@electric-sql/pglite@0.3.16)(@opentelemetry/api@1.9.1)(@types/pg@8.20.0)(postgres@3.4.8)': + dependencies: + '@tanstack/workflow-core': 0.0.3 + '@tanstack/workflow-runtime': 0.0.1 + drizzle-orm: 0.45.2(@electric-sql/pglite@0.3.16)(@opentelemetry/api@1.9.1)(@types/pg@8.20.0)(postgres@3.4.8) + transitivePeerDependencies: + - '@aws-sdk/client-rds-data' + - '@cloudflare/workers-types' + - '@electric-sql/pglite' + - '@libsql/client' + - '@libsql/client-wasm' + - '@neondatabase/serverless' + - '@op-engineering/op-sqlite' + - '@opentelemetry/api' + - '@planetscale/database' + - '@prisma/client' + - '@tidbcloud/serverless' + - '@types/better-sqlite3' + - '@types/pg' + - '@types/sql.js' + - '@upstash/redis' + - '@vercel/postgres' + - '@xata.io/client' + - better-sqlite3 + - bun-types + - expo-sqlite + - gel + - knex + - kysely + - mysql2 + - pg + - postgres + - prisma + - sql.js + - sqlite3 + '@tweenjs/tween.js@23.1.3': {} '@tybys/wasm-util@0.10.1': diff --git a/src/db/schema.ts b/src/db/schema.ts index c261ca752..2c8be3784 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -1133,14 +1133,11 @@ export type NewIntentPackage = InferInsertModel // Per-version snapshot of a package's skills (latest + last 5 versions) // -// syncStatus acts as a durable work queue: +// syncStatus tracks domain progress for each discovered package version: // 'pending' -- version discovered, tarball not yet downloaded/extracted // 'synced' -- skills extracted and indexed successfully // 'failed' -- tarball processing failed (will be retried next cycle) -// -// This means the scheduled function can be interrupted at any point and -// resume from where it left off. Only the currently in-flight version is -// at risk of being re-processed on restart (upserts make that safe). +// Workflow run/step replay lives in the Workflow Postgres store, not here. export const intentPackageVersions = pgTable( 'intent_package_versions', { diff --git a/src/routes/admin/intent.tsx b/src/routes/admin/intent.tsx index 44dcdab67..872e0b838 100644 --- a/src/routes/admin/intent.tsx +++ b/src/routes/admin/intent.tsx @@ -20,6 +20,7 @@ import { getIntentAdminStats, listIntentPackages, listFailedVersions, + listIntentWorkflowRuns, triggerIntentDiscover, triggerIntentProcess, retryIntentVersion, @@ -40,6 +41,7 @@ const QK = { stats: ['admin', 'intent', 'stats'] as const, packages: ['admin', 'intent', 'packages'] as const, failed: ['admin', 'intent', 'failed'] as const, + workflows: ['admin', 'intent', 'workflows'] as const, } // --------------------------------------------------------------------------- @@ -69,6 +71,12 @@ function IntentAdminPage() { queryFn: () => listFailedVersions(), }) + const workflowsQuery = useQuery({ + queryKey: QK.workflows, + queryFn: () => listIntentWorkflowRuns(), + refetchInterval: 10_000, + }) + const discoverMutation = useMutation({ mutationFn: () => triggerIntentDiscover(), onSuccess: invalidateAll, @@ -324,6 +332,11 @@ function IntentAdminPage() { /> + + {/* Failed versions (shown prominently when non-zero) */} {(failedQuery.data?.length ?? 0) > 0 && ( + readonly loading: boolean +}) { + return ( +
+

+ + Workflow Runs +

+ {loading ? ( +
+ ) : runs.length === 0 ? ( + + No workflow runs recorded yet. + + ) : ( +
+ + + + + + + + + + + {runs.map((run) => ( + + + + + + + ))} + +
+ Workflow + + Status + + Run + + Updated +
+ {run.workflowId} + + + {run.waitingFor + ? `${run.status}:${run.waitingFor}` + : run.status} + + + {run.runId} + + {formatDistanceToNow(run.updatedAt, { addSuffix: true })} +
+
+ )} +
+ ) +} + +function getWorkflowStatusClass(status: string): string { + switch (status) { + case 'finished': + return 'bg-emerald-100 text-emerald-700 dark:bg-emerald-900/40 dark:text-emerald-300' + case 'errored': + case 'aborted': + return 'bg-red-100 text-red-700 dark:bg-red-900/40 dark:text-red-300' + case 'paused': + return 'bg-amber-100 text-amber-700 dark:bg-amber-900/40 dark:text-amber-300' + case 'running': + case 'queued': + return 'bg-sky-100 text-sky-700 dark:bg-sky-900/40 dark:text-sky-300' + default: + return 'bg-gray-100 text-gray-700 dark:bg-gray-800 dark:text-gray-300' + } +} + // --------------------------------------------------------------------------- // Result banner // --------------------------------------------------------------------------- diff --git a/src/utils/intent-admin.functions.ts b/src/utils/intent-admin.functions.ts index c5ef26c62..ed2541d7a 100644 --- a/src/utils/intent-admin.functions.ts +++ b/src/utils/intent-admin.functions.ts @@ -6,6 +6,7 @@ import { getIntentAdminStats as getIntentAdminStatsServer, listFailedVersions as listFailedVersionsServer, listIntentPackages as listIntentPackagesServer, + listIntentWorkflowRuns as listIntentWorkflowRunsServer, resetFailedVersions as resetFailedVersionsServer, retryIntentVersion as retryIntentVersionServer, seedIntentPackage as seedIntentPackageServer, @@ -25,6 +26,10 @@ export const listFailedVersions = createServerFn({ method: 'GET' }).handler( async () => listFailedVersionsServer(), ) +export const listIntentWorkflowRuns = createServerFn({ method: 'GET' }).handler( + async () => listIntentWorkflowRunsServer(), +) + export const triggerIntentDiscover = createServerFn({ method: 'POST' }).handler( async () => triggerIntentDiscoverServer(), ) diff --git a/src/utils/intent-admin.server.ts b/src/utils/intent-admin.server.ts index 4fd3b5f33..12d582acd 100644 --- a/src/utils/intent-admin.server.ts +++ b/src/utils/intent-admin.server.ts @@ -3,7 +3,6 @@ * All functions require the 'admin' capability. */ -import * as v from 'valibot' import { db } from '~/db/client' import { intentPackages, @@ -29,6 +28,11 @@ import { markVersionSynced, markVersionFailed, } from './intent-db.server' +import { + INTENT_DISCOVER_WORKFLOW_ID, + INTENT_PROCESS_WORKFLOW_ID, +} from '~/utils/intent-workflows.server' +import { workflowExecutionStore } from '~/utils/workflow-runtime.server' // --------------------------------------------------------------------------- // Stats / overview @@ -75,6 +79,36 @@ export async function getIntentAdminStats() { } } +export async function listIntentWorkflowRuns() { + await requireCapability({ data: { capability: 'admin' } }) + + const runs = await Promise.all([ + workflowExecutionStore.listRuns({ + workflowId: INTENT_DISCOVER_WORKFLOW_ID, + limit: 5, + }), + workflowExecutionStore.listRuns({ + workflowId: INTENT_PROCESS_WORKFLOW_ID, + limit: 5, + }), + ]) + + return runs + .flat() + .sort((a, b) => b.updatedAt - a.updatedAt) + .slice(0, 10) + .map((run) => ({ + runId: run.runId, + workflowId: run.workflowId, + workflowVersion: run.workflowVersion, + status: run.status, + waitingFor: run.waitingFor?.signalName, + wakeAt: run.wakeAt ? new Date(run.wakeAt) : null, + createdAt: new Date(run.createdAt), + updatedAt: new Date(run.updatedAt), + })) +} + // --------------------------------------------------------------------------- // Package list (all known packages with status) // --------------------------------------------------------------------------- diff --git a/src/utils/intent-db.server.ts b/src/utils/intent-db.server.ts index 02f55f083..97b378ec0 100644 --- a/src/utils/intent-db.server.ts +++ b/src/utils/intent-db.server.ts @@ -75,14 +75,16 @@ export async function getKnownVersions( // Pull up to `limit` pending versions ordered by createdAt (FIFO queue). // Also includes 'failed' rows so they get retried each cycle. -export async function getPendingVersions(limit: number): Promise< - Array<{ - id: number - packageName: string - version: string - tarballUrl: string | null - }> -> { +export interface PendingIntentVersion { + id: number + packageName: string + version: string + tarballUrl: string | null +} + +export async function getPendingVersions( + limit: number, +): Promise> { return db .select({ id: intentPackageVersions.id, @@ -96,6 +98,30 @@ export async function getPendingVersions(limit: number): Promise< .limit(limit) } +export interface IntentVersionForProcessing extends PendingIntentVersion { + syncStatus: string + skillCount: number +} + +export async function getVersionForProcessing( + id: number, +): Promise { + const rows = await db + .select({ + id: intentPackageVersions.id, + packageName: intentPackageVersions.packageName, + version: intentPackageVersions.version, + tarballUrl: intentPackageVersions.tarballUrl, + syncStatus: intentPackageVersions.syncStatus, + skillCount: intentPackageVersions.skillCount, + }) + .from(intentPackageVersions) + .where(eq(intentPackageVersions.id, id)) + .limit(1) + + return rows[0] +} + export async function getSkillsForVersion( packageVersionId: number, ): Promise> { diff --git a/src/utils/intent-sync.server.ts b/src/utils/intent-sync.server.ts new file mode 100644 index 000000000..821fcdfea --- /dev/null +++ b/src/utils/intent-sync.server.ts @@ -0,0 +1,352 @@ +import { z } from 'zod' +import { + extractSkillsFromTarball, + fetchPackument, + isIntentCompatible, + searchIntentPackages, + selectVersionsToSync, +} from '~/utils/intent.server' +import { + enqueuePackageVersion, + getKnownVersions, + getPendingVersions, + getVersionForProcessing, + markPackageVerified, + markVersionFailed, + markVersionSynced, + replaceSkillsForVersion, + upsertIntentPackage, +} from '~/utils/intent-db.server' + +export const INTENT_PROCESS_BATCH_SIZE = 50 + +const githubSearchResponseSchema = z.object({ + items: z.array( + z.object({ + path: z.string(), + repository: z.object({ full_name: z.string() }), + }), + ), +}) + +const githubContentResponseSchema = z.object({ + content: z.string().optional(), +}) + +const packageJsonSchema = z.object({ + name: z.string().optional(), + private: z.boolean().optional(), +}) + +const npmLatestSchema = z.object({ + version: z.string().optional(), + dist: z.object({ tarball: z.string().optional() }).optional(), +}) + +export interface IntentDiscoveryResult { + packagesDiscovered: number + githubCandidates: number + packagesVerified: number + versionsEnqueued: number + errors: Array +} + +export interface IntentVersionProcessResult { + packageName: string + version: string + status: 'synced' | 'failed' + skillCount?: number + error?: string +} + +export interface IntentProcessResult { + processed: number + failed: number + deferred: number + results: Array +} + +export interface IntentVersionToProcess { + id: number + packageName: string + version: string +} + +export interface IntentSyncOperations { + discoverIntentPackages: () => Promise + selectPendingIntentVersions: (options: { + limit: number + }) => Promise> + processIntentVersion: ( + versionId: number, + ) => Promise +} + +export const defaultIntentSyncOperations: IntentSyncOperations = { + discoverIntentPackages, + selectPendingIntentVersions, + processIntentVersion, +} + +export async function discoverIntentPackages(): Promise { + const errors: Array = [] + let packagesDiscovered = 0 + let githubCandidates = 0 + let packagesVerified = 0 + let versionsEnqueued = 0 + + try { + const searchResults = await searchIntentPackages() + const packageNames = dedupe( + searchResults.objects.map((item) => item.package.name), + ) + packagesDiscovered = packageNames.length + + for (const packageName of packageNames) { + try { + const enqueued = await discoverNpmPackage(packageName) + if (enqueued !== null) { + packagesVerified++ + versionsEnqueued += enqueued + } + } catch (error) { + errors.push(`npm/${packageName}: ${getErrorMessage(error)}`) + } + } + } catch (error) { + errors.push(`npm-search: ${getErrorMessage(error)}`) + } + + const githubToken = process.env.GITHUB_AUTH_TOKEN + if (githubToken) { + try { + const githubResult = await discoverGitHubPackages(githubToken) + githubCandidates = githubResult.githubCandidates + packagesVerified += githubResult.packagesVerified + versionsEnqueued += githubResult.versionsEnqueued + errors.push(...githubResult.errors) + } catch (error) { + errors.push(`github-search: ${getErrorMessage(error)}`) + } + } + + return { + packagesDiscovered, + githubCandidates, + packagesVerified, + versionsEnqueued, + errors, + } +} + +export async function selectPendingIntentVersions(options: { + limit: number +}): Promise> { + const versions = await getPendingVersions(options.limit) + + return versions.map((version) => ({ + id: version.id, + packageName: version.packageName, + version: version.version, + })) +} + +export function summarizeIntentProcessResults( + results: Array, +): IntentProcessResult { + return { + processed: results.filter((result) => result.status === 'synced').length, + failed: results.filter((result) => result.status === 'failed').length, + deferred: 0, + results, + } +} + +async function discoverNpmPackage(packageName: string): Promise { + await upsertIntentPackage({ name: packageName, verified: false }) + + const packument = await fetchPackument(packageName) + const latestVersion = packument['dist-tags'].latest + const latestMeta = latestVersion ? packument.versions[latestVersion] : null + if (!latestVersion || !latestMeta || !isIntentCompatible(latestMeta)) { + return null + } + + await markPackageVerified(packageName) + return enqueueVersionsFromPackument(packageName, packument) +} + +async function discoverGitHubPackages(githubToken: string): Promise<{ + githubCandidates: number + packagesVerified: number + versionsEnqueued: number + errors: Array +}> { + const ghHeaders = { + Authorization: `Bearer ${githubToken}`, + Accept: 'application/vnd.github.v3+json', + } + const searchRes = await fetch( + 'https://api.github.com/search/code?q=%22%40tanstack%2Fintent%22+filename%3Apackage.json&per_page=100', + { headers: ghHeaders }, + ) + if (!searchRes.ok) throw new Error(`GitHub search ${searchRes.status}`) + + const searchData = githubSearchResponseSchema.parse(await searchRes.json()) + const candidates = dedupeBy( + searchData.items.map((item) => ({ + repo: item.repository.full_name, + path: item.path, + })), + (item) => `${item.repo}|${item.path}`, + ) + let packagesVerified = 0 + let versionsEnqueued = 0 + const errors: Array = [] + + for (const candidate of candidates) { + try { + const enqueued = await discoverGitHubPackage(candidate, ghHeaders) + if (enqueued !== null) { + packagesVerified++ + versionsEnqueued += enqueued + } + } catch (error) { + errors.push( + `github/${candidate.repo}/${candidate.path}: ${getErrorMessage(error)}`, + ) + } + } + + return { + githubCandidates: candidates.length, + packagesVerified, + versionsEnqueued, + errors, + } +} + +async function discoverGitHubPackage( + candidate: { repo: string; path: string }, + headers: HeadersInit, +): Promise { + const contentRes = await fetch( + `https://api.github.com/repos/${candidate.repo}/contents/${candidate.path}`, + { headers }, + ) + if (!contentRes.ok) return null + + const contentData = githubContentResponseSchema.parse(await contentRes.json()) + if (!contentData.content) return null + + const packageJson = packageJsonSchema.parse( + JSON.parse(Buffer.from(contentData.content, 'base64').toString('utf-8')), + ) + if (!packageJson.name || packageJson.private) return null + + const npmRes = await fetch( + `https://registry.npmjs.org/${encodeURIComponent(packageJson.name)}/latest`, + ) + if (!npmRes.ok) return null + + const npmMeta = npmLatestSchema.parse(await npmRes.json()) + if (!npmMeta.dist?.tarball) return null + + const skills = await extractSkillsFromTarball(npmMeta.dist.tarball) + if (skills.length === 0) return null + + await upsertIntentPackage({ name: packageJson.name, verified: true }) + await markPackageVerified(packageJson.name) + + const packument = await fetchPackument(packageJson.name) + return enqueueVersionsFromPackument(packageJson.name, packument) +} + +async function enqueueVersionsFromPackument( + packageName: string, + packument: Awaited>, +): Promise { + const knownVersions = await getKnownVersions(packageName) + const versionsToEnqueue = selectVersionsToSync(packument, knownVersions) + + for (const version of versionsToEnqueue) { + await enqueuePackageVersion({ + packageName, + version: version.version, + tarballUrl: version.tarball, + publishedAt: version.publishedAt, + }) + } + + return versionsToEnqueue.length +} + +export async function processIntentVersion( + versionId: number, +): Promise { + const version = await getVersionForProcessing(versionId) + if (!version) { + throw new Error(`Intent package version ${versionId} not found`) + } + + if (version.syncStatus === 'synced') { + return { + packageName: version.packageName, + version: version.version, + status: 'synced', + skillCount: version.skillCount, + } + } + + if (!version.tarballUrl) { + const reason = 'No tarball URL recorded' + await markVersionFailed(version.id, reason) + return { + packageName: version.packageName, + version: version.version, + status: 'failed', + error: reason, + } + } + + try { + const skills = await extractSkillsFromTarball(version.tarballUrl) + await replaceSkillsForVersion(version.id, skills) + await markVersionSynced(version.id, skills.length) + return { + packageName: version.packageName, + version: version.version, + status: 'synced', + skillCount: skills.length, + } + } catch (error) { + const reason = getErrorMessage(error) + await markVersionFailed(version.id, reason) + return { + packageName: version.packageName, + version: version.version, + status: 'failed', + error: reason, + } + } +} + +function dedupe(values: Array): Array { + return dedupeBy(values, (value) => value) +} + +function dedupeBy(values: Array, getKey: (value: T) => string): Array { + const seen = new Set() + const result: Array = [] + for (const value of values) { + const key = getKey(value) + if (seen.has(key)) continue + seen.add(key) + result.push(value) + } + return result +} + +function getErrorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error) +} diff --git a/src/utils/intent-workflows.server.ts b/src/utils/intent-workflows.server.ts new file mode 100644 index 000000000..57330c018 --- /dev/null +++ b/src/utils/intent-workflows.server.ts @@ -0,0 +1,138 @@ +import { createWorkflow } from '@tanstack/workflow-core' +import type { StepOptions } from '@tanstack/workflow-core' +import { every } from '@tanstack/workflow-runtime' +import type { WorkflowRegistrationMap } from '@tanstack/workflow-runtime' +import { z } from 'zod' +import { + defaultIntentSyncOperations, + INTENT_PROCESS_BATCH_SIZE, + summarizeIntentProcessResults, +} from '~/utils/intent-sync.server' +import type { + IntentSyncOperations, + IntentVersionProcessResult, +} from '~/utils/intent-sync.server' + +export const INTENT_DISCOVER_WORKFLOW_ID = 'intent-discover-workflow' +export const INTENT_PROCESS_WORKFLOW_ID = 'intent-process-workflow' +export const INTENT_DISCOVER_SCHEDULE_ID = 'intent-discover-every-6h' +export const INTENT_PROCESS_SCHEDULE_ID = 'intent-process-every-15m' + +const intentDiscoverInputSchema = z.object({ + source: z.enum(['schedule', 'admin']).default('schedule'), +}) + +const intentProcessInputSchema = z.object({ + batchSize: z.number().int().positive().max(100), + source: z.enum(['schedule', 'admin']).default('schedule'), +}) + +const discoverStepOptions = { + retry: { maxAttempts: 2, backoff: 'exponential', baseMs: 1_000 }, + timeout: 10 * 60 * 1000, +} satisfies StepOptions + +const selectPendingVersionsStepOptions = { + timeout: 30_000, +} satisfies StepOptions + +const processVersionStepOptions = { + timeout: 2 * 60 * 1000, +} satisfies StepOptions + +export function createIntentDiscoverWorkflow( + operations: IntentSyncOperations = defaultIntentSyncOperations, +) { + return createWorkflow({ + id: INTENT_DISCOVER_WORKFLOW_ID, + input: intentDiscoverInputSchema, + }).handler((ctx) => + ctx.step( + 'discover-intent-packages', + () => operations.discoverIntentPackages(), + discoverStepOptions, + ), + ) +} + +export function createIntentProcessWorkflow( + operations: IntentSyncOperations = defaultIntentSyncOperations, +) { + return createWorkflow({ + id: INTENT_PROCESS_WORKFLOW_ID, + input: intentProcessInputSchema, + }).handler(async (ctx) => { + const versions = await ctx.step( + 'select-pending-versions', + () => + operations.selectPendingIntentVersions({ + limit: ctx.input.batchSize, + }), + selectPendingVersionsStepOptions, + ) + const results: Array = [] + + for (const version of versions) { + try { + results.push( + await ctx.step( + `process-version:${version.id}`, + () => operations.processIntentVersion(version.id), + processVersionStepOptions, + ), + ) + } catch (error) { + results.push({ + packageName: version.packageName, + version: version.version, + status: 'failed', + error: getErrorMessage(error), + }) + } + } + + return summarizeIntentProcessResults(results) + }) +} + +export const intentDiscoverWorkflow = createIntentDiscoverWorkflow() +export const intentProcessWorkflow = createIntentProcessWorkflow() + +export function createIntentWorkflowRegistrations(options?: { + operations?: IntentSyncOperations +}) { + const discoverWorkflow = createIntentDiscoverWorkflow(options?.operations) + const processWorkflow = createIntentProcessWorkflow(options?.operations) + + return { + [INTENT_DISCOVER_WORKFLOW_ID]: { + load: async () => discoverWorkflow, + schedules: [ + { + id: INTENT_DISCOVER_SCHEDULE_ID, + schedule: every.hours(6), + overlapPolicy: 'skip', + input: { source: 'schedule' }, + }, + ], + }, + [INTENT_PROCESS_WORKFLOW_ID]: { + load: async () => processWorkflow, + schedules: [ + { + id: INTENT_PROCESS_SCHEDULE_ID, + schedule: every.minutes(15), + overlapPolicy: 'skip', + input: { + batchSize: INTENT_PROCESS_BATCH_SIZE, + source: 'schedule', + }, + }, + ], + }, + } satisfies WorkflowRegistrationMap +} + +function getErrorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error) +} diff --git a/src/utils/workflow-registrations.server.ts b/src/utils/workflow-registrations.server.ts new file mode 100644 index 000000000..fa5ee590d --- /dev/null +++ b/src/utils/workflow-registrations.server.ts @@ -0,0 +1,8 @@ +import type { WorkflowRegistrationMap } from '@tanstack/workflow-runtime' +import { createIntentWorkflowRegistrations } from '~/utils/intent-workflows.server' + +export function createAppWorkflowRegistrations(): WorkflowRegistrationMap { + return { + ...createIntentWorkflowRegistrations(), + } +} diff --git a/src/utils/workflow-runtime.server.ts b/src/utils/workflow-runtime.server.ts new file mode 100644 index 000000000..5779b6d10 --- /dev/null +++ b/src/utils/workflow-runtime.server.ts @@ -0,0 +1,23 @@ +import { defineWorkflowRuntime } from '@tanstack/workflow-runtime' +import type { + WorkflowExecutionStore, + WorkflowRegistrationMap, +} from '@tanstack/workflow-runtime' +import { createDrizzlePostgresWorkflowStore } from '@tanstack/workflow-store-drizzle-postgres' +import { db } from '~/db/client' +import { createAppWorkflowRegistrations } from '~/utils/workflow-registrations.server' + +export const workflowExecutionStore = createDrizzlePostgresWorkflowStore({ db }) + +export function createAppWorkflowRuntime(options?: { + store?: WorkflowExecutionStore + workflowRegistrations?: WorkflowRegistrationMap +}) { + return defineWorkflowRuntime({ + store: options?.store ?? workflowExecutionStore, + workflows: + options?.workflowRegistrations ?? createAppWorkflowRegistrations(), + }) +} + +export const workflowRuntime = createAppWorkflowRuntime() diff --git a/tests/intent-workflow.test.ts b/tests/intent-workflow.test.ts new file mode 100644 index 000000000..5ab3b1384 --- /dev/null +++ b/tests/intent-workflow.test.ts @@ -0,0 +1,284 @@ +import assert from 'node:assert/strict' +import { test } from 'node:test' +import { createWorkflow, LogConflictError } from '@tanstack/workflow-core' +import { + defineWorkflowRuntime, + inMemoryWorkflowExecutionStore, + materializeWorkflowSchedules, +} from '@tanstack/workflow-runtime' +import { createDrizzlePostgresWorkflowStore } from '@tanstack/workflow-store-drizzle-postgres' +import { drizzle } from 'drizzle-orm/postgres-js' +import { sql } from 'drizzle-orm' +import postgres from 'postgres' +import { z } from 'zod' +import * as schema from '../src/db/schema' +import { createAppWorkflowRuntime } from '../src/utils/workflow-runtime.server' +import { + createIntentWorkflowRegistrations, + INTENT_PROCESS_SCHEDULE_ID, + INTENT_PROCESS_WORKFLOW_ID, +} from '../src/utils/intent-workflows.server' +import type { + IntentProcessResult, + IntentSyncOperations, + IntentVersionProcessResult, +} from '../src/utils/intent-sync.server' + +test( + 'Postgres/Drizzle workflow store appends events and replays by run ID', + { + skip: + process.env.INTENT_WORKFLOW_DB_TESTS === '1' + ? false + : 'Set INTENT_WORKFLOW_DB_TESTS=1 with DATABASE_URL to run DB adapter coverage', + }, + async () => { + const databaseUrl = process.env.DATABASE_URL + assert.ok(databaseUrl, 'DATABASE_URL is required for this test') + + const client = postgres(databaseUrl, { max: 1, connect_timeout: 5 }) + const database = drizzle(client, { schema }) + const schemaName = `workflow_test_${process.pid}_${Date.now()}` + + try { + await database.execute(sql.raw(`create schema ${quoteIdent(schemaName)}`)) + + let stepCalls = 0 + const workflow = createWorkflow({ + id: 'test-replay-workflow', + input: z.object({ value: z.number() }), + }).handler(async (ctx) => { + const doubled = await ctx.step('double-value', () => { + stepCalls++ + return ctx.input.value * 2 + }) + return { doubled } + }) + + const store = createDrizzlePostgresWorkflowStore({ + db: database, + schema: schemaName, + }) + await store.ensureSchema() + + const runtime = defineWorkflowRuntime({ + store, + workflows: { + 'test-replay-workflow': { + load: async () => workflow, + }, + }, + }) + + await runtime.startRun({ + workflowId: 'test-replay-workflow', + runId: 'test-replay:1', + input: { value: 21 }, + now: Date.UTC(2026, 4, 26, 12, 0, 0), + includeEvents: false, + }) + await runtime.startRun({ + workflowId: 'test-replay-workflow', + runId: 'test-replay:1', + input: { value: 21 }, + now: Date.UTC(2026, 4, 26, 12, 0, 1), + includeEvents: false, + }) + + assert.equal(stepCalls, 1) + const timeline = await store.getRunTimeline('test-replay:1') + assert.ok(timeline) + assert.equal( + timeline.events.filter((event) => event.eventType === 'STEP_FINISHED') + .length, + 1, + ) + const firstEvent = timeline.events[0]?.event + assert.ok(firstEvent) + await assert.rejects( + () => + store.appendEvents({ + runId: 'test-replay:1', + expectedNextIndex: 0, + events: [firstEvent], + }), + LogConflictError, + ) + } finally { + await database.execute( + sql.raw(`drop schema if exists ${quoteIdent(schemaName)} cascade`), + ) + await client.end() + } + }, +) + +test('duplicate scheduled invocation with the same bucket is idempotent', async () => { + let selectCalls = 0 + let processCalls = 0 + const store = inMemoryWorkflowExecutionStore() + const runtime = createAppWorkflowRuntime({ + store, + workflowRegistrations: createIntentWorkflowRegistrations({ + operations: { + ...noopOperations, + selectPendingIntentVersions: async () => { + selectCalls++ + return [{ id: 1, packageName: '@example/pkg', version: '1.0.0' }] + }, + processIntentVersion: async () => { + processCalls++ + return { + packageName: '@example/pkg', + version: '1.0.0', + status: 'synced', + skillCount: 1, + } + }, + }, + }), + }) + const now = Date.UTC(2026, 4, 26, 12, 0, 0) + + await materializeWorkflowSchedules(runtime, { now }) + const first = await runtime.sweep({ now, includeEvents: false }) + await materializeWorkflowSchedules(runtime, { now }) + const second = await runtime.sweep({ now, includeEvents: false }) + + const processRun = first.scheduled.find( + (run) => run.workflowId === INTENT_PROCESS_WORKFLOW_ID, + ) + assert.ok(processRun) + assert.equal( + processRun.runId, + `${INTENT_PROCESS_WORKFLOW_ID}:${INTENT_PROCESS_SCHEDULE_ID}:${now}`, + ) + assert.equal(second.scheduled.length, 0) + assert.equal(selectCalls, 1) + assert.equal(processCalls, 1) +}) + +test('failed package version step does not prevent other versions from processing', async () => { + const goodResult: IntentVersionProcessResult = { + packageName: '@example/good', + version: '1.0.0', + status: 'synced', + skillCount: 1, + } + const badResult: IntentVersionProcessResult = { + packageName: '@example/bad', + version: '1.0.0', + status: 'failed', + error: 'tarball failed', + } + const laterResult: IntentVersionProcessResult = { + packageName: '@example/later', + version: '1.0.0', + status: 'synced', + skillCount: 2, + } + const expected: IntentProcessResult = { + processed: 2, + failed: 1, + deferred: 0, + results: [goodResult, badResult, laterResult], + } + const runtime = createAppWorkflowRuntime({ + store: inMemoryWorkflowExecutionStore(), + workflowRegistrations: createIntentWorkflowRegistrations({ + operations: { + ...noopOperations, + selectPendingIntentVersions: async () => [ + { id: 1, packageName: '@example/good', version: '1.0.0' }, + { id: 2, packageName: '@example/bad', version: '1.0.0' }, + { id: 3, packageName: '@example/later', version: '1.0.0' }, + ], + processIntentVersion: async (versionId: number) => { + if (versionId === 1) return goodResult + if (versionId === 2) throw new Error('tarball failed') + return laterResult + }, + }, + }), + }) + + const result = await runtime.startRun({ + workflowId: INTENT_PROCESS_WORKFLOW_ID, + runId: 'intent-process:test-partial-failure', + input: { batchSize: 3, source: 'admin' }, + now: Date.UTC(2026, 4, 26, 12, 15, 0), + includeEvents: false, + }) + + assert.equal(result.kind, 'completed') + assert.ok(result.run) + assert.deepEqual(result.run.output, expected) +}) + +test('process workflow continues from queue state across scheduled invocations', async () => { + const queue = [ + { id: 1, packageName: '@example/one', version: '1.0.0' }, + { id: 2, packageName: '@example/two', version: '1.0.0' }, + ] + const processed: Array = [] + const store = inMemoryWorkflowExecutionStore() + const operations: IntentSyncOperations = { + ...noopOperations, + selectPendingIntentVersions: async () => { + const next = queue.shift() + return next ? [next] : [] + }, + processIntentVersion: async (versionId: number) => { + const packageName = versionId === 1 ? '@example/one' : '@example/two' + const version = '1.0.0' + processed.push(`${packageName}@${version}`) + return { + packageName, + version, + status: 'synced', + skillCount: 1, + } + }, + } + const firstRuntime = createAppWorkflowRuntime({ + store, + workflowRegistrations: createIntentWorkflowRegistrations({ operations }), + }) + const secondRuntime = createAppWorkflowRuntime({ + store, + workflowRegistrations: createIntentWorkflowRegistrations({ operations }), + }) + const firstBucket = Date.UTC(2026, 4, 26, 12, 0, 0) + const secondBucket = Date.UTC(2026, 4, 26, 12, 15, 0) + + await materializeWorkflowSchedules(firstRuntime, { now: firstBucket }) + await firstRuntime.sweep({ now: firstBucket, includeEvents: false }) + await materializeWorkflowSchedules(secondRuntime, { now: secondBucket }) + await secondRuntime.sweep({ now: secondBucket, includeEvents: false }) + + const runs = await store.listRuns({ + workflowId: INTENT_PROCESS_WORKFLOW_ID, + limit: 10, + }) + + assert.deepEqual(processed, ['@example/one@1.0.0', '@example/two@1.0.0']) + assert.equal(runs.length, 2) +}) + +const noopOperations: IntentSyncOperations = { + discoverIntentPackages: async () => ({ + packagesDiscovered: 0, + githubCandidates: 0, + packagesVerified: 0, + versionsEnqueued: 0, + errors: [], + }), + selectPendingIntentVersions: async () => [], + processIntentVersion: async () => { + throw new Error('No test version configured') + }, +} + +function quoteIdent(identifier: string): string { + return `"${identifier.replaceAll('"', '""')}"` +}