AI Agents SDE Task Viewer
      • Context
      • Plan
      • Prd
  1. Home
  2. AgentSDE
  3. agent-core
  4. gh-444
  5. plan
  6. plan.md
plan.md(3.9 KB)· Apr 13, 2026· 3 min read
  • Summary
  • Files
  • Steps
  • Verification
  • Risks

AW-8: Migrate EventEmitter2 Events to EventBusService#

Summary#

Incrementally migrate all 6 in-process EventEmitter2 event types to Redis-backed EventBusService (from AW-7 #443) across 6 rounds, then remove @nestjs/event-emitter. Each round is a separate PR — lowest-risk events first, highest-risk (job.created self-trigger) last.

Dependency: Blocked until AW-7 (#443) merges — EventBusService must exist before migration begins.

Files#

FileActionRoundsDescription
src/task-state/task-state.service.tsmodify1Replace 4× eventEmitter.emit('task.updated') with eventBus.emit()
src/task-state/task-state.module.tsmodify1Import EventBusModule, remove EventEmitterModule
src/ws-gateway/ws-gateway.service.tsmodify1-4Replace 6× @OnEvent decorators with eventBus.on() in onModuleInit()
src/ws-gateway/ws-gateway.module.tsmodify1, 6Import EventBusModule; remove EventEmitterModule in R6
src/metrics/metrics.cache.tsmodify1Replace @OnEvent('task.updated') with eventBus.on()
src/metrics/metrics.module.tsmodify1Import EventBusModule, remove EventEmitterModule
src/watchdog/watchdog.service.tsmodify2Replace eventEmitter.emit('task.stuck') with eventBus.emit()
src/watchdog/watchdog.module.tsmodify2Import EventBusModule, remove EventEmitterModule
src/phase-router/phase-router.service.tsmodify3Replace 2× eventEmitter.emit('artefacts.synced') with eventBus.emit()
src/phase-router/phase-router.module.tsmodify3Import EventBusModule
src/queue/sqlite-job-queue.tsmodify4-5R4: migrate job.completed/job.failed. R5: replace job.created self-trigger with direct processNext() call
src/queue/queue.module.tsmodify4Import EventBusModule, remove EventEmitterModule
package.jsonmodify6npm uninstall @nestjs/event-emitter
Unit test files (*.spec.ts)modify1-6Update mocks: replace EventEmitter2 with EventBusService

Steps#

  1. Round 1 — task.updated: Inject EventBusService into TaskStateService, WsGatewayService, MetricsCache. Replace 4 emit calls and 2 @OnEvent decorators. Wrap all eventBus.emit() in try/catch. Update modules and specs.
  2. Round 2 — task.stuck: Inject EventBusService into WatchdogService. Replace 1 emit and 1 @OnEvent in WsGatewayService. Update module and specs.
  3. Round 3 — artefacts.synced: Inject EventBusService into PhaseRouterService. Replace 2 emits and 1 @OnEvent in WsGatewayService. Update module and specs.
  4. Round 4 — job.completed + job.failed: Inject EventBusService into SqliteJobQueue. Replace 2 emits and 2 @OnEvent in WsGatewayService. Update module and specs.
  5. Round 5 — job.created: Replace 3× eventEmitter.emit('job.created') with direct void this.processNext(). Also emit to Redis for future multi-instance. Remove @OnEvent('job.created') from processNext(). Remove remaining EventEmitter2 import from SqliteJobQueue.
  6. Round 6 — Cleanup: Remove EventEmitterModule.forRoot() from all modules. npm uninstall @nestjs/event-emitter. Verify no EventEmitter2 or @OnEvent imports remain. Run build + test.

Verification#

  • npm run build && npm run test passes after each round
  • WebSocket clients still receive all 6 event types via gateway broadcast
  • Job queue self-trigger (job.created → processNext()) still processes pending jobs end-to-end

Risks#

  • Round 5 (job.created) is pipeline-critical: If the self-trigger migration breaks, no new phases execute. Mitigation: direct processNext() call is simpler/more reliable than pub/sub; test thoroughly in test environment before merging.
  • Dependency on AW-7: Plan is blocked until EventBusService exists. If AW-7 scope changes, this plan may need revision.
ContextPrd