Skip to content

Commit 52c83be

Browse files
jigs1996RomainLanz
andauthored
feat: add .dedup() method for job deduplication (#12)
* feat: add .id() method for job deduplication * fix: remove unused assert parameter in fake_adapter test * fix: check all job states in dedup guard for fake and memory adapters * refactor: replace .id() with .dedup() API for extensible job deduplication * feat: add .dedup() API for job deduplication * test: cover dedup regressions * fix: harden dedup against concurrent races * test: add regression tests for dedup edge cases * fix: align dedup behavior across adapters and tighten dispatcher validation * test(dedup): cover active-state, concurrent, and orphan-pointer edge cases * fix: release expired active dedup rows --------- Co-authored-by: Romain Lanz <romain.lanz@pm.me>
1 parent ded2312 commit 52c83be

24 files changed

Lines changed: 2650 additions & 165 deletions

.github/workflows/checks.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ jobs:
5353
uses: actions/setup-node@v4
5454
with:
5555
node-version: 24
56-
cache: "yarn"
56+
cache: 'yarn'
5757

5858
- name: Install dependencies
5959
run: yarn

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ coverage
1515
npm-debug.log
1616
yarn-error.log
1717

18+
# OS specific
19+
.DS_Store
20+
1821
# Editors specific
1922
.fleet
2023
.idea

README.md

Lines changed: 94 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ npm install @boringnode/queue
2626
- **Priority Queues**: Process high-priority jobs first
2727
- **Bulk Dispatch**: Efficiently dispatch thousands of jobs at once
2828
- **Job Grouping**: Organize related jobs for monitoring
29+
- **Job Deduplication**: Prevent duplicate jobs with custom IDs
2930
- **Retry with Backoff**: Exponential, linear, or fixed backoff strategies
3031
- **Job Timeout**: Fail or retry jobs that exceed a time limit
3132
- **Job History**: Retain completed/failed jobs for debugging
@@ -131,6 +132,85 @@ await SendEmailJob.dispatchMany(recipients).group('newsletter-jan-2025')
131132

132133
The `groupId` is stored with job data and accessible via `job.data.groupId`.
133134

135+
## Job Deduplication
136+
137+
Prevent the same job from being pushed multiple times. Four modes, all via `.dedup()`:
138+
139+
### Simple (skip while job exists)
140+
141+
```typescript
142+
// First dispatch - job is created
143+
await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run()
144+
145+
// Second dispatch with same dedup ID - silently skipped
146+
await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run()
147+
```
148+
149+
### Throttle (skip within TTL window)
150+
151+
```typescript
152+
// Within 5s, duplicates are skipped. After 5s, a new job is created.
153+
await SendEmailJob.dispatch({ to: 'user@example.com' })
154+
.dedup({ id: 'welcome-123', ttl: '5s' })
155+
.run()
156+
```
157+
158+
### Extend (reset TTL on duplicate)
159+
160+
```typescript
161+
// Each duplicate push resets the TTL timer.
162+
await RateLimitJob.dispatch({ userId: 42 }).dedup({ id: 'rate-42', ttl: '1m', extend: true }).run()
163+
```
164+
165+
### Debounce (replace payload + reset TTL)
166+
167+
```typescript
168+
// Within the 2s window, the latest payload overwrites the previous pending job.
169+
await SaveDraftJob.dispatch({ content: 'latest draft' })
170+
.dedup({ id: 'draft-42', ttl: '2s', replace: true, extend: true })
171+
.run()
172+
```
173+
174+
### Inspecting the outcome
175+
176+
`DispatchResult` tells you what happened:
177+
178+
```typescript
179+
const { jobId, deduped } = await SaveDraftJob.dispatch({ content: '...' })
180+
.dedup({ id: 'draft-42', ttl: '2s', replace: true })
181+
.run()
182+
183+
// deduped: 'added' | 'skipped' | 'replaced' | 'extended'
184+
// jobId: the UUID of the job (the existing one when deduped)
185+
```
186+
187+
### How it works
188+
189+
- The dedup ID is automatically prefixed with the job name (`SendInvoiceJob::order-123`), so different job types can reuse the same key.
190+
- The user-supplied `id` must be ≤ 400 characters, and the combined `<jobName>::<id>` key must be ≤ 510 characters (constrained by the Knex storage column). Both limits are validated at `.dedup()` time.
191+
- `ttl` accepts a Duration (`'5s'`, `'1m'`) or milliseconds, and must be **positive** when provided. Use `0` or omit `ttl` if you want no expiry — `ttl: 0` is rejected to avoid an ambiguous "expired immediately vs no-expiry" interpretation across engines.
192+
- `extend` and `replace` **require** `ttl` — calling them without `ttl` throws.
193+
- `replace` only applies to jobs in `pending` or `delayed` state. Jobs that are active (executing) or retained in history (`completed`/`failed` with retention) are left alone; the dispatch returns `{ deduped: 'skipped' }`.
194+
- `replace` swaps the **payload only** — priority, queue, delay, groupId, and stored dedup options of the existing job are retained. To change those, use a different dedup id or wait for the TTL to expire.
195+
- `extend` resets the TTL clock but never changes the window length. The window length is fixed to the `ttl` from the first dispatch that created the dedup slot. Later dispatches that pass a different `ttl` only reset the clock; their `ttl` value is ignored. To resize the window, let the slot expire and start over with a new dispatch.
196+
- `extend` works in **all states** — even when the existing job is `active` (executing) or retained in history. Unlike `replace` (which is no-op on non-replaceable states), `extend` always refreshes the dedup TTL window. Use this when you want the dedup slot to keep blocking new dispatches for the lifetime of a long-running job.
197+
- `extend` requires the **first** dispatch to have set a `ttl`. If the slot was created without a `ttl`, later `extend` dispatches have no window to refresh and return `{ deduped: 'skipped' }` instead of `'extended'`.
198+
- `retryJob` does not touch the dedup entry — a retried job continues to occupy the dedup slot. TTL runs on wall-clock time, so long-running retries may outlive the TTL window. Use a generous TTL or no TTL if retries must stay deduped.
199+
- Atomic and race-free:
200+
- **Redis**: a single Lua script per dispatch performs the dedup-key lookup, state check (pending/delayed ZSCORE), payload swap, and TTL refresh atomically.
201+
- **Knex**: transactional `SELECT ... FOR UPDATE` + insert/update inside a transaction. A nested savepoint catches unique-constraint violations under concurrent inserts and returns `{ deduped: 'skipped' }` pointing at the winner.
202+
- **SyncAdapter**: executes inline, no dedup support.
203+
204+
### Caveats
205+
206+
- Without `.dedup()`, jobs use auto-generated UUIDs and are never deduplicated.
207+
- The **Sync adapter** ignores `.dedup()` entirely — every dispatch executes inline and `deduped` is always `undefined` on the result. Use Redis or Knex if you need real deduplication.
208+
- `.dedup()` is only available on single dispatch. `dispatchMany` / `pushManyOn` reject jobs with a `dedup` field.
209+
- Scheduled jobs (`.schedule()`) do not support dedup — each cron/interval fire is an independent dispatch.
210+
- With no `ttl`, dedup persists until the job is removed (completed/failed without retention). When retention keeps the record, re-dispatch stays blocked until the record is pruned.
211+
- With `ttl`, dedup expires after the window — a new job (new UUID) is created. The old job still runs.
212+
- Knex MySQL concurrent race: MySQL does not support partial unique indexes, so two `pushOn` calls with the same dedup id firing at the exact same instant can both succeed. Serialize at the app layer if strict guarantees are required, or use Postgres / SQLite / Redis (all of which serialize correctly via the partial unique index or Lua atomicity).
213+
134214
## Job History & Retention
135215

136216
Keep completed and failed jobs for debugging:
@@ -536,7 +616,7 @@ import * as boringqueue from '@boringnode/queue'
536616

537617
const instrumentation = new QueueInstrumentation({
538618
messagingSystem: 'boringqueue', // default
539-
executionSpanLinkMode: 'link', // or 'parent'
619+
executionSpanLinkMode: 'link', // or 'parent'
540620
})
541621

542622
instrumentation.enable()
@@ -549,19 +629,19 @@ The instrumentation patches `QueueManager.init()` to automatically inject its wr
549629

550630
The instrumentation uses standard [OTel messaging semantic conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/) where they map cleanly, plus a few queue-specific custom attributes.
551631

552-
| Attribute | Kind | Description |
553-
| ------------------------------- | ------- | ------------------------------------------ |
554-
| `messaging.system` | Semconv | `'boringqueue'` (configurable) |
555-
| `messaging.operation.name` | Semconv | `'publish'` or `'process'` |
556-
| `messaging.destination.name` | Semconv | Queue name |
557-
| `messaging.message.id` | Semconv | Job ID for single-message spans |
558-
| `messaging.batch.message_count` | Semconv | Number of jobs in a batch dispatch |
559-
| `messaging.message.retry.count` | Custom | Retry count (0-based) for a job attempt |
560-
| `messaging.job.name` | Custom | Job class name (e.g. `SendEmailJob`) |
561-
| `messaging.job.status` | Custom | `'completed'`, `'failed'`, or `'retrying'` |
562-
| `messaging.job.group_id` | Custom | Queue-specific group identifier |
563-
| `messaging.job.priority` | Custom | Queue-specific job priority |
564-
| `messaging.job.delay_ms` | Custom | Delay before the job becomes available |
632+
| Attribute | Kind | Description |
633+
| ------------------------------- | ------- | --------------------------------------------- |
634+
| `messaging.system` | Semconv | `'boringqueue'` (configurable) |
635+
| `messaging.operation.name` | Semconv | `'publish'` or `'process'` |
636+
| `messaging.destination.name` | Semconv | Queue name |
637+
| `messaging.message.id` | Semconv | Job ID for single-message spans |
638+
| `messaging.batch.message_count` | Semconv | Number of jobs in a batch dispatch |
639+
| `messaging.message.retry.count` | Custom | Retry count (0-based) for a job attempt |
640+
| `messaging.job.name` | Custom | Job class name (e.g. `SendEmailJob`) |
641+
| `messaging.job.status` | Custom | `'completed'`, `'failed'`, or `'retrying'` |
642+
| `messaging.job.group_id` | Custom | Queue-specific group identifier |
643+
| `messaging.job.priority` | Custom | Queue-specific job priority |
644+
| `messaging.job.delay_ms` | Custom | Delay before the job becomes available |
565645
| `messaging.job.queue_time_ms` | Custom | Time spent waiting in queue before processing |
566646

567647
### Trace Context Propagation

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,4 +129,4 @@
129129
"engines": {
130130
"node": ">=24.0.0"
131131
}
132-
}
132+
}

src/contracts/adapter.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type {
2+
DedupOutcome,
23
JobData,
34
JobRecord,
45
JobRetention,
@@ -7,6 +8,17 @@ import type {
78
ScheduleListOptions,
89
} from '../types/main.js'
910

11+
/**
12+
* Result of a push operation when dedup was involved.
13+
* `outcome` tells the dispatcher what happened; `jobId` is the ID of the
14+
* existing job when deduped (skipped/replaced/extended).
15+
*/
16+
export interface PushResult {
17+
outcome: DedupOutcome
18+
/** ID of the existing job when a duplicate was detected, otherwise the newly added job's id. */
19+
jobId: string
20+
}
21+
1022
/**
1123
* A job that has been acquired by a worker for processing.
1224
* Extends JobData with the timestamp when the job was acquired.
@@ -119,33 +131,37 @@ export interface Adapter {
119131
* Push a job to the default queue for immediate processing.
120132
*
121133
* @param jobData - The job data to push
134+
* @returns PushResult if jobData.dedup is set, otherwise void
122135
*/
123-
push(jobData: JobData): Promise<void>
136+
push(jobData: JobData): Promise<PushResult | void>
124137

125138
/**
126139
* Push a job to a specific queue for immediate processing.
127140
*
128141
* @param queue - The queue name to push to
129142
* @param jobData - The job data to push
143+
* @returns PushResult if jobData.dedup is set, otherwise void
130144
*/
131-
pushOn(queue: string, jobData: JobData): Promise<void>
145+
pushOn(queue: string, jobData: JobData): Promise<PushResult | void>
132146

133147
/**
134148
* Push a job to the default queue with a delay.
135149
*
136150
* @param jobData - The job data to push
137151
* @param delay - Delay in milliseconds before the job becomes available
152+
* @returns PushResult if jobData.dedup is set, otherwise void
138153
*/
139-
pushLater(jobData: JobData, delay: number): Promise<void>
154+
pushLater(jobData: JobData, delay: number): Promise<PushResult | void>
140155

141156
/**
142157
* Push a job to a specific queue with a delay.
143158
*
144159
* @param queue - The queue name to push to
145160
* @param jobData - The job data to push
146161
* @param delay - Delay in milliseconds before the job becomes available
162+
* @returns PushResult if jobData.dedup is set, otherwise void
147163
*/
148-
pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void>
164+
pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<PushResult | void>
149165

150166
/**
151167
* Push multiple jobs to the default queue for immediate processing.

0 commit comments

Comments
 (0)