Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- **`upsert_resource()`** method on `ManagementClient` and `AsyncManagementClient` — create or update a resource by `external_id` in a single call. Uses `PUT /folders/:folder/resources/?external_id=<value>`.
- **`batch_upsert_resources()`** method on `ManagementClient` and `AsyncManagementClient` — upsert multiple resources concurrently with configurable `max_concurrency`, `fail_fast` error handling mode, and optional `on_progress` callback.
- **`BatchUpsertItem`**, **`BatchItemError`**, **`BatchUpsertResult`** models for batch upsert input/output.
- **`external_id`** optional parameter on `create_resource()` — assign an external identifier when creating a resource via `POST`.
- **`external_id`** field on `ResourceSummary` model — populated in API responses for resources that have an external identifier.

Expand Down
27 changes: 27 additions & 0 deletions docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,33 @@ resource = client.create_resource(
)
```

### Batch Upsert

Use `batch_upsert_resources` to upsert many resources in parallel. This is much faster than calling `upsert_resource` in a loop:

```python
from foxnose_sdk import ManagementClient, BatchUpsertItem

items = [
BatchUpsertItem(
external_id=f"article-{i}",
payload={"title": f"Article {i}", "body": "..."},
)
for i in range(1000)
]

result = client.batch_upsert_resources(
"blog-posts",
items,
max_concurrency=10,
on_progress=lambda done, total: print(f"\r{done}/{total}", end=""),
)

print(f"\nSucceeded: {result.success_count}, Failed: {result.failure_count}")
for error in result.failed:
print(f" Item {error.index} ({error.external_id}): {error.exception}")
```

### Folder Schema

**File:** `examples/folder_schema.py`
Expand Down
76 changes: 76 additions & 0 deletions docs/management-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,82 @@ resource = client.upsert_resource(
| `external_id` | `str` | Yes | External identifier for the resource |
| `component` | `ComponentRef` | No | Component key for composite folders |

### Batch Upsert Resources

Upsert many resources in parallel. The SDK fans out individual `upsert_resource()` calls using threads (sync client) or async tasks (async client), controlled by `max_concurrency`.

```python
from foxnose_sdk import BatchUpsertItem

items = [
BatchUpsertItem(external_id="ext-1", payload={"title": "Article 1"}),
BatchUpsertItem(external_id="ext-2", payload={"title": "Article 2"}),
BatchUpsertItem(external_id="ext-3", payload={"title": "Article 3"}),
]

result = client.batch_upsert_resources("folder-key", items, max_concurrency=10)

print(f"Succeeded: {result.success_count}, Failed: {result.failure_count}")
for error in result.failed:
print(f" [{error.index}] {error.external_id}: {error.exception}")
```

Async usage:

```python
result = await client.batch_upsert_resources("folder-key", items, max_concurrency=10)
```

**Error handling modes:**

- `fail_fast=False` (default) — process all items, collect successes and failures in the result.
- `fail_fast=True` — stop on the first error and raise it immediately.

```python
# Raises FoxnoseAPIError on first failure
try:
result = client.batch_upsert_resources("folder-key", items, fail_fast=True)
except FoxnoseAPIError as exc:
print(f"Batch stopped: {exc}")
```

**Progress tracking:**

```python
result = client.batch_upsert_resources(
"folder-key",
items,
on_progress=lambda done, total: print(f"{done}/{total}"),
)
```

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `folder_key` | `FolderRef` | Yes | Target folder key or object |
| `items` | `Sequence[BatchUpsertItem]` | Yes | Items to upsert |
| `max_concurrency` | `int` | No | Max parallel workers (default 5) |
| `fail_fast` | `bool` | No | Stop on first error (default `False`) |
| `on_progress` | `Callable[[int, int], None]` | No | Progress callback `(completed, total)` |

**`BatchUpsertItem` fields:**

| Field | Type | Required | Description |
|-------|------|----------|-------------|
| `external_id` | `str` | Yes | External identifier for the resource |
| `payload` | `dict` | Yes | JSON payload matching the folder schema |
| `component` | `str` | No | Component key for composite folders |

**`BatchUpsertResult` attributes:**

| Attribute | Type | Description |
|-----------|------|-------------|
| `succeeded` | `list[ResourceSummary]` | Successfully upserted resources |
| `failed` | `list[BatchItemError]` | Failed items with error details |
| `success_count` | `int` | Number of successes |
| `failure_count` | `int` | Number of failures |
| `total` | `int` | Total processed items |
| `has_failures` | `bool` | Whether any items failed |

### Update Resource

```python
Expand Down
6 changes: 6 additions & 0 deletions src/foxnose_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
SchemaVersionRef,
)
from .management.models import (
BatchItemError,
BatchUpsertItem,
BatchUpsertResult,
ComponentList,
ComponentSummary,
EnvironmentList,
Expand Down Expand Up @@ -100,6 +103,9 @@
"ResourceList",
"RevisionSummary",
"RevisionList",
"BatchUpsertItem",
"BatchItemError",
"BatchUpsertResult",
"FolderSummary",
"FolderList",
"ComponentSummary",
Expand Down
13 changes: 12 additions & 1 deletion src/foxnose_sdk/management/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@
RevisionRef,
SchemaVersionRef,
)
from .models import ResourceList, ResourceSummary, RevisionList, RevisionSummary
from .models import (
BatchItemError,
BatchUpsertItem,
BatchUpsertResult,
ResourceList,
ResourceSummary,
RevisionList,
RevisionSummary,
)

__all__ = [
"ManagementClient",
Expand All @@ -26,6 +34,9 @@
"ResourceList",
"RevisionSummary",
"RevisionList",
"BatchUpsertItem",
"BatchItemError",
"BatchUpsertResult",
"FolderRef",
"ResourceRef",
"RevisionRef",
Expand Down
170 changes: 170 additions & 0 deletions src/foxnose_sdk/management/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from __future__ import annotations

import asyncio
import concurrent.futures
from collections.abc import Callable, Sequence
from typing import Any, Mapping, Union

from pydantic import BaseModel
Expand All @@ -12,6 +15,9 @@
APIFolderSummary,
APIInfo,
APIList,
BatchItemError,
BatchUpsertItem,
BatchUpsertResult,
ComponentList,
ComponentSummary,
EnvironmentList,
Expand Down Expand Up @@ -2009,6 +2015,87 @@ def upsert_resource(
)
return ResourceSummary.model_validate(data)

def batch_upsert_resources(
self,
folder_key: FolderRef,
items: Sequence[BatchUpsertItem],
*,
max_concurrency: int = 5,
fail_fast: bool = False,
on_progress: Callable[[int, int], None] | None = None,
) -> BatchUpsertResult:
"""
Upsert multiple resources concurrently using threads.

Each item is processed via :meth:`upsert_resource`. Results are
collected into a :class:`BatchUpsertResult` containing succeeded
and failed lists.

Args:
folder_key: Target folder key (shared across all items).
items: Sequence of :class:`BatchUpsertItem` to upsert.
max_concurrency: Maximum number of parallel workers (default 5).
fail_fast: If ``True``, stop on first error and raise it.
If ``False`` (default), collect all results.
on_progress: Optional callback invoked as
``(completed_count, total_count)`` after each item finishes.
"""
if max_concurrency < 1:
raise ValueError("max_concurrency must be at least 1")
folder_key = _resolve_key(folder_key)
total = len(items)
if total == 0:
return BatchUpsertResult()

succeeded: list[ResourceSummary] = []
failed: list[BatchItemError] = []
completed = 0

with concurrent.futures.ThreadPoolExecutor(
max_workers=max_concurrency,
) as executor:
future_to_item: dict[
concurrent.futures.Future[ResourceSummary],
tuple[int, BatchUpsertItem],
] = {
executor.submit(
self.upsert_resource,
folder_key,
item.payload,
external_id=item.external_id,
component=item.component,
): (idx, item)
for idx, item in enumerate(items)
}

for future in concurrent.futures.as_completed(future_to_item):
idx, item = future_to_item[future]
try:
result = future.result()
succeeded.append(result)
except Exception as exc:
if fail_fast:
# Cancel remaining futures and propagate.
for f in future_to_item:
f.cancel()
raise
failed.append(
BatchItemError(
index=idx,
external_id=item.external_id,
exception=exc,
)
)
finally:
completed += 1
if on_progress is not None:
try:
on_progress(completed, total)
except Exception:
pass

return BatchUpsertResult(succeeded=succeeded, failed=failed)

def update_resource(
self,
folder_key: FolderRef,
Expand Down Expand Up @@ -3427,6 +3514,89 @@ async def upsert_resource(
)
return ResourceSummary.model_validate(data)

async def batch_upsert_resources(
self,
folder_key: FolderRef,
items: Sequence[BatchUpsertItem],
*,
max_concurrency: int = 5,
fail_fast: bool = False,
on_progress: Callable[[int, int], None] | None = None,
) -> BatchUpsertResult:
"""
Upsert multiple resources concurrently using async tasks.

Each item is processed via :meth:`upsert_resource`. Results are
collected into a :class:`BatchUpsertResult` containing succeeded
and failed lists.

Args:
folder_key: Target folder key (shared across all items).
items: Sequence of :class:`BatchUpsertItem` to upsert.
max_concurrency: Maximum number of parallel workers (default 5).
fail_fast: If ``True``, stop on first error and raise it.
If ``False`` (default), collect all results.
on_progress: Optional callback invoked as
``(completed_count, total_count)`` after each item finishes.
"""
if max_concurrency < 1:
raise ValueError("max_concurrency must be at least 1")
folder_key = _resolve_key(folder_key)
total = len(items)
if total == 0:
return BatchUpsertResult()

semaphore = asyncio.Semaphore(max_concurrency)
succeeded: list[ResourceSummary] = []
failed: list[BatchItemError] = []
completed = 0
first_error: Exception | None = None
cancel_event = asyncio.Event()

async def _process(index: int, item: BatchUpsertItem) -> None:
nonlocal completed, first_error
if cancel_event.is_set():
return
async with semaphore:
if cancel_event.is_set():
return
try:
result = await self.upsert_resource(
folder_key,
item.payload,
external_id=item.external_id,
component=item.component,
)
succeeded.append(result)
except Exception as exc:
if fail_fast:
if first_error is None:
first_error = exc
cancel_event.set()
return
failed.append(
BatchItemError(
index=index,
external_id=item.external_id,
exception=exc,
)
)
finally:
completed += 1
if on_progress is not None:
try:
on_progress(completed, total)
except Exception:
pass

tasks = [asyncio.create_task(_process(i, item)) for i, item in enumerate(items)]
await asyncio.gather(*tasks)

if fail_fast and first_error is not None:
raise first_error

return BatchUpsertResult(succeeded=succeeded, failed=failed)

async def update_resource(
self,
folder_key: FolderRef,
Expand Down
Loading