From 0afb5ff553dd33095063752f23a58944c49eb26a Mon Sep 17 00:00:00 2001 From: Janis Saldabols Date: Thu, 14 May 2026 10:54:44 +0300 Subject: [PATCH 1/4] CROSSLINK-287 Add scheduler --- broker/go.mod | 1 + broker/sqlc/skd_query.sql | 35 +++++++++++++++++++++++++++++++++++ broker/sqlc/skd_schema.sql | 15 +++++++++++++++ 3 files changed, 51 insertions(+) create mode 100644 broker/sqlc/skd_query.sql create mode 100644 broker/sqlc/skd_schema.sql diff --git a/broker/go.mod b/broker/go.mod index 1bfe0b2a..b8409e8f 100644 --- a/broker/go.mod +++ b/broker/go.mod @@ -120,6 +120,7 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect github.com/riza-io/grpc-go v0.2.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/shirou/gopsutil/v4 v4.26.4 // indirect github.com/sirupsen/logrus v1.9.4 // indirect github.com/speakeasy-api/jsonpath v0.6.3 // indirect diff --git a/broker/sqlc/skd_query.sql b/broker/sqlc/skd_query.sql new file mode 100644 index 00000000..012d9b71 --- /dev/null +++ b/broker/sqlc/skd_query.sql @@ -0,0 +1,35 @@ +-- name: SaveScheduledTask :one +INSERT INTO scheduled_task (id, event_name, cron_expr, payload, run_at, status, created_at, updated_at) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +ON CONFLICT (id) DO UPDATE + SET event_name = EXCLUDED.event_name, + cron_expr = EXCLUDED.cron_expr, + payload = EXCLUDED.payload, + run_at = EXCLUDED.run_at, + status = EXCLUDED.status, + updated_at = now() +RETURNING sqlc.embed(scheduled_task); + +-- name: GetNextRunAt :one +SELECT run_at +FROM scheduled_task +WHERE status = 'pending' + AND run_at IS NOT NULL +ORDER BY run_at +LIMIT 1; + +-- name: ClaimNextScheduledTask :one +UPDATE scheduled_task +SET status = 'running', + updated_at = now() +WHERE id = (SELECT id + FROM scheduled_task + WHERE status = 'pending' + AND run_at <= now() + ORDER BY run_at + LIMIT 1 + FOR +UPDATE SKIP LOCKED + ) + RETURNING sqlc.embed(scheduled_task); + diff --git a/broker/sqlc/skd_schema.sql b/broker/sqlc/skd_schema.sql new file mode 100644 index 00000000..2bef4760 --- /dev/null +++ b/broker/sqlc/skd_schema.sql @@ -0,0 +1,15 @@ +CREATE TABLE scheduled_task +( + id TEXT PRIMARY KEY, + event_name TEXT NOT NULL, + cron_expr TEXT NOT NULL, + payload JSONB, + run_at TIMESTAMPTZ, + status TEXT NOT NULL DEFAULT 'pending', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ, + FOREIGN KEY (event_name) REFERENCES event_config (event_name) +); + +CREATE INDEX idx_scheduled_task_run_at ON scheduled_task (run_at) WHERE status = 'pending'; + From 2dce0a70a0a01d5f24230d027f2840644f91137b Mon Sep 17 00:00:00 2001 From: Janis Saldabols Date: Mon, 18 May 2026 16:45:07 +0300 Subject: [PATCH 2/4] CROSSLINK-288 Add batch action endpoints --- broker/Makefile | 16 +- broker/app/app.go | 47 +- .../ModuleDescriptor-template.json | 92 +++- broker/events/eventmodels.go | 8 + .../migrations/039_add_batch_action.down.sql | 3 + broker/migrations/039_add_batch_action.up.sql | 17 + broker/oapi/cfg.yaml | 1 + broker/oapi/open-api.yaml | 183 ++++++++ broker/oapi/sched-cfg.yaml | 8 + broker/scheduler/api/api_handler.go | 243 ++++++++++ broker/scheduler/api/api_handler_test.go | 419 ++++++++++++++++++ broker/scheduler/db/repo.go | 55 +++ broker/scheduler/service/scheduler.go | 6 +- broker/scheduler/service/scheduler_test.go | 7 +- broker/sqlc/sched_query.sql | 33 ++ broker/sqlc/sched_schema.sql | 11 + 16 files changed, 1119 insertions(+), 30 deletions(-) create mode 100644 broker/migrations/039_add_batch_action.down.sql create mode 100644 broker/migrations/039_add_batch_action.up.sql create mode 100644 broker/oapi/sched-cfg.yaml create mode 100644 broker/scheduler/api/api_handler.go create mode 100644 broker/scheduler/api/api_handler_test.go diff --git a/broker/Makefile b/broker/Makefile index 59fc967d..8fa0020d 100644 --- a/broker/Makefile +++ b/broker/Makefile @@ -55,6 +55,12 @@ PS_OAPI_CFG = $(PS_OAPI_DIR)/ps-cfg.yaml PS_OAPI_SPEC = $(PS_OAPI_DIR)/open-api.yaml PS_OAPI_GEN = pullslip/oapi/ps-openapi_gen.go +# Pull slip OpenAPI +SCHED_OAPI_DIR=oapi +SCHED_OAPI_CFG = $(SCHED_OAPI_DIR)/sched-cfg.yaml +SCHED_OAPI_SPEC = $(SCHED_OAPI_DIR)/open-api.yaml +SCHED_OAPI_GEN = scheduler/oapi/sched_openapi_gen.go + .PHONY: all docker generate generate-sqlc generate-api generate-commit-id check run fmt fmt-check clean view-coverage deps-update tools-update lint vulncheck check-coverage all: $(BINARY) archive @@ -68,7 +74,7 @@ generate-commit-id: $(COMMIT_ID) generate-sqlc: $(SQL_GEN_OUT) -generate-api: $(OAPI_GEN) $(PR_OAPI_GEN) $(PS_OAPI_GEN) +generate-api: $(OAPI_GEN) $(PR_OAPI_GEN) $(PS_OAPI_GEN) $(SCHED_OAPI_GEN) $(RETURNABLES_JSON): $(RETURNABLES_YAML) $(GO) run github.com/mikefarah/yq/v4@v4.52.5 eval -o=json $< > $@.tmp @@ -83,6 +89,9 @@ $(PR_OAPI_GEN): $(PR_OAPI_CFG) $(PR_OAPI_SPEC) $(PR_OAPI_OVERLAY) $(PS_OAPI_GEN): $(PS_OAPI_CFG) $(PS_OAPI_SPEC) $(OAPI_CODEGEN) -config ./$(PS_OAPI_CFG) ./$(PS_OAPI_SPEC) +$(SCHED_OAPI_GEN): $(SCHED_OAPI_CFG) $(SCHED_OAPI_SPEC) + $(OAPI_CODEGEN) -config ./$(SCHED_OAPI_CFG) ./$(SCHED_OAPI_SPEC) + $(SQL_GEN_OUT): $(SQL_GEN_IN) $(SQLC_CONFIG) $(SQLC) generate -f $(SQLC_CONFIG) @@ -92,10 +101,10 @@ $(COMMIT_ID): $(GIT_COMMIT_DEPS) printf "%s" "$$commit" > "$@"; \ fi -$(BINARY): $(COMMIT_ID) $(SQL_GEN_OUT) $(OAPI_GEN) $(PR_OAPI_GEN) $(PS_OAPI_GEN) $(BUILD_GOFILES) $(RETURNABLES_JSON) $(PULLSLIP_TEMPLATE) +$(BINARY): $(COMMIT_ID) $(SQL_GEN_OUT) $(OAPI_GEN) $(PR_OAPI_GEN) $(PS_OAPI_GEN) $(SCHED_OAPI_GEN) $(BUILD_GOFILES) $(RETURNABLES_JSON) $(PULLSLIP_TEMPLATE) $(GO) build -v -o $(BINARY) ./$(MAIN_PACKAGE) -archive: $(COMMIT_ID) $(SQL_GEN_OUT) $(OAPI_GEN) $(PR_OAPI_GEN) $(PS_OAPI_GEN) $(BUILD_GOFILES) $(RETURNABLES_JSON) $(PULLSLIP_TEMPLATE) +archive: $(COMMIT_ID) $(SQL_GEN_OUT) $(OAPI_GEN) $(PR_OAPI_GEN) $(PS_OAPI_GEN) $(SCHED_OAPI_GEN) $(BUILD_GOFILES) $(RETURNABLES_JSON) $(PULLSLIP_TEMPLATE) $(GO) build -v -o archive ./cmd/archive check: generate @@ -141,3 +150,4 @@ clean: rm -f $(OAPI_GEN) rm -f $(PR_OAPI_GEN) rm -f $(PS_OAPI_GEN) + rm -f $(SCHED_OAPI_GEN) diff --git a/broker/app/app.go b/broker/app/app.go index 2a8fc181..fd34179f 100644 --- a/broker/app/app.go +++ b/broker/app/app.go @@ -21,7 +21,9 @@ import ( psapi "github.com/indexdata/crosslink/broker/pullslip/api" ps_db "github.com/indexdata/crosslink/broker/pullslip/db" psoapi "github.com/indexdata/crosslink/broker/pullslip/oapi" + schedapi "github.com/indexdata/crosslink/broker/scheduler/api" sched_db "github.com/indexdata/crosslink/broker/scheduler/db" + schedoapi "github.com/indexdata/crosslink/broker/scheduler/oapi" sched_service "github.com/indexdata/crosslink/broker/scheduler/service" "github.com/indexdata/crosslink/broker/tenant" @@ -94,16 +96,17 @@ var ServeMux *http.ServeMux var appCtx = common.CreateExtCtxWithLogArgsAndHandler(context.Background(), nil, configLog()) type Context struct { - EventBus events.EventBus - IllRepo ill_db.IllRepo - EventRepo events.EventRepo - DirAdapter adapter.DirectoryLookupAdapter - PrRepo pr_db.PrRepo - TenantResolver *tenant.TenantResolver - ApiHandler api.ApiHandler - PrApiHandler prapi.PatronRequestApiHandler - SseBroker *api.SseBroker - PsApiHandler psapi.PullSlipApiHandler + EventBus events.EventBus + IllRepo ill_db.IllRepo + EventRepo events.EventRepo + DirAdapter adapter.DirectoryLookupAdapter + PrRepo pr_db.PrRepo + TenantResolver *tenant.TenantResolver + ApiHandler api.ApiHandler + PrApiHandler prapi.PatronRequestApiHandler + SseBroker *api.SseBroker + PsApiHandler psapi.PullSlipApiHandler + SchedApiHandler schedapi.SchedulerApiHandler } func configLog() slog.Handler { @@ -200,21 +203,23 @@ func Init(ctx context.Context) (Context, error) { } schedRepoRepo := sched_db.CreateSchedRepo(pool) + skdApiHandler := schedapi.NewSchedulerApiHandler(API_PAGE_SIZE, schedRepoRepo, tenantResolver) if err = StartScheduler(ctx, schedRepoRepo, eventBus); err != nil { return Context{}, err } return Context{ - EventBus: eventBus, - IllRepo: illRepo, - EventRepo: eventRepo, - DirAdapter: dirAdapter, - PrRepo: prRepo, - TenantResolver: tenantResolver, - ApiHandler: apiHandler, - PrApiHandler: prApiHandler, - SseBroker: sseBroker, - PsApiHandler: psApiHandler, + EventBus: eventBus, + IllRepo: illRepo, + EventRepo: eventRepo, + DirAdapter: dirAdapter, + PrRepo: prRepo, + TenantResolver: tenantResolver, + ApiHandler: apiHandler, + PrApiHandler: prApiHandler, + SseBroker: sseBroker, + PsApiHandler: psApiHandler, + SchedApiHandler: skdApiHandler, }, nil } @@ -241,12 +246,14 @@ func StartServer(ctx Context) error { oapi.HandlerFromMux(&ctx.ApiHandler, ServeMux) proapi.HandlerFromMux(&ctx.PrApiHandler, ServeMux) psoapi.HandlerFromMux(&ctx.PsApiHandler, ServeMux) + schedoapi.HandlerFromMux(&ctx.SchedApiHandler, ServeMux) ServeMux.HandleFunc("GET /sse/events", ctx.SseBroker.ServeHTTP) if ctx.TenantResolver.HasTenantMapping() { basePath := tenant.OKAPI_PATH_PREFIX oapi.HandlerFromMuxWithBaseURL(&ctx.ApiHandler, ServeMux, basePath) proapi.HandlerFromMuxWithBaseURL(&ctx.PrApiHandler, ServeMux, basePath) psoapi.HandlerFromMuxWithBaseURL(&ctx.PsApiHandler, ServeMux, basePath) + schedoapi.HandlerFromMuxWithBaseURL(&ctx.SchedApiHandler, ServeMux, basePath) ServeMux.HandleFunc("GET "+basePath+"/sse/events", ctx.SseBroker.ServeHTTP) } signatureHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/broker/descriptors/ModuleDescriptor-template.json b/broker/descriptors/ModuleDescriptor-template.json index 8ed72ee5..e6115846 100644 --- a/broker/descriptors/ModuleDescriptor-template.json +++ b/broker/descriptors/ModuleDescriptor-template.json @@ -203,6 +203,42 @@ "permissionsRequired": [ "broker.pullslips.post" ] + }, + { + "methods": [ + "GET" + ], + "pathPattern": "/broker/batch_actions", + "permissionsRequired": [ + "broker.batch_actions.get" + ] + }, + { + "methods": [ + "POST" + ], + "pathPattern": "/broker/batch_actions", + "permissionsRequired": [ + "broker.batch_actions.post" + ] + }, + { + "methods": [ + "GET" + ], + "pathPattern": "/broker/batch_actions/{id}", + "permissionsRequired": [ + "broker.batch_actions.item.get" + ] + }, + { + "methods": [ + "DELETE" + ], + "pathPattern": "/broker/batch_actions/{id}", + "permissionsRequired": [ + "broker.batch_actions.item.delete" + ] } ] } @@ -328,6 +364,56 @@ "displayName": "Broker - create pull slip for a patron request", "permissionName": "broker.pullslips.post" }, + { + "description": "List batch actions", + "displayName": "Broker - list batch actions", + "permissionName": "broker.batch_actions.get" + }, + { + "description": "Create a batch action", + "displayName": "Broker - create batch action", + "permissionName": "broker.batch_actions.post" + }, + { + "description": "Read a batch action", + "displayName": "Broker - read batch action", + "permissionName": "broker.batch_actions.item.get" + }, + { + "description": "Delete a batch action", + "displayName": "Broker - delete batch action", + "permissionName": "broker.batch_actions.item.delete" + }, + { + "description": "Read-only access to batch actions", + "displayName": "Broker - batch actions: read", + "permissionName": "broker.batch_actions.read", + "visible": true, + "subPermissions": [ + "broker.batch_actions.get", + "broker.batch_actions.item.get" + ] + }, + { + "description": "Write access to batch actions", + "displayName": "Broker - batch actions: write", + "permissionName": "broker.batch_actions.write", + "visible": true, + "subPermissions": [ + "broker.batch_actions.post", + "broker.batch_actions.item.delete" + ] + }, + { + "description": "Read and write access to batch actions", + "displayName": "Broker - batch actions: all", + "permissionName": "broker.batch_actions.all", + "visible": true, + "subPermissions": [ + "broker.batch_actions.read", + "broker.batch_actions.write" + ] + }, { "description": "Read-only access to patron requests", "displayName": "Broker - patron requests: read", @@ -401,7 +487,11 @@ "broker.patron_requests.item.notifications.item.receipt.put", "broker.pullslips.item.get", "broker.pullslips.item.pdf.get", - "broker.pullslips.post" + "broker.pullslips.post", + "broker.batch_actions.get", + "broker.batch_actions.post", + "broker.batch_actions.item.get", + "broker.batch_actions.item.delete" ] } ] diff --git a/broker/events/eventmodels.go b/broker/events/eventmodels.go index 0c037e27..3ba5007e 100644 --- a/broker/events/eventmodels.go +++ b/broker/events/eventmodels.go @@ -50,6 +50,7 @@ const ( EventNameLmsSupplierMessage EventName = "lms-supplier-message" EventNameSendNotification EventName = "send-notification" EventNameCheckAvailability EventName = "check-availability" + EventNameSendEmail EventName = "send-email" ) type Signal string @@ -94,6 +95,7 @@ type CommonEventData struct { Action *pr_db.PatronRequestAction `json:"action,omitempty"` ActionResult *ActionResult `json:"actionResult,omitempty"` Notification *pr_db.Notification `json:"notification,omitempty"` + BatchActionData *BatchActionData `json:"batchActionData,omitempty"` } type ActionResult struct { @@ -123,6 +125,12 @@ type NotifyData struct { Target SignalTarget `json:"target"` } +type BatchActionData struct { + ActionName string `json:"actionName"` + Selector string `json:"selector"` + TaskId string `json:"taskId"` +} + func NewErrorResult(message string, cause string) (EventStatus, *EventResult) { return EventStatusError, &EventResult{ CommonEventData: CommonEventData{ diff --git a/broker/migrations/039_add_batch_action.down.sql b/broker/migrations/039_add_batch_action.down.sql new file mode 100644 index 00000000..e2118d80 --- /dev/null +++ b/broker/migrations/039_add_batch_action.down.sql @@ -0,0 +1,3 @@ +DROP TABLE IF EXISTS batch_action; + +DELETE FROM event_config WHERE event_name = 'send-email'; diff --git a/broker/migrations/039_add_batch_action.up.sql b/broker/migrations/039_add_batch_action.up.sql new file mode 100644 index 00000000..37dd1df3 --- /dev/null +++ b/broker/migrations/039_add_batch_action.up.sql @@ -0,0 +1,17 @@ +CREATE TABLE batch_action +( + id TEXT PRIMARY KEY, + action_name TEXT NOT NULL, + schedule TEXT NOT NULL, + batch_query TEXT NOT NULL, + owner TEXT NOT NULL, + scheduled_task_id TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ +); + +CREATE INDEX idx_batch_action_owner ON batch_action (owner); + +INSERT INTO event_config (event_name, event_type, retry_count) +VALUES ('send-email', 'TASK', 1); + diff --git a/broker/oapi/cfg.yaml b/broker/oapi/cfg.yaml index bd99b65c..bcc2e5cb 100644 --- a/broker/oapi/cfg.yaml +++ b/broker/oapi/cfg.yaml @@ -5,6 +5,7 @@ output-options: - patron-requests-api - sse-api - pull-slips-api + - scheduler-api overlay: path: oapi/overlay.yaml generate: diff --git a/broker/oapi/open-api.yaml b/broker/oapi/open-api.yaml index 55829da5..97462c59 100644 --- a/broker/oapi/open-api.yaml +++ b/broker/oapi/open-api.yaml @@ -1011,6 +1011,71 @@ components: - owner - searchCriteria + BatchAction: + type: object + title: Batch Action + description: A scheduled batch action + properties: + id: + type: string + description: Unique identifier of the batch action + schedule: + type: string + description: Cron schedule expression, e.g. "0 6 * * 1" (every Monday at 06:00) + actionName: + type: string + description: Name of the batch action to run + enum: + - email + createdAt: + type: string + format: date-time + description: Creation timestamp + updatedAt: + type: string + format: date-time + description: Last update timestamp + required: + - id + - schedule + - actionName + - createdAt + + BatchActions: + type: object + required: + - items + - about + properties: + about: + $ref: '#/components/schemas/About' + items: + type: array + description: List of batch actions + items: + $ref: '#/components/schemas/BatchAction' + + CreateBatchAction: + type: object + title: Create Batch Action + description: Request body for creating a batch action + properties: + schedule: + type: string + description: Cron schedule expression, e.g. "0 6 * * 1" (every Monday at 06:00) + actionName: + type: string + description: Name of the batch action to run + enum: + - email + batchQuery: + type: string + description: Batch action selection query in CQL format + required: + - schedule + - actionName + - batchQuery + CreatePullSlip: type: object title: Create Pull Slip @@ -1861,6 +1926,124 @@ paths: schema: $ref: '#/components/schemas/Error' + /batch_actions: + get: + summary: List all batch actions + tags: + - scheduler-api + parameters: + - $ref: '#/components/parameters/Tenant' + - $ref: '#/components/parameters/Symbol' + - $ref: '#/components/parameters/Limit' + - $ref: '#/components/parameters/Offset' + responses: + '200': + description: Successful retrieval of batch actions + content: + application/json: + schema: + $ref: '#/components/schemas/BatchActions' + '500': + description: Internal Server Error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + post: + summary: Create a new batch action + tags: + - scheduler-api + parameters: + - $ref: '#/components/parameters/Tenant' + - $ref: '#/components/parameters/Symbol' + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/CreateBatchAction' + responses: + '201': + description: Batch action created successfully + content: + application/json: + schema: + $ref: '#/components/schemas/BatchAction' + '400': + description: Bad Request. Invalid input. + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal Server Error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /batch_actions/{id}: + get: + summary: Get a batch action by ID + tags: + - scheduler-api + parameters: + - $ref: '#/components/parameters/Tenant' + - $ref: '#/components/parameters/Symbol' + - in: path + name: id + schema: + type: string + required: true + description: ID of the batch action to retrieve + responses: + '200': + description: Batch action retrieved successfully + content: + application/json: + schema: + $ref: '#/components/schemas/BatchAction' + '404': + description: Batch action not found + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal Server Error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + delete: + summary: Delete a batch action by ID + tags: + - scheduler-api + parameters: + - $ref: '#/components/parameters/Tenant' + - $ref: '#/components/parameters/Symbol' + - in: path + name: id + schema: + type: string + required: true + description: ID of the batch action to delete + responses: + '204': + description: Batch action deleted successfully (No Content) + '404': + description: Batch action not found + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal Server Error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + /pullslips/{id}: get: summary: Get pull slip by ID diff --git a/broker/oapi/sched-cfg.yaml b/broker/oapi/sched-cfg.yaml new file mode 100644 index 00000000..4bbee1ad --- /dev/null +++ b/broker/oapi/sched-cfg.yaml @@ -0,0 +1,8 @@ +package: schedoapi +output: scheduler/oapi/sched_openapi_gen.go +output-options: + include-tags: + - scheduler-api +generate: + models: true + std-http-server: true diff --git a/broker/scheduler/api/api_handler.go b/broker/scheduler/api/api_handler.go new file mode 100644 index 00000000..4b51b8dc --- /dev/null +++ b/broker/scheduler/api/api_handler.go @@ -0,0 +1,243 @@ +package api + +import ( + "encoding/json" + "errors" + "net/http" + "time" + + "github.com/google/uuid" + brokerapi "github.com/indexdata/crosslink/broker/api" + "github.com/indexdata/crosslink/broker/common" + "github.com/indexdata/crosslink/broker/events" + sched_db "github.com/indexdata/crosslink/broker/scheduler/db" + schedoapi "github.com/indexdata/crosslink/broker/scheduler/oapi" + sched_service "github.com/indexdata/crosslink/broker/scheduler/service" + "github.com/indexdata/crosslink/broker/tenant" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" +) + +// SchedulerApiHandler implements schedoapi.ServerInterface. +type SchedulerApiHandler struct { + limitDefault int32 + schedRepo sched_db.SchedRepo + tenantResolver *tenant.TenantResolver +} + +// NewSchedulerApiHandler creates a SchedulerApiHandler. +func NewSchedulerApiHandler(limitDefault int32, schedRepo sched_db.SchedRepo, tenantResolver *tenant.TenantResolver) SchedulerApiHandler { + return SchedulerApiHandler{ + limitDefault: limitDefault, + schedRepo: schedRepo, + tenantResolver: tenantResolver, + } +} + +// GetBatchActions lists all batch actions for the resolved owner, with pagination. +func (h SchedulerApiHandler) GetBatchActions(w http.ResponseWriter, r *http.Request, params schedoapi.GetBatchActionsParams) { + logParams := map[string]string{"method": "GetBatchActions"} + ctx := common.CreateExtCtxWithArgs(r.Context(), &common.LoggerArgs{Other: logParams}) + + owner, ok := h.resolveOwner(ctx, w, r, params.Symbol) + if !ok { + return + } + + limit := h.limitDefault + if params.Limit != nil && *params.Limit > 0 { + limit = *params.Limit + } + offset := int32(0) + if params.Offset != nil && *params.Offset > 0 { + offset = *params.Offset + } + + items, count, err := h.schedRepo.GetBatchActions(ctx, sched_db.GetBatchActionsParams{ + Owner: owner, + Limit: limit, + Offset: offset, + }) + if err != nil { + brokerapi.AddInternalError(ctx, w, err) + return + } + + resp := schedoapi.BatchActions{ + About: schedoapi.About(brokerapi.CollectAboutData(count, offset, limit, r)), + Items: toBatchActionList(items), + } + brokerapi.WriteJsonResponse(w, resp) +} + +// PostBatchActions creates a new batch action. +func (h SchedulerApiHandler) PostBatchActions(w http.ResponseWriter, r *http.Request, params schedoapi.PostBatchActionsParams) { + logParams := map[string]string{"method": "PostBatchActions"} + ctx := common.CreateExtCtxWithArgs(r.Context(), &common.LoggerArgs{Other: logParams}) + + if r.Body == nil || r.Body == http.NoBody { + brokerapi.AddBadRequestError(ctx, w, errors.New("missing body")) + return + } + var create schedoapi.CreateBatchAction + if err := json.NewDecoder(r.Body).Decode(&create); err != nil { + brokerapi.AddBadRequestError(ctx, w, err) + return + } + if !create.ActionName.Valid() { + brokerapi.AddBadRequestError(ctx, w, errors.New("unknown actionName: "+string(create.ActionName))) + return + } + if create.Schedule == "" { + brokerapi.AddBadRequestError(ctx, w, errors.New("schedule must not be empty")) + return + } + + owner, ok := h.resolveOwner(ctx, w, r, params.Symbol) + if !ok { + return + } + + next, err := sched_service.NextCronTime(create.Schedule) + if err != nil { + brokerapi.AddBadRequestError(ctx, w, err) + return + } + var ba sched_db.BatchAction + err = h.schedRepo.WithTxFunc(ctx, func(schedRepo sched_db.SchedRepo) error { + id := uuid.NewString() + now := pgtype.Timestamptz{Time: time.Now(), Valid: true} + taskId := uuid.New().String() + task, inErr := schedRepo.SaveScheduledTask(ctx, sched_db.SaveScheduledTaskParams{ + ID: taskId, + EventName: events.EventNameSendEmail, + CronExpr: create.Schedule, + Status: sched_db.ScheduledTaskStatusPending, + Payload: events.EventData{CommonEventData: events.CommonEventData{BatchActionData: &events.BatchActionData{ + ActionName: string(create.ActionName), + Selector: create.BatchQuery, + TaskId: taskId, + }}}, + RunAt: next, + CreatedAt: now, + }) + if inErr != nil { + return inErr + } + ba, inErr = schedRepo.SaveBatchAction(ctx, sched_db.SaveBatchActionParams{ + ID: id, + Schedule: create.Schedule, + ActionName: string(create.ActionName), + BatchQuery: create.BatchQuery, + Owner: owner, + CreatedAt: now, + ScheduledTaskID: task.ID, + }) + return inErr + }) + if err != nil { + brokerapi.AddInternalError(ctx, w, err) + return + } + + w.Header().Set("Location", brokerapi.Link(r, brokerapi.Path("batch_actions", ba.ID), nil)) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + _ = json.NewEncoder(w).Encode(toBatchAction(ba)) +} + +// GetBatchActionsId returns a single batch action by ID. +func (h SchedulerApiHandler) GetBatchActionsId(w http.ResponseWriter, r *http.Request, id string, params schedoapi.GetBatchActionsIdParams) { + logParams := map[string]string{"method": "GetBatchActionsId", "id": id} + ctx := common.CreateExtCtxWithArgs(r.Context(), &common.LoggerArgs{Other: logParams}) + + owner, ok := h.resolveOwner(ctx, w, r, params.Symbol) + if !ok { + return + } + + ba, err := h.schedRepo.GetBatchActionById(ctx, id, owner) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + brokerapi.AddNotFoundError(w) + return + } + brokerapi.AddInternalError(ctx, w, err) + return + } + brokerapi.WriteJsonResponse(w, toBatchAction(ba)) +} + +// DeleteBatchActionsId deletes a batch action by ID. +func (h SchedulerApiHandler) DeleteBatchActionsId(w http.ResponseWriter, r *http.Request, id string, params schedoapi.DeleteBatchActionsIdParams) { + logParams := map[string]string{"method": "DeleteBatchActionsId", "id": id} + ctx := common.CreateExtCtxWithArgs(r.Context(), &common.LoggerArgs{Other: logParams}) + + owner, ok := h.resolveOwner(ctx, w, r, params.Symbol) + if !ok { + return + } + + err := h.schedRepo.WithTxFunc(ctx, func(schedRepo sched_db.SchedRepo) error { + ba, inErr := schedRepo.GetBatchActionById(ctx, id, owner) + if inErr != nil { + return inErr + } + task, inErr := schedRepo.GetScheduledTaskById(ctx, ba.ScheduledTaskID) + if inErr != nil { + return inErr + } + task.Status = sched_db.ScheduledTaskStatusStopped + task, inErr = schedRepo.SaveScheduledTask(ctx, sched_db.SaveScheduledTaskParams(task)) + if inErr != nil { + return inErr + } + return schedRepo.DeleteBatchAction(ctx, id, owner) + }) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + brokerapi.AddNotFoundError(w) + return + } + brokerapi.AddInternalError(ctx, w, err) + return + } + w.WriteHeader(http.StatusNoContent) +} + +// resolveOwner resolves the tenant and returns the owner symbol. +// Writes an error response and returns false on failure. +func (h SchedulerApiHandler) resolveOwner(ctx common.ExtendedContext, w http.ResponseWriter, r *http.Request, symbol *string) (string, bool) { + t, err := h.tenantResolver.Resolve(ctx, r, symbol) + if err != nil { + brokerapi.AddBadRequestError(ctx, w, err) + return "", false + } + owner, err := t.GetRequestSymbol() + if err != nil { + brokerapi.AddBadRequestError(ctx, w, err) + return "", false + } + return owner, true +} + +func toBatchAction(ba sched_db.BatchAction) schedoapi.BatchAction { + resp := schedoapi.BatchAction{ + Id: ba.ID, + Schedule: ba.Schedule, + ActionName: schedoapi.BatchActionActionName(ba.ActionName), + CreatedAt: ba.CreatedAt.Time, + } + if ba.UpdatedAt.Valid { + resp.UpdatedAt = &ba.UpdatedAt.Time + } + return resp +} + +func toBatchActionList(items []sched_db.BatchAction) []schedoapi.BatchAction { + result := make([]schedoapi.BatchAction, 0, len(items)) + for _, ba := range items { + result = append(result, toBatchAction(ba)) + } + return result +} diff --git a/broker/scheduler/api/api_handler_test.go b/broker/scheduler/api/api_handler_test.go new file mode 100644 index 00000000..c417ee82 --- /dev/null +++ b/broker/scheduler/api/api_handler_test.go @@ -0,0 +1,419 @@ +package api + +import ( + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/indexdata/crosslink/broker/common" + sched_db "github.com/indexdata/crosslink/broker/scheduler/db" + schedoapi "github.com/indexdata/crosslink/broker/scheduler/oapi" + "github.com/indexdata/crosslink/broker/tenant" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +// ── mock ────────────────────────────────────────────────────────────────────── + +type MockSchedRepo struct { + mock.Mock + sched_db.SchedRepo // satisfies unimplemented interface methods +} + +// WithTxFunc calls fn(mock) directly, simulating a pass-through transaction. +func (m *MockSchedRepo) WithTxFunc(ctx common.ExtendedContext, fn func(sched_db.SchedRepo) error) error { + return fn(m) +} + +func (m *MockSchedRepo) SaveScheduledTask(ctx common.ExtendedContext, params sched_db.SaveScheduledTaskParams) (sched_db.ScheduledTask, error) { + args := m.Called(params) + return args.Get(0).(sched_db.ScheduledTask), args.Error(1) +} + +func (m *MockSchedRepo) GetScheduledTaskById(ctx common.ExtendedContext, id string) (sched_db.ScheduledTask, error) { + args := m.Called(id) + return args.Get(0).(sched_db.ScheduledTask), args.Error(1) +} + +func (m *MockSchedRepo) SaveBatchAction(ctx common.ExtendedContext, params sched_db.SaveBatchActionParams) (sched_db.BatchAction, error) { + args := m.Called(params) + return args.Get(0).(sched_db.BatchAction), args.Error(1) +} + +func (m *MockSchedRepo) GetBatchActionById(ctx common.ExtendedContext, id, owner string) (sched_db.BatchAction, error) { + args := m.Called(id, owner) + return args.Get(0).(sched_db.BatchAction), args.Error(1) +} + +func (m *MockSchedRepo) DeleteBatchAction(ctx common.ExtendedContext, id, owner string) error { + args := m.Called(id, owner) + return args.Error(0) +} + +func (m *MockSchedRepo) GetBatchActions(ctx common.ExtendedContext, params sched_db.GetBatchActionsParams) ([]sched_db.BatchAction, int64, error) { + args := m.Called(params) + return args.Get(0).([]sched_db.BatchAction), args.Get(1).(int64), args.Error(2) +} + +// ── helpers ─────────────────────────────────────────────────────────────────── + +const testSymbol = "ISIL:TEST" +const validCron = "0 6 * * 1" + +func newHandler(repo sched_db.SchedRepo) SchedulerApiHandler { + return NewSchedulerApiHandler(10, repo, tenant.NewResolver()) +} + +func newReq(method, body string) *http.Request { + if body != "" { + return httptest.NewRequest(method, "/batch_actions", strings.NewReader(body)) + } + return httptest.NewRequest(method, "/batch_actions", nil) +} + +func symPtr(s string) *string { return &s } + +func batchActionFixture(id string) sched_db.BatchAction { + return sched_db.BatchAction{ + ID: id, + ActionName: "email", + Schedule: validCron, + BatchQuery: "", + Owner: testSymbol, + ScheduledTaskID: "task-" + id, + CreatedAt: pgtype.Timestamptz{Time: time.Now(), Valid: true}, + } +} + +func scheduledTaskFixture(id string) sched_db.ScheduledTask { + return sched_db.ScheduledTask{ + ID: id, + Status: sched_db.ScheduledTaskStatusPending, + } +} + +// ── GetBatchActions ─────────────────────────────────────────────────────────── + +func TestGetBatchActions_OK(t *testing.T) { + repo := new(MockSchedRepo) + items := []sched_db.BatchAction{batchActionFixture("ba-1"), batchActionFixture("ba-2")} + repo.On("GetBatchActions", mock.MatchedBy(func(p sched_db.GetBatchActionsParams) bool { + return p.Owner == testSymbol + })).Return(items, int64(2), nil) + + h := newHandler(repo) + req := newReq(http.MethodGet, "") + rr := httptest.NewRecorder() + h.GetBatchActions(rr, req, schedoapi.GetBatchActionsParams{Symbol: symPtr(testSymbol)}) + + assert.Equal(t, http.StatusOK, rr.Code) + assert.Contains(t, rr.Header().Get("Content-Type"), "application/json") + var resp schedoapi.BatchActions + assert.NoError(t, json.Unmarshal(rr.Body.Bytes(), &resp)) + assert.Equal(t, int64(2), resp.About.Count) + assert.Len(t, resp.Items, 2) + assert.Equal(t, "ba-1", resp.Items[0].Id) + repo.AssertExpectations(t) +} + +func TestGetBatchActions_EmptyList(t *testing.T) { + repo := new(MockSchedRepo) + repo.On("GetBatchActions", mock.Anything).Return([]sched_db.BatchAction{}, int64(0), nil) + + h := newHandler(repo) + req := newReq(http.MethodGet, "") + rr := httptest.NewRecorder() + h.GetBatchActions(rr, req, schedoapi.GetBatchActionsParams{Symbol: symPtr(testSymbol)}) + + assert.Equal(t, http.StatusOK, rr.Code) + var resp schedoapi.BatchActions + assert.NoError(t, json.Unmarshal(rr.Body.Bytes(), &resp)) + assert.Empty(t, resp.Items) + repo.AssertExpectations(t) +} + +func TestGetBatchActions_WithPagination(t *testing.T) { + repo := new(MockSchedRepo) + limit := int32(5) + offset := int32(10) + repo.On("GetBatchActions", sched_db.GetBatchActionsParams{ + Owner: testSymbol, + Limit: limit, + Offset: offset, + }).Return([]sched_db.BatchAction{}, int64(20), nil) + + h := newHandler(repo) + req := newReq(http.MethodGet, "") + rr := httptest.NewRecorder() + h.GetBatchActions(rr, req, schedoapi.GetBatchActionsParams{ + Symbol: symPtr(testSymbol), + Limit: &limit, + Offset: &offset, + }) + + assert.Equal(t, http.StatusOK, rr.Code) + repo.AssertExpectations(t) +} + +func TestGetBatchActions_DBError(t *testing.T) { + repo := new(MockSchedRepo) + repo.On("GetBatchActions", mock.Anything).Return([]sched_db.BatchAction{}, int64(0), errors.New("db error")) + + h := newHandler(repo) + req := newReq(http.MethodGet, "") + rr := httptest.NewRecorder() + h.GetBatchActions(rr, req, schedoapi.GetBatchActionsParams{Symbol: symPtr(testSymbol)}) + + assert.Equal(t, http.StatusInternalServerError, rr.Code) + repo.AssertExpectations(t) +} + +// ── PostBatchActions ────────────────────────────────────────────────────────── + +func TestPostBatchActions_OK(t *testing.T) { + repo := new(MockSchedRepo) + ba := batchActionFixture("ba-new") + task := scheduledTaskFixture("task-new") + + repo.On("SaveScheduledTask", mock.Anything).Return(task, nil) + repo.On("SaveBatchAction", mock.Anything).Return(ba, nil) + + h := newHandler(repo) + body := `{"actionName":"email","schedule":"` + validCron + `"}` + req := newReq(http.MethodPost, body) + rr := httptest.NewRecorder() + h.PostBatchActions(rr, req, schedoapi.PostBatchActionsParams{Symbol: symPtr(testSymbol)}) + + assert.Equal(t, http.StatusCreated, rr.Code) + assert.NotEmpty(t, rr.Header().Get("Location")) + assert.Contains(t, rr.Header().Get("Content-Type"), "application/json") + var resp schedoapi.BatchAction + assert.NoError(t, json.Unmarshal(rr.Body.Bytes(), &resp)) + assert.Equal(t, "ba-new", resp.Id) + repo.AssertExpectations(t) +} + +func TestPostBatchActions_MissingBody(t *testing.T) { + h := newHandler(new(MockSchedRepo)) + req := newReq(http.MethodPost, "") + req.Body = nil + rr := httptest.NewRecorder() + h.PostBatchActions(rr, req, schedoapi.PostBatchActionsParams{Symbol: symPtr(testSymbol)}) + + assert.Equal(t, http.StatusBadRequest, rr.Code) +} + +func TestPostBatchActions_InvalidJSON(t *testing.T) { + h := newHandler(new(MockSchedRepo)) + req := newReq(http.MethodPost, `{not-json}`) + rr := httptest.NewRecorder() + h.PostBatchActions(rr, req, schedoapi.PostBatchActionsParams{Symbol: symPtr(testSymbol)}) + + assert.Equal(t, http.StatusBadRequest, rr.Code) +} + +func TestPostBatchActions_InvalidActionName(t *testing.T) { + h := newHandler(new(MockSchedRepo)) + req := newReq(http.MethodPost, `{"actionName":"unknown","schedule":"`+validCron+`"}`) + rr := httptest.NewRecorder() + h.PostBatchActions(rr, req, schedoapi.PostBatchActionsParams{Symbol: symPtr(testSymbol)}) + + assert.Equal(t, http.StatusBadRequest, rr.Code) +} + +func TestPostBatchActions_EmptySchedule(t *testing.T) { + h := newHandler(new(MockSchedRepo)) + req := newReq(http.MethodPost, `{"actionName":"email","schedule":""}`) + rr := httptest.NewRecorder() + h.PostBatchActions(rr, req, schedoapi.PostBatchActionsParams{Symbol: symPtr(testSymbol)}) + + assert.Equal(t, http.StatusBadRequest, rr.Code) +} + +func TestPostBatchActions_InvalidCronExpression(t *testing.T) { + h := newHandler(new(MockSchedRepo)) + req := newReq(http.MethodPost, `{"actionName":"email","schedule":"not-a-cron"}`) + rr := httptest.NewRecorder() + h.PostBatchActions(rr, req, schedoapi.PostBatchActionsParams{Symbol: symPtr(testSymbol)}) + + assert.Equal(t, http.StatusBadRequest, rr.Code) +} + +func TestPostBatchActions_SaveScheduledTaskError(t *testing.T) { + repo := new(MockSchedRepo) + repo.On("SaveScheduledTask", mock.Anything).Return(sched_db.ScheduledTask{}, errors.New("db error")) + + h := newHandler(repo) + body := `{"actionName":"email","schedule":"` + validCron + `"}` + req := newReq(http.MethodPost, body) + rr := httptest.NewRecorder() + h.PostBatchActions(rr, req, schedoapi.PostBatchActionsParams{Symbol: symPtr(testSymbol)}) + + assert.Equal(t, http.StatusInternalServerError, rr.Code) + repo.AssertExpectations(t) +} + +func TestPostBatchActions_SaveBatchActionError(t *testing.T) { + repo := new(MockSchedRepo) + repo.On("SaveScheduledTask", mock.Anything).Return(scheduledTaskFixture("task-1"), nil) + repo.On("SaveBatchAction", mock.Anything).Return(sched_db.BatchAction{}, errors.New("db error")) + + h := newHandler(repo) + body := `{"actionName":"email","schedule":"` + validCron + `"}` + req := newReq(http.MethodPost, body) + rr := httptest.NewRecorder() + h.PostBatchActions(rr, req, schedoapi.PostBatchActionsParams{Symbol: symPtr(testSymbol)}) + + assert.Equal(t, http.StatusInternalServerError, rr.Code) + repo.AssertExpectations(t) +} + +// ── GetBatchActionsId ───────────────────────────────────────────────────────── + +func TestGetBatchActionsId_OK(t *testing.T) { + repo := new(MockSchedRepo) + ba := batchActionFixture("ba-1") + ba.UpdatedAt = pgtype.Timestamptz{Time: time.Now(), Valid: true} + repo.On("GetBatchActionById", "ba-1", testSymbol).Return(ba, nil) + + h := newHandler(repo) + req := newReq(http.MethodGet, "") + rr := httptest.NewRecorder() + h.GetBatchActionsId(rr, req, "ba-1", schedoapi.GetBatchActionsIdParams{Symbol: symPtr(testSymbol)}) + + assert.Equal(t, http.StatusOK, rr.Code) + assert.Contains(t, rr.Header().Get("Content-Type"), "application/json") + var resp schedoapi.BatchAction + assert.NoError(t, json.Unmarshal(rr.Body.Bytes(), &resp)) + assert.Equal(t, "ba-1", resp.Id) + assert.NotNil(t, resp.UpdatedAt) + repo.AssertExpectations(t) +} + +func TestGetBatchActionsId_NotFound(t *testing.T) { + repo := new(MockSchedRepo) + repo.On("GetBatchActionById", "missing", testSymbol).Return(sched_db.BatchAction{}, pgx.ErrNoRows) + + h := newHandler(repo) + req := newReq(http.MethodGet, "") + rr := httptest.NewRecorder() + h.GetBatchActionsId(rr, req, "missing", schedoapi.GetBatchActionsIdParams{Symbol: symPtr(testSymbol)}) + + assert.Equal(t, http.StatusNotFound, rr.Code) + repo.AssertExpectations(t) +} + +func TestGetBatchActionsId_DBError(t *testing.T) { + repo := new(MockSchedRepo) + repo.On("GetBatchActionById", "ba-err", testSymbol).Return(sched_db.BatchAction{}, errors.New("db error")) + + h := newHandler(repo) + req := newReq(http.MethodGet, "") + rr := httptest.NewRecorder() + h.GetBatchActionsId(rr, req, "ba-err", schedoapi.GetBatchActionsIdParams{Symbol: symPtr(testSymbol)}) + + assert.Equal(t, http.StatusInternalServerError, rr.Code) + repo.AssertExpectations(t) +} + +// ── DeleteBatchActionsId ───────────────────────────────────────────────────────────── + +func TestDeleteBatchActionsId_OK(t *testing.T) { + repo := new(MockSchedRepo) + ba := batchActionFixture("ba-1") + task := scheduledTaskFixture("task-ba-1") + repo.On("GetBatchActionById", "ba-1", testSymbol).Return(ba, nil) + repo.On("GetScheduledTaskById", ba.ScheduledTaskID).Return(task, nil) + repo.On("SaveScheduledTask", mock.Anything).Return(task, nil) + repo.On("DeleteBatchAction", "ba-1", testSymbol).Return(nil) + + h := newHandler(repo) + req := newReq(http.MethodDelete, "") + rr := httptest.NewRecorder() + h.DeleteBatchActionsId(rr, req, "ba-1", schedoapi.DeleteBatchActionsIdParams{Symbol: symPtr(testSymbol)}) + + assert.Equal(t, http.StatusNoContent, rr.Code) + repo.AssertExpectations(t) +} + +func TestDeleteBatchActionsId_NotFound(t *testing.T) { + repo := new(MockSchedRepo) + repo.On("GetBatchActionById", "missing", testSymbol).Return(sched_db.BatchAction{}, pgx.ErrNoRows) + + h := newHandler(repo) + req := newReq(http.MethodDelete, "") + rr := httptest.NewRecorder() + h.DeleteBatchActionsId(rr, req, "missing", schedoapi.DeleteBatchActionsIdParams{Symbol: symPtr(testSymbol)}) + + assert.Equal(t, http.StatusNotFound, rr.Code) + repo.AssertExpectations(t) +} + +func TestDeleteBatchActionsId_GetBatchActionError(t *testing.T) { + repo := new(MockSchedRepo) + repo.On("GetBatchActionById", "ba-err", testSymbol).Return(sched_db.BatchAction{}, errors.New("db error")) + + h := newHandler(repo) + req := newReq(http.MethodDelete, "") + rr := httptest.NewRecorder() + h.DeleteBatchActionsId(rr, req, "ba-err", schedoapi.DeleteBatchActionsIdParams{Symbol: symPtr(testSymbol)}) + + assert.Equal(t, http.StatusInternalServerError, rr.Code) + repo.AssertExpectations(t) +} + +func TestDeleteBatchActionsId_GetScheduledTaskError(t *testing.T) { + repo := new(MockSchedRepo) + ba := batchActionFixture("ba-1") + repo.On("GetBatchActionById", "ba-1", testSymbol).Return(ba, nil) + repo.On("GetScheduledTaskById", ba.ScheduledTaskID).Return(sched_db.ScheduledTask{}, errors.New("db error")) + + h := newHandler(repo) + req := newReq(http.MethodDelete, "") + rr := httptest.NewRecorder() + h.DeleteBatchActionsId(rr, req, "ba-1", schedoapi.DeleteBatchActionsIdParams{Symbol: symPtr(testSymbol)}) + + assert.Equal(t, http.StatusInternalServerError, rr.Code) + repo.AssertExpectations(t) +} + +func TestDeleteBatchActionsId_SaveTaskError(t *testing.T) { + repo := new(MockSchedRepo) + ba := batchActionFixture("ba-1") + task := scheduledTaskFixture("task-ba-1") + repo.On("GetBatchActionById", "ba-1", testSymbol).Return(ba, nil) + repo.On("GetScheduledTaskById", ba.ScheduledTaskID).Return(task, nil) + repo.On("SaveScheduledTask", mock.Anything).Return(sched_db.ScheduledTask{}, errors.New("db error")) + + h := newHandler(repo) + req := newReq(http.MethodDelete, "") + rr := httptest.NewRecorder() + h.DeleteBatchActionsId(rr, req, "ba-1", schedoapi.DeleteBatchActionsIdParams{Symbol: symPtr(testSymbol)}) + + assert.Equal(t, http.StatusInternalServerError, rr.Code) + repo.AssertExpectations(t) +} + +func TestDeleteBatchActionsId_DeleteError(t *testing.T) { + repo := new(MockSchedRepo) + ba := batchActionFixture("ba-1") + task := scheduledTaskFixture("task-ba-1") + repo.On("GetBatchActionById", "ba-1", testSymbol).Return(ba, nil) + repo.On("GetScheduledTaskById", ba.ScheduledTaskID).Return(task, nil) + repo.On("SaveScheduledTask", mock.Anything).Return(task, nil) + repo.On("DeleteBatchAction", "ba-1", testSymbol).Return(errors.New("db error")) + + h := newHandler(repo) + req := newReq(http.MethodDelete, "") + rr := httptest.NewRecorder() + h.DeleteBatchActionsId(rr, req, "ba-1", schedoapi.DeleteBatchActionsIdParams{Symbol: symPtr(testSymbol)}) + + assert.Equal(t, http.StatusInternalServerError, rr.Code) + repo.AssertExpectations(t) +} diff --git a/broker/scheduler/db/repo.go b/broker/scheduler/db/repo.go index ccdb59ec..a5933c57 100644 --- a/broker/scheduler/db/repo.go +++ b/broker/scheduler/db/repo.go @@ -17,6 +17,13 @@ type SchedRepo interface { ClaimNextScheduledTask(ctx common.ExtendedContext) (ScheduledTask, error) GetNextRunAt(ctx common.ExtendedContext) (pgtype.Timestamptz, error) GetStuckRunningTasks(ctx common.ExtendedContext, stuckAfter time.Duration) ([]ScheduledTask, error) + GetScheduledTaskById(ctx common.ExtendedContext, id string) (ScheduledTask, error) + + // Batch action methods. + SaveBatchAction(ctx common.ExtendedContext, params SaveBatchActionParams) (BatchAction, error) + GetBatchActionById(ctx common.ExtendedContext, id, owner string) (BatchAction, error) + DeleteBatchAction(ctx common.ExtendedContext, id, owner string) error + GetBatchActions(ctx common.ExtendedContext, params GetBatchActionsParams) ([]BatchAction, int64, error) } type PgSchedRepo struct { @@ -74,6 +81,11 @@ func (r *PgSchedRepo) GetStuckRunningTasks(ctx common.ExtendedContext, stuckAfte return tasks, nil } +func (r *PgSchedRepo) GetScheduledTaskById(ctx common.ExtendedContext, id string) (ScheduledTask, error) { + row, err := r.queries.GetScheduledTaskById(ctx, r.GetConnOrTx(), id) + return row.ScheduledTask, err +} + func pgDuration(d time.Duration) pgtype.Interval { return pgtype.Interval{Microseconds: d.Microseconds(), Valid: true} } @@ -84,3 +96,46 @@ func (r *PgSchedRepo) notify(ctx common.ExtendedContext) { ctx.Logger().Error("failed to notify scheduler channel", "channel", SchedulerChannel, "error", err) } } + +func (r *PgSchedRepo) SaveBatchAction(ctx common.ExtendedContext, params SaveBatchActionParams) (BatchAction, error) { + row, err := r.queries.SaveBatchAction(ctx, r.GetConnOrTx(), params) + return row.BatchAction, err +} + +func (r *PgSchedRepo) GetBatchActionById(ctx common.ExtendedContext, id, owner string) (BatchAction, error) { + row, err := r.queries.GetBatchActionById(ctx, r.GetConnOrTx(), GetBatchActionByIdParams{ + ID: id, + Owner: owner, + }) + return row.BatchAction, err +} + +func (r *PgSchedRepo) DeleteBatchAction(ctx common.ExtendedContext, id, owner string) error { + return r.queries.DeleteBatchAction(ctx, r.GetConnOrTx(), DeleteBatchActionParams{ + ID: id, + Owner: owner, + }) +} + +func (r *PgSchedRepo) GetBatchActions(ctx common.ExtendedContext, params GetBatchActionsParams) ([]BatchAction, int64, error) { + rows, err := r.queries.GetBatchActions(ctx, r.GetConnOrTx(), params) + var actions []BatchAction + var fullCount int64 + if err == nil { + if len(rows) > 0 { + fullCount = rows[0].FullCount + for _, r := range rows { + fullCount = r.FullCount + actions = append(actions, r.BatchAction) + } + } else { + params.Limit = 1 + params.Offset = 0 + rows, err := r.queries.GetBatchActions(ctx, r.GetConnOrTx(), params) + if err == nil && len(rows) > 0 { + fullCount = rows[0].FullCount + } + } + } + return actions, fullCount, err +} diff --git a/broker/scheduler/service/scheduler.go b/broker/scheduler/service/scheduler.go index 7dfa9e10..06a5e30d 100644 --- a/broker/scheduler/service/scheduler.go +++ b/broker/scheduler/service/scheduler.go @@ -166,7 +166,7 @@ func (s *SchedulerService) runDueTasks(ctx common.ExtendedContext) bool { // Compute and persist the task's next state. if task.CronExpr != "" { - next, cronErr := nextCronTime(task.CronExpr) + next, cronErr := NextCronTime(task.CronExpr) if cronErr != nil { ctx.Logger().Error("invalid cron expression, disabling task", "error", cronErr, "taskId", task.ID) task.Status = sched_db.ScheduledTaskStatusStopped @@ -264,7 +264,7 @@ func waitUntil(ctx common.ExtendedContext, nextRunAt pgtype.Timestamptz, notifyC // nextCronTime parses a standard 5-field cron expression and returns the next // scheduled execution time after now as a pgtype.Timestamptz. // Returns an error if the expression is invalid. -func nextCronTime(cronExpr string) (pgtype.Timestamptz, error) { +func NextCronTime(cronExpr string) (pgtype.Timestamptz, error) { parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) schedule, err := parser.Parse(cronExpr) if err != nil { @@ -289,7 +289,7 @@ func (s *SchedulerService) rescheduleLongRunningTasks(ctx common.ExtendedContext for _, task := range tasks { ctx.Logger().Info("rescheduling stuck task", "taskId", task.ID, "eventName", task.EventName) if task.CronExpr != "" { - next, err := nextCronTime(task.CronExpr) + next, err := NextCronTime(task.CronExpr) if err != nil { ctx.Logger().Error("invalid cron expression, disabling task", "error", err, "taskId", task.ID) s.disableTask(ctx, task) diff --git a/broker/scheduler/service/scheduler_test.go b/broker/scheduler/service/scheduler_test.go index 80fc2371..3a1338fb 100644 --- a/broker/scheduler/service/scheduler_test.go +++ b/broker/scheduler/service/scheduler_test.go @@ -33,6 +33,7 @@ func invalidTstz() pgtype.Timestamptz { // --------------------------------------------------------------------------- type mockSchedRepo struct { + sched_db.PgSchedRepo claimResults []sched_db.ScheduledTask claimErrors []error claimIndex int @@ -94,21 +95,21 @@ func (m *mockEventBus) CreateTask(_ string, name events.EventName, _ events.Even // --------------------------------------------------------------------------- func TestNextCronTime_ValidExpr(t *testing.T) { - ts, err := nextCronTime("* * * * *") // every minute + ts, err := NextCronTime("* * * * *") // every minute assert.NoError(t, err) assert.True(t, ts.Valid) assert.True(t, ts.Time.After(time.Now())) } func TestNextCronTime_InvalidExpr(t *testing.T) { - _, err := nextCronTime("not-a-cron") + _, err := NextCronTime("not-a-cron") assert.Error(t, err) assert.Contains(t, err.Error(), "invalid cron expression") } func TestNextCronTime_SpecificSchedule(t *testing.T) { // "0 9 * * 1" = every Monday at 09:00 — just verify it's in the future - ts, err := nextCronTime("0 9 * * 1") + ts, err := NextCronTime("0 9 * * 1") assert.NoError(t, err) assert.True(t, ts.Valid) assert.True(t, ts.Time.After(time.Now())) diff --git a/broker/sqlc/sched_query.sql b/broker/sqlc/sched_query.sql index eacce93e..4e71bdb8 100644 --- a/broker/sqlc/sched_query.sql +++ b/broker/sqlc/sched_query.sql @@ -39,3 +39,36 @@ UPDATE SKIP LOCKED ) RETURNING sqlc.embed(scheduled_task); +-- name: GetScheduledTaskById :one +SELECT sqlc.embed(scheduled_task) +FROM scheduled_task +WHERE id = $1; + +-- name: SaveBatchAction :one +INSERT INTO batch_action (id, action_name, schedule, batch_query, owner, scheduled_task_id, created_at, updated_at) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +ON CONFLICT (id) DO UPDATE + SET action_name = EXCLUDED.action_name, + schedule = EXCLUDED.schedule, + batch_query = EXCLUDED.batch_query, + owner = EXCLUDED.owner, + scheduled_task_id = EXCLUDED.scheduled_task_id, + updated_at = now() +RETURNING sqlc.embed(batch_action); + +-- name: GetBatchActionById :one +SELECT sqlc.embed(batch_action) +FROM batch_action +WHERE id = $1 AND owner = $2; + +-- name: GetBatchActions :many +SELECT sqlc.embed(batch_action), COUNT(*) OVER () as full_count +FROM batch_action +WHERE owner = $3 +ORDER BY created_at +LIMIT $1 OFFSET $2; + +-- name: DeleteBatchAction :exec +DELETE FROM batch_action +WHERE id = $1 AND owner = $2; + diff --git a/broker/sqlc/sched_schema.sql b/broker/sqlc/sched_schema.sql index 59600175..e91b5345 100644 --- a/broker/sqlc/sched_schema.sql +++ b/broker/sqlc/sched_schema.sql @@ -13,3 +13,14 @@ CREATE TABLE scheduled_task CREATE INDEX idx_scheduled_task_run_at ON scheduled_task (run_at) WHERE status = 'pending' AND run_at IS NOT NULL; +CREATE TABLE batch_action +( + id TEXT PRIMARY KEY, + action_name TEXT NOT NULL, + schedule TEXT NOT NULL, + batch_query TEXT NOT NULL, + owner TEXT NOT NULL, + scheduled_task_id TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ +); \ No newline at end of file From fe0a8b5b4f428d8042069b6c25ed2444da5bb891 Mon Sep 17 00:00:00 2001 From: Janis Saldabols Date: Wed, 20 May 2026 14:06:08 +0300 Subject: [PATCH 3/4] CROSSLINK-288 Fix copilot comments --- broker/Makefile | 2 +- broker/go.mod | 1 - broker/migrations/039_add_batch_action.up.sql | 3 +- broker/oapi/open-api.yaml | 18 ++++++++++ broker/scheduler/api/api_handler.go | 4 +++ broker/scheduler/db/repo.go | 2 +- broker/scheduler/service/scheduler.go | 2 +- broker/sqlc/sched_schema.sql | 3 +- broker/sqlc/skd_query.sql | 35 ------------------- broker/sqlc/skd_schema.sql | 15 -------- 10 files changed, 29 insertions(+), 56 deletions(-) delete mode 100644 broker/sqlc/skd_query.sql delete mode 100644 broker/sqlc/skd_schema.sql diff --git a/broker/Makefile b/broker/Makefile index 8fa0020d..92cfc9d6 100644 --- a/broker/Makefile +++ b/broker/Makefile @@ -55,7 +55,7 @@ PS_OAPI_CFG = $(PS_OAPI_DIR)/ps-cfg.yaml PS_OAPI_SPEC = $(PS_OAPI_DIR)/open-api.yaml PS_OAPI_GEN = pullslip/oapi/ps-openapi_gen.go -# Pull slip OpenAPI +# Scheduled task OpenAPI SCHED_OAPI_DIR=oapi SCHED_OAPI_CFG = $(SCHED_OAPI_DIR)/sched-cfg.yaml SCHED_OAPI_SPEC = $(SCHED_OAPI_DIR)/open-api.yaml diff --git a/broker/go.mod b/broker/go.mod index b8409e8f..1bfe0b2a 100644 --- a/broker/go.mod +++ b/broker/go.mod @@ -120,7 +120,6 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect github.com/riza-io/grpc-go v0.2.0 // indirect - github.com/robfig/cron/v3 v3.0.1 // indirect github.com/shirou/gopsutil/v4 v4.26.4 // indirect github.com/sirupsen/logrus v1.9.4 // indirect github.com/speakeasy-api/jsonpath v0.6.3 // indirect diff --git a/broker/migrations/039_add_batch_action.up.sql b/broker/migrations/039_add_batch_action.up.sql index 37dd1df3..b64a1198 100644 --- a/broker/migrations/039_add_batch_action.up.sql +++ b/broker/migrations/039_add_batch_action.up.sql @@ -7,7 +7,8 @@ CREATE TABLE batch_action owner TEXT NOT NULL, scheduled_task_id TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_at TIMESTAMPTZ + updated_at TIMESTAMPTZ, + FOREIGN KEY (scheduled_task_id) REFERENCES scheduled_task (id) ); CREATE INDEX idx_batch_action_owner ON batch_action (owner); diff --git a/broker/oapi/open-api.yaml b/broker/oapi/open-api.yaml index 97462c59..5264ed3b 100644 --- a/broker/oapi/open-api.yaml +++ b/broker/oapi/open-api.yaml @@ -1943,6 +1943,12 @@ paths: application/json: schema: $ref: '#/components/schemas/BatchActions' + '400': + description: Bad Request. Invalid query parameters. + content: + application/json: + schema: + $ref: '#/components/schemas/Error' '500': description: Internal Server Error content: @@ -2003,6 +2009,12 @@ paths: application/json: schema: $ref: '#/components/schemas/BatchAction' + '400': + description: Bad Request. Invalid query parameters. + content: + application/json: + schema: + $ref: '#/components/schemas/Error' '404': description: Batch action not found content: @@ -2031,6 +2043,12 @@ paths: responses: '204': description: Batch action deleted successfully (No Content) + '400': + description: Bad Request. Invalid query parameters. + content: + application/json: + schema: + $ref: '#/components/schemas/Error' '404': description: Batch action not found content: diff --git a/broker/scheduler/api/api_handler.go b/broker/scheduler/api/api_handler.go index 4b51b8dc..4765b2fc 100644 --- a/broker/scheduler/api/api_handler.go +++ b/broker/scheduler/api/api_handler.go @@ -92,6 +92,10 @@ func (h SchedulerApiHandler) PostBatchActions(w http.ResponseWriter, r *http.Req brokerapi.AddBadRequestError(ctx, w, errors.New("schedule must not be empty")) return } + if create.BatchQuery == "" { + brokerapi.AddBadRequestError(ctx, w, errors.New("batchQuery must not be empty")) + return + } owner, ok := h.resolveOwner(ctx, w, r, params.Symbol) if !ok { diff --git a/broker/scheduler/db/repo.go b/broker/scheduler/db/repo.go index a5933c57..b454f105 100644 --- a/broker/scheduler/db/repo.go +++ b/broker/scheduler/db/repo.go @@ -131,7 +131,7 @@ func (r *PgSchedRepo) GetBatchActions(ctx common.ExtendedContext, params GetBatc } else { params.Limit = 1 params.Offset = 0 - rows, err := r.queries.GetBatchActions(ctx, r.GetConnOrTx(), params) + rows, err = r.queries.GetBatchActions(ctx, r.GetConnOrTx(), params) if err == nil && len(rows) > 0 { fullCount = rows[0].FullCount } diff --git a/broker/scheduler/service/scheduler.go b/broker/scheduler/service/scheduler.go index 06a5e30d..dcfc9b3b 100644 --- a/broker/scheduler/service/scheduler.go +++ b/broker/scheduler/service/scheduler.go @@ -261,7 +261,7 @@ func waitUntil(ctx common.ExtendedContext, nextRunAt pgtype.Timestamptz, notifyC } } -// nextCronTime parses a standard 5-field cron expression and returns the next +// NextCronTime parses a standard 5-field cron expression and returns the next // scheduled execution time after now as a pgtype.Timestamptz. // Returns an error if the expression is invalid. func NextCronTime(cronExpr string) (pgtype.Timestamptz, error) { diff --git a/broker/sqlc/sched_schema.sql b/broker/sqlc/sched_schema.sql index e91b5345..7c89d675 100644 --- a/broker/sqlc/sched_schema.sql +++ b/broker/sqlc/sched_schema.sql @@ -22,5 +22,6 @@ CREATE TABLE batch_action owner TEXT NOT NULL, scheduled_task_id TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_at TIMESTAMPTZ + updated_at TIMESTAMPTZ, + FOREIGN KEY (scheduled_task_id) REFERENCES scheduled_task (id) ); \ No newline at end of file diff --git a/broker/sqlc/skd_query.sql b/broker/sqlc/skd_query.sql deleted file mode 100644 index 012d9b71..00000000 --- a/broker/sqlc/skd_query.sql +++ /dev/null @@ -1,35 +0,0 @@ --- name: SaveScheduledTask :one -INSERT INTO scheduled_task (id, event_name, cron_expr, payload, run_at, status, created_at, updated_at) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8) -ON CONFLICT (id) DO UPDATE - SET event_name = EXCLUDED.event_name, - cron_expr = EXCLUDED.cron_expr, - payload = EXCLUDED.payload, - run_at = EXCLUDED.run_at, - status = EXCLUDED.status, - updated_at = now() -RETURNING sqlc.embed(scheduled_task); - --- name: GetNextRunAt :one -SELECT run_at -FROM scheduled_task -WHERE status = 'pending' - AND run_at IS NOT NULL -ORDER BY run_at -LIMIT 1; - --- name: ClaimNextScheduledTask :one -UPDATE scheduled_task -SET status = 'running', - updated_at = now() -WHERE id = (SELECT id - FROM scheduled_task - WHERE status = 'pending' - AND run_at <= now() - ORDER BY run_at - LIMIT 1 - FOR -UPDATE SKIP LOCKED - ) - RETURNING sqlc.embed(scheduled_task); - diff --git a/broker/sqlc/skd_schema.sql b/broker/sqlc/skd_schema.sql deleted file mode 100644 index 2bef4760..00000000 --- a/broker/sqlc/skd_schema.sql +++ /dev/null @@ -1,15 +0,0 @@ -CREATE TABLE scheduled_task -( - id TEXT PRIMARY KEY, - event_name TEXT NOT NULL, - cron_expr TEXT NOT NULL, - payload JSONB, - run_at TIMESTAMPTZ, - status TEXT NOT NULL DEFAULT 'pending', - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_at TIMESTAMPTZ, - FOREIGN KEY (event_name) REFERENCES event_config (event_name) -); - -CREATE INDEX idx_scheduled_task_run_at ON scheduled_task (run_at) WHERE status = 'pending'; - From 9d0ae872859dc617f05d74f1badd960f0fced067 Mon Sep 17 00:00:00 2001 From: Janis Saldabols Date: Wed, 20 May 2026 15:16:03 +0300 Subject: [PATCH 4/4] CROSSLINK-288 Fix copilot comments --- broker/app/app.go | 4 ++-- broker/migrations/039_add_batch_action.down.sql | 1 + broker/scheduler/api/api_handler_test.go | 6 +++--- broker/scheduler/db/repo.go | 1 - broker/sqlc/sched_schema.sql | 3 ++- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/broker/app/app.go b/broker/app/app.go index fd34179f..fc9b7d1f 100644 --- a/broker/app/app.go +++ b/broker/app/app.go @@ -203,7 +203,7 @@ func Init(ctx context.Context) (Context, error) { } schedRepoRepo := sched_db.CreateSchedRepo(pool) - skdApiHandler := schedapi.NewSchedulerApiHandler(API_PAGE_SIZE, schedRepoRepo, tenantResolver) + schedApiHandler := schedapi.NewSchedulerApiHandler(API_PAGE_SIZE, schedRepoRepo, tenantResolver) if err = StartScheduler(ctx, schedRepoRepo, eventBus); err != nil { return Context{}, err } @@ -219,7 +219,7 @@ func Init(ctx context.Context) (Context, error) { PrApiHandler: prApiHandler, SseBroker: sseBroker, PsApiHandler: psApiHandler, - SchedApiHandler: skdApiHandler, + SchedApiHandler: schedApiHandler, }, nil } diff --git a/broker/migrations/039_add_batch_action.down.sql b/broker/migrations/039_add_batch_action.down.sql index e2118d80..d584c12b 100644 --- a/broker/migrations/039_add_batch_action.down.sql +++ b/broker/migrations/039_add_batch_action.down.sql @@ -1,3 +1,4 @@ DROP TABLE IF EXISTS batch_action; +DELETE FROM scheduled_task WHERE event_name = 'send-email'; DELETE FROM event_config WHERE event_name = 'send-email'; diff --git a/broker/scheduler/api/api_handler_test.go b/broker/scheduler/api/api_handler_test.go index c417ee82..bf62c8c1 100644 --- a/broker/scheduler/api/api_handler_test.go +++ b/broker/scheduler/api/api_handler_test.go @@ -185,7 +185,7 @@ func TestPostBatchActions_OK(t *testing.T) { repo.On("SaveBatchAction", mock.Anything).Return(ba, nil) h := newHandler(repo) - body := `{"actionName":"email","schedule":"` + validCron + `"}` + body := `{"actionName":"email","batchQuery":"title=test","schedule":"` + validCron + `"}` req := newReq(http.MethodPost, body) rr := httptest.NewRecorder() h.PostBatchActions(rr, req, schedoapi.PostBatchActionsParams{Symbol: symPtr(testSymbol)}) @@ -250,7 +250,7 @@ func TestPostBatchActions_SaveScheduledTaskError(t *testing.T) { repo.On("SaveScheduledTask", mock.Anything).Return(sched_db.ScheduledTask{}, errors.New("db error")) h := newHandler(repo) - body := `{"actionName":"email","schedule":"` + validCron + `"}` + body := `{"actionName":"email","batchQuery":"title=test","schedule":"` + validCron + `"}` req := newReq(http.MethodPost, body) rr := httptest.NewRecorder() h.PostBatchActions(rr, req, schedoapi.PostBatchActionsParams{Symbol: symPtr(testSymbol)}) @@ -265,7 +265,7 @@ func TestPostBatchActions_SaveBatchActionError(t *testing.T) { repo.On("SaveBatchAction", mock.Anything).Return(sched_db.BatchAction{}, errors.New("db error")) h := newHandler(repo) - body := `{"actionName":"email","schedule":"` + validCron + `"}` + body := `{"actionName":"email","batchQuery":"title=test","schedule":"` + validCron + `"}` req := newReq(http.MethodPost, body) rr := httptest.NewRecorder() h.PostBatchActions(rr, req, schedoapi.PostBatchActionsParams{Symbol: symPtr(testSymbol)}) diff --git a/broker/scheduler/db/repo.go b/broker/scheduler/db/repo.go index b454f105..fa058b06 100644 --- a/broker/scheduler/db/repo.go +++ b/broker/scheduler/db/repo.go @@ -125,7 +125,6 @@ func (r *PgSchedRepo) GetBatchActions(ctx common.ExtendedContext, params GetBatc if len(rows) > 0 { fullCount = rows[0].FullCount for _, r := range rows { - fullCount = r.FullCount actions = append(actions, r.BatchAction) } } else { diff --git a/broker/sqlc/sched_schema.sql b/broker/sqlc/sched_schema.sql index 7c89d675..156035d4 100644 --- a/broker/sqlc/sched_schema.sql +++ b/broker/sqlc/sched_schema.sql @@ -24,4 +24,5 @@ CREATE TABLE batch_action created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ, FOREIGN KEY (scheduled_task_id) REFERENCES scheduled_task (id) -); \ No newline at end of file +); +CREATE INDEX idx_batch_action_owner ON batch_action (owner); \ No newline at end of file