AI Agents SDE Task Viewer
      • Context
      • Plan
      • Prd
  1. Home
  2. AgentSDE
  3. agent-core
  4. gh-457
  5. plan
  6. plan.md
plan.md(3.1 KB)· Apr 13, 2026· 2 min read
  • Summary
  • Files
  • Steps
  • Verification
  • Risks

AW-11: Create BullMQJobQueue implementing JobQueue interface#

Summary#

Replace the SQLite-backed job queue with a BullMQ/Redis-backed implementation behind a JOB_QUEUE_DRIVER config flag. BullMQ dependencies (bullmq, ioredis, @nestjs/bullmq) are already installed from AW-2; the existing BullModule.forRootAsync pattern in InvokeModule (line 13) provides the Redis connection blueprint.

Files#

FileActionDescription
src/queue/bullmq-job-queue.tscreateBullMQ implementation of JobQueue interface using Queue + Worker
src/queue/bullmq-job-queue.spec.tscreateUnit tests for BullMQJobQueue
src/queue/queue.module.tsmodifyConditional provider registration based on JOB_QUEUE_DRIVER
src/config/config.schema.tsmodifyAdd JOB_QUEUE_DRIVER Joi validation (valid: sqlite, bullmq; default: sqlite)

Steps#

  1. Add JOB_QUEUE_DRIVER: Joi.string().valid('sqlite', 'bullmq').default('sqlite') to config.schema.ts (line 26).
  2. Create src/queue/bullmq-job-queue.ts implementing JobQueue interface (enqueue(), onJob()). Use BullMQ Queue for enqueuing and Worker for consuming. Inject ConfigService for Redis connection config (REDIS_HOST, REDIS_PORT, REDIS_DB) — follow the same BullModule.forRootAsync pattern from invoke.module.ts:13-23. Use TenantConfigService for per-org queue name isolation. BullMQ natively handles stalled job retry — no manual onModuleInit recovery needed.
  3. Implement per-issue concurrency control: use BullMQ's job.data.issueNumber as a group key to prevent concurrent processing of the same issue (matching SqliteJobQueue's atomic claim semantics).
  4. Update queue.module.ts to use a factory provider: inject ConfigService, read JOB_QUEUE_DRIVER, conditionally instantiate BullMQJobQueue or SqliteJobQueue for the JOB_QUEUE token. Import BullModule.forRootAsync and BullModule.registerQueue only when driver is bullmq. Keep SqliteJobQueue imports (TypeORM, EventEmitter) for the sqlite path.
  5. Create bullmq-job-queue.spec.ts with tests: enqueue adds job to queue, onJob registers handler, stalled job auto-retry, per-issue dedup, error handling marks job failed, and OnModuleDestroy cleanup.

Verification#

  • npm run build compiles without errors
  • npm run test passes (both new and existing queue tests)
  • npm run lint passes with zero warnings

Risks#

  • BullModule.forRootAsync duplication: InvokeModule already registers BullModule.forRootAsync (TODO on line 12). Registering again in QueueModule requires either hoisting to AppModule or ensuring NestJS deduplicates the root config. Safest: hoist to a shared import or use BullModule.forRoot at app level — but keep scope to this issue, document as follow-up for AW-3.
  • Per-issue dedup complexity: SqliteJobQueue uses an atomic SQL subquery for single-issue locking. BullMQ equivalent requires either a custom lock mechanism or a concurrency-1 worker with job grouping. Needs careful implementation to preserve the same semantics.
ContextPrd