AW-5: Consume phase-result queue and handle crash recovery#
Summary#
Add crash recovery to the invoke/queue layer so that phase results arriving after an agent-core restart are processed and advance the pipeline, rather than being silently dropped. This builds on AW-4 (#440) which introduces InvocationResultListener and waitForResult().
Files#
| File | Action | Description |
|---|---|---|
src/invoke/invocation-result.listener.ts | modify | Add recoverResult() for unmatched results; inject TaskStateService and InternalAdapterService |
src/invoke/invocation-result.listener.spec.ts | modify | Add tests for crash recovery: duplicate delivery, no-task-found, successful recovery |
src/queue/sqlite-job-queue.ts | modify | Smart stale job detection in onModuleInit(): check ai-done.json and ai.pid before marking failed |
src/queue/sqlite-job-queue.spec.ts | modify | Add tests for smart stale detection: live PID, existing done file, truly stale |
src/invoke/claude-invocation.service.ts | modify | Add configurable timeout to waitForResult() that rejects and cleans up pending map entry |
src/invoke/claude-invocation.service.spec.ts | modify | Add timeout expiry test for waitForResult() |
Steps#
- Add
recoverResult()toInvocationResultListener— When a phase-result arrives with no matching resolver in the pending map, callrecoverResult()instead of just logging. InjectTaskStateServiceandInternalAdapterService. Look up the task by jobId payload, read current phase/status, and if the task hasn't already advanced past the incoming phase, callInternalAdapterService.handlePhaseResult()to advance the pipeline. - Add duplicate-delivery guard in
recoverResult()— Before advancing, checktask.currentPhaseand the phase status column. If the task has already moved past the incoming phase (status iscompleteorskipped), log a warning and return early — no re-advance. - Smart stale job detection in
SqliteJobQueue.onModuleInit()— Replace the blind "mark all processing as failed" logic. For each stale processing job: (a) parsetaskDirfrom the job payload, (b) check if{taskDir}/meta/ai-done.jsonexists — if so, treat as a completed result and enqueue recovery, (c) check if{taskDir}/meta/ai.pidexists andprocess.kill(pid, 0)succeeds — if alive, leave job asprocessing, (d) otherwise mark asfailed. - Add timeout to
waitForResult()— Add asetTimeoutthat rejects the promise afterCLAUDE_TIMEOUT_SECS + 100seconds (~3,700,000 ms). On timeout, delete the entry from the pending map and reject with a descriptiveTimeoutError. - Write unit tests — Cover: (a)
recoverResult()happy path advancing pipeline, (b) duplicate delivery no-op, (c) no-task-found early return, (d) stale job with live PID stays processing, (e) stale job with done file triggers recovery, (f) truly stale job marked failed, (g)waitForResult()timeout rejection and map cleanup.
Verification#
npm run buildpasses with no type errorsnpm run testpasses — all new and existing tests greennpm run lintpasses with zero warnings
Risks#
- Dependency on AW-4 (#440): This issue assumes
InvocationResultListener,waitForResult(), and the pending resolver map exist. Must be implemented after AW-4 merges. - PID check false positive:
process.kill(pid, 0)may succeed for a recycled PID. Mitigation: combine PID check with age heuristic (joblockedAtage > timeout → treat as stale regardless).