Skip to content

Commit 3c62677

Browse files
authored
Reindexer: Mock reindex function to reduce parallel contention (#1210)
This one's aimed at fixing an intermittently failing test case: https://github.com/riverqueue/river/actions/runs/24322049420/job/71009998068?pr=1208 --- FAIL: TestReindexer (0.00s) --- FAIL: TestReindexer/ReindexesConfiguredIndexes (10.07s) reindexer_test.go:219: Reusing idle postgres schema "maintenance_2026_04_13t01_54_14_schema_04" [user facing: "maintenance_2026_04_13t01_54_14_schema_04"] after cleaning in 26.436456ms [4 generated] [7 reused] test_signal.go:95: timed out waiting on test signal after 10s logger.go:256: time=2026-04-13T01:54:27.581Z level=INFO msg="maintenance.Reindexer: Signaled to stop during index build; attempting to clean up concurrent artifacts" riverdbtest.go:293: Checked in postgres schema "maintenance_2026_04_13t01_54_14_schema_04"; 1 idle schema(s) [4 generated] [10 reused] --- FAIL: TestReindexer/ReindexesMinimalSubsetofIndexes (10.14s) reindexer_test.go:183: Reusing idle postgres schema "maintenance_2026_04_13t01_54_14_schema_01" [user facing: "maintenance_2026_04_13t01_54_14_schema_01"] after cleaning in 28.042877ms [4 generated] [10 reused] test_signal.go:95: timed out waiting on test signal after 10s reindexer_test.go:211: Error Trace: /home/runner/work/river/river/internal/maintenance/reindexer_test.go:211 Error: Should be false Test: TestReindexer/ReindexesMinimalSubsetofIndexes logger.go:256: time=2026-04-13T01:54:28.444Z level=INFO msg="maintenance.Reindexer: Signaled to stop during index build; attempting to clean up concurrent artifacts" riverdbtest.go:293: Checked in postgres schema "maintenance_2026_04_13t01_54_14_schema_01"; 1 idle schema(s) [5 generated] [24 reused] FAIL FAIL github.com/riverqueue/river/internal/maintenance 18.764s I'm diagnosing with Claude's help here, but what appears to be happening is that although a reindex operation in Postgres is often fast, it is still a heavy operation, and can slow down even further when there's a lot of concurrent activity hammering a database. Many reindexer test cases run in parallel, and it appears that was happening here is that we got a reindex that exceeded our maximum timeout of 10x. We have some evidence this during the test run from the runtime of 10.07s and the line: Signaled to stop during index build; attempting to clean up concurrent artifacts Here, I'm putting forward a solution proposed by Claude, which is to mock out the reindex operation, especially where we have a number of reindexes going in parallel. The tests `ReindexOneSuccess` and `ReindexSkippedWithReindexArtifact` still put load on the real reindex operation, so we're not going full mock here, and should see our intermittency considerably reduced while still being confident that everything still works.
1 parent 4155660 commit 3c62677

File tree

1 file changed

+36
-14
lines changed

1 file changed

+36
-14
lines changed

internal/maintenance/reindexer_test.go

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"sync"
78
"sync/atomic"
89
"testing"
910
"time"
@@ -24,13 +25,15 @@ type reindexerExecutorMock struct {
2425
indexesExistCalls atomic.Int32
2526
indexesExistFunc func(ctx context.Context, params *riverdriver.IndexesExistParams) (map[string]bool, error)
2627
indexesExistSignal chan struct{}
28+
indexReindexFunc func(ctx context.Context, params *riverdriver.IndexReindexParams) error
2729
}
2830

2931
func newReindexerExecutorMock(exec riverdriver.Executor) *reindexerExecutorMock {
3032
return &reindexerExecutorMock{
3133
Executor: exec,
3234
indexesExistFunc: exec.IndexesExist,
3335
indexesExistSignal: make(chan struct{}, 10),
36+
indexReindexFunc: exec.IndexReindex,
3437
}
3538
}
3639

@@ -45,6 +48,10 @@ func (m *reindexerExecutorMock) IndexesExist(ctx context.Context, params *riverd
4548
return m.indexesExistFunc(ctx, params)
4649
}
4750

51+
func (m *reindexerExecutorMock) IndexReindex(ctx context.Context, params *riverdriver.IndexReindexParams) error {
52+
return m.indexReindexFunc(ctx, params)
53+
}
54+
4855
func TestReindexer(t *testing.T) {
4956
t.Parallel()
5057

@@ -182,6 +189,23 @@ func TestReindexer(t *testing.T) {
182189

183190
svc, bundle := setup(t)
184191

192+
var (
193+
// Mock IndexReindex so the test doesn't depend on the speed of real
194+
// REINDEX CONCURRENTLY operations on the shared CI database. Track
195+
// which indexes got reindexed so we can verify the expected set.
196+
mockExec = newReindexerExecutorMock(bundle.exec)
197+
198+
reindexedNames []string
199+
reindexedMu sync.Mutex
200+
)
201+
mockExec.indexReindexFunc = func(ctx context.Context, params *riverdriver.IndexReindexParams) error {
202+
reindexedMu.Lock()
203+
defer reindexedMu.Unlock()
204+
reindexedNames = append(reindexedNames, params.Index)
205+
return nil
206+
}
207+
svc.exec = mockExec
208+
185209
svc.Config.IndexNames = []string{
186210
"river_job_kind",
187211
"river_job_prioritized_fetching_index",
@@ -198,25 +222,23 @@ func TestReindexer(t *testing.T) {
198222
case <-time.After(100 * time.Millisecond):
199223
}
200224

201-
// Make sure that no `CONCURRENTLY` artifacts exist after reindexing is
202-
// supposed to be done. Postgres creates a new index suffixed with
203-
// `_ccnew` before swapping it in as the new index. The existing index
204-
// is renamed `_ccold` before being dropped concurrently.
205-
//
206-
// https://www.postgresql.org/docs/current/sql-reindex.html#SQL-REINDEX-CONCURRENTLY
207-
for _, indexName := range svc.Config.IndexNames {
208-
for _, reindexArtifactName := range []string{indexName + "_ccnew", indexName + "_ccold"} {
209-
indexExists, err := bundle.exec.IndexExists(ctx, &riverdriver.IndexExistsParams{Index: reindexArtifactName, Schema: bundle.schema})
210-
require.NoError(t, err)
211-
require.False(t, indexExists)
212-
}
213-
}
225+
reindexedMu.Lock()
226+
require.ElementsMatch(t, svc.Config.IndexNames, reindexedNames)
227+
reindexedMu.Unlock()
214228
})
215229

216230
t.Run("ReindexesConfiguredIndexes", func(t *testing.T) {
217231
t.Parallel()
218232

219-
svc, _ := setup(t)
233+
svc, bundle := setup(t)
234+
235+
// Mock IndexReindex so the test doesn't depend on the speed of real
236+
// REINDEX CONCURRENTLY operations on the shared CI database.
237+
mockExec := newReindexerExecutorMock(bundle.exec)
238+
mockExec.indexReindexFunc = func(ctx context.Context, params *riverdriver.IndexReindexParams) error {
239+
return nil
240+
}
241+
svc.exec = mockExec
220242

221243
svc.Config.ScheduleFunc = runImmediatelyThenOnceAnHour()
222244

0 commit comments

Comments
 (0)