From 31347394e17233a7c27528296c3fae3f8581369e Mon Sep 17 00:00:00 2001 From: Zhiming Wang Date: Sun, 31 May 2026 08:26:54 +0800 Subject: [PATCH] Honor `Config.Schema` for the running-state transition in `rivertest.Worker` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `rivertest.Worker` works a job in three database steps: it inserts the job through the client, builds an inline completer, and transitions the job to `running` via `JobUpdateFull`. The first two already thread `config.Schema` through — the client uses it internally, and it's passed explicitly to `jobcompleter.NewInlineCompleter` — but the `JobUpdateFull` call omitted it. With a non-default `Schema`, the insert lands the job in `.river_job` correctly, then the running-state update runs unqualified and resolves only through the connection's `search_path`. On a connection that doesn't include the configured schema it fails one step later with: test worker internal error: failed to update job to running state: ERROR: relation "river_job" does not exist (SQLSTATE 42P01) Pass `w.config.Schema` into `JobUpdateFullParams` so all three steps agree on the schema. This finishes custom-schema support for `rivertest.Worker`; the `rivertest.Require*` family received an analogous per-call `Schema` option in #926 (#907). The regression tests migrate River into an isolated named schema and work jobs through a transaction whose `search_path` is empty, so the tables resolve only via schema qualification — the exact condition that fails before this change. They live as `CustomSchema` subtests of `TestWorker_Work` and `TestWorker_WorkJob`, each building its own bundle inline since the schema setup differs from those tests' shared `setup`. --- CHANGELOG.md | 4 +++ rivertest/worker.go | 1 + rivertest/worker_test.go | 62 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 67 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 01e43314..70ce64b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `rivermigrate.ValidateOpts.TargetVersion` so validation can check migrations up to a specific target version, matching the target-version behavior available on `Migrate` and `MigrateTx`. Notably, this is a breaking API change as the validate functions previously didn't take any options. [PR #1259](https://github.com/riverqueue/river/pull/1259) +### Fixed + +- Fixed `rivertest.Worker.Work` and `WorkJob` to honor a configured custom `Config.Schema` when transitioning a job to its running state. Previously, the running-state update ran unqualified and could fail on a connection whose `search_path` didn't include the configured schema. [PR #1262](https://github.com/riverqueue/river/pull/1262) + ## [0.38.0] - 2026-05-22 ### Added diff --git a/rivertest/worker.go b/rivertest/worker.go index 45c5fb8b..8da01ae0 100644 --- a/rivertest/worker.go +++ b/rivertest/worker.go @@ -166,6 +166,7 @@ func (w *Worker[T, TTx]) workJob(ctx context.Context, tb testing.TB, tx TTx, job AttemptedAtDoUpdate: true, AttemptedBy: append(job.AttemptedBy, w.config.ID), AttemptedByDoUpdate: true, + Schema: w.config.Schema, StateDoUpdate: true, State: rivertype.JobStateRunning, }) diff --git a/rivertest/worker_test.go b/rivertest/worker_test.go index 1e0d5680..ced315c5 100644 --- a/rivertest/worker_test.go +++ b/rivertest/worker_test.go @@ -450,6 +450,34 @@ func TestWorker_Work(t *testing.T) { require.True(t, middlewareCalled) require.True(t, middlewareWithBaseServiceCalled) }) + + // Honors config.Schema: River lives in a named schema, worked through a + // transaction with an empty search_path so tables resolve only via schema + // qualification. Needs its own infrastructure, so it skips setup. + t.Run("CustomSchema", func(t *testing.T) { + t.Parallel() + + var ( + dbPool = riversharedtest.DBPool(ctx, t) + driver = riverpgxv5.New(dbPool) + schema = riverdbtest.TestSchema(ctx, t, driver, nil) + config = &river.Config{ID: "rivertest-worker", Schema: schema} + ) + + tx, err := dbPool.Begin(ctx) + require.NoError(t, err) + t.Cleanup(func() { _ = tx.Rollback(ctx) }) + + worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error { + require.Equal(t, rivertype.JobStateRunning, job.State) + return nil + }) + + tw := NewWorker(t, driver, config, worker) + res, err := tw.Work(ctx, t, tx, testArgs{Value: "test"}, nil) + require.NoError(t, err) + require.Equal(t, river.EventKindJobCompleted, res.EventKind) + }) } func TestWorker_WorkJob(t *testing.T) { @@ -556,4 +584,38 @@ func TestWorker_WorkJob(t *testing.T) { require.ErrorContains(t, err, "failed to update job to running state") require.Nil(t, res) }) + + // Honors config.Schema: River lives in a named schema, worked through a + // transaction with an empty search_path so tables resolve only via schema + // qualification. Needs its own infrastructure, so it skips setup. + t.Run("CustomSchema", func(t *testing.T) { + t.Parallel() + + var ( + dbPool = riversharedtest.DBPool(ctx, t) + driver = riverpgxv5.New(dbPool) + schema = riverdbtest.TestSchema(ctx, t, driver, nil) + config = &river.Config{ID: "rivertest-workjob", Schema: schema} + ) + + tx, err := dbPool.Begin(ctx) + require.NoError(t, err) + t.Cleanup(func() { _ = tx.Rollback(ctx) }) + + client, err := river.NewClient(driver, config) + require.NoError(t, err) + + insertRes, err := client.InsertTx(ctx, tx, testArgs{Value: "test"}, nil) + require.NoError(t, err) + + worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error { + require.Equal(t, rivertype.JobStateRunning, job.State) + return nil + }) + + tw := NewWorker(t, driver, config, worker) + res, err := tw.WorkJob(ctx, t, tx, insertRes.Job) + require.NoError(t, err) + require.Equal(t, river.EventKindJobCompleted, res.EventKind) + }) }