Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion packages/data_designer_nemo/src/data_designer_nemo/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)
from data_designer_nemo.errors import NDDError
from data_designer_nemo.fileset_file_seed_reader import FilesetFileSeedReader
from data_designer_nemo.fileset_filesystem_provider import FilesetFileSystemProvider
from data_designer_nemo.model_provider import (
make_local_first_model_provider_registry,
make_model_provider_registry,
Expand Down Expand Up @@ -113,6 +114,7 @@ class RemoteDataDesignerContext:
def __init__(self, sdk: AsyncNeMoPlatform | NeMoPlatform, workspace: str):
self._sdk = sdk
self._workspace = workspace
self._validated_filesystem_roots: set[str] = set()

def get_secret_resolver(self) -> SecretResolver:
return NMPSecretResolver(self._sdk, self._workspace)
Expand All @@ -130,7 +132,8 @@ async def validate(self, config: dd.DataDesignerConfig) -> list[NDDError]:
except NDDError as e:
errors.append(e)
try:
await validate_seed(config, self._workspace, sdk)
if validated_root := await validate_seed(config, self._workspace, sdk):
self._validated_filesystem_roots.add(validated_root)
except NDDError as e:
errors.append(e)
try:
Expand All @@ -141,9 +144,16 @@ async def validate(self, config: dd.DataDesignerConfig) -> list[NDDError]:
return errors

def get_seed_readers(self) -> list[SeedReader]:
provider = FilesetFileSystemProvider(
self._sdk,
workspace=self._workspace,
validated_roots=self._validated_filesystem_roots,
)
return [
HuggingFaceSeedReader(),
FilesetFileSeedReader(self._sdk),
DirectorySeedReader(fs_provider=provider),
FileContentsSeedReader(fs_provider=provider),
]

def get_person_reader(self) -> PersonReader:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from pathlib import PurePosixPath

from data_designer.engine.resources.seed_reader import (
SeedReaderConfigError,
SeedReaderError,
SeedReaderFileSystemContext,
)
from data_designer_nemo.sdk_translation import async_to_sync_sdk
from fsspec.implementations.dirfs import DirFileSystem
from nemo_platform import AsyncNeMoPlatform, NeMoPlatform
from nemo_platform.filesets import FilesetFileSystem, FilesetPathError, build_fileset_ref, parse_fileset_ref


class _FilesetDirFileSystem(DirFileSystem):
"""DirFileSystem that handles FilesetFileSystem's '#' path separator.

FilesetFileSystem returns paths using '#' to separate the fileset name from
the file path (e.g. "ws/fs#data.parquet"). Standard DirFileSystem._relpath
builds its strip-prefix with '/' (e.g. "ws/fs/"), so the startswith check
fails for fileset-root paths. For subdirectory roots (e.g. "ws/fs#subdir"),
files use '/' after '#' and the standard logic already works; the '#' branch
below is a no-op in that case.

All methods besides _relpath are inherited from DirFileSystem unchanged, so
this remains a complete AbstractFileSystem implementation.
"""

def _relpath(self, path: str | list) -> str | list: # type: ignore[override]
if isinstance(path, list):
return [self._relpath(p) for p in path]
if not self.path:
return path
if path == self.path:
return ""
for sep in ("#", "/"):
prefix = self.path + sep
if path.startswith(prefix):
return path[len(prefix) :]
raise AssertionError(f"Path {path!r} does not start with root {self.path!r}")


class FilesetFileSystemProvider:
"""Filesystem provider that roots directory-style seed readers in a fileset."""

def __init__(
self,
sdk: NeMoPlatform | AsyncNeMoPlatform,
*,
workspace: str,
validated_roots: set[str] | None = None,
) -> None:
if isinstance(sdk, AsyncNeMoPlatform):
sdk = async_to_sync_sdk(sdk)
self._sdk = sdk
self._workspace = workspace
self._validated_roots = set() if validated_roots is None else validated_roots

def create_context(self, *, runtime_path: str) -> SeedReaderFileSystemContext:
root = self._canonical_root(runtime_path)
rooted_fs = _FilesetDirFileSystem(path=root, fs=FilesetFileSystem(self._sdk))
return SeedReaderFileSystemContext(fs=rooted_fs, root_path=PurePosixPath(root))

def ensure_root_exists(self, *, runtime_path: str) -> None:
workspace, fileset, fragment = self._parse(runtime_path)
root = build_fileset_ref(fragment, workspace=workspace, fileset=fileset)
if root in self._validated_roots:
return

fs = FilesetFileSystem(self._sdk)
if fs.exists(root):
self._validated_roots.add(root)
return

fileset_root = build_fileset_ref("", workspace=workspace, fileset=fileset)
fully_qualified_fileset_name = f"{workspace}/{fileset}"
if not fs.exists(fileset_root):
raise SeedReaderConfigError(f"🛑 Fileset {fully_qualified_fileset_name!r} not found.")
raise SeedReaderConfigError(f"🛑 Path {fragment!r} not found in fileset {fully_qualified_fileset_name!r}.")

def _canonical_root(self, runtime_path: str) -> str:
workspace, fileset, fragment = self._parse(runtime_path)
return build_fileset_ref(fragment, workspace=workspace, fileset=fileset)

def _parse(self, runtime_path: str) -> tuple[str, str, str]:
try:
return parse_fileset_ref(runtime_path, workspace_fallback=self._workspace)
except FilesetPathError as error:
raise SeedReaderError(f"🛑 Invalid fileset seed source path {runtime_path!r}: {error}") from error
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def make_null_registry() -> ModelProviderRegistry:
# is semantically valid. The library requires a non-empty ModelProviderRegistry, so in this scenario
# we can provide this dummy null registry.
return ModelProviderRegistry(
default=_NO_OP,
providers=[make_noop_provider()],
)

Expand All @@ -74,12 +73,6 @@ async def make_local_first_model_provider_registry(
if len(model_configs) == 0:
return None

missing_providers = [model_config for model_config in model_configs if model_config.provider is None]
if len(missing_providers) > 0:
raise NDDInvalidConfigError(
f"Error: following model configs do not have an explicit provider defined: {missing_providers}"
)

logger.info("Building model provider registry. First checking locally-defined providers.")

local_registry = _make_local_model_provider_registry()
Expand Down
81 changes: 54 additions & 27 deletions packages/data_designer_nemo/src/data_designer_nemo/seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from data_designer_nemo.fileset_file_seed_source import FilesetFileSeedSource
from data_designer_nemo.secret_resolver import validate_secret
from nemo_platform import AsyncNeMoPlatform, NotFoundError, PermissionDeniedError
from nemo_platform.filesets import FilesetPathError, build_fileset_ref, parse_fileset_ref

logger = logging.getLogger(__name__)

Expand All @@ -17,37 +18,63 @@ def get_seed_source(dd_config: dd.DataDesignerConfig) -> SeedSource | None:
return dd_config.seed_config.source if dd_config.seed_config else None


async def validate_seed(dd_config: dd.DataDesignerConfig, workspace: str, sdk: AsyncNeMoPlatform) -> None:
async def validate_seed(dd_config: dd.DataDesignerConfig, workspace: str, sdk: AsyncNeMoPlatform) -> str | None:
if (seed_source := get_seed_source(dd_config)) is None:
return None

if isinstance(seed_source, dd.HuggingFaceSeedSource) and (token := seed_source.token) is not None:
await validate_secret(sdk, token, workspace)
return None

if isinstance(seed_source, FilesetFileSeedSource):
workspace, fileset_name = _parse_seed_source_path(seed_source.path, workspace)
try:
await sdk.files.filesets.retrieve(name=fileset_name, workspace=workspace)
except NotFoundError as e:
raise NDDInvalidConfigError(f"Could not find fileset {fileset_name!r} in workspace {workspace!r}") from e
except PermissionDeniedError as e:
raise NDDInvalidConfigError(f"Access denied to workspace {workspace!r}") from e
except Exception as e:
logger.exception("Error retrieving fileset", extra={"fileset_name": fileset_name, "workspace": workspace})
raise NDDInternalError(
f"An unexpected error occurred while retrieving fileset {fileset_name!r} in workspace {workspace!r}: {e}"
) from e


def _parse_seed_source_path(path: str, request_workspace: str) -> tuple[str, str]:
provided_fileset = path.split("#")[0]
match provided_fileset.split("/"):
case [name]:
return request_workspace, name
case [workspace, name]:
return workspace, name
case _:
raise NDDInvalidConfigError(
f"The fileset reference {provided_fileset!r} in seed source path is formatted incorrectly"
)
if isinstance(seed_source, FilesetFileSeedSource | dd.DirectorySeedSource | dd.FileContentsSeedSource):
return await _validate_seed_from_files_service(seed_source, workspace, sdk)


async def _validate_seed_from_files_service(
seed_source: FilesetFileSeedSource | dd.DirectorySeedSource | dd.FileContentsSeedSource,
workspace: str,
sdk: AsyncNeMoPlatform,
) -> str | None:
try:
workspace, fileset_name, fragment = parse_fileset_ref(seed_source.path, workspace_fallback=workspace)
except FilesetPathError as e:
raise NDDInvalidConfigError(
f"The fileset reference in seed source path {seed_source.path!r} is formatted incorrectly"
) from e

try:
await sdk.files.filesets.retrieve(name=fileset_name, workspace=workspace)
except NotFoundError as e:
raise NDDInvalidConfigError(f"Could not find fileset {fileset_name!r} in workspace {workspace!r}") from e
except PermissionDeniedError as e:
raise NDDInvalidConfigError(f"Access denied to workspace {workspace!r}") from e
except Exception as e:
logger.exception("Error retrieving fileset", extra={"fileset_name": fileset_name, "workspace": workspace})
raise NDDInternalError(
f"An unexpected error occurred while retrieving fileset {fileset_name!r} in workspace {workspace!r}: {e}"
) from e

canonical_root = build_fileset_ref(fragment, workspace=workspace, fileset=fileset_name)
if not fragment:
return canonical_root

fully_qualified_fileset_name = f"{workspace}/{fileset_name}"
try:
response = await sdk.files.list(remote_path=fragment, fileset=fileset_name, workspace=workspace)
except NotFoundError as e:
raise NDDInvalidConfigError(f"Path {fragment!r} not found in fileset {fully_qualified_fileset_name!r}") from e
except PermissionDeniedError as e:
raise NDDInvalidConfigError(f"Access denied to workspace {workspace!r}") from e
except Exception as e:
logger.exception(
"Error listing fileset path",
extra={"fileset_name": fileset_name, "workspace": workspace, "fragment": fragment},
)
raise NDDInternalError(
f"An unexpected error occurred while listing path {fragment!r} in fileset {fully_qualified_fileset_name!r}: {e}"
) from e

if not response.data:
raise NDDInvalidConfigError(f"Path {fragment!r} not found in fileset {fully_qualified_fileset_name!r}")

return canonical_root
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
import data_designer.config as dd
from data_designer_nemo.errors import NDDInvalidConfigError

_SUPPORTED_SEED_TYPES = {"hf", "nmp"}
_SUPPORTED_SEED_TYPES = {"directory", "file_contents", "hf", "nmp"}
_UNSUPPORTED_SEED_TYPES_MESSAGE = (
"The NeMo Platform Data Designer service only supports seed data from HuggingFace "
"(seed_type=hf) or the Files service (seed_type=nmp)."
"or the NeMo Platform Files service (FilesetFile, Directory, or FileContents seed sources "
"referencing fileset paths). Upload your data to the Files service, adjust your config, and try again."
)
_DATAFRAME_SEED_TYPE = "df"
_DATAFRAME_SEED_TYPE_MESSAGE = (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from unittest.mock import Mock, patch

import pytest
from data_designer.engine.resources.seed_reader import SeedReaderConfigError
from data_designer_nemo.fileset_filesystem_provider import FilesetFileSystemProvider


def test_create_context_roots_reader_in_canonical_fileset_ref() -> None:
sdk = Mock()

with patch("data_designer_nemo.fileset_filesystem_provider.FilesetFileSystem") as fs_class:
fs_class.return_value.async_impl = True
fs_class.return_value.asynchronous = False
context = FilesetFileSystemProvider(sdk, workspace="default").create_context(runtime_path="docs#corpus")

fs_class.assert_called_once_with(sdk)
assert str(context.root_path) == "default/docs#corpus"


def test_ensure_root_exists_skips_validated_roots() -> None:
sdk = Mock()

with patch("data_designer_nemo.fileset_filesystem_provider.FilesetFileSystem") as fs_class:
FilesetFileSystemProvider(
sdk,
workspace="default",
validated_roots={"default/docs#corpus"},
).ensure_root_exists(runtime_path="docs#corpus")

fs_class.assert_not_called()


def test_ensure_root_exists_reports_missing_fileset_path() -> None:
sdk = Mock()

with patch("data_designer_nemo.fileset_filesystem_provider.FilesetFileSystem") as fs_class:
fs_class.return_value.exists.side_effect = [False, True]
provider = FilesetFileSystemProvider(sdk, workspace="default")

with pytest.raises(SeedReaderConfigError, match="Path 'corpus' not found in fileset 'default/docs'"):
provider.ensure_root_exists(runtime_path="docs#corpus")

assert fs_class.return_value.exists.call_count == 2
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def _make_model_config(alias: str) -> dd.ModelConfig:
return dd.ModelConfig(
alias=alias,
model="nvidia/nemotron-3",
provider="default/nvidia",
)


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

from typing import Any
from unittest.mock import AsyncMock, Mock

import data_designer.config as dd
import pytest
from data_designer.engine.resources.seed_reader import DirectorySeedReader, FileContentsSeedReader
from data_designer_nemo.context import RemoteDataDesignerContext
from data_designer_nemo.seed import validate_seed
from data_designer_nemo.unsupported_features import validate_seed_config_for_execution_context
from nemo_platform import AsyncNeMoPlatform


def _make_config(source: Any) -> dd.DataDesignerConfig:
builder = dd.DataDesignerConfigBuilder()
builder.with_seed_dataset(source)
return builder.build()


def test_remote_seed_type_validation_allows_filesystem_seed_sources() -> None:
validate_seed_config_for_execution_context(
_make_config(dd.DirectorySeedSource(path="workspace/docs#corpus")),
is_local=False,
)
validate_seed_config_for_execution_context(
_make_config(dd.FileContentsSeedSource(path="workspace/docs#corpus")),
is_local=False,
)


def test_remote_context_includes_filesystem_seed_readers() -> None:
readers = RemoteDataDesignerContext(Mock(), "default").get_seed_readers()

assert any(isinstance(reader, DirectorySeedReader) for reader in readers)
assert any(isinstance(reader, FileContentsSeedReader) for reader in readers)


@pytest.mark.asyncio
async def test_validate_seed_returns_canonical_validated_filesystem_root() -> None:
sdk = AsyncMock(spec=AsyncNeMoPlatform)
sdk.files.filesets.retrieve = AsyncMock()
sdk.files.list = AsyncMock(return_value=Mock(data=[Mock(path="corpus/a.md")]))
config = _make_config(dd.FileContentsSeedSource(path="docs#corpus", file_pattern="*.md"))

validated_root = await validate_seed(config, "default", sdk)

assert validated_root == "default/docs#corpus"
sdk.files.filesets.retrieve.assert_awaited_once_with(name="docs", workspace="default")
sdk.files.list.assert_awaited_once_with(remote_path="corpus", fileset="docs", workspace="default")
Loading
Loading