Skip to content

[NEM-363] Add built-context indexing flow#114

Open
Mateus-Cordeiro wants to merge 8 commits intomainfrom
NEM-363
Open

[NEM-363] Add built-context indexing flow#114
Mateus-Cordeiro wants to merge 8 commits intomainfrom
NEM-363

Conversation

@Mateus-Cordeiro
Copy link
Collaborator

This PR introduces a full "index built contexts" workflow. After the datasource context is built, users can now run index to read the generated context files from output/, generate embeddings and persist into DuckDB.

Changes

  • New dce index command
    • Added a new index command that indexes built context files into DuckDB.
    • Supports optional filtering.
  • Added DatabaoContextProjectManager.index_built_contexts(...)
  • Added BuildService.index_built_context(...)
    • Parses the YAML stored in the context file
    • Reconstructs the wrapper (BuildDatasourceContext) and deserializes the context into the plugin's expected context_type
  • Persistence: override support
    • PersistenceService.write_chunks_and_embeddings(..., override=True) now deletes old embeddings and chunks for a datasource before inserting new ones.
    • Deletion happens outside the transaction due to DuckDB foreign key limitations.
  • Repositories: delete by datasource_id
    • ChunkRepository and EmbeddingRepository now support deletion by datasource_id
  • Plugins now expose context_type
    • Indexing reads context files from YAML. After yaml.safe_load(), the payload is always in Python primitives, but the chunkers are intentionally written against typed context objects. Because of that, each plugin now declares context_type to tell the indexing pipeline what type to reconstruct before calling the chunking operation.
  • New dependency: cattrs
    • cattrs provides structured conversion from unstructured data into python types, which fits our needs and avoids boilerplate deserialize methods that may be tough to maintain as the project grows.

@Mateus-Cordeiro Mateus-Cordeiro changed the title Add built-context indexing flow [NEM-363] Add built-context indexing flow Feb 5, 2026
Copy link
Collaborator

@JulienArzul JulienArzul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that'd be nice to use Pydantic to create the context type from the plugin rather than adding a new library (cattrs) that does the same thing

Looks good otherwise 🚀

"""Summary of an indexing run over built contexts."""

total: int
indexed: int
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of int for each of these properties, should we return a list of DatasourceId?
We can still keep the indexed as a calculated property for quick access if we want

@dataclass
class IndexSummary:
    """Summary of an indexing run over built contexts."""

    indexed: set[DatasourceId]
    skipped: set[DatasourceId]
    failed: set[DatasourceId]

    @property
    def number_indexed() -> int:
        return len(indexed)
    
    ...

At the very least, I think we should be able to know which datasource failed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is already being logged in the exception catch.

import logging
from datetime import datetime

import cattrs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're already using Pydantic in the project, which in my understanding does the same thing. It would be better IMO to not bring in an other library and have two different ways of creating classes from YAML

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I wasn't aware that Pydantic could do this work for non-pydantic models, and I wanted to avoid forcing users to use Pydantic. I will test without the new library and, if it fits, I'll remove the added dependency.


converter = cattrs.Converter()
converter.register_structure_hook(datetime, lambda v, _: v)
build_datasource_context = converter.structure(raw_context, BuiltDatasourceContext)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ What is used for the context: Any inside this class? A dictionary?

We could potentially skip this step to avoid the weird object of build_datasource_context with the wrong content for "context".

That would mean reading the attributes from BuiltDatasourceContext directly in the raw dictionary:

typed_context = converter.structure(raw_context.get("context", {}), context_type)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean here. I'm doing this deliberately so that I can use the other items from the BuiltDatasourceContext object (datasource_type, datasource_id and the context itself)

if Path(context_file_name).suffix not in DatasourceId.ALLOWED_YAML_SUFFIXES:
if (
Path(context_file_name).suffix not in DatasourceId.ALLOWED_YAML_SUFFIXES
or context_file_name == "all_results.yaml"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 well spotted

if not chunk_embeddings:
raise ValueError("chunk_embeddings must be a non-empty list")

# Outside the transaction due to duckdb limitations.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's annoying...

But I guess since we're in a local context, there shouldn't be any concurrency on the DB and we can probably live with it. The only potential problem would be is something fails within the following transaction: we deleted all previously existing contexts but we didn't add new ones, which is not great but is something that we can deal with as long as we notify the user that this datasource failed to be indexed

try:
logger.info(f"Indexing datasource {context.datasource_id}")

datasource_type = read_datasource_type_from_context_file(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Since you're getting a DatasourceContext as input, you have already read the context as a string. So we don't really need to re-read from the file system anymore (which is what this function does)

We could either:

  • replicate what that function does (finding the line with the type attribute and parsing that one only)
  • or simply parse the full YAML string since you'll do it afterwards anyway

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call!

datasource_ids: list[DatasourceId] | None = None,
chunk_embedding_mode: ChunkEmbeddingMode = ChunkEmbeddingMode.EMBEDDABLE_TEXT_ONLY,
) -> IndexSummary:
"""Index built datasource contexts into duckdb.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'm not sure we should we mention DuckDB in externally facing docs?

The summary of the index operation.
"""
engine: DatabaoContextEngine = self.get_engine_for_project()
contexts: list[DatasourceContext] = engine.get_all_contexts()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improvement for an other PR: we probably should have an API in the engine to get only the datasources from a list

Right now, we only have:

  • get one datasource context
  • get all datasource context

We should add:

  • get multiple datasource contexts

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, we could add that. I considered implementing it for this PR, but I'm not sure it actually saves too much IO, but it might, specially if we have contexts that are too big.


if datasource_ids is not None:
wanted_paths = {d.datasource_path for d in datasource_ids}
contexts = [c for c in contexts if c.datasource_id.datasource_path in wanted_paths]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't you be able to simply check if c.datasource_id in datasource_ids?

c2 = DatasourceContext(DatasourceId.from_string_repr("other/b.yaml"), context="B")

engine = mocker.Mock()
engine.get_all_contexts.return_value = [c1, c2]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it would be interesting to make this a full end-2-end test rather than testing the very small amount of code within the ProjectManager that's only filtering which datasource context to use
(I think all other tests in this class are end2end tests since this is the entry point)

There is already a helper function called given_output_dir_with_built_contexts that can create the contexts for you in the output folder so it shouldn't be hard code-wise

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants