Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions packages/ingest-github/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
} from "./strategies/flat-folder/prompts/file-analysis.ts";
import type { PullFactory, SourceFactory } from "./types/pipeline.ts";
import type { ProgressContextFactory } from "./progress/types.ts";
import { nullProgressContextFactory } from "./progress/NullProgressReporter.ts";
import { dbProgressContextFactory } from "./progress/DbProgressReporter.ts";

/**
* Optional dependencies for the GitHub workers. Factories are documented in
Expand Down Expand Up @@ -47,7 +47,7 @@ function buildRunner(
}

export function registerGithubWorkers(deps: RegisterGithubWorkersDeps = {}): void {
const progressContextFactory = deps.progressContextFactory ?? nullProgressContextFactory;
const progressContextFactory = deps.progressContextFactory ?? dbProgressContextFactory;
const runner = buildRunner(deps.sourceFactory, progressContextFactory);
// `registerWorker` expects `Promise<void>`; the handler now returns
// `Promise<PipelineSummary>` so the enterprise queue bridge can mirror
Expand All @@ -65,7 +65,7 @@ export function registerGithubWorkers(deps: RegisterGithubWorkersDeps = {}): voi
}

export function registerLocalIngestWorker(): void {
const runner = buildRunner(undefined, nullProgressContextFactory);
const runner = buildRunner(undefined, dbProgressContextFactory);
const localHandler = createLocalIngestHandler({ runner });
registerWorker(JobType.LocalIngest, async (msg) => {
await localHandler(msg);
Expand Down Expand Up @@ -133,3 +133,4 @@ export type {
ProgressTotalMode,
} from "./progress/types.ts";
export { nullProgressContextFactory } from "./progress/NullProgressReporter.ts";
export { dbProgressContextFactory } from "./progress/DbProgressReporter.ts";
65 changes: 65 additions & 0 deletions packages/ingest-github/src/progress/DbProgressReporter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { knowledgeDb } from "@bb/db";
import type {
ProgressContext,
ProgressContextFactory,
ProgressPhase,
ProgressReporter,
ProgressReporterInput,
} from "./types.ts";

class DbProgressContext implements ProgressContext {
private total = 0;
private processed = 0;
private lastUpdate = 0;

constructor(private knowledgeId: string) {}

reporter(input: ProgressReporterInput): ProgressReporter {
const isFileAnalysis =
input.phase === "file_analysis" &&
(input.subPhase === "analyse_small" || input.subPhase === "big_files_condense");

return {
start: async () => {
if (isFileAnalysis && input.total.kind === "fixed") {
this.total += input.total.total;
await knowledgeDb.updateKnowledgeProgress(this.knowledgeId, this.processed, this.total);
}
},
increment: (delta = 1) => {
if (isFileAnalysis) {
this.processed += delta;
const now = Date.now();
if (now - this.lastUpdate > 250 || this.processed >= this.total) {
this.lastUpdate = now;
knowledgeDb.updateKnowledgeProgress(this.knowledgeId, this.processed, this.total).catch(() => {});
}
}
},
incrementSeen: () => {},
setTotal: (total) => {
if (isFileAnalysis) {
this.total = total;
knowledgeDb.updateKnowledgeProgress(this.knowledgeId, this.processed, this.total).catch(() => {});
}
},
stop: () => {},
};
}

phaseChanged(phase: ProgressPhase) {
if (phase === "clone" || phase === "scan") {
knowledgeDb.updateKnowledgeProgress(this.knowledgeId, 0, undefined).catch(() => {});
}
}

completed() {
knowledgeDb.updateKnowledgeProgress(this.knowledgeId, this.total, this.total).catch(() => {});
}

failed() {}
}

export const dbProgressContextFactory: ProgressContextFactory = (knowledgeId: string) => {
return new DbProgressContext(knowledgeId);
};
4 changes: 2 additions & 2 deletions packages/server/src/reposRoute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export function buildReposRoute(): Router {
state: e.status.state,
createdAt: e.createdAt instanceof Date ? e.createdAt.toISOString() : new Date(e.createdAt).toISOString(),
updatedAt: e.updatedAt instanceof Date ? e.updatedAt.toISOString() : new Date(e.updatedAt).toISOString(),
fileCount: e.fileCount,
fileCount: e.status.totalFiles ?? e.fileCount,
}));
res.status(200).json({ repos });
});
Expand Down Expand Up @@ -50,7 +50,7 @@ export function buildReposRoute(): Router {
entry.createdAt instanceof Date ? entry.createdAt.toISOString() : new Date(entry.createdAt).toISOString(),
updatedAt:
entry.updatedAt instanceof Date ? entry.updatedAt.toISOString() : new Date(entry.updatedAt).toISOString(),
fileCount: entry.fileCount,
fileCount: entry.status.totalFiles ?? entry.fileCount,
totalFiles: entry.status.totalFiles,
processedFiles: entry.status.processedFiles,
});
Expand Down
Loading