Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

⚠️ **Breaking API change:** `rivermigrate.Migrator.Validate` and `rivermigrate.Migrator.ValidateTx` now take a `*rivermigrate.ValidateOpts` parameter. Pass `nil` to preserve previous behavior. We normally endeavor not to make any breaking API changes, but this one will keep the API in a much nicer state, and is on an ancillary function that most installations won't be using. [PR #1259](https://github.com/riverqueue/river/pull/1259)

### Changed
Expand All @@ -18,7 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Add a 10-second timeout around `StandardPilot.JobGetAvailable` so a stalled standard-pilot fetch no longer hangs a producer indefinitely. [PR #1255](https://github.com/riverqueue/river/pull/1255)
- Add a 30-second timeout around `StandardPilot.JobGetAvailable` so a stalled standard-pilot fetch no longer hangs a producer indefinitely. [PR #1255](https://github.com/riverqueue/river/pull/1255) [PR #1263](https://github.com/riverqueue/river/pull/1263)
- Fixed `rivertest.Worker.Work` and `WorkJob` to honor a configured custom `Config.Schema` when transitioning a job to its running state. Previously, the running-state update ran unqualified and could fail on a connection whose `search_path` didn't include the configured schema. [PR #1262](https://github.com/riverqueue/river/pull/1262)

## [0.38.0] - 2026-05-22
Expand Down
8 changes: 8 additions & 0 deletions riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,14 @@ type ExecutorTx interface {
//
// API is not stable. DO NOT USE.
Rollback(ctx context.Context) error

// SetLocalStatementTimeout sets a statement timeout local to the current
// transaction if supported by the underlying database. Some databases don't
// support this behavior, so this should be used in addition to context
// timeouts, not instead of them.
//
// API is not stable. DO NOT USE.
SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error
}

type GetListenenerParams struct {
Expand Down
8 changes: 8 additions & 0 deletions riverdriver/riverdatabasesql/river_database_sql_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,10 @@ func (t *ExecutorTx) Rollback(ctx context.Context) error {
return t.tx.Rollback()
}

func (t *ExecutorTx) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error {
return t.Exec(ctx, "SELECT set_config('statement_timeout', $1, true)", timeout.String())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think time.Duration.String will return Go duration values that aren't pg-compatible?

}

type ExecutorSubTx struct {
Executor

Expand Down Expand Up @@ -1103,6 +1107,10 @@ func (t *ExecutorSubTx) Rollback(ctx context.Context) error {
return nil
}

func (t *ExecutorSubTx) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error {
return t.Exec(ctx, "SELECT set_config('statement_timeout', $1, true)", timeout.String())
}

func interpretError(err error) error {
if errors.Is(err, sql.ErrNoRows) {
return rivertype.ErrNotFound
Expand Down
23 changes: 23 additions & 0 deletions riverdriver/riverdrivertest/executor_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package riverdrivertest
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -161,6 +162,28 @@ func exerciseExecutorTx[TTx any](ctx context.Context, t *testing.T,
})
})

t.Run("SetLocalStatementTimeout", func(t *testing.T) {
t.Parallel()

exec, driver := executorWithTx(ctx, t)

tx, err := exec.Begin(ctx)
require.NoError(t, err)
t.Cleanup(func() { _ = tx.Rollback(ctx) })

const timeout = 1234 * time.Millisecond

require.NoError(t, tx.SetLocalStatementTimeout(ctx, timeout))

if driver.DatabaseName() == databaseNameSQLite {
return
}

var timeoutMatches bool
require.NoError(t, tx.QueryRow(ctx, "SELECT current_setting('statement_timeout')::interval = $1::interval", timeout.String()).Scan(&timeoutMatches))
require.True(t, timeoutMatches)
})

t.Run("PGAdvisoryXactLock", func(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 4 additions & 0 deletions riverdriver/riverpgxv5/river_pgx_v5_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,10 @@ func (t *ExecutorTx) Rollback(ctx context.Context) error {
return t.tx.Rollback(ctx)
}

func (t *ExecutorTx) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error {
return t.Exec(ctx, "SELECT set_config('statement_timeout', $1, true)", timeout.String())
}

type Listener struct {
afterConnectExec string // should only ever be used in testing
conn *pgx.Conn
Expand Down
8 changes: 8 additions & 0 deletions riverdriver/riversqlite/river_sqlite_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1500,6 +1500,10 @@ func (t *ExecutorTx) Rollback(ctx context.Context) error {
return t.tx.Rollback()
}

func (t *ExecutorTx) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error {
return nil
}

type ExecutorSubTx struct {
Executor

Expand Down Expand Up @@ -1560,6 +1564,10 @@ func (t *ExecutorSubTx) Rollback(ctx context.Context) error {
return nil
}

func (t *ExecutorSubTx) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error {
return nil
}

func interpretError(err error) error {
if errors.Is(err, sql.ErrNoRows) {
return rivertype.ErrNotFound
Expand Down
22 changes: 18 additions & 4 deletions rivershared/riverpilot/standard_pilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ package riverpilot

import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/util/dbutil"
"github.com/riverqueue/river/rivertype"
)

const standardPilotJobGetAvailableTimeoutDefault = 10 * time.Second

type StandardPilot struct {
seq atomic.Int64
}
Expand All @@ -23,10 +23,24 @@ func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Ex
return nil, nil
}

ctx, cancel := context.WithTimeoutCause(ctx, standardPilotJobGetAvailableTimeoutDefault, context.DeadlineExceeded)
// Set an outer context timeout on locking jobs, and where possible (i.e. in
// Postgres, but not SQLite), set an inner `statement_timeout` inside a
// transaction so the configuration isn't durable. The error from the
// Postgres version will be better, so try to have that trigger first. It
// also minimizes the chances of a successful operation that locks jobs but
// then accidentally errors because it's run time was so close to the Go
// timeout.
const timeout = 30 * time.Second

ctx, cancel := context.WithTimeoutCause(ctx, timeout, context.DeadlineExceeded)
defer cancel()

return exec.JobGetAvailable(ctx, params)
return dbutil.WithTxV(ctx, exec, func(ctx context.Context, execTx riverdriver.ExecutorTx) ([]*rivertype.JobRow, error) {
if err := execTx.SetLocalStatementTimeout(ctx, timeout-1*time.Second); err != nil {
return nil, fmt.Errorf("error setting statement timeout: %w", err)
}
return execTx.JobGetAvailable(ctx, params)
})
}

func (p *StandardPilot) JobCancel(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error) {
Expand Down
15 changes: 15 additions & 0 deletions rivershared/riverpilot/standard_pilot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand All @@ -17,10 +18,24 @@ type standardPilotExecutorMock struct {
jobGetAvailableFunc func(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error)
}

func (m *standardPilotExecutorMock) Begin(ctx context.Context) (riverdriver.ExecutorTx, error) {
return &standardPilotExecutorTxMock{standardPilotExecutorMock: m}, nil
}

func (m *standardPilotExecutorMock) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) {
return m.jobGetAvailableFunc(ctx, params)
}

type standardPilotExecutorTxMock struct {
*standardPilotExecutorMock
}

func (m *standardPilotExecutorTxMock) Commit(ctx context.Context) error { return nil }
func (m *standardPilotExecutorTxMock) Rollback(ctx context.Context) error { return nil }
func (m *standardPilotExecutorTxMock) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error {
return nil
}

func TestStandardPilot_JobGetAvailable(t *testing.T) {
t.Parallel()

Expand Down
Loading