diff --git a/CHANGELOG.md b/CHANGELOG.md index 2401f5bb..6851d96e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -### Fixed - ⚠️ **Breaking API change:** `rivermigrate.Migrator.Validate` and `rivermigrate.Migrator.ValidateTx` now take a `*rivermigrate.ValidateOpts` parameter. Pass `nil` to preserve previous behavior. We normally endeavor not to make any breaking API changes, but this one will keep the API in a much nicer state, and is on an ancillary function that most installations won't be using. [PR #1259](https://github.com/riverqueue/river/pull/1259) ### Changed @@ -18,7 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed -- Add a 10-second timeout around `StandardPilot.JobGetAvailable` so a stalled standard-pilot fetch no longer hangs a producer indefinitely. [PR #1255](https://github.com/riverqueue/river/pull/1255) +- Add a 30-second timeout around `StandardPilot.JobGetAvailable` so a stalled standard-pilot fetch no longer hangs a producer indefinitely. [PR #1255](https://github.com/riverqueue/river/pull/1255) [PR #1263](https://github.com/riverqueue/river/pull/1263) - Fixed `rivertest.Worker.Work` and `WorkJob` to honor a configured custom `Config.Schema` when transitioning a job to its running state. Previously, the running-state update ran unqualified and could fail on a connection whose `search_path` didn't include the configured schema. [PR #1262](https://github.com/riverqueue/river/pull/1262) ## [0.38.0] - 2026-05-22 diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 8727a6e3..8b345edb 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -295,6 +295,14 @@ type ExecutorTx interface { // // API is not stable. DO NOT USE. Rollback(ctx context.Context) error + + // SetLocalStatementTimeout sets a statement timeout local to the current + // transaction if supported by the underlying database. Some databases don't + // support this behavior, so this should be used in addition to context + // timeouts, not instead of them. + // + // API is not stable. DO NOT USE. + SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error } type GetListenenerParams struct { diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 16a3ae35..3467a0ab 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -1050,6 +1050,10 @@ func (t *ExecutorTx) Rollback(ctx context.Context) error { return t.tx.Rollback() } +func (t *ExecutorTx) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error { + return t.Exec(ctx, "SELECT set_config('statement_timeout', $1, true)", timeout.String()) +} + type ExecutorSubTx struct { Executor @@ -1103,6 +1107,10 @@ func (t *ExecutorSubTx) Rollback(ctx context.Context) error { return nil } +func (t *ExecutorSubTx) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error { + return t.Exec(ctx, "SELECT set_config('statement_timeout', $1, true)", timeout.String()) +} + func interpretError(err error) error { if errors.Is(err, sql.ErrNoRows) { return rivertype.ErrNotFound diff --git a/riverdriver/riverdrivertest/executor_tx.go b/riverdriver/riverdrivertest/executor_tx.go index 451c65c2..a5ef9828 100644 --- a/riverdriver/riverdrivertest/executor_tx.go +++ b/riverdriver/riverdrivertest/executor_tx.go @@ -3,6 +3,7 @@ package riverdrivertest import ( "context" "testing" + "time" "github.com/stretchr/testify/require" @@ -161,6 +162,28 @@ func exerciseExecutorTx[TTx any](ctx context.Context, t *testing.T, }) }) + t.Run("SetLocalStatementTimeout", func(t *testing.T) { + t.Parallel() + + exec, driver := executorWithTx(ctx, t) + + tx, err := exec.Begin(ctx) + require.NoError(t, err) + t.Cleanup(func() { _ = tx.Rollback(ctx) }) + + const timeout = 1234 * time.Millisecond + + require.NoError(t, tx.SetLocalStatementTimeout(ctx, timeout)) + + if driver.DatabaseName() == databaseNameSQLite { + return + } + + var timeoutMatches bool + require.NoError(t, tx.QueryRow(ctx, "SELECT current_setting('statement_timeout')::interval = $1::interval", timeout.String()).Scan(&timeoutMatches)) + require.True(t, timeoutMatches) + }) + t.Run("PGAdvisoryXactLock", func(t *testing.T) { t.Parallel() diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index efbddf1f..c435653f 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -1024,6 +1024,10 @@ func (t *ExecutorTx) Rollback(ctx context.Context) error { return t.tx.Rollback(ctx) } +func (t *ExecutorTx) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error { + return t.Exec(ctx, "SELECT set_config('statement_timeout', $1, true)", timeout.String()) +} + type Listener struct { afterConnectExec string // should only ever be used in testing conn *pgx.Conn diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index b7d729ae..bd948bc3 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -1500,6 +1500,10 @@ func (t *ExecutorTx) Rollback(ctx context.Context) error { return t.tx.Rollback() } +func (t *ExecutorTx) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error { + return nil +} + type ExecutorSubTx struct { Executor @@ -1560,6 +1564,10 @@ func (t *ExecutorSubTx) Rollback(ctx context.Context) error { return nil } +func (t *ExecutorSubTx) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error { + return nil +} + func interpretError(err error) error { if errors.Is(err, sql.ErrNoRows) { return rivertype.ErrNotFound diff --git a/rivershared/riverpilot/standard_pilot.go b/rivershared/riverpilot/standard_pilot.go index 45e2ff2b..2c3eb6ee 100644 --- a/rivershared/riverpilot/standard_pilot.go +++ b/rivershared/riverpilot/standard_pilot.go @@ -2,16 +2,16 @@ package riverpilot import ( "context" + "fmt" "sync/atomic" "time" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/util/dbutil" "github.com/riverqueue/river/rivertype" ) -const standardPilotJobGetAvailableTimeoutDefault = 10 * time.Second - type StandardPilot struct { seq atomic.Int64 } @@ -23,10 +23,24 @@ func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Ex return nil, nil } - ctx, cancel := context.WithTimeoutCause(ctx, standardPilotJobGetAvailableTimeoutDefault, context.DeadlineExceeded) + // Set an outer context timeout on locking jobs, and where possible (i.e. in + // Postgres, but not SQLite), set an inner `statement_timeout` inside a + // transaction so the configuration isn't durable. The error from the + // Postgres version will be better, so try to have that trigger first. It + // also minimizes the chances of a successful operation that locks jobs but + // then accidentally errors because it's run time was so close to the Go + // timeout. + const timeout = 30 * time.Second + + ctx, cancel := context.WithTimeoutCause(ctx, timeout, context.DeadlineExceeded) defer cancel() - return exec.JobGetAvailable(ctx, params) + return dbutil.WithTxV(ctx, exec, func(ctx context.Context, execTx riverdriver.ExecutorTx) ([]*rivertype.JobRow, error) { + if err := execTx.SetLocalStatementTimeout(ctx, timeout-1*time.Second); err != nil { + return nil, fmt.Errorf("error setting statement timeout: %w", err) + } + return execTx.JobGetAvailable(ctx, params) + }) } func (p *StandardPilot) JobCancel(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error) { diff --git a/rivershared/riverpilot/standard_pilot_test.go b/rivershared/riverpilot/standard_pilot_test.go index ae7d4254..d625f13c 100644 --- a/rivershared/riverpilot/standard_pilot_test.go +++ b/rivershared/riverpilot/standard_pilot_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "testing" + "time" "github.com/stretchr/testify/require" @@ -17,10 +18,24 @@ type standardPilotExecutorMock struct { jobGetAvailableFunc func(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) } +func (m *standardPilotExecutorMock) Begin(ctx context.Context) (riverdriver.ExecutorTx, error) { + return &standardPilotExecutorTxMock{standardPilotExecutorMock: m}, nil +} + func (m *standardPilotExecutorMock) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) { return m.jobGetAvailableFunc(ctx, params) } +type standardPilotExecutorTxMock struct { + *standardPilotExecutorMock +} + +func (m *standardPilotExecutorTxMock) Commit(ctx context.Context) error { return nil } +func (m *standardPilotExecutorTxMock) Rollback(ctx context.Context) error { return nil } +func (m *standardPilotExecutorTxMock) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error { + return nil +} + func TestStandardPilot_JobGetAvailable(t *testing.T) { t.Parallel()