Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ Core is intentionally thin: it only defines types and contracts. IO, parsing, an
## Structure

- types/ Contracts for sources, raw inputs, and ingest interfaces
- ingest/ Ingestion strategies (flat-map, future map-reduce)
- ingest/ Markdown parsing and ingestion strategies (flat-map, map-reduce)

## Boundaries

- No IO in core
- No parsing or file formats in core
- Markdown section parsing lives in ingest/; other format adapters belong in higher-level packages
- Keep types small and composable

## Development
Expand Down
76 changes: 75 additions & 1 deletion core/bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/ingest/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
export { flatMapSections } from "./flat-map";
export { mapReduceSections } from "./map-reduce";
export { parseMarkdownSections } from "./parse-markdown";
138 changes: 138 additions & 0 deletions core/ingest/map-reduce.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import { describe, expect, test } from "bun:test";
import { mapReduceSections } from "./map-reduce";
import type { RawSection, MapFn, ReduceFn } from "../types/ingest";

const mockMap: MapFn = (heading, content) => ({
title: heading,
description: content.slice(0, 40),
keypoints: [heading],
});

const mockReduce: ReduceFn = (heading, _own, childMeta) => ({
title: heading,
description: `Rolled up ${childMeta.length} children`,
keypoints: childMeta.flatMap((m) => m.keypoints),
});

describe("mapReduceSections", () => {
test("maps flat leaf sections", async () => {
const sections: RawSection[] = [
{ heading: "A", level: 1, content: "Alpha content", children: [] },
{ heading: "B", level: 1, content: "Beta content", children: [] },
];

const result = await mapReduceSections(sections, {
map: mockMap,
reduce: mockReduce,
});

expect(result).toHaveLength(2);
expect(result[0]!.metadata.title).toBe("A");
expect(result[1]!.metadata.title).toBe("B");
expect(result[0]!.children).toHaveLength(0);
});

test("reduces parent with children bottom-up", async () => {
const sections: RawSection[] = [
{
heading: "Parent",
level: 1,
content: "Intro",
children: [
{ heading: "Child1", level: 2, content: "C1", children: [] },
{ heading: "Child2", level: 2, content: "C2", children: [] },
],
},
];

const result = await mapReduceSections(sections, {
map: mockMap,
reduce: mockReduce,
});

expect(result).toHaveLength(1);
const parent = result[0]!;
expect(parent.metadata.description).toBe("Rolled up 2 children");
expect(parent.metadata.keypoints).toEqual(["Child1", "Child2"]);
expect(parent.children).toHaveLength(2);
expect(parent.children[0]!.metadata.title).toBe("Child1");
expect(parent.children[1]!.metadata.title).toBe("Child2");
});

test("chunks large leaf sections and reduces chunk results", async () => {
const largeParagraphs = Array.from(
{ length: 5 },
(_, i) => `Paragraph ${i}: ${"x".repeat(50)}`,
).join("\n\n");

const sections: RawSection[] = [
{ heading: "Big", level: 1, content: largeParagraphs, children: [] },
];

const mapCalls: string[] = [];
const trackingMap: MapFn = (heading, content) => {
mapCalls.push(heading);
return mockMap(heading, content);
};

const result = await mapReduceSections(sections, {
map: trackingMap,
reduce: mockReduce,
maxSectionSize: 100,
});

expect(mapCalls.length).toBeGreaterThan(1);
expect(mapCalls[0]).toContain("[chunk ");
expect(result[0]!.metadata.description).toContain("Rolled up");
});

test("respects concurrency bound", async () => {
let peak = 0;
let running = 0;

const slowMap: MapFn = async (heading) => {
running++;
peak = Math.max(peak, running);
await new Promise((r) => setTimeout(r, 10));
running--;
return { title: heading, description: "", keypoints: [] };
};

const sections: RawSection[] = Array.from({ length: 6 }, (_, i) => ({
heading: `S${i}`,
level: 1,
content: `Content ${i}`,
children: [],
}));

await mapReduceSections(sections, {
map: slowMap,
reduce: mockReduce,
concurrency: 2,
});

expect(peak).toBeLessThanOrEqual(2);
});

test("handles empty content", async () => {
const sections: RawSection[] = [
{ heading: "Empty", level: 1, content: "", children: [] },
];

const result = await mapReduceSections(sections, {
map: mockMap,
reduce: mockReduce,
});

expect(result).toHaveLength(1);
expect(result[0]!.metadata.title).toBe("Empty");
});

test("empty sections array", async () => {
const result = await mapReduceSections([], {
map: mockMap,
reduce: mockReduce,
});
expect(result).toHaveLength(0);
});
});
140 changes: 140 additions & 0 deletions core/ingest/map-reduce.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import type { GeneratedMetadata } from "../types/source/base";
import type { DocumentSection } from "../types/source/complex-document";
import type { RawSection, MapReduceOptions } from "../types/ingest";

/**
* Split text into chunks on paragraph boundaries (`\n\n`).
* Each chunk stays at or under `maxSize` characters when possible.
* A single paragraph larger than `maxSize` becomes its own chunk.
*/
function chunkContent(text: string, maxSize: number): string[] {
const paragraphs = text.split(/\n\n+/);
const chunks: string[] = [];
let current = "";

for (const para of paragraphs) {
if (current.length === 0) {
current = para;
continue;
}
if (current.length + 2 + para.length <= maxSize) {
current += "\n\n" + para;
} else {
chunks.push(current);
current = para;
}
}
if (current.length > 0) chunks.push(current);
return chunks;
}

/**
* Walk a section tree bottom-up with map/reduce:
*
* - Leaf within size limit → `map(heading, content)`
* - Leaf exceeding maxSectionSize → chunk on paragraphs, map each chunk, reduce results
* - Parent with children → recurse children first, then `reduce(heading, ownContent, childMeta)`
*
* Each map/reduce call individually acquires a semaphore slot so that
* `concurrency: 1` doesn't deadlock.
*/
export async function mapReduceSections(
sections: RawSection[],
options: MapReduceOptions,
): Promise<DocumentSection[]> {
const {
map,
reduce,
maxSectionSize = Infinity,
concurrency = Infinity,
} = options;

// Lightweight semaphore
let running = 0;
const queue: (() => void)[] = [];

async function acquire() {
if (running < concurrency) {
running++;
return;
}
await new Promise<void>((resolve) => queue.push(resolve));
running++;
}

function release() {
running--;
queue.shift()?.();
}

async function guardedMap(
heading: string,
content: string,
): Promise<GeneratedMetadata> {
await acquire();
try {
return await map(heading, content);
} finally {
release();
}
}

async function guardedReduce(
heading: string,
ownContent: string,
childMeta: GeneratedMetadata[],
): Promise<GeneratedMetadata> {
await acquire();
try {
return await reduce(heading, ownContent, childMeta);
} finally {
release();
}
}

async function walk(raw: RawSection[]): Promise<DocumentSection[]> {
return Promise.all(
raw.map(async (section): Promise<DocumentSection> => {
// Recurse children first (bottom-up).
const children = await walk(section.children);

let metadata: GeneratedMetadata;

if (children.length > 0) {
// Parent: reduce over child metadata
metadata = await guardedReduce(
section.heading,
section.content,
children.map((c) => c.metadata),
);
} else if (section.content.length > maxSectionSize) {
// Leaf too large: chunk → map each → reduce chunk results
const chunks = chunkContent(section.content, maxSectionSize);
const chunkMeta = await Promise.all(
chunks.map((chunk, i) =>
guardedMap(`${section.heading} [chunk ${i + 1}]`, chunk),
),
);
metadata = await guardedReduce(
section.heading,
section.content,
chunkMeta,
);
} else {
// Leaf within size limit: map directly
metadata = await guardedMap(section.heading, section.content);
}

return {
heading: section.heading,
level: section.level,
content: section.content,
metadata,
children,
};
}),
);
}

return walk(sections);
}
Loading