Skip to content

Commit 76e4b8f

Browse files
y4nderclaude
andauthored
FAC-130.2 refactor coverage stats handling for pipeline status queries (#328) (#330)
Coverage stats (submissionCount, totalEnrolled, commentCount, responseRate) were computed once in CreatePipeline and cached on the AnalysisPipeline entity. GetPipelineStatus read them from the entity and never refreshed, so pipelines created early in data collection reported stale numbers forever — appearing as a hard "limit" to users when more submissions arrived afterwards. Now, while a pipeline is still in AWAITING_CONFIRMATION, GetPipelineStatus recomputes coverage live and persists the fresh values (including warnings) back to the entity so what the user sees matches what will be locked in at confirmation time. After confirmation, the stored snapshot is preserved untouched — it represents the corpus that was actually analyzed and must not drift. Refactors: - Extract BuildScopeFromPipeline helper (entity -> ScopeFilter) - Extract BuildCoverageWarnings helper (reused by CreatePipeline and GetPipelineStatus) Tests: 2 new cases cover the fresh-recompute path and the snapshot-preserved path for confirmed pipelines. https://claude.ai/code/session_01AsGM2DbyriRMyLWHr7Hwdw Co-authored-by: Claude <noreply@anthropic.com>
1 parent 5053fbc commit 76e4b8f

File tree

2 files changed

+177
-55
lines changed

2 files changed

+177
-55
lines changed

src/modules/analysis/services/pipeline-orchestrator.service.spec.ts

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,80 @@ describe('PipelineOrchestratorService', () => {
677677
expect(status.stages.embeddings.status).toBe('failed');
678678
});
679679

680+
it('should recompute coverage stats for AWAITING_CONFIRMATION pipelines', async () => {
681+
// Pipeline was created with a stale snapshot (200 submissions, 205 enrolled).
682+
// By the time GetPipelineStatus is called, more submissions have arrived.
683+
const pipeline = {
684+
...basePipeline,
685+
status: PipelineStatus.AWAITING_CONFIRMATION,
686+
totalEnrolled: 205,
687+
submissionCount: 200,
688+
commentCount: 150,
689+
responseRate: 200 / 205,
690+
warnings: ['Only 200 submissions (minimum recommended: 30).'],
691+
};
692+
693+
mockFork.findOne
694+
.mockResolvedValueOnce(pipeline) // pipeline lookup
695+
.mockResolvedValueOnce(null) // sentiment run
696+
.mockResolvedValueOnce(null) // topic model run
697+
.mockResolvedValueOnce(null) // recommendation run
698+
.mockResolvedValueOnce({ updatedAt: new Date() }); // latest enrollment in ComputeCoverageStats
699+
700+
// ComputeCoverageStats flow: count submissions, count comments, find
701+
// scoped submissions for course ids, count enrollments.
702+
mockFork.count
703+
.mockResolvedValueOnce(520) // fresh submissionCount
704+
.mockResolvedValueOnce(410) // fresh commentCount
705+
.mockResolvedValueOnce(600); // fresh totalEnrolled
706+
mockFork.find.mockResolvedValueOnce([{ course: { id: 'c1' } }]);
707+
708+
const status = await service.GetPipelineStatus('p1');
709+
710+
// Fresh values, not the stale snapshot
711+
expect(status.coverage.submissionCount).toBe(520);
712+
expect(status.coverage.totalEnrolled).toBe(600);
713+
expect(status.coverage.commentCount).toBe(410);
714+
expect(status.coverage.responseRate).toBeCloseTo(520 / 600, 5);
715+
716+
// Pipeline entity was mutated with fresh values and flushed
717+
expect(pipeline.submissionCount).toBe(520);
718+
expect(pipeline.totalEnrolled).toBe(600);
719+
expect(mockFork.flush).toHaveBeenCalled();
720+
721+
// Stale "Only 200 submissions" warning is replaced with fresh warnings
722+
expect(
723+
status.warnings.some((w) => w.includes('Only 200 submissions')),
724+
).toBe(false);
725+
});
726+
727+
it('should use stored coverage snapshot for confirmed pipelines', async () => {
728+
const pipeline = {
729+
...basePipeline,
730+
status: PipelineStatus.SENTIMENT_ANALYSIS,
731+
totalEnrolled: 205,
732+
submissionCount: 200,
733+
commentCount: 150,
734+
responseRate: 200 / 205,
735+
};
736+
737+
mockFork.findOne
738+
.mockResolvedValueOnce(pipeline)
739+
.mockResolvedValueOnce(null)
740+
.mockResolvedValueOnce(null)
741+
.mockResolvedValueOnce(null);
742+
743+
// No scoped submissions → no fresh recompute; stored snapshot wins
744+
mockFork.find.mockResolvedValueOnce([]);
745+
746+
const status = await service.GetPipelineStatus('p1');
747+
748+
expect(status.coverage.submissionCount).toBe(200);
749+
expect(status.coverage.totalEnrolled).toBe(205);
750+
// ComputeCoverageStats should NOT have been invoked (no count calls)
751+
expect(mockFork.count).not.toHaveBeenCalled();
752+
});
753+
680754
it('should return sentiment gate included/excluded with completed status', async () => {
681755
const pipeline = {
682756
...basePipeline,

src/modules/analysis/services/pipeline-orchestrator.service.ts

Lines changed: 103 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -128,34 +128,7 @@ export class PipelineOrchestratorService {
128128
if (input.courseId) scope.course = input.courseId;
129129

130130
const coverage = await this.ComputeCoverageStats(fork, scope);
131-
132-
// Generate warnings
133-
const warnings: string[] = [];
134-
if (coverage.responseRate < COVERAGE_WARNINGS.MIN_RESPONSE_RATE) {
135-
warnings.push(
136-
`Response rate is ${(coverage.responseRate * 100).toFixed(1)}% (below ${COVERAGE_WARNINGS.MIN_RESPONSE_RATE * 100}% threshold).`,
137-
);
138-
}
139-
if (coverage.submissionCount < COVERAGE_WARNINGS.MIN_SUBMISSIONS) {
140-
warnings.push(
141-
`Only ${coverage.submissionCount} submissions (minimum recommended: ${COVERAGE_WARNINGS.MIN_SUBMISSIONS}).`,
142-
);
143-
}
144-
if (coverage.commentCount < COVERAGE_WARNINGS.MIN_COMMENTS) {
145-
warnings.push(
146-
`Only ${coverage.commentCount} qualitative comments (minimum recommended: ${COVERAGE_WARNINGS.MIN_COMMENTS}).`,
147-
);
148-
}
149-
if (coverage.lastEnrollmentSyncAt) {
150-
const hoursSinceSync =
151-
(Date.now() - coverage.lastEnrollmentSyncAt.getTime()) / 3_600_000;
152-
if (hoursSinceSync > COVERAGE_WARNINGS.STALE_SYNC_HOURS) {
153-
const daysStale = Math.floor(hoursSinceSync / 24);
154-
warnings.push(
155-
`Enrollment data may be stale (last synced ${daysStale} day${daysStale !== 1 ? 's' : ''} ago).`,
156-
);
157-
}
158-
}
131+
const warnings = this.BuildCoverageWarnings(coverage);
159132

160133
const pipeline = fork.create(AnalysisPipeline, {
161134
semester: fork.getReference(Semester, input.semesterId),
@@ -478,30 +451,63 @@ export class PipelineOrchestratorService {
478451
});
479452
}
480453

481-
// Compute lastEnrollmentSyncAt by scoping through courses in submission scope
482-
const scope = buildSubmissionScope(pipeline);
454+
// Coverage stats are cached on the pipeline entity at creation time. For
455+
// pipelines still awaiting confirmation, recompute on every status fetch
456+
// so the user sees the latest submission/enrollment counts before they
457+
// lock in the snapshot. After confirmation, the stored values represent
458+
// what was actually analyzed and must not drift.
459+
const scope = this.BuildScopeFromPipeline(pipeline);
460+
let totalEnrolled = pipeline.totalEnrolled;
461+
let submissionCount = pipeline.submissionCount;
462+
let commentCount = pipeline.commentCount;
463+
let responseRate = Number(pipeline.responseRate);
464+
let warnings = pipeline.warnings;
483465
let lastEnrollmentSyncAt: Date | null = null;
484-
const scopedSubs = await fork.find(
485-
QuestionnaireSubmission,
486-
{
487-
...scope,
488-
qualitativeComment: { $ne: null },
489-
},
490-
{ fields: ['course'] },
491-
);
492-
const courseIds = [
493-
...new Set(
494-
scopedSubs.map((s) => s.course?.id).filter((id): id is string => !!id),
495-
),
496-
];
497-
if (courseIds.length > 0) {
498-
const latestEnrollment = await fork.findOne(
499-
Enrollment,
500-
{ isActive: true, course: { $in: courseIds } },
501-
{ orderBy: { updatedAt: 'DESC' } },
466+
467+
if (pipeline.status === PipelineStatus.AWAITING_CONFIRMATION) {
468+
const freshCoverage = await this.ComputeCoverageStats(fork, scope);
469+
totalEnrolled = freshCoverage.totalEnrolled;
470+
submissionCount = freshCoverage.submissionCount;
471+
commentCount = freshCoverage.commentCount;
472+
responseRate = freshCoverage.responseRate;
473+
lastEnrollmentSyncAt = freshCoverage.lastEnrollmentSyncAt;
474+
warnings = this.BuildCoverageWarnings(freshCoverage);
475+
476+
// Persist refreshed snapshot so the values shown here match what will
477+
// be locked in at confirmation time.
478+
pipeline.totalEnrolled = freshCoverage.totalEnrolled;
479+
pipeline.submissionCount = freshCoverage.submissionCount;
480+
pipeline.commentCount = freshCoverage.commentCount;
481+
pipeline.responseRate = freshCoverage.responseRate;
482+
pipeline.warnings = warnings;
483+
await fork.flush();
484+
} else {
485+
// For confirmed/terminal pipelines, derive lastEnrollmentSyncAt from
486+
// courses in the original submission scope (snapshot view).
487+
const scopedSubs = await fork.find(
488+
QuestionnaireSubmission,
489+
{
490+
...scope,
491+
qualitativeComment: { $ne: null },
492+
},
493+
{ fields: ['course'] },
502494
);
503-
if (latestEnrollment) {
504-
lastEnrollmentSyncAt = latestEnrollment.updatedAt;
495+
const courseIds = [
496+
...new Set(
497+
scopedSubs
498+
.map((s) => s.course?.id)
499+
.filter((id): id is string => !!id),
500+
),
501+
];
502+
if (courseIds.length > 0) {
503+
const latestEnrollment = await fork.findOne(
504+
Enrollment,
505+
{ isActive: true, course: { $in: courseIds } },
506+
{ orderBy: { updatedAt: 'DESC' } },
507+
);
508+
if (latestEnrollment) {
509+
lastEnrollmentSyncAt = latestEnrollment.updatedAt;
510+
}
505511
}
506512
}
507513

@@ -554,10 +560,10 @@ export class PipelineOrchestratorService {
554560
course: pipeline.course?.shortname || null,
555561
},
556562
coverage: {
557-
totalEnrolled: pipeline.totalEnrolled,
558-
submissionCount: pipeline.submissionCount,
559-
commentCount: pipeline.commentCount,
560-
responseRate: Number(pipeline.responseRate),
563+
totalEnrolled,
564+
submissionCount,
565+
commentCount,
566+
responseRate,
561567
lastEnrollmentSyncAt: lastEnrollmentSyncAt?.toISOString() || null,
562568
},
563569
stages: {
@@ -585,7 +591,7 @@ export class PipelineOrchestratorService {
585591
recommendationRun,
586592
),
587593
},
588-
warnings: pipeline.warnings,
594+
warnings,
589595
errorMessage: pipeline.errorMessage ?? null,
590596
// Intent signal for future error categorization — currently equivalent to status === FAILED
591597
retryable: pipeline.status === PipelineStatus.FAILED,
@@ -636,6 +642,48 @@ export class PipelineOrchestratorService {
636642

637643
// --- Private Helpers ---
638644

645+
private BuildScopeFromPipeline(pipeline: AnalysisPipeline): ScopeFilter {
646+
const scope: ScopeFilter = { semester: pipeline.semester.id };
647+
if (pipeline.faculty) scope.faculty = pipeline.faculty.id;
648+
if (pipeline.questionnaireVersion)
649+
scope.questionnaireVersion = pipeline.questionnaireVersion.id;
650+
if (pipeline.department) scope.department = pipeline.department.id;
651+
if (pipeline.program) scope.program = pipeline.program.id;
652+
if (pipeline.campus) scope.campus = pipeline.campus.id;
653+
if (pipeline.course) scope.course = pipeline.course.id;
654+
return scope;
655+
}
656+
657+
private BuildCoverageWarnings(coverage: CoverageStats): string[] {
658+
const warnings: string[] = [];
659+
if (coverage.responseRate < COVERAGE_WARNINGS.MIN_RESPONSE_RATE) {
660+
warnings.push(
661+
`Response rate is ${(coverage.responseRate * 100).toFixed(1)}% (below ${COVERAGE_WARNINGS.MIN_RESPONSE_RATE * 100}% threshold).`,
662+
);
663+
}
664+
if (coverage.submissionCount < COVERAGE_WARNINGS.MIN_SUBMISSIONS) {
665+
warnings.push(
666+
`Only ${coverage.submissionCount} submissions (minimum recommended: ${COVERAGE_WARNINGS.MIN_SUBMISSIONS}).`,
667+
);
668+
}
669+
if (coverage.commentCount < COVERAGE_WARNINGS.MIN_COMMENTS) {
670+
warnings.push(
671+
`Only ${coverage.commentCount} qualitative comments (minimum recommended: ${COVERAGE_WARNINGS.MIN_COMMENTS}).`,
672+
);
673+
}
674+
if (coverage.lastEnrollmentSyncAt) {
675+
const hoursSinceSync =
676+
(Date.now() - coverage.lastEnrollmentSyncAt.getTime()) / 3_600_000;
677+
if (hoursSinceSync > COVERAGE_WARNINGS.STALE_SYNC_HOURS) {
678+
const daysStale = Math.floor(hoursSinceSync / 24);
679+
warnings.push(
680+
`Enrollment data may be stale (last synced ${daysStale} day${daysStale !== 1 ? 's' : ''} ago).`,
681+
);
682+
}
683+
}
684+
return warnings;
685+
}
686+
639687
private async ComputeCoverageStats(
640688
em: EntityManager,
641689
scope: ScopeFilter,

0 commit comments

Comments
 (0)