AW-11: Create BullMQJobQueue implementing JobQueue interface#
Summary#
Replace the SQLite-backed job queue with a BullMQ/Redis-backed implementation behind a JOB_QUEUE_DRIVER config flag. BullMQ dependencies (bullmq, ioredis, @nestjs/bullmq) are already installed from AW-2; the existing BullModule.forRootAsync pattern in InvokeModule (line 13) provides the Redis connection blueprint.
Files#
| File | Action | Description |
|---|---|---|
src/queue/bullmq-job-queue.ts | create | BullMQ implementation of JobQueue interface using Queue + Worker |
src/queue/bullmq-job-queue.spec.ts | create | Unit tests for BullMQJobQueue |
src/queue/queue.module.ts | modify | Conditional provider registration based on JOB_QUEUE_DRIVER |
src/config/config.schema.ts | modify | Add JOB_QUEUE_DRIVER Joi validation (valid: sqlite, bullmq; default: sqlite) |
Steps#
- Add
JOB_QUEUE_DRIVER: Joi.string().valid('sqlite', 'bullmq').default('sqlite')toconfig.schema.ts(line 26). - Create
src/queue/bullmq-job-queue.tsimplementingJobQueueinterface (enqueue(),onJob()). Use BullMQQueuefor enqueuing andWorkerfor consuming. InjectConfigServicefor Redis connection config (REDIS_HOST,REDIS_PORT,REDIS_DB) — follow the sameBullModule.forRootAsyncpattern frominvoke.module.ts:13-23. UseTenantConfigServicefor per-org queue name isolation. BullMQ natively handles stalled job retry — no manualonModuleInitrecovery needed. - Implement per-issue concurrency control: use BullMQ's
job.data.issueNumberas a group key to prevent concurrent processing of the same issue (matchingSqliteJobQueue's atomic claim semantics). - Update
queue.module.tsto use a factory provider: injectConfigService, readJOB_QUEUE_DRIVER, conditionally instantiateBullMQJobQueueorSqliteJobQueuefor theJOB_QUEUEtoken. ImportBullModule.forRootAsyncandBullModule.registerQueueonly when driver isbullmq. KeepSqliteJobQueueimports (TypeORM, EventEmitter) for thesqlitepath. - Create
bullmq-job-queue.spec.tswith tests: enqueue adds job to queue,onJobregisters handler, stalled job auto-retry, per-issue dedup, error handling marks job failed, andOnModuleDestroycleanup.
Verification#
npm run buildcompiles without errorsnpm run testpasses (both new and existing queue tests)npm run lintpasses with zero warnings
Risks#
- BullModule.forRootAsync duplication:
InvokeModulealready registersBullModule.forRootAsync(TODO on line 12). Registering again inQueueModulerequires either hoisting toAppModuleor ensuring NestJS deduplicates the root config. Safest: hoist to a shared import or useBullModule.forRootat app level — but keep scope to this issue, document as follow-up for AW-3. - Per-issue dedup complexity:
SqliteJobQueueuses an atomic SQL subquery for single-issue locking. BullMQ equivalent requires either a custom lock mechanism or a concurrency-1 worker with job grouping. Needs careful implementation to preserve the same semantics.