AW-8: Migrate EventEmitter2 Events to EventBusService#
Summary#
Incrementally migrate all 6 in-process EventEmitter2 event types to Redis-backed EventBusService (from AW-7 #443) across 6 rounds, then remove @nestjs/event-emitter. Each round is a separate PR — lowest-risk events first, highest-risk (job.created self-trigger) last.
Dependency: Blocked until AW-7 (#443) merges — EventBusService must exist before migration begins.
Files#
| File | Action | Rounds | Description |
|---|---|---|---|
src/task-state/task-state.service.ts | modify | 1 | Replace 4× eventEmitter.emit('task.updated') with eventBus.emit() |
src/task-state/task-state.module.ts | modify | 1 | Import EventBusModule, remove EventEmitterModule |
src/ws-gateway/ws-gateway.service.ts | modify | 1-4 | Replace 6× @OnEvent decorators with eventBus.on() in onModuleInit() |
src/ws-gateway/ws-gateway.module.ts | modify | 1, 6 | Import EventBusModule; remove EventEmitterModule in R6 |
src/metrics/metrics.cache.ts | modify | 1 | Replace @OnEvent('task.updated') with eventBus.on() |
src/metrics/metrics.module.ts | modify | 1 | Import EventBusModule, remove EventEmitterModule |
src/watchdog/watchdog.service.ts | modify | 2 | Replace eventEmitter.emit('task.stuck') with eventBus.emit() |
src/watchdog/watchdog.module.ts | modify | 2 | Import EventBusModule, remove EventEmitterModule |
src/phase-router/phase-router.service.ts | modify | 3 | Replace 2× eventEmitter.emit('artefacts.synced') with eventBus.emit() |
src/phase-router/phase-router.module.ts | modify | 3 | Import EventBusModule |
src/queue/sqlite-job-queue.ts | modify | 4-5 | R4: migrate job.completed/job.failed. R5: replace job.created self-trigger with direct processNext() call |
src/queue/queue.module.ts | modify | 4 | Import EventBusModule, remove EventEmitterModule |
package.json | modify | 6 | npm uninstall @nestjs/event-emitter |
| Unit test files (*.spec.ts) | modify | 1-6 | Update mocks: replace EventEmitter2 with EventBusService |
Steps#
- Round 1 —
task.updated: InjectEventBusServiceintoTaskStateService,WsGatewayService,MetricsCache. Replace 4 emit calls and 2@OnEventdecorators. Wrap alleventBus.emit()in try/catch. Update modules and specs. - Round 2 —
task.stuck: InjectEventBusServiceintoWatchdogService. Replace 1 emit and 1@OnEventinWsGatewayService. Update module and specs. - Round 3 —
artefacts.synced: InjectEventBusServiceintoPhaseRouterService. Replace 2 emits and 1@OnEventinWsGatewayService. Update module and specs. - Round 4 —
job.completed+job.failed: InjectEventBusServiceintoSqliteJobQueue. Replace 2 emits and 2@OnEventinWsGatewayService. Update module and specs. - Round 5 —
job.created: Replace 3×eventEmitter.emit('job.created')with directvoid this.processNext(). Also emit to Redis for future multi-instance. Remove@OnEvent('job.created')fromprocessNext(). Remove remainingEventEmitter2import fromSqliteJobQueue. - Round 6 — Cleanup: Remove
EventEmitterModule.forRoot()from all modules.npm uninstall @nestjs/event-emitter. Verify noEventEmitter2or@OnEventimports remain. Run build + test.
Verification#
npm run build && npm run testpasses after each round- WebSocket clients still receive all 6 event types via gateway broadcast
- Job queue self-trigger (
job.created→processNext()) still processes pending jobs end-to-end
Risks#
- Round 5 (
job.created) is pipeline-critical: If the self-trigger migration breaks, no new phases execute. Mitigation: directprocessNext()call is simpler/more reliable than pub/sub; test thoroughly in test environment before merging. - Dependency on AW-7: Plan is blocked until
EventBusServiceexists. If AW-7 scope changes, this plan may need revision.