From e323cbd0404844ce55acf6346629f5c0f66a717a Mon Sep 17 00:00:00 2001 From: jlenon7 Date: Fri, 26 Jun 2026 10:39:21 -0300 Subject: [PATCH 1/2] feat: add workerConcurrency to queue options --- package-lock.json | 4 ++-- package.json | 2 +- src/types/ConnectionOptions.ts | 12 ++++++++++++ 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index 06c646e..5a712be 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@athenna/queue", - "version": "5.32.0", + "version": "5.33.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@athenna/queue", - "version": "5.32.0", + "version": "5.33.0", "license": "MIT", "dependencies": { "@aws-sdk/client-sqs": "^3.1019.0" diff --git a/package.json b/package.json index 23d2d73..7f99d77 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@athenna/queue", - "version": "5.32.0", + "version": "5.33.0", "description": "The Athenna queue handler.", "license": "MIT", "author": "João Lenon ", diff --git a/src/types/ConnectionOptions.ts b/src/types/ConnectionOptions.ts index e477e8a..cd29b14 100644 --- a/src/types/ConnectionOptions.ts +++ b/src/types/ConnectionOptions.ts @@ -125,5 +125,17 @@ export type ConnectionOptions = { * @default Parser.timeToMs('5m') */ workerTimeoutMs?: number + + /** + * Define how many independent consumer loops run in parallel for this + * connection. Each loop still pulls and processes ONE job at a time, so a + * value of `N` yields an effective concurrency of `N`. This is honored by + * the `@athenna/event` consumer; raise it to drain a backed-up queue faster + * without spinning up extra processes. When the option is `null`/unset it + * defaults to `0`, which falls back to a single serial loop (one-by-one). + * + * @default Config.get(`queue.connections.${connection}.workerConcurrency`, 0) + */ + workerConcurrency?: number } } From fee80894061fde7727eb2fbbd60d5e6be6492814 Mon Sep 17 00:00:00 2001 From: jlenon7 Date: Fri, 26 Jun 2026 11:04:56 -0300 Subject: [PATCH 2/2] feat: add base worker class --- src/index.ts | 1 + src/types/WorkerOptions.ts | 7 ++- src/worker/BaseWorker.ts | 65 +++++++++++++++++++++++ src/worker/WorkerTaskBuilder.ts | 25 ++++++++- templates/worker.edge | 4 +- tests/fixtures/config/queue.ts | 7 +++ tests/unit/worker/BaseWorkerTest.ts | 82 +++++++++++++++++++++++++++++ tests/unit/worker/WorkerImplTest.ts | 62 ++++++++++++++++++++++ 8 files changed, 248 insertions(+), 5 deletions(-) create mode 100644 src/worker/BaseWorker.ts create mode 100644 tests/unit/worker/BaseWorkerTest.ts diff --git a/src/index.ts b/src/index.ts index fe5f1b2..710ad11 100644 --- a/src/index.ts +++ b/src/index.ts @@ -19,6 +19,7 @@ export * from '#src/drivers/DatabaseDriver' export * from '#src/factories/ConnectionFactory' export * from '#src/facades/Queue' +export * from '#src/worker/BaseWorker' export * from '#src/worker/WorkerImpl' export * from '#src/providers/QueueProvider' export * from '#src/providers/WorkerProvider' diff --git a/src/types/WorkerOptions.ts b/src/types/WorkerOptions.ts index 4afba68..f54fcb7 100644 --- a/src/types/WorkerOptions.ts +++ b/src/types/WorkerOptions.ts @@ -16,9 +16,12 @@ export type WorkerOptions = { name?: string /** - * Define how much instances of the same worker could run in parallel. + * Define how many instances of the same worker run in parallel. Each + * instance still processes one job at a time, so a value of `N` yields an + * effective concurrency of `N`. When omitted, the worker falls back to the + * connection's `workerConcurrency` config and, if that is `0`/unset, to `1`. * - * @default 1 + * @default Config.get(`queue.connections.${connection}.workerConcurrency`, 1) */ concurrency?: number diff --git a/src/worker/BaseWorker.ts b/src/worker/BaseWorker.ts new file mode 100644 index 0000000..3879c08 --- /dev/null +++ b/src/worker/BaseWorker.ts @@ -0,0 +1,65 @@ +/** + * @athenna/queue + * + * (c) João Lenon + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +import 'reflect-metadata' + +import { Queue } from '#src/facades/Queue' +import { Annotation } from '@athenna/ioc' +import type { QueueImpl } from '#src/queue/QueueImpl' + +/** + * Base class for workers. Extend it to get a queue instance already bound to + * the worker's own connection, so you don't need to call `Queue.connection()` + * on every operation. + * + * @example + * ```ts + * @Worker() + * export class HelloWorker extends BaseWorker { + * public async handle(ctx: Context) { + * await this.queue.add({ hello: 'world' }) + * } + * } + * ``` + */ +export class BaseWorker { + /** + * Cached queue instance bound to this worker's connection. + */ + private _queue?: QueueImpl + + /** + * The queue connection name of this worker. It is resolved from the + * worker's `@Worker({ connection })` metadata, falling back to the default + * connection (`queue.default`) when the worker is not annotated. + */ + public get connection() { + const meta = Annotation.getMeta(this.constructor) + + return meta?.connection ?? Config.get('queue.default') + } + + /** + * A queue instance already bound to this worker's connection. Use it to + * enqueue or inspect jobs without calling `Queue.connection(...)` on every + * operation. + * + * @example + * ```ts + * await this.queue.add({ email: 'lenon@athenna.io' }) + * ``` + */ + public get queue() { + if (!this._queue) { + this._queue = Queue.connection(this.connection) + } + + return this._queue + } +} diff --git a/src/worker/WorkerTaskBuilder.ts b/src/worker/WorkerTaskBuilder.ts index 551c83a..62bb929 100644 --- a/src/worker/WorkerTaskBuilder.ts +++ b/src/worker/WorkerTaskBuilder.ts @@ -200,13 +200,36 @@ export class WorkerTaskBuilder { return } - const n = this.worker.concurrency ?? 1 + const n = this.resolveConcurrency() for (let i = 0; i < n; i++) { this.spawn() } } + /** + * Resolve how many worker loops to spawn. An explicit `concurrency` set on + * the worker (e.g. via `@Worker({ concurrency })`) wins, then the worker + * `options.workerConcurrency`, then the connection's `workerConcurrency` + * config. When none is a positive number it falls back to a single serial + * loop. + */ + private resolveConcurrency(): number { + const explicit = + this.worker.concurrency ?? this.worker.options?.workerConcurrency + + if (Is.Number(explicit) && explicit > 0) { + return explicit + } + + const configured = Config.get( + `queue.connections.${this.worker.connection}.workerConcurrency`, + 0 + ) + + return configured > 0 ? configured : 1 + } + /** * Use spawn to force a worker instance to run. */ diff --git a/templates/worker.edge b/templates/worker.edge index 1369eba..9fb1033 100644 --- a/templates/worker.edge +++ b/templates/worker.edge @@ -1,7 +1,7 @@ -import { Worker, type Context } from '@athenna/queue' +import { Worker, BaseWorker, type Context } from '@athenna/queue' @Worker() -export class {{ namePascal }} { +export class {{ namePascal }} extends BaseWorker { public async handle(ctx: Context) { // } diff --git a/tests/fixtures/config/queue.ts b/tests/fixtures/config/queue.ts index 8a58b7b..c086724 100644 --- a/tests/fixtures/config/queue.ts +++ b/tests/fixtures/config/queue.ts @@ -61,6 +61,13 @@ export default { workerTimeoutMs: 200 }, + memoryConcurrent: { + driver: 'memory', + queue: 'default', + deadletter: 'deadletter', + workerConcurrency: 3 + }, + aws_sqs: { driver: 'aws_sqs', type: 'standard', diff --git a/tests/unit/worker/BaseWorkerTest.ts b/tests/unit/worker/BaseWorkerTest.ts new file mode 100644 index 0000000..7bb651d --- /dev/null +++ b/tests/unit/worker/BaseWorkerTest.ts @@ -0,0 +1,82 @@ +/** + * @athenna/queue + * + * (c) João Lenon + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +import { Path } from '@athenna/common' +import { Worker, BaseWorker, QueueImpl } from '#src' +import { LoggerProvider } from '@athenna/logger' +import { QueueProvider } from '#src/providers/QueueProvider' +import { Test, BeforeEach, AfterEach, type Context } from '@athenna/test' + +@Worker({ connection: 'fake' }) +class FakeConnectionWorker extends BaseWorker { + public async handle() {} +} + +@Worker({ connection: 'memory' }) +class MemoryConnectionWorker extends BaseWorker { + public async handle() {} +} + +@Worker() +class DefaultConnectionWorker extends BaseWorker { + public async handle() {} +} + +export class BaseWorkerTest { + @BeforeEach() + public async beforeEach() { + await Config.loadAll(Path.fixtures('config')) + + new LoggerProvider().register() + new QueueProvider().register() + } + + @AfterEach() + public async afterEach() { + await new QueueProvider().shutdown() + + ioc.reconstruct() + Config.clear() + } + + @Test() + public async shouldResolveTheConnectionFromTheWorkerMetadata({ assert }: Context) { + const worker = new FakeConnectionWorker() + + assert.equal(worker.connection, 'fake') + assert.equal(worker.queue.connectionName, 'fake') + } + + @Test() + public async shouldFallBackToTheDefaultConnectionWhenNotAnnotatedWithOne({ assert }: Context) { + const worker = new DefaultConnectionWorker() + + assert.equal(worker.connection, Config.get('queue.default')) + assert.equal(worker.queue.connectionName, Config.get('queue.default')) + } + + @Test() + public async shouldExposeAReadyToUseQueueInstanceBoundToTheWorkerConnection({ assert }: Context) { + const worker = new MemoryConnectionWorker() + + assert.instanceOf(worker.queue, QueueImpl) + assert.equal(worker.queue.connectionName, 'memory') + assert.isTrue(worker.queue.isConnected()) + } + + @Test() + public async shouldCacheTheQueueInstanceBetweenAccesses({ assert }: Context) { + const worker = new FakeConnectionWorker() + + const first = worker.queue + const second = worker.queue + + assert.isTrue(first === second) + } +} diff --git a/tests/unit/worker/WorkerImplTest.ts b/tests/unit/worker/WorkerImplTest.ts index 49d1c2a..20eb215 100644 --- a/tests/unit/worker/WorkerImplTest.ts +++ b/tests/unit/worker/WorkerImplTest.ts @@ -176,6 +176,68 @@ export class WorkerImplTest { assert.deepEqual(task?.worker.connection, 'fake') } + @Test() + public async shouldSpawnASingleLoopWhenNoConcurrencyIsConfigured({ assert }: Context) { + const builder = Queue.worker() + .task() + .name('default_concurrency') + .connection('memory') + .handler(() => {}) + + builder.start() + + assert.lengthOf((builder as any).timers, 1) + + builder.stop() + } + + @Test() + public async shouldSpawnConcurrentLoopsBasedOnConnectionWorkerConcurrencyConfig({ assert }: Context) { + const builder = Queue.worker() + .task() + .name('config_concurrency') + .connection('memoryConcurrent') + .handler(() => {}) + + builder.start() + + assert.lengthOf((builder as any).timers, 3) + + builder.stop() + } + + @Test() + public async shouldSpawnConcurrentLoopsFromWorkerOptionsWorkerConcurrency({ assert }: Context) { + const builder = Queue.worker() + .task() + .name('options_concurrency') + .connection('memory') + .options({ workerConcurrency: 4 }) + .handler(() => {}) + + builder.start() + + assert.lengthOf((builder as any).timers, 4) + + builder.stop() + } + + @Test() + public async shouldLetExplicitConcurrencyOverrideTheConnectionWorkerConcurrencyConfig({ assert }: Context) { + const builder = Queue.worker() + .task() + .name('explicit_concurrency') + .connection('memoryConcurrent') + .concurrency(5) + .handler(() => {}) + + builder.start() + + assert.lengthOf((builder as any).timers, 5) + + builder.stop() + } + @Test() public async shouldBeAbleToCreateAWorkerTaskWithCustomOptions({ assert }: Context) { Queue.worker()