Plan: AW-4 — Refactor ClaudeInvocationService to enqueue via BullMQ#
Summary#
Replace the direct child_process.spawn() call in ClaudeInvocationService.invoke() with BullMQ queue enqueue (phase-invoke) + result await (phase-result), decoupling agent invocation from the NestJS process. The SignalResult interface remains unchanged for all callers.
Files#
| File | Action | Description |
|---|---|---|
src/invoke/claude-invocation.service.ts | modify | Remove spawn import; replace spawn logic with queue.add() + waitForResult() + file-based signal parsing |
src/invoke/invocation-result.listener.ts | create | BullMQ processor for phase-result queue; resolves pending promises by jobId, warns on unmatched |
src/invoke/invoke.module.ts | modify | Register BullMQ queues (phase-invoke, phase-result) and InvocationResultListener |
src/invoke/claude-invocation.service.spec.ts | modify | Replace child_process.spawn mocks with BullMQ queue mocks; add timeout and missing-file tests |
Steps#
- Create
InvocationResultListener— new filesrc/invoke/invocation-result.listener.ts. BullMQ@Processor('phase-result')that maintains aMap<string, { resolve, reject }>of pending promises. ExposewaitForResult(jobId, timeoutMs): Promise<{ exitCode, stdoutLength }>. On unmatched jobId: log warning, return (no throw). On timeout: reject with a descriptive error. - Refactor
ClaudeInvocationService.invoke()— removespawnimport and all child-process logic (stdout/stderr streaming, process event handlers, setTimeout kill chain). Replace with: (a) enqueue job tophase-invokequeue with payload{ jobId, taskId, phase, skill, provider, env, taskDir, cwd, timeout, args }; (b)await resultListener.waitForResult(jobId, timeoutSecs * 1000 + 100_000); (c) read{TASK_DIR}/meta/ai-output.jsonl, concatenate text, runsignalParser.detectSignal()on the concatenated text; (d) returnSignalResultwith identical shape. Keep: task-dir creation (mkdirSync),PHASE_TO_SKILLmap,buildArgString(),formatStreamEvent()(move to utility or keep for future log replay). - Update
InvokeModule— importBullModule.registerQueue({ name: 'phase-invoke' })andBullModule.registerQueue({ name: 'phase-result' }). AddInvocationResultListenerto providers. - Update tests — replace
child_processmock with BullMQ queue mock ({ add: jest.fn() }). MockInvocationResultListener.waitForResult()to return controlled results. Add tests for: timeout propagation, missingai-output.jsonl(returns empty signal), unmatched jobId warning.
Verification#
npm run buildpasses with no type errorsnpm run testpasses — all existing signal/mapping tests still green with mocked queuesnpm run lintpasses — no new warnings
Risks#
- AW-3 dependency (#439):
BullModuleimport requires@nestjs/bullmqand Redis connection, which AW-3 provides. If AW-3 is not merged first, this PR will not compile. Mitigation: branch from AW-3 branch or use a minimal inline BullMQ stub per AGENTS.md Wave 2+ guidance. - File-based signal parsing timing: The worker writes
ai-output.jsonlbefore enqueuing the result, but a race is possible if the filesystem flush is delayed. Mitigation: the worker mustfsyncbefore enqueuing the result message.
Open Questions#
- Should
formatStreamEvent()be preserved as a utility for future log-replay, or removed entirely since real-time streaming is gone?