CROSSLINK-287 Add scheduler#596
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a new database-backed scheduler subsystem to the broker that stores scheduled tasks in Postgres, wakes the scheduler loop via LISTEN/NOTIFY, and dispatches due tasks onto the existing event bus under a new scheduler event domain.
Changes:
- Add
scheduled_tasktable + SQLC-generated access layer, plus migrations for creating/dropping the scheduler schema. - Add
SchedulerServicethat claims due tasks, dispatches them to the event bus, and reschedules (cron) or stops tasks. - Wire scheduler startup into
broker/appinitialization and add unit/integration tests + configuration documentation.
Reviewed changes
Copilot reviewed 15 out of 16 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| broker/test/scheduler/db/skdrepo_test.go | New integration tests for scheduler repository behavior against Postgres (testcontainers). |
| broker/sqlc/sqlc.yaml | Adds a new SQLC target for scheduler schema/queries. |
| broker/sqlc/skd_schema.sql | SQLC schema for the scheduled_task table and index. |
| broker/sqlc/skd_query.sql | SQLC queries for upsert, next-run lookup, and claiming due tasks. |
| broker/scheduler/service/scheduler.go | New scheduler loop, LISTEN reconnect logic, cron parsing/rescheduling, event dispatching. |
| broker/scheduler/service/scheduler_test.go | Unit tests for scheduler logic (cron parsing, waiting, dispatch/reschedule/disable). |
| broker/scheduler/db/repo.go | Scheduler repo wrapper around SQLC queries + NOTIFY on save. |
| broker/scheduler/db/models.go | Scheduled task status type and constants. |
| broker/README.md | Documents SCHEDULER_RETRY_DELAY configuration. |
| broker/migrations/038_add_scheduler.up.sql | Adds the scheduled_task table and index. |
| broker/migrations/038_add_scheduler.down.sql | Drops the scheduler index/table. |
| broker/Makefile | Includes scheduler SQLC outputs in generate-sqlc targets. |
| broker/go.sum | Adds robfig/cron checksum entries. |
| broker/go.mod | Adds robfig/cron dependency entry. |
| broker/events/eventmodels.go | Adds EventDomainScheduler. |
| broker/app/app.go | Starts the scheduler during app initialization. |
Comments suppressed due to low confidence (2)
broker/scheduler/service/scheduler.go:68
- Reconnect logic can clobber the existing LISTEN connection: connectAndListen assigns to the shared
connvariable even when pgx.Connect fails (conn becomes nil) and it doesn't close the previous connection before replacing it. This can leak connections and also sets up a nil dereference later; use a temporary variable and only swap/close after a successful reconnect.
connectAndListen := func() error {
conn, err = pgx.Connect(ctx, s.connString)
if err != nil {
ctx.Logger().Error("scheduler: unable to connect to database", "error", err)
return err
}
_, err = conn.Exec(ctx, "LISTEN "+schedulerChannel)
if err != nil {
ctx.Logger().Error("scheduler: unable to listen to channel", "channel", schedulerChannel, "error", err)
_ = conn.Close(ctx)
return err
}
broker/scheduler/service/scheduler.go:85
- The listener goroutine defers
conn.Close(ctx)via a closure over the sharedconnvariable. During reconnect attempts,connmay be set to nil (e.g., pgx.Connect failure), which can cause a panic when the goroutine exits; guard against nil and/or avoid reassigning the outer conn until a new connection is successfully established.
go func() {
defer func() { _ = conn.Close(ctx) }()
for {
_, er := conn.WaitForNotification(ctx)
if er != nil {
if strings.Contains(er.Error(), "context canceled") {
ctx.Logger().Info("scheduler: context cancelled, stopping listener")
return
}
496d630 to
d75f83b
Compare
0e06d65 to
fdbadc6
Compare
fdbadc6 to
f67c88a
Compare
adamdickmeiss
left a comment
There was a problem hiding this comment.
Have you tested , perhaps manually, with multiple broker instances?
It is hard to test when I don't have any implementation for real tasks but I added integration tests which is testing reconnect and multiple instances |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 17 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (1)
broker/test/scheduler/service/scheduler_test.go:245
adminPoolis created to runpg_terminate_backendbut is never closed. This can leak connections in the test process. Please adddefer adminPool.Close()(ort.Cleanup) after successful creation.
// Kill all LISTEN connections to simulate a network interruption.
adminPool, err := app.InitDbPool()
assert.NoError(t, err)
killCtx := common.CreateExtCtxWithArgs(context.Background(), nil)
| schedRepoRepo := sched_db.CreateSchedRepo(pool) | ||
| if err = StartScheduler(ctx, schedRepoRepo, eventBus); err != nil { |
| // Listen opens a dedicated Postgres connection and listens on sched_db.SchedulerChannel. | ||
| // Each notification wakes the scheduler loop. Reconnects with exponential | ||
| // backoff on connection loss. Blocks until ctx is cancelled. |
| // countingEventBus records dispatched tasks and is safe for concurrent use. | ||
| type countingEventBus struct { | ||
| events.EventBus | ||
| mu sync.Mutex | ||
| claims []string | ||
| } | ||
|
|
||
| func (b *countingEventBus) CreateTask(_ string, _ events.EventName, _ events.EventData, _ events.EventDomain, _ *string, _ events.SignalTarget) (string, error) { | ||
| b.mu.Lock() | ||
| defer b.mu.Unlock() | ||
| b.claims = append(b.claims, uuid.NewString()) | ||
| return uuid.NewString(), nil | ||
| } | ||
|
|
||
| func (b *countingEventBus) totalClaims() int { | ||
| b.mu.Lock() | ||
| defer b.mu.Unlock() | ||
| return len(b.claims) |
| func startScheduler(t *testing.T, ctx context.Context, bus events.EventBus) { | ||
| t.Helper() | ||
| pool, err := app.InitDbPool() | ||
| assert.NoError(t, err) | ||
| repo := sched_db.CreateSchedRepo(pool) | ||
| svc := sched_service.NewSchedulerService(repo, bus, connString) | ||
| extCtx := common.CreateExtCtxWithArgs(ctx, nil) | ||
| assert.NoError(t, svc.Listen(extCtx)) | ||
| go svc.Run(extCtx) | ||
| } |
|
@JanisSaldabols can you address copilot comments? |
No description provided.