diff --git a/docs/changelog.md b/docs/changelog.md index a924b1d..57db22a 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - **`upsert_resource()`** method on `ManagementClient` and `AsyncManagementClient` — create or update a resource by `external_id` in a single call. Uses `PUT /folders/:folder/resources/?external_id=`. +- **`batch_upsert_resources()`** method on `ManagementClient` and `AsyncManagementClient` — upsert multiple resources concurrently with configurable `max_concurrency`, `fail_fast` error handling mode, and optional `on_progress` callback. +- **`BatchUpsertItem`**, **`BatchItemError`**, **`BatchUpsertResult`** models for batch upsert input/output. - **`external_id`** optional parameter on `create_resource()` — assign an external identifier when creating a resource via `POST`. - **`external_id`** field on `ResourceSummary` model — populated in API responses for resources that have an external identifier. diff --git a/docs/examples.md b/docs/examples.md index 9c87902..4c254d3 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -146,6 +146,33 @@ resource = client.create_resource( ) ``` +### Batch Upsert + +Use `batch_upsert_resources` to upsert many resources in parallel. This is much faster than calling `upsert_resource` in a loop: + +```python +from foxnose_sdk import ManagementClient, BatchUpsertItem + +items = [ + BatchUpsertItem( + external_id=f"article-{i}", + payload={"title": f"Article {i}", "body": "..."}, + ) + for i in range(1000) +] + +result = client.batch_upsert_resources( + "blog-posts", + items, + max_concurrency=10, + on_progress=lambda done, total: print(f"\r{done}/{total}", end=""), +) + +print(f"\nSucceeded: {result.success_count}, Failed: {result.failure_count}") +for error in result.failed: + print(f" Item {error.index} ({error.external_id}): {error.exception}") +``` + ### Folder Schema **File:** `examples/folder_schema.py` diff --git a/docs/management-client.md b/docs/management-client.md index 0e54e6d..07766b7 100644 --- a/docs/management-client.md +++ b/docs/management-client.md @@ -213,6 +213,82 @@ resource = client.upsert_resource( | `external_id` | `str` | Yes | External identifier for the resource | | `component` | `ComponentRef` | No | Component key for composite folders | +### Batch Upsert Resources + +Upsert many resources in parallel. The SDK fans out individual `upsert_resource()` calls using threads (sync client) or async tasks (async client), controlled by `max_concurrency`. + +```python +from foxnose_sdk import BatchUpsertItem + +items = [ + BatchUpsertItem(external_id="ext-1", payload={"title": "Article 1"}), + BatchUpsertItem(external_id="ext-2", payload={"title": "Article 2"}), + BatchUpsertItem(external_id="ext-3", payload={"title": "Article 3"}), +] + +result = client.batch_upsert_resources("folder-key", items, max_concurrency=10) + +print(f"Succeeded: {result.success_count}, Failed: {result.failure_count}") +for error in result.failed: + print(f" [{error.index}] {error.external_id}: {error.exception}") +``` + +Async usage: + +```python +result = await client.batch_upsert_resources("folder-key", items, max_concurrency=10) +``` + +**Error handling modes:** + +- `fail_fast=False` (default) — process all items, collect successes and failures in the result. +- `fail_fast=True` — stop on the first error and raise it immediately. + +```python +# Raises FoxnoseAPIError on first failure +try: + result = client.batch_upsert_resources("folder-key", items, fail_fast=True) +except FoxnoseAPIError as exc: + print(f"Batch stopped: {exc}") +``` + +**Progress tracking:** + +```python +result = client.batch_upsert_resources( + "folder-key", + items, + on_progress=lambda done, total: print(f"{done}/{total}"), +) +``` + +| Parameter | Type | Required | Description | +|-----------|------|----------|-------------| +| `folder_key` | `FolderRef` | Yes | Target folder key or object | +| `items` | `Sequence[BatchUpsertItem]` | Yes | Items to upsert | +| `max_concurrency` | `int` | No | Max parallel workers (default 5) | +| `fail_fast` | `bool` | No | Stop on first error (default `False`) | +| `on_progress` | `Callable[[int, int], None]` | No | Progress callback `(completed, total)` | + +**`BatchUpsertItem` fields:** + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `external_id` | `str` | Yes | External identifier for the resource | +| `payload` | `dict` | Yes | JSON payload matching the folder schema | +| `component` | `str` | No | Component key for composite folders | + +**`BatchUpsertResult` attributes:** + +| Attribute | Type | Description | +|-----------|------|-------------| +| `succeeded` | `list[ResourceSummary]` | Successfully upserted resources | +| `failed` | `list[BatchItemError]` | Failed items with error details | +| `success_count` | `int` | Number of successes | +| `failure_count` | `int` | Number of failures | +| `total` | `int` | Total processed items | +| `has_failures` | `bool` | Whether any items failed | + ### Update Resource ```python diff --git a/src/foxnose_sdk/__init__.py b/src/foxnose_sdk/__init__.py index ffd71a0..622f54f 100644 --- a/src/foxnose_sdk/__init__.py +++ b/src/foxnose_sdk/__init__.py @@ -38,6 +38,9 @@ SchemaVersionRef, ) from .management.models import ( + BatchItemError, + BatchUpsertItem, + BatchUpsertResult, ComponentList, ComponentSummary, EnvironmentList, @@ -100,6 +103,9 @@ "ResourceList", "RevisionSummary", "RevisionList", + "BatchUpsertItem", + "BatchItemError", + "BatchUpsertResult", "FolderSummary", "FolderList", "ComponentSummary", diff --git a/src/foxnose_sdk/management/__init__.py b/src/foxnose_sdk/management/__init__.py index b3ef357..179a7f9 100644 --- a/src/foxnose_sdk/management/__init__.py +++ b/src/foxnose_sdk/management/__init__.py @@ -17,7 +17,15 @@ RevisionRef, SchemaVersionRef, ) -from .models import ResourceList, ResourceSummary, RevisionList, RevisionSummary +from .models import ( + BatchItemError, + BatchUpsertItem, + BatchUpsertResult, + ResourceList, + ResourceSummary, + RevisionList, + RevisionSummary, +) __all__ = [ "ManagementClient", @@ -26,6 +34,9 @@ "ResourceList", "RevisionSummary", "RevisionList", + "BatchUpsertItem", + "BatchItemError", + "BatchUpsertResult", "FolderRef", "ResourceRef", "RevisionRef", diff --git a/src/foxnose_sdk/management/client.py b/src/foxnose_sdk/management/client.py index 99cf841..c9146c9 100644 --- a/src/foxnose_sdk/management/client.py +++ b/src/foxnose_sdk/management/client.py @@ -1,5 +1,8 @@ from __future__ import annotations +import asyncio +import concurrent.futures +from collections.abc import Callable, Sequence from typing import Any, Mapping, Union from pydantic import BaseModel @@ -12,6 +15,9 @@ APIFolderSummary, APIInfo, APIList, + BatchItemError, + BatchUpsertItem, + BatchUpsertResult, ComponentList, ComponentSummary, EnvironmentList, @@ -2009,6 +2015,87 @@ def upsert_resource( ) return ResourceSummary.model_validate(data) + def batch_upsert_resources( + self, + folder_key: FolderRef, + items: Sequence[BatchUpsertItem], + *, + max_concurrency: int = 5, + fail_fast: bool = False, + on_progress: Callable[[int, int], None] | None = None, + ) -> BatchUpsertResult: + """ + Upsert multiple resources concurrently using threads. + + Each item is processed via :meth:`upsert_resource`. Results are + collected into a :class:`BatchUpsertResult` containing succeeded + and failed lists. + + Args: + folder_key: Target folder key (shared across all items). + items: Sequence of :class:`BatchUpsertItem` to upsert. + max_concurrency: Maximum number of parallel workers (default 5). + fail_fast: If ``True``, stop on first error and raise it. + If ``False`` (default), collect all results. + on_progress: Optional callback invoked as + ``(completed_count, total_count)`` after each item finishes. + """ + if max_concurrency < 1: + raise ValueError("max_concurrency must be at least 1") + folder_key = _resolve_key(folder_key) + total = len(items) + if total == 0: + return BatchUpsertResult() + + succeeded: list[ResourceSummary] = [] + failed: list[BatchItemError] = [] + completed = 0 + + with concurrent.futures.ThreadPoolExecutor( + max_workers=max_concurrency, + ) as executor: + future_to_item: dict[ + concurrent.futures.Future[ResourceSummary], + tuple[int, BatchUpsertItem], + ] = { + executor.submit( + self.upsert_resource, + folder_key, + item.payload, + external_id=item.external_id, + component=item.component, + ): (idx, item) + for idx, item in enumerate(items) + } + + for future in concurrent.futures.as_completed(future_to_item): + idx, item = future_to_item[future] + try: + result = future.result() + succeeded.append(result) + except Exception as exc: + if fail_fast: + # Cancel remaining futures and propagate. + for f in future_to_item: + f.cancel() + raise + failed.append( + BatchItemError( + index=idx, + external_id=item.external_id, + exception=exc, + ) + ) + finally: + completed += 1 + if on_progress is not None: + try: + on_progress(completed, total) + except Exception: + pass + + return BatchUpsertResult(succeeded=succeeded, failed=failed) + def update_resource( self, folder_key: FolderRef, @@ -3427,6 +3514,89 @@ async def upsert_resource( ) return ResourceSummary.model_validate(data) + async def batch_upsert_resources( + self, + folder_key: FolderRef, + items: Sequence[BatchUpsertItem], + *, + max_concurrency: int = 5, + fail_fast: bool = False, + on_progress: Callable[[int, int], None] | None = None, + ) -> BatchUpsertResult: + """ + Upsert multiple resources concurrently using async tasks. + + Each item is processed via :meth:`upsert_resource`. Results are + collected into a :class:`BatchUpsertResult` containing succeeded + and failed lists. + + Args: + folder_key: Target folder key (shared across all items). + items: Sequence of :class:`BatchUpsertItem` to upsert. + max_concurrency: Maximum number of parallel workers (default 5). + fail_fast: If ``True``, stop on first error and raise it. + If ``False`` (default), collect all results. + on_progress: Optional callback invoked as + ``(completed_count, total_count)`` after each item finishes. + """ + if max_concurrency < 1: + raise ValueError("max_concurrency must be at least 1") + folder_key = _resolve_key(folder_key) + total = len(items) + if total == 0: + return BatchUpsertResult() + + semaphore = asyncio.Semaphore(max_concurrency) + succeeded: list[ResourceSummary] = [] + failed: list[BatchItemError] = [] + completed = 0 + first_error: Exception | None = None + cancel_event = asyncio.Event() + + async def _process(index: int, item: BatchUpsertItem) -> None: + nonlocal completed, first_error + if cancel_event.is_set(): + return + async with semaphore: + if cancel_event.is_set(): + return + try: + result = await self.upsert_resource( + folder_key, + item.payload, + external_id=item.external_id, + component=item.component, + ) + succeeded.append(result) + except Exception as exc: + if fail_fast: + if first_error is None: + first_error = exc + cancel_event.set() + return + failed.append( + BatchItemError( + index=index, + external_id=item.external_id, + exception=exc, + ) + ) + finally: + completed += 1 + if on_progress is not None: + try: + on_progress(completed, total) + except Exception: + pass + + tasks = [asyncio.create_task(_process(i, item)) for i, item in enumerate(items)] + await asyncio.gather(*tasks) + + if fail_fast and first_error is not None: + raise first_error + + return BatchUpsertResult(succeeded=succeeded, failed=failed) + async def update_resource( self, folder_key: FolderRef, diff --git a/src/foxnose_sdk/management/models.py b/src/foxnose_sdk/management/models.py index c889173..8e46ffb 100644 --- a/src/foxnose_sdk/management/models.py +++ b/src/foxnose_sdk/management/models.py @@ -426,6 +426,58 @@ class OrganizationUsage(BaseModel): current_usage: CurrentUsage +# --------------------------------------------------------------------------- +# Batch upsert helpers +# --------------------------------------------------------------------------- + + +class BatchUpsertItem(BaseModel): + """A single item to upsert in a batch operation.""" + + external_id: str + payload: dict[str, Any] + component: str | None = None + + +class BatchItemError(BaseModel): + """Error information for a single failed upsert in a batch.""" + + model_config = ConfigDict(arbitrary_types_allowed=True) + + index: int + external_id: str + exception: Exception + + +class BatchUpsertResult(BaseModel): + """Aggregate result of a batch upsert operation.""" + + model_config = ConfigDict(arbitrary_types_allowed=True) + + succeeded: list[ResourceSummary] = [] + failed: list[BatchItemError] = [] + + @property + def total(self) -> int: + """Total number of processed items.""" + return len(self.succeeded) + len(self.failed) + + @property + def success_count(self) -> int: + """Number of items that succeeded.""" + return len(self.succeeded) + + @property + def failure_count(self) -> int: + """Number of items that failed.""" + return len(self.failed) + + @property + def has_failures(self) -> bool: + """Whether any items failed.""" + return len(self.failed) > 0 + + __all__ = [ "PaginatedResponse", "ResourceSummary", @@ -465,4 +517,7 @@ class OrganizationUsage(BaseModel): "PlanDetails", "OrganizationPlanStatus", "OrganizationUsage", + "BatchUpsertItem", + "BatchItemError", + "BatchUpsertResult", ] diff --git a/tests/test_async_clients.py b/tests/test_async_clients.py index 7ff6b1a..72524c0 100644 --- a/tests/test_async_clients.py +++ b/tests/test_async_clients.py @@ -11,7 +11,10 @@ from foxnose_sdk.flux.client import AsyncFluxClient from foxnose_sdk.http import HttpTransport from foxnose_sdk.management.client import AsyncManagementClient +from foxnose_sdk.errors import FoxnoseAPIError from foxnose_sdk.management.models import ( + BatchUpsertItem, + BatchUpsertResult, FolderSummary, ResourceSummary, RevisionSummary, @@ -1201,6 +1204,181 @@ def handler(request: httpx.Request) -> httpx.Response: await client.aclose() +# --------------------------------------------------------------------------- +# async batch_upsert_resources +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_async_batch_upsert_resources_success(): + call_count = 0 + + def handler(request: httpx.Request) -> httpx.Response: + nonlocal call_count + ext_id = str(request.url).split("external_id=")[1].split("&")[0] + call_count += 1 + return httpx.Response(200, json={**RESOURCE_JSON, "external_id": ext_id}) + + client = build_async_management_client(handler) + items = [ + BatchUpsertItem(external_id=f"ext-{i}", payload={"title": f"Item {i}"}) + for i in range(3) + ] + result = await client.batch_upsert_resources("folder-1", items) + + assert isinstance(result, BatchUpsertResult) + assert result.success_count == 3 + assert result.failure_count == 0 + assert result.has_failures is False + assert result.total == 3 + assert call_count == 3 + returned_ext_ids = {r.external_id for r in result.succeeded} + assert returned_ext_ids == {"ext-0", "ext-1", "ext-2"} + await client.aclose() + + +@pytest.mark.asyncio +async def test_async_batch_upsert_resources_empty_list(): + def handler(request: httpx.Request) -> httpx.Response: + raise AssertionError("handler should not be called") + + client = build_async_management_client(handler) + result = await client.batch_upsert_resources("folder-1", []) + + assert result.success_count == 0 + assert result.failure_count == 0 + assert result.total == 0 + await client.aclose() + + +@pytest.mark.asyncio +async def test_async_batch_upsert_resources_partial_failure(): + def handler(request: httpx.Request) -> httpx.Response: + ext_id = str(request.url).split("external_id=")[1].split("&")[0] + if ext_id == "ext-1": + return httpx.Response( + 400, json={"message": "Bad request", "error_code": "validation_error"} + ) + return httpx.Response(200, json={**RESOURCE_JSON, "external_id": ext_id}) + + client = build_async_management_client(handler) + items = [ + BatchUpsertItem(external_id=f"ext-{i}", payload={"title": f"Item {i}"}) + for i in range(3) + ] + result = await client.batch_upsert_resources("folder-1", items) + + assert result.success_count == 2 + assert result.failure_count == 1 + assert result.has_failures is True + error = result.failed[0] + assert error.external_id == "ext-1" + assert isinstance(error.exception, FoxnoseAPIError) + assert error.exception.status_code == 400 + await client.aclose() + + +@pytest.mark.asyncio +async def test_async_batch_upsert_resources_fail_fast(): + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response( + 400, json={"message": "Bad request", "error_code": "validation_error"} + ) + + client = build_async_management_client(handler) + items = [ + BatchUpsertItem(external_id=f"ext-{i}", payload={"title": f"Item {i}"}) + for i in range(5) + ] + with pytest.raises(FoxnoseAPIError) as exc_info: + await client.batch_upsert_resources("folder-1", items, fail_fast=True) + assert exc_info.value.status_code == 400 + await client.aclose() + + +@pytest.mark.asyncio +async def test_async_batch_upsert_resources_max_concurrency(): + peak = 0 + current = 0 + + def handler(request: httpx.Request) -> httpx.Response: + nonlocal peak, current + # Note: the mock handler is sync, but with httpx MockTransport + # the async client still serializes calls through the transport. + # We verify concurrency via semaphore logic in the implementation. + ext_id = str(request.url).split("external_id=")[1].split("&")[0] + return httpx.Response(200, json={**RESOURCE_JSON, "external_id": ext_id}) + + client = build_async_management_client(handler) + items = [ + BatchUpsertItem(external_id=f"ext-{i}", payload={"title": f"Item {i}"}) + for i in range(6) + ] + result = await client.batch_upsert_resources("folder-1", items, max_concurrency=2) + assert result.success_count == 6 + await client.aclose() + + +@pytest.mark.asyncio +async def test_async_batch_upsert_resources_progress_callback(): + def handler(request: httpx.Request) -> httpx.Response: + ext_id = str(request.url).split("external_id=")[1].split("&")[0] + return httpx.Response(200, json={**RESOURCE_JSON, "external_id": ext_id}) + + progress_calls: list[tuple[int, int]] = [] + + client = build_async_management_client(handler) + items = [ + BatchUpsertItem(external_id=f"ext-{i}", payload={"title": f"Item {i}"}) + for i in range(3) + ] + result = await client.batch_upsert_resources( + "folder-1", + items, + on_progress=lambda done, total: progress_calls.append((done, total)), + ) + assert result.success_count == 3 + assert len(progress_calls) == 3 + assert all(total == 3 for _, total in progress_calls) + completed_values = sorted(done for done, _ in progress_calls) + assert completed_values == [1, 2, 3] + await client.aclose() + + +@pytest.mark.asyncio +async def test_async_batch_upsert_resources_with_component(): + captured: list[str] = [] + + def handler(request: httpx.Request) -> httpx.Response: + captured.append(str(request.url)) + ext_id = str(request.url).split("external_id=")[1].split("&")[0] + return httpx.Response(200, json={**RESOURCE_JSON, "external_id": ext_id}) + + client = build_async_management_client(handler) + items = [ + BatchUpsertItem( + external_id="ext-1", payload={"title": "Item"}, component="comp-1" + ) + ] + result = await client.batch_upsert_resources("folder-1", items) + assert result.success_count == 1 + assert "component=comp-1" in captured[0] + assert "external_id=ext-1" in captured[0] + await client.aclose() + + +@pytest.mark.asyncio +async def test_async_batch_upsert_resources_rejects_zero_concurrency(): + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json=RESOURCE_JSON) + + client = build_async_management_client(handler) + items = [BatchUpsertItem(external_id="ext-1", payload={"title": "Item"})] + with pytest.raises(ValueError, match="max_concurrency must be at least 1"): + await client.batch_upsert_resources("folder-1", items, max_concurrency=0) + await client.aclose() + + @pytest.mark.asyncio async def test_async_update_delete_resource_and_get_data(): captured: list[tuple[str, str]] = [] diff --git a/tests/test_clients.py b/tests/test_clients.py index 0c01ce4..92d44be 100644 --- a/tests/test_clients.py +++ b/tests/test_clients.py @@ -12,7 +12,11 @@ from foxnose_sdk.flux.client import FluxClient from foxnose_sdk.http import HttpTransport from foxnose_sdk.management.client import ManagementClient, _resolve_key +from foxnose_sdk.errors import FoxnoseAPIError from foxnose_sdk.management.models import ( + BatchItemError, + BatchUpsertItem, + BatchUpsertResult, FolderSummary, ResourceSummary, RevisionSummary, @@ -1113,6 +1117,183 @@ def test_resource_summary_parses_without_external_id_field(): assert summary.external_id is None +# --------------------------------------------------------------------------- +# batch_upsert_resources +# --------------------------------------------------------------------------- + + +def test_batch_upsert_resources_success(): + call_count = 0 + + def handler(request: httpx.Request) -> httpx.Response: + nonlocal call_count + ext_id = str(request.url).split("external_id=")[1].split("&")[0] + resource_json = {**RESOURCE_JSON, "external_id": ext_id} + call_count += 1 + return httpx.Response(200, json=resource_json) + + client = build_management_client(handler) + items = [ + BatchUpsertItem(external_id=f"ext-{i}", payload={"title": f"Item {i}"}) + for i in range(3) + ] + result = client.batch_upsert_resources("folder-1", items) + + assert isinstance(result, BatchUpsertResult) + assert result.success_count == 3 + assert result.failure_count == 0 + assert result.has_failures is False + assert result.total == 3 + assert call_count == 3 + returned_ext_ids = {r.external_id for r in result.succeeded} + assert returned_ext_ids == {"ext-0", "ext-1", "ext-2"} + + +def test_batch_upsert_resources_empty_list(): + def handler(request: httpx.Request) -> httpx.Response: + raise AssertionError("handler should not be called") + + client = build_management_client(handler) + result = client.batch_upsert_resources("folder-1", []) + + assert result.success_count == 0 + assert result.failure_count == 0 + assert result.total == 0 + assert result.has_failures is False + + +def test_batch_upsert_resources_partial_failure(): + def handler(request: httpx.Request) -> httpx.Response: + ext_id = str(request.url).split("external_id=")[1].split("&")[0] + # Fail requests where external_id ends with "1" (index 1) + if ext_id == "ext-1": + return httpx.Response( + 400, json={"message": "Bad request", "error_code": "validation_error"} + ) + resource_json = {**RESOURCE_JSON, "external_id": ext_id} + return httpx.Response(200, json=resource_json) + + client = build_management_client(handler) + items = [ + BatchUpsertItem(external_id=f"ext-{i}", payload={"title": f"Item {i}"}) + for i in range(3) + ] + result = client.batch_upsert_resources("folder-1", items) + + assert result.success_count == 2 + assert result.failure_count == 1 + assert result.has_failures is True + assert len(result.failed) == 1 + error = result.failed[0] + assert isinstance(error, BatchItemError) + assert error.external_id == "ext-1" + assert isinstance(error.exception, FoxnoseAPIError) + assert error.exception.status_code == 400 + + +def test_batch_upsert_resources_fail_fast(): + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response( + 400, json={"message": "Bad request", "error_code": "validation_error"} + ) + + client = build_management_client(handler) + items = [ + BatchUpsertItem(external_id=f"ext-{i}", payload={"title": f"Item {i}"}) + for i in range(5) + ] + with pytest.raises(FoxnoseAPIError) as exc_info: + client.batch_upsert_resources("folder-1", items, fail_fast=True) + assert exc_info.value.status_code == 400 + + +def test_batch_upsert_resources_max_concurrency(): + import threading + + peak = 0 + current = 0 + lock = threading.Lock() + + def handler(request: httpx.Request) -> httpx.Response: + nonlocal peak, current + with lock: + current += 1 + if current > peak: + peak = current + import time + + time.sleep(0.05) + with lock: + current -= 1 + ext_id = str(request.url).split("external_id=")[1].split("&")[0] + return httpx.Response(200, json={**RESOURCE_JSON, "external_id": ext_id}) + + client = build_management_client(handler) + items = [ + BatchUpsertItem(external_id=f"ext-{i}", payload={"title": f"Item {i}"}) + for i in range(6) + ] + result = client.batch_upsert_resources("folder-1", items, max_concurrency=2) + assert result.success_count == 6 + assert peak <= 2 + + +def test_batch_upsert_resources_progress_callback(): + def handler(request: httpx.Request) -> httpx.Response: + ext_id = str(request.url).split("external_id=")[1].split("&")[0] + return httpx.Response(200, json={**RESOURCE_JSON, "external_id": ext_id}) + + progress_calls: list[tuple[int, int]] = [] + + client = build_management_client(handler) + items = [ + BatchUpsertItem(external_id=f"ext-{i}", payload={"title": f"Item {i}"}) + for i in range(3) + ] + result = client.batch_upsert_resources( + "folder-1", + items, + on_progress=lambda done, total: progress_calls.append((done, total)), + ) + assert result.success_count == 3 + assert len(progress_calls) == 3 + # All calls should have total=3 + assert all(total == 3 for _, total in progress_calls) + # Completed counts should cover 1, 2, 3 + completed_values = sorted(done for done, _ in progress_calls) + assert completed_values == [1, 2, 3] + + +def test_batch_upsert_resources_with_component(): + captured: list[str] = [] + + def handler(request: httpx.Request) -> httpx.Response: + captured.append(str(request.url)) + ext_id = str(request.url).split("external_id=")[1].split("&")[0] + return httpx.Response(200, json={**RESOURCE_JSON, "external_id": ext_id}) + + client = build_management_client(handler) + items = [ + BatchUpsertItem( + external_id="ext-1", payload={"title": "Item"}, component="comp-1" + ) + ] + result = client.batch_upsert_resources("folder-1", items) + assert result.success_count == 1 + assert "component=comp-1" in captured[0] + assert "external_id=ext-1" in captured[0] + + +def test_batch_upsert_resources_rejects_zero_concurrency(): + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json=RESOURCE_JSON) + + client = build_management_client(handler) + items = [BatchUpsertItem(external_id="ext-1", payload={"title": "Item"})] + with pytest.raises(ValueError, match="max_concurrency must be at least 1"): + client.batch_upsert_resources("folder-1", items, max_concurrency=0) + + def test_publish_revision_uses_nested_path(): captured = {}