Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/realtime-replica-read-consistency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Realtime feed reads now wait out measured read-replica lag and retry stale reads, so subscribers receive each change's current content instead of trailing one change behind when a read replica races the write.
22 changes: 22 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,28 @@ const EnvironmentSchema = z
// TTL/size of the per-org realtimeBackend flag cache used to pick the serving backend.
REALTIME_BACKEND_FLAG_CACHE_TTL_MS: z.coerce.number().int().default(30_000),
REALTIME_BACKEND_FLAG_CACHE_MAX_ENTRIES: z.coerce.number().int().default(50_000),
// "1" enables the read-your-writes gate: wake hydrates wait out the measured replica lag
// (anchored to the change record's updatedAtMs) and stale reads are retried.
REALTIME_BACKEND_NATIVE_REPLICA_LAG_GATE_ENABLED: z.string().default("1"),
// Reader-side lag probe cadence while the router is active; probing pauses when idle.
REALTIME_BACKEND_NATIVE_REPLICA_LAG_SAMPLE_INTERVAL_MS: z.coerce.number().int().default(250),
REALTIME_BACKEND_NATIVE_REPLICA_LAG_IDLE_AFTER_MS: z.coerce.number().int().default(30_000),
// The lag estimate is the max sample inside this window (spikes widen it immediately).
REALTIME_BACKEND_NATIVE_REPLICA_LAG_WINDOW_MS: z.coerce.number().int().default(5_000),
// Estimate before the first sample lands (and the floor when probing is unavailable).
REALTIME_BACKEND_NATIVE_REPLICA_LAG_DEFAULT_MS: z.coerce.number().int().default(30),
// Safety margin (clock skew + scheduling) added on top of the lag estimate.
REALTIME_BACKEND_NATIVE_REPLICA_LAG_MARGIN_MS: z.coerce.number().int().default(10),
// Hard cap on any single gate delay — a sick replica degrades freshness, never liveness.
REALTIME_BACKEND_NATIVE_REPLICA_LAG_MAX_DELAY_MS: z.coerce.number().int().default(1_000),
// Re-hydrate attempts for rows the tripwire still finds stale after the delay.
REALTIME_BACKEND_NATIVE_STALE_HYDRATE_RETRIES: z.coerce.number().int().default(3),
// How long a tripwire-observed staleness floors the lag estimate (vanilla-PG replicas
// can't measure mid-apply lag, so observations carry the estimate between races).
REALTIME_BACKEND_NATIVE_REPLICA_LAG_OBSERVED_FLOOR_TTL_MS: z.coerce
.number()
.int()
.default(60_000),

PUBSUB_REDIS_HOST: z
.string()
Expand Down
34 changes: 26 additions & 8 deletions apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,20 @@ export async function routeOperationsToRun(
const [error, result] = await tryCatch(
updateMetadataService.call(targetRunId, { operations }, env)
);
if (!error && result !== undefined) return;
if (!error && result !== undefined) {
// The parent/root run changed too — wake its live feeds (only when something was
// actually written here; buffered writes publish from the flusher).
if (result.updatedAtMs !== undefined) {
publishChangeRecord({
runId: result.runId,
envId: env.id,
tags: result.runTags,
batchId: result.batchId,
updatedAtMs: result.updatedAtMs,
});
}
return;
}

if (error) {
// PG threw — auxiliary op, stay best-effort and don't surface this
Expand Down Expand Up @@ -186,13 +199,18 @@ const { action } = createActionApiRoute(
}
if (pgResult) {
// Reflect metadata.set() on a live feed before the next lifecycle event. Publish the
// internal id (the router keys single-run feeds by it, not the friendly id from the URL).
publishChangeRecord({
runId: pgResult.runId,
envId: env.id,
tags: pgResult.runTags,
batchId: pgResult.batchId,
});
// internal id (the router keys single-run feeds by it, not the friendly id from the
// URL) with the committed updatedAt as the read-your-writes watermark. No write
// (no-op body, or ops buffered for the flusher) means nothing to announce here.
if (pgResult.updatedAtMs !== undefined) {
publishChangeRecord({
runId: pgResult.runId,
envId: env.id,
tags: pgResult.runTags,
batchId: pgResult.batchId,
updatedAtMs: pgResult.updatedAtMs,
});
}
return json({ metadata: pgResult.metadata }, { status: 200 });
}

Expand Down
6 changes: 4 additions & 2 deletions apps/webapp/app/routes/api.v1.runs.$runId.tags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,22 @@ export async function action({ request, params }: ActionFunctionArgs) {
if (newTags.length === 0) {
return json({ message: "No new tags to add" }, { status: 200 });
}
await prisma.taskRun.update({
const updated = await prisma.taskRun.update({
where: {
id: taskRun.id,
runtimeEnvironmentId: env.id,
},
data: { runTags: { push: newTags } },
select: { updatedAt: true },
});
// Publish a run-changed record with the NEW tag set so tag feeds reindex
// (no-op unless enabled).
// (no-op unless enabled). updatedAt is the read-your-writes watermark.
publishChangeRecord({
runId: taskRun.id,
envId: env.id,
tags: existing.concat(newTags),
batchId: taskRun.batchId,
updatedAtMs: updated.updatedAt.getTime(),
});
return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 });
},
Expand Down
80 changes: 65 additions & 15 deletions apps/webapp/app/services/metadata/updateMetadata.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ export type UpdateMetadataServiceOptions = {
maximumSize?: number;
logger?: Logger;
logLevel?: LogLevel;
/** Called after the batched flusher writes a run's buffered operations, with everything
* a realtime change record needs — buffered (parent/root) updates otherwise never wake
* live feeds. */
onRunFlushed?: (run: {
runId: string;
environmentId: string;
tags: string[];
batchId: string | null;
updatedAtMs: number;
}) => void;
// Testing hooks
onBeforeUpdate?: (runId: string) => Promise<void>;
onAfterRead?: (runId: string, metadataVersion: number) => Promise<void>;
Expand Down Expand Up @@ -172,12 +182,20 @@ export class UpdateMetadataService {
operations: BufferedRunMetadataChangeOperation[]
) => {
return Effect.gen(this, function* (_) {
// Fetch current run
// Fetch current run (+ the realtime membership keys, so a flush can publish)
const run = yield* _(
Effect.tryPromise(() =>
this._prisma.taskRun.findFirst({
where: { id: runId },
select: { id: true, metadata: true, metadataType: true, metadataVersion: true },
select: {
id: true,
metadata: true,
metadataType: true,
metadataVersion: true,
runtimeEnvironmentId: true,
runTags: true,
batchId: true,
},
})
)
);
Expand Down Expand Up @@ -237,6 +255,9 @@ export class UpdateMetadataService {
yield* _(Effect.tryPromise(() => this.options.onBeforeUpdate!(runId)));
}

// Stamp updatedAt explicitly so the realtime publish can carry the exact committed
// value without a follow-up read (updateMany can't RETURNING).
const writeTime = new Date();
const result = yield* _(
Effect.tryPromise(() =>
this._prisma.taskRun.updateMany({
Expand All @@ -247,6 +268,7 @@ export class UpdateMetadataService {
data: {
metadata: newMetadataPacket.data,
metadataVersion: { increment: 1 },
updatedAt: writeTime,
},
})
)
Expand All @@ -262,6 +284,16 @@ export class UpdateMetadataService {
return yield* _(Effect.fail(new Error("Optimistic lock failed")));
}

yield* Effect.sync(() => {
this.options.onRunFlushed?.({
runId,
environmentId: run.runtimeEnvironmentId,
tags: run.runTags,
batchId: run.batchId,
updatedAtMs: writeTime.getTime(),
});
});

return result;
});
};
Expand Down Expand Up @@ -346,7 +378,7 @@ export class UpdateMetadataService {
this.#ingestRunOperations(taskRun.rootTaskRun?.id ?? taskRun.id, body.rootOperations);
}

const newMetadata = await this.#updateRunMetadata({
const result = await this.#updateRunMetadata({
runId: taskRun.id,
body,
existingMetadata: {
Expand All @@ -356,11 +388,14 @@ export class UpdateMetadataService {
});

return {
metadata: newMetadata,
metadata: result?.metadata,
// Internal id + membership keys, so callers can publish full realtime records the router routes by index.
runId: taskRun.id,
batchId: taskRun.batchId,
runTags: taskRun.runTags,
// The committed row's updatedAt — the realtime watermark. Undefined when nothing was
// written here (no-op, or buffered for the flusher, which publishes itself).
updatedAtMs: result?.updatedAtMs,
};
}

Expand All @@ -372,15 +407,18 @@ export class UpdateMetadataService {
runId: string;
body: UpdateMetadataRequestBody;
existingMetadata: IOPacket;
}) {
}): Promise<{ metadata: Record<string, unknown> | undefined; updatedAtMs?: number }> {
if (Array.isArray(body.operations)) {
return this.#updateRunMetadataWithOperations(runId, body.operations);
} else {
return this.#updateRunMetadataDirectly(runId, body, existingMetadata);
}
}

async #updateRunMetadataWithOperations(runId: string, operations: RunMetadataChangeOperation[]) {
async #updateRunMetadataWithOperations(
runId: string,
operations: RunMetadataChangeOperation[]
): Promise<{ metadata: Record<string, unknown> | undefined; updatedAtMs?: number }> {
const MAX_RETRIES = 3;
let attempts = 0;

Expand Down Expand Up @@ -408,9 +446,9 @@ export class UpdateMetadataService {
// Apply operations to the current metadata
const applyResults = applyMetadataOperations(currentMetadata, operations);

// If no operations were applied, return the current metadata
// If no operations were applied, return the current metadata (nothing written)
if (applyResults.unappliedOperations.length === operations.length) {
return currentMetadata;
return { metadata: currentMetadata };
}

const newMetadataPacket = handleMetadataPacket(
Expand All @@ -428,7 +466,9 @@ export class UpdateMetadataService {
await this.options.onBeforeUpdate(runId);
}

// Update with optimistic locking
// Update with optimistic locking; updatedAt stamped explicitly so the caller can
// publish the exact committed watermark without a follow-up read.
const writeTime = new Date();
const result = await this._prisma.taskRun.updateMany({
where: {
id: runId,
Expand All @@ -440,6 +480,7 @@ export class UpdateMetadataService {
metadataVersion: {
increment: 1,
},
updatedAt: writeTime,
},
});

Expand All @@ -454,9 +495,10 @@ export class UpdateMetadataService {
}

// If this was our last attempt, buffer the operations and return optimistically
// (no watermark — the flusher writes later and publishes itself).
if (attempts === MAX_RETRIES) {
this.#ingestRunOperations(runId, operations);
return applyResults.newMetadata;
return { metadata: applyResults.newMetadata };
}

// Otherwise sleep and try again
Expand All @@ -474,8 +516,10 @@ export class UpdateMetadataService {
}

// Success! Return the new metadata
return applyResults.newMetadata;
return { metadata: applyResults.newMetadata, updatedAtMs: writeTime.getTime() };
}

return { metadata: undefined };
}

// Checks to see if a run is updatable
Expand All @@ -493,17 +537,19 @@ export class UpdateMetadataService {
runId: string,
body: UpdateMetadataRequestBody,
existingMetadata: IOPacket
) {
): Promise<{ metadata: Record<string, unknown> | undefined; updatedAtMs?: number }> {
const metadataPacket = handleMetadataPacket(
body.metadata,
"application/json",
this.maximumSize
);

if (!metadataPacket) {
return {};
return { metadata: {} };
}

let updatedAtMs: number | undefined;

if (
metadataPacket.data !== "{}" ||
(existingMetadata.data && metadataPacket.data !== existingMetadata.data)
Expand All @@ -515,7 +561,9 @@ export class UpdateMetadataService {
});
}

// Update the metadata without version check
// Update the metadata without version check; updatedAt stamped explicitly so the
// caller can publish the exact committed watermark.
const writeTime = new Date();
await this._prisma.taskRun.update({
where: {
id: runId,
Expand All @@ -526,12 +574,14 @@ export class UpdateMetadataService {
metadataVersion: {
increment: 1,
},
updatedAt: writeTime,
},
});
updatedAtMs = writeTime.getTime();
}

const newMetadata = await parsePacket(metadataPacket);
return newMetadata;
return { metadata: newMetadata, updatedAtMs };
}

#ingestRunOperations(runId: string, operations: RunMetadataChangeOperation[]) {
Expand Down
12 changes: 12 additions & 0 deletions apps/webapp/app/services/metadata/updateMetadataInstance.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { singleton } from "~/utils/singleton";
import { env } from "~/env.server";
import { UpdateMetadataService } from "./updateMetadata.server";
import { prisma } from "~/db.server";
import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server";

export const updateMetadataService = singleton(
"update-metadata-service",
Expand All @@ -13,5 +14,16 @@ export const updateMetadataService = singleton(
flushLoggingEnabled: env.BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED === "1",
maximumSize: env.TASK_RUN_METADATA_MAXIMUM_SIZE,
logLevel: env.BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED === "1" ? "debug" : "info",
// Buffered (parent/root) operations land via the flusher, not the caller's request —
// publish here so those changes wake live feeds too (no-op when the backend is off).
onRunFlushed: (run) => {
publishChangeRecord({
runId: run.runId,
envId: run.environmentId,
tags: run.tags,
batchId: run.batchId,
updatedAtMs: run.updatedAtMs,
});
},
})
);
Loading
Loading