From e3daf6514895180ed89648eca7ca4435f31b6c29 Mon Sep 17 00:00:00 2001 From: Matt Harris Date: Thu, 7 May 2026 22:16:09 -0400 Subject: [PATCH] feat: adopt parallel-web 0.6.0 monitor + non-beta task group Upgrades the SDK floor from 0.4.2 (lagged) to 0.6.0 to take advantage of two changes that ship in that release: - Task Groups graduated out of beta. Replaces client.beta.task_group.* calls with client.task_group.* in core/batch.py, integrations/spark/ streaming.py, and the test fixtures, and switches the typed input from BetaRunInputParam to RunInputParam. - Monitor moved from a hand-rolled httpx client against /v1alpha to the SDK's client.monitor resource against /v1. The wrappers in core/monitor.py now thin-wrap the SDK and return dict payloads via model_dump(mode="json") so the CLI and existing callers continue to work. Surface changes mirror the SDK: - cadence (hourly/daily/...) -> frequency ("1h", "1d", "2w"; aliases preserved via MONITOR_FREQUENCY_PRESETS) - delete_monitor -> cancel_monitor (cancellation is irreversible) - simulate_monitor_event -> trigger_monitor (real one-off run) - get_monitor_event_group folded into list_monitor_events with event_group_id filter - new fields: type (event_stream/snapshot), task_run_id (snapshot), processor (lite/base), include_backfill - events listing is cursor-paginated with include_completions / limit CLI subcommands updated accordingly: monitor cancel/trigger replace delete/simulate; monitor event-group is removed; --frequency replaces --cadence; new --type, --task-run-id, --processor, --include-backfill, --cursor, --status, --event-group-id flags. Note that query / task_run_id are immutable in the new update endpoint, so monitor update --query is no longer offered (create a new monitor to change the tracked target). README and npm/README snippets refreshed; tests/test_monitor.py rewritten against the SDK-mocked client; tests/test_enrichment.py mocks moved off client.beta.task_group. --- README.md | 23 +- npm/README.md | 2 +- parallel_web_tools/__init__.py | 12 + parallel_web_tools/cli/commands.py | 362 +++++--- parallel_web_tools/core/__init__.py | 22 +- parallel_web_tools/core/batch.py | 40 +- parallel_web_tools/core/monitor.py | 371 ++++---- .../integrations/spark/streaming.py | 15 +- pyproject.toml | 2 +- tests/test_enrichment.py | 122 +-- tests/test_monitor.py | 871 ++++++------------ uv.lock | 8 +- 12 files changed, 836 insertions(+), 1014 deletions(-) diff --git a/README.md b/README.md index a8aef6b..48af277 100644 --- a/README.md +++ b/README.md @@ -97,14 +97,13 @@ parallel-cli │ ├── schema # Get the schema for a FindAll run │ └── cancel # Cancel a running FindAll └── monitor # Continuous web change tracking - ├── create # Create a new web monitor - ├── list # List all monitors + ├── create # Create a new web monitor (event_stream or snapshot) + ├── list # List monitors (cursor paginated) ├── get # Get monitor details - ├── update # Update monitor configuration - ├── delete # Delete a monitor + ├── update # Update frequency, webhook, metadata + ├── cancel # Cancel a monitor (irreversible) ├── events # List events for a monitor - ├── event-group # Get event group details - └── simulate # Simulate webhook event for testing + └── trigger # Trigger an immediate one-off run ``` ## Quick Start @@ -260,7 +259,7 @@ parallel-cli enrich suggest "Find CEO" --json parallel-cli findall run "AI startups in healthcare" --json # Monitor: track web changes -parallel-cli monitor create "Track Tesla SEC filings" --cadence daily --json +parallel-cli monitor create "Track Tesla SEC filings" --frequency 1d --json # Plan without prompts (provide all args) parallel-cli enrich plan -o config.yaml \ @@ -389,14 +388,14 @@ Track web changes programmatically: ```python from parallel_web_tools import create_monitor, list_monitors, get_monitor -# Create a monitor -monitor = create_monitor(query="Track Tesla SEC filings", cadence="daily") +# Create an event_stream monitor (the default) +monitor = create_monitor(query="Track Tesla SEC filings", frequency="1d") -# List all monitors +# List all monitors (cursor paginated) monitors = list_monitors() -# Get monitor details and events -details = get_monitor(monitor.monitor_id) +# Get monitor details +details = get_monitor(monitor["monitor_id"]) ``` ## YAML Configuration Format diff --git a/npm/README.md b/npm/README.md index 6eab2b4..c9b8424 100644 --- a/npm/README.md +++ b/npm/README.md @@ -14,7 +14,7 @@ npm install -g parallel-cli parallel-cli --help parallel-cli search "your query" parallel-cli enrich input.csv --recipe company-info -parallel-cli monitor watch https://example.com +parallel-cli monitor create "Track price changes for iPhone 16" --frequency 1d ``` ## How it works diff --git a/parallel_web_tools/__init__.py b/parallel_web_tools/__init__.py index 47411e3..5810bae 100644 --- a/parallel_web_tools/__init__.py +++ b/parallel_web_tools/__init__.py @@ -9,6 +9,7 @@ ParseError, ProcessorType, SourceType, + cancel_monitor, create_monitor, enrich_batch, enrich_single, @@ -16,6 +17,9 @@ get_async_client, get_auth_status, get_client, + get_monitor, + list_monitor_events, + list_monitors, load_schema, logout, parse_input_and_output_models, @@ -27,6 +31,8 @@ run_findall, run_research, run_tasks, + trigger_monitor, + update_monitor, ) __version__ = "0.3.0" @@ -61,7 +67,13 @@ # FindAll "run_findall", # Monitor + "cancel_monitor", "create_monitor", + "get_monitor", + "list_monitor_events", + "list_monitors", + "trigger_monitor", + "update_monitor", # Research "run_research", ] diff --git a/parallel_web_tools/cli/commands.py b/parallel_web_tools/cli/commands.py index 90ee0e5..0422028 100644 --- a/parallel_web_tools/cli/commands.py +++ b/parallel_web_tools/cli/commands.py @@ -21,14 +21,14 @@ AVAILABLE_PROCESSORS, FINDALL_GENERATORS, JSON_SCHEMA_TYPE_MAP, - MONITOR_CADENCES, - MONITOR_EVENT_TYPES, + MONITOR_PROCESSORS, + MONITOR_TYPES, RESEARCH_PROCESSORS, cancel_findall_run, + cancel_monitor, create_findall_run, create_monitor, create_research_task, - delete_monitor, enrich_findall, extend_findall, get_api_key, @@ -37,7 +37,6 @@ get_findall_schema, get_findall_status, get_monitor, - get_monitor_event_group, get_research_status, get_task_group_status, get_user_agent, @@ -51,7 +50,7 @@ run_enrichment_from_dict, run_findall, run_research, - simulate_monitor_event, + trigger_monitor, update_monitor, ) @@ -2893,41 +2892,84 @@ def monitor(): @monitor.command(name="create") -@click.argument("query") +@click.argument("query", required=False) +@click.option( + "--frequency", + "-f", + default="1d", + show_default=True, + help=( + "How often to run the monitor. SDK format '' with unit h/d/w " + "(e.g. 1h, 6h, 1d, 2w). Aliases also accepted: hourly, daily, weekly, every_two_weeks." + ), +) @click.option( - "--cadence", - "-c", - type=click.Choice(list(MONITOR_CADENCES.keys())), - default="daily", + "--type", + "monitor_type", + type=click.Choice(list(MONITOR_TYPES)), + default="event_stream", show_default=True, - help="How often to check for changes", + help="Monitor type: 'event_stream' tracks a search query; 'snapshot' tracks a Task Run output.", +) +@click.option("--task-run-id", help="Required for type=snapshot: the Task Run whose output to track.") +@click.option( + "--processor", + type=click.Choice(list(MONITOR_PROCESSORS)), + help="Monitor processor (default: lite). 'base' is more thorough at higher cost.", ) @click.option("--webhook", help="Webhook URL for event delivery") @click.option("--metadata", "metadata_json", help="Metadata as JSON string") -@click.option("--output-schema", "output_schema_json", help="Output schema as JSON string") +@click.option("--output-schema", "output_schema_json", help="Output schema as JSON string (event_stream only)") +@click.option( + "--include-backfill", + is_flag=True, + help="event_stream only: include a sample of historical events on first run.", +) @click.option("-o", "--output", "output_file", type=click.Path(), help="Save result to JSON file") @click.option("--json", "output_json", is_flag=True, help="Output JSON to stdout") def monitor_create( - query: str, - cadence: str, + query: str | None, + frequency: str, + monitor_type: str, + task_run_id: str | None, + processor: str | None, webhook: str | None, metadata_json: str | None, output_schema_json: str | None, + include_backfill: bool, output_file: str | None, output_json: bool, ): """Create a new monitor to track the web for changes. - QUERY is a natural language description of what to track. + QUERY is the search query for type=event_stream (the default). For type=snapshot, + omit QUERY and pass --task-run-id instead. Examples: parallel-cli monitor create "Track price changes for iPhone 16" - parallel-cli monitor create "New AI funding announcements" --cadence hourly + parallel-cli monitor create "New AI funding announcements" --frequency 1h parallel-cli monitor create "SEC filings from Tesla" --webhook https://example.com/hook + + parallel-cli monitor create --type snapshot --task-run-id trun_abc --frequency 1d """ + if monitor_type == "event_stream" and not query: + _handle_error( + click.UsageError("QUERY is required when --type=event_stream"), + output_json=output_json, + exit_code=EXIT_BAD_INPUT, + ) + return + if monitor_type == "snapshot" and not task_run_id: + _handle_error( + click.UsageError("--task-run-id is required when --type=snapshot"), + output_json=output_json, + exit_code=EXIT_BAD_INPUT, + ) + return + try: metadata = json.loads(metadata_json) if metadata_json else None output_schema = json.loads(output_schema_json) if output_schema_json else None @@ -2937,14 +2979,18 @@ def monitor_create( try: if not output_json: - console.print(f"[dim]Creating monitor with cadence={cadence}...[/dim]") + console.print(f"[dim]Creating {monitor_type} monitor (frequency={frequency})...[/dim]") result = create_monitor( query=query, - cadence=cadence, + frequency=frequency, + type=monitor_type, + task_run_id=task_run_id, webhook=webhook, metadata=metadata, output_schema=output_schema, + include_backfill=include_backfill or None, + processor=processor, source="cli", ) @@ -2953,8 +2999,12 @@ def monitor_create( if not output_json: monitor_id = result.get("monitor_id", "unknown") console.print(f"\n[bold green]Monitor created: {monitor_id}[/bold green]") - console.print(f"[dim]Query: {query}[/dim]") - console.print(f"[dim]Cadence: {cadence} ({MONITOR_CADENCES[cadence]})[/dim]") + console.print(f"[dim]Type: {result.get('type', monitor_type)}[/dim]") + console.print(f"[dim]Frequency: {result.get('frequency', frequency)}[/dim]") + if query: + console.print(f"[dim]Query: {query}[/dim]") + if task_run_id: + console.print(f"[dim]Task run: {task_run_id}[/dim]") if webhook: console.print(f"[dim]Webhook: {webhook}[/dim]") @@ -2963,44 +3013,79 @@ def monitor_create( @monitor.command(name="list") -@click.option("--limit", "-n", type=int, help="Maximum number of monitors to return") +@click.option("--limit", "-n", type=int, help="Maximum number of monitors to return (1-10000)") +@click.option("--cursor", help="Pagination token from a previous response") +@click.option( + "--status", + type=click.Choice(["active", "cancelled"]), + multiple=True, + help="Filter by status (repeatable). Defaults to active only.", +) +@click.option( + "--type", + "monitor_type", + type=click.Choice(list(MONITOR_TYPES)), + multiple=True, + help="Filter by monitor type (repeatable).", +) @click.option("--json", "output_json", is_flag=True, help="Output JSON to stdout") -def monitor_list(limit: int | None, output_json: bool): - """List all monitors. +def monitor_list( + limit: int | None, + cursor: str | None, + status: tuple[str, ...], + monitor_type: tuple[str, ...], + output_json: bool, +): + """List monitors (newest first). Examples: parallel-cli monitor list parallel-cli monitor list --limit 10 --json + + parallel-cli monitor list --status active --status cancelled """ try: - result = list_monitors(limit=limit, source="cli") + result = list_monitors( + cursor=cursor, + limit=limit, + status=list(status) if status else None, + type=list(monitor_type) if monitor_type else None, + source="cli", + ) + monitors = result.get("monitors", []) if isinstance(result, dict) else [] if output_json: print(json.dumps(result, indent=2, default=str)) else: - if not result: + if not monitors: console.print("[yellow]No monitors found.[/yellow]") return from rich.table import Table - table = Table(title=f"Monitors ({len(result)})") + table = Table(title=f"Monitors ({len(monitors)})") table.add_column("ID", style="cyan", no_wrap=True) - table.add_column("Query", max_width=50) - table.add_column("Cadence", style="green") + table.add_column("Type", style="magenta") + table.add_column("Query / Task Run", max_width=50) + table.add_column("Frequency", style="green") table.add_column("Status", style="yellow") - for m in result: + for m in monitors: + settings = m.get("settings", {}) or {} + tracked = settings.get("query") or settings.get("task_run_id") or "" table.add_row( m.get("monitor_id", ""), - (m.get("query", "") or "")[:50], - m.get("cadence", ""), + m.get("type", ""), + str(tracked)[:50], + m.get("frequency", ""), m.get("status", ""), ) console.print(table) + if next_cursor := result.get("next_cursor"): + console.print(f"[dim]Next cursor: {next_cursor}[/dim]") except Exception as e: _handle_error(e, output_json=output_json) @@ -3020,14 +3105,24 @@ def monitor_get(monitor_id: str, output_json: bool): if output_json: print(json.dumps(result, indent=2, default=str)) else: - console.print(f"[bold]Monitor:[/bold] {result.get('monitor_id', monitor_id)}") - console.print(f"[bold]Query:[/bold] {result.get('query', '')}") - console.print(f"[bold]Cadence:[/bold] {result.get('cadence', '')}") - console.print(f"[bold]Status:[/bold] {result.get('status', '')}") - if result.get("webhook"): - console.print(f"[bold]Webhook:[/bold] {result['webhook']}") - if result.get("created_at"): - console.print(f"[bold]Created:[/bold] {result['created_at']}") + settings = result.get("settings", {}) or {} + console.print(f"[bold]Monitor:[/bold] {result.get('monitor_id', monitor_id)}") + console.print(f"[bold]Type:[/bold] {result.get('type', '')}") + console.print(f"[bold]Frequency:[/bold] {result.get('frequency', '')}") + console.print(f"[bold]Status:[/bold] {result.get('status', '')}") + console.print(f"[bold]Processor:[/bold] {result.get('processor', '')}") + if query := settings.get("query"): + console.print(f"[bold]Query:[/bold] {query}") + if task_run_id := settings.get("task_run_id"): + console.print(f"[bold]Task run:[/bold] {task_run_id}") + if webhook := result.get("webhook"): + console.print( + f"[bold]Webhook:[/bold] {webhook.get('url') if isinstance(webhook, dict) else webhook}" + ) + if created_at := result.get("created_at"): + console.print(f"[bold]Created:[/bold] {created_at}") + if last_run := result.get("last_run_at"): + console.print(f"[bold]Last run:[/bold] {last_run}") except Exception as e: _handle_error(e, output_json=output_json) @@ -3035,32 +3130,43 @@ def monitor_get(monitor_id: str, output_json: bool): @monitor.command(name="update") @click.argument("monitor_id") -@click.option("--query", "-q", help="Updated query text") -@click.option("--cadence", "-c", type=click.Choice(list(MONITOR_CADENCES.keys())), help="Updated cadence") +@click.option( + "--frequency", + "-f", + help="Updated frequency (e.g. 1h, 6h, 1d, 2w; or aliases hourly/daily/weekly/every_two_weeks).", +) @click.option("--webhook", help="Updated webhook URL") @click.option("--metadata", "metadata_json", help="Updated metadata as JSON string") +@click.option( + "--advanced-settings", + "advanced_settings_json", + help="event_stream advanced_settings as JSON string (sets type=event_stream automatically).", +) @click.option("--json", "output_json", is_flag=True, help="Output JSON to stdout") def monitor_update( monitor_id: str, - query: str | None, - cadence: str | None, + frequency: str | None, webhook: str | None, metadata_json: str | None, + advanced_settings_json: str | None, output_json: bool, ): """Update an existing monitor. - MONITOR_ID is the monitor identifier. + MONITOR_ID is the monitor identifier. Note: query and task_run_id are + immutable — create a new monitor to change them. Examples: - parallel-cli monitor update mon_abc --cadence hourly + parallel-cli monitor update mon_abc --frequency 1h - parallel-cli monitor update mon_abc --query "Updated tracking query" + parallel-cli monitor update mon_abc --webhook https://example.com/hook """ - if not any([query, cadence, webhook, metadata_json]): + if not any([frequency, webhook, metadata_json, advanced_settings_json]): _handle_error( - click.UsageError("Provide at least one field to update (--query, --cadence, --webhook, --metadata)"), + click.UsageError( + "Provide at least one field to update (--frequency, --webhook, --metadata, --advanced-settings)" + ), output_json=output_json, exit_code=EXIT_BAD_INPUT, ) @@ -3068,17 +3174,18 @@ def monitor_update( try: metadata = json.loads(metadata_json) if metadata_json else None + advanced_settings = json.loads(advanced_settings_json) if advanced_settings_json else None except json.JSONDecodeError as e: _handle_error(e, output_json=output_json, exit_code=EXIT_BAD_INPUT, prefix="Invalid JSON") return try: result = update_monitor( - monitor_id=monitor_id, - query=query, - cadence=cadence, + monitor_id, + frequency=frequency, webhook=webhook, metadata=metadata, + advanced_settings=advanced_settings, source="cli", ) @@ -3086,30 +3193,31 @@ def monitor_update( print(json.dumps(result, indent=2, default=str)) else: console.print(f"[bold green]Monitor updated: {monitor_id}[/bold green]") - if query: - console.print(f"[dim]Query: {query}[/dim]") - if cadence: - console.print(f"[dim]Cadence: {cadence}[/dim]") + if frequency: + console.print(f"[dim]Frequency: {frequency}[/dim]") + if webhook: + console.print(f"[dim]Webhook: {webhook}[/dim]") except Exception as e: _handle_error(e, output_json=output_json) -@monitor.command(name="delete") +@monitor.command(name="cancel") @click.argument("monitor_id") @click.option("--json", "output_json", is_flag=True, help="Output as JSON") -def monitor_delete(monitor_id: str, output_json: bool): - """Delete a monitor. +def monitor_cancel(monitor_id: str, output_json: bool): + """Cancel a monitor (irreversible). - MONITOR_ID is the monitor identifier. + MONITOR_ID is the monitor identifier. Cancellation permanently stops the + monitor from running. Create a new monitor to resume monitoring. """ try: - result = delete_monitor(monitor_id, source="cli") + result = cancel_monitor(monitor_id, source="cli") if output_json: print(json.dumps(result, indent=2, default=str)) else: - console.print(f"[bold green]Deleted:[/bold green] {monitor_id}") + console.print(f"[bold green]Cancelled:[/bold green] {monitor_id}") except Exception as e: _handle_error(e, output_json=output_json) @@ -3117,13 +3225,26 @@ def monitor_delete(monitor_id: str, output_json: bool): @monitor.command(name="events") @click.argument("monitor_id") +@click.option("--cursor", help="Pagination token from a previous response") +@click.option("--event-group-id", help="Restrict results to a single execution.") @click.option( - "--lookback", default="10d", show_default=True, help="Lookback period using d (days) or w (weeks), e.g., 10d, 1w" + "--include-completions", + is_flag=True, + help="Include no-change completion events for audit history.", ) +@click.option("--limit", type=int, help="Maximum number of events to return (1-100, default 20)") @click.option("-o", "--output", "output_file", type=click.Path(), help="Save results to JSON file") @click.option("--json", "output_json", is_flag=True, help="Output JSON to stdout") -def monitor_events(monitor_id: str, lookback: str, output_file: str | None, output_json: bool): - """List events for a monitor. +def monitor_events( + monitor_id: str, + cursor: str | None, + event_group_id: str | None, + include_completions: bool, + limit: int | None, + output_file: str | None, + output_json: bool, +): + """List events for a monitor (newest first). MONITOR_ID is the monitor identifier. @@ -3131,17 +3252,26 @@ def monitor_events(monitor_id: str, lookback: str, output_file: str | None, outp parallel-cli monitor events mon_abc - parallel-cli monitor events mon_abc --lookback 3d --json + parallel-cli monitor events mon_abc --include-completions --limit 50 + + parallel-cli monitor events mon_abc --event-group-id egrp_xyz """ try: - result = list_monitor_events(monitor_id, lookback_period=lookback, source="cli") + result = list_monitor_events( + monitor_id, + cursor=cursor, + event_group_id=event_group_id, + include_completions=include_completions, + limit=limit, + source="cli", + ) write_json_output(result, output_file, output_json) if not output_json: events = result.get("events", []) if not events: - console.print(f"[yellow]No events found for {monitor_id} in the last {lookback}.[/yellow]") + console.print(f"[yellow]No events found for {monitor_id}.[/yellow]") return from rich.table import Table @@ -3153,19 +3283,31 @@ def monitor_events(monitor_id: str, lookback: str, output_file: str | None, outp table.add_column("Summary", max_width=50) for ev in events: - ev_type = ev.get("type", "") - if ev_type == "event": - ev_id = ev.get("event_group_id", "") - date = ev.get("event_date", "") - summary = (ev.get("output", "") or "")[:50] + ev_type = ev.get("event_type", "") or ev.get("type", "") + if ev_type == "event_stream": + ev_id = ev.get("event_group_id", "") or ev.get("event_id", "") + date = ev.get("event_date", "") or "" + output = ev.get("output", {}) or {} + if isinstance(output, dict): + summary = (output.get("content", "") or str(output))[:50] + else: + summary = str(output)[:50] + elif ev_type == "snapshot": + ev_id = ev.get("event_group_id", "") or ev.get("event_id", "") + date = ev.get("event_date", "") or "" + changed = ev.get("changed_output", {}) or {} + if isinstance(changed, dict): + summary = (changed.get("content", "") or "fields changed")[:50] + else: + summary = "fields changed" elif ev_type == "completion": - ev_id = ev.get("monitor_ts", "") - date = "" - summary = "Run completed" + ev_id = "" + date = ev.get("timestamp", "") or "" + summary = "Run completed (no change)" elif ev_type == "error": - ev_id = ev.get("id", "") - date = ev.get("date", "") - summary = (ev.get("error", "") or "")[:50] + ev_id = "" + date = ev.get("timestamp", "") or "" + summary = (ev.get("error_message", "") or "")[:50] else: ev_id = "" date = "" @@ -3173,77 +3315,35 @@ def monitor_events(monitor_id: str, lookback: str, output_file: str | None, outp table.add_row(ev_type, ev_id, date, summary) console.print(table) + if next_cursor := result.get("next_cursor"): + console.print(f"[dim]Next cursor: {next_cursor}[/dim]") except Exception as e: _handle_error(e, output_json=output_json) -@monitor.command(name="event-group") +@monitor.command(name="trigger") @click.argument("monitor_id") -@click.argument("event_group_id") -@click.option("-o", "--output", "output_file", type=click.Path(), help="Save result to JSON file") -@click.option("--json", "output_json", is_flag=True, help="Output JSON to stdout") -def monitor_event_group(monitor_id: str, event_group_id: str, output_file: str | None, output_json: bool): - """Get details of an event group. - - MONITOR_ID is the monitor identifier. - EVENT_GROUP_ID is the event group identifier. - """ - try: - result = get_monitor_event_group(monitor_id, event_group_id, source="cli") - - write_json_output(result, output_file, output_json) - - if not output_json: - console.print(f"[bold]Monitor:[/bold] {monitor_id}") - console.print(f"[bold]Event Group:[/bold] {event_group_id}") - events = result.get("events", []) - if events: - console.print(f"\n[bold]Events ({len(events)}):[/bold]") - for ev in events: - date = ev.get("event_date", "") - output = ev.get("output", "") - urls = ev.get("source_urls", []) - console.print(f" [dim]{date}[/dim] {output}") - for u in urls: - console.print(f" [cyan]{u}[/cyan]") - else: - console.print("[yellow]No events in this group.[/yellow]") - - except Exception as e: - _handle_error(e, output_json=output_json) - - -@monitor.command(name="simulate") -@click.argument("monitor_id") -@click.option( - "--event-type", - type=click.Choice(MONITOR_EVENT_TYPES), - help="Event type to simulate (default: monitor.event.detected)", -) @click.option("--json", "output_json", is_flag=True, help="Output as JSON") -def monitor_simulate(monitor_id: str, event_type: str | None, output_json: bool): - """Simulate an event for webhook testing. +def monitor_trigger(monitor_id: str, output_json: bool): + """Trigger an immediate one-off run of a monitor. - Requires a webhook to be configured on the monitor. + The monitor's regular schedule is unaffected. An event is only emitted if a + material change is detected. Cancelled monitors cannot be triggered. MONITOR_ID is the monitor identifier. Examples: - parallel-cli monitor simulate mon_abc - - parallel-cli monitor simulate mon_abc --event-type monitor.execution.completed + parallel-cli monitor trigger mon_abc """ try: - simulate_monitor_event(monitor_id, event_type=event_type, source="cli") + trigger_monitor(monitor_id, source="cli") if output_json: - print(json.dumps({"monitor_id": monitor_id, "simulated": True}, indent=2)) + print(json.dumps({"monitor_id": monitor_id, "triggered": True}, indent=2)) else: - console.print(f"[bold green]Event simulated for:[/bold green] {monitor_id}") - if event_type: - console.print(f"[dim]Event type: {event_type}[/dim]") + console.print(f"[bold green]Triggered:[/bold green] {monitor_id}") except Exception as e: _handle_error(e, output_json=output_json) diff --git a/parallel_web_tools/core/__init__.py b/parallel_web_tools/core/__init__.py index 69c4d23..0c11d39 100644 --- a/parallel_web_tools/core/__init__.py +++ b/parallel_web_tools/core/__init__.py @@ -36,15 +36,18 @@ run_findall, ) from parallel_web_tools.core.monitor import ( - MONITOR_CADENCES, MONITOR_EVENT_TYPES, + MONITOR_FREQUENCY_PRESETS, + MONITOR_PROCESSORS, + MONITOR_STATUSES, + MONITOR_TYPES, + cancel_monitor, create_monitor, - delete_monitor, get_monitor, - get_monitor_event_group, list_monitor_events, list_monitors, - simulate_monitor_event, + resolve_frequency, + trigger_monitor, update_monitor, ) from parallel_web_tools.core.research import ( @@ -143,15 +146,18 @@ "poll_findall", "run_findall", # Monitor - "MONITOR_CADENCES", "MONITOR_EVENT_TYPES", + "MONITOR_FREQUENCY_PRESETS", + "MONITOR_PROCESSORS", + "MONITOR_STATUSES", + "MONITOR_TYPES", + "cancel_monitor", "create_monitor", - "delete_monitor", "get_monitor", - "get_monitor_event_group", "list_monitor_events", "list_monitors", - "simulate_monitor_event", + "resolve_frequency", + "trigger_monitor", "update_monitor", # Result "EnrichmentResult", diff --git a/parallel_web_tools/core/batch.py b/parallel_web_tools/core/batch.py index bd1d569..ad51e66 100644 --- a/parallel_web_tools/core/batch.py +++ b/parallel_web_tools/core/batch.py @@ -117,8 +117,7 @@ def enrich_batch( Returns: List of result dictionaries in same order as inputs. """ - from parallel.types import JsonSchemaParam, TaskSpecParam - from parallel.types.beta import BetaRunInputParam + from parallel.types import JsonSchemaParam, RunInputParam, TaskSpecParam if not inputs: return [] @@ -129,18 +128,18 @@ def enrich_batch( task_spec = TaskSpecParam(output_schema=JsonSchemaParam(type="json", json_schema=output_schema)) # Create task group - task_group = client.beta.task_group.create() + task_group = client.task_group.create() taskgroup_id = task_group.task_group_id # Add runs - use SDK type for proper typing - def _make_run_input(inp: dict[str, Any]) -> BetaRunInputParam: - entry: BetaRunInputParam = {"input": inp, "processor": processor} + def _make_run_input(inp: dict[str, Any]) -> RunInputParam: + entry: RunInputParam = {"input": inp, "processor": processor} if previous_interaction_id: entry["previous_interaction_id"] = previous_interaction_id return entry - run_inputs: list[BetaRunInputParam] = [_make_run_input(inp) for inp in inputs] - response = client.beta.task_group.add_runs( + run_inputs: list[RunInputParam] = [_make_run_input(inp) for inp in inputs] + response = client.task_group.add_runs( taskgroup_id, default_task_spec=task_spec, inputs=run_inputs, @@ -154,7 +153,7 @@ def _make_run_input(inp: dict[str, Any]) -> BetaRunInputParam: time.sleep(3) start_time = time.time() while time.time() - start_time < timeout: - status = client.beta.task_group.retrieve(taskgroup_id) + status = client.task_group.retrieve(taskgroup_id) status_counts = status.status.task_run_status_counts or {} completed = status_counts.get("completed", 0) failed = status_counts.get("failed", 0) @@ -166,7 +165,7 @@ def _make_run_input(inp: dict[str, Any]) -> BetaRunInputParam: # Collect results results_by_id: dict[str, dict[str, Any]] = {} - runs_stream = client.beta.task_group.get_runs(taskgroup_id, include_input=True, include_output=True) + runs_stream = client.task_group.get_runs(taskgroup_id, include_input=True, include_output=True) for event in runs_stream: if event.type == "task_run.state": @@ -232,8 +231,7 @@ def create_task_group( Returns: Dict with taskgroup_id, url, and num_runs. """ - from parallel.types import JsonSchemaParam, TaskSpecParam - from parallel.types.beta import BetaRunInputParam + from parallel.types import JsonSchemaParam, RunInputParam, TaskSpecParam logger = logging.getLogger(__name__) @@ -246,13 +244,13 @@ def create_task_group( ) # Create task group - task_group = client.beta.task_group.create() + task_group = client.task_group.create() taskgroup_id = task_group.task_group_id logger.info(f"Created taskgroup id {taskgroup_id}") # Build run input helper - def _make_run_input(row: dict[str, Any]) -> BetaRunInputParam: - entry: BetaRunInputParam = {"input": row, "processor": processor} + def _make_run_input(row: dict[str, Any]) -> RunInputParam: + entry: RunInputParam = {"input": row, "processor": processor} if previous_interaction_id: entry["previous_interaction_id"] = previous_interaction_id return entry @@ -262,8 +260,8 @@ def _make_run_input(row: dict[str, Any]) -> BetaRunInputParam: total_created = 0 for i in range(0, len(input_data), batch_size): batch = input_data[i : i + batch_size] - run_inputs: list[BetaRunInputParam] = [_make_run_input(row) for row in batch] - response = client.beta.task_group.add_runs( + run_inputs: list[RunInputParam] = [_make_run_input(row) for row in batch] + response = client.task_group.add_runs( taskgroup_id, default_task_spec=task_spec, inputs=run_inputs, @@ -294,7 +292,7 @@ def get_task_group_status( Dict with taskgroup_id, status_counts, is_active, num_runs, url. """ client = create_client(api_key, source) - status = client.beta.task_group.retrieve(taskgroup_id) + status = client.task_group.retrieve(taskgroup_id) status_counts = dict(status.status.task_run_status_counts or {}) return { @@ -336,7 +334,7 @@ def poll_task_group( deadline = time.time() + timeout while time.time() < deadline: - status = client.beta.task_group.retrieve(taskgroup_id) + status = client.task_group.retrieve(taskgroup_id) status_counts = status.status.task_run_status_counts or {} completed = status_counts.get("completed", 0) failed = status_counts.get("failed", 0) @@ -355,7 +353,7 @@ def poll_task_group( # Collect results results = [] - runs_stream = client.beta.task_group.get_runs(taskgroup_id, include_input=True, include_output=True) + runs_stream = client.task_group.get_runs(taskgroup_id, include_input=True, include_output=True) for event in runs_stream: if event.type == "task_run.state": @@ -405,7 +403,7 @@ def run_tasks( client = create_client(source=source) poll_start = time.time() while time.time() - poll_start < timeout: - status = client.beta.task_group.retrieve(taskgroup_id) + status = client.task_group.retrieve(taskgroup_id) status_counts = status.status.task_run_status_counts or {} logger.info(f"Status: {status_counts}") @@ -419,7 +417,7 @@ def run_tasks( # Get results using SDK's streaming (handles SSE properly) results = [] - runs_stream = client.beta.task_group.get_runs(taskgroup_id, include_input=True, include_output=True) + runs_stream = client.task_group.get_runs(taskgroup_id, include_input=True, include_output=True) for event in runs_stream: if event.type == "task_run.state" and event.output: diff --git a/parallel_web_tools/core/monitor.py b/parallel_web_tools/core/monitor.py index 58a6481..17ebe76 100644 --- a/parallel_web_tools/core/monitor.py +++ b/parallel_web_tools/core/monitor.py @@ -1,151 +1,167 @@ """Monitor: continuously track the web for changes using the Parallel Monitor API. -Monitors let you define natural-language queries that run on a schedule (cadence). -When changes are detected, events are generated and optionally delivered via webhook. +Monitors run on a fixed frequency and emit events when material changes are +detected. Two monitor types are supported: -This module uses httpx directly since the SDK does not yet have high-level -convenience methods for Monitor endpoints. +- ``event_stream`` (default): tracks a natural-language search query. +- ``snapshot``: tracks the output of a specific Task Run. -The typical workflow is: - 1. Create a monitor with a query and cadence - 2. Optionally configure a webhook for real-time notifications - 3. List events to see detected changes - 4. Update or delete monitors as needed +Results can be polled via the events endpoint or delivered via webhooks. This +module wraps the ``client.monitor.*`` SDK resource and returns plain dicts so +the CLI and other callers don't have to deal with pydantic models. """ from __future__ import annotations from typing import Any -import httpx +from parallel_web_tools.core.auth import create_client +from parallel_web_tools.core.user_agent import ClientSource -from parallel_web_tools.core.auth import resolve_api_key -from parallel_web_tools.core.user_agent import ClientSource, get_default_headers - -BASE_URL = "https://api.parallel.ai" - -# Supported cadences for monitor scheduling -MONITOR_CADENCES = { - "hourly": "Every hour", - "daily": "Once per day", - "weekly": "Once per week", - "every_two_weeks": "Every two weeks", +# Friendly aliases for SDK frequency strings. +# The SDK accepts "" with unit in {h, d, w}, range 1h-30d (inclusive). +MONITOR_FREQUENCY_PRESETS: dict[str, str] = { + "hourly": "1h", + "daily": "1d", + "weekly": "1w", + "every_two_weeks": "2w", } -# Valid webhook event types -MONITOR_EVENT_TYPES = [ +MONITOR_TYPES: tuple[str, ...] = ("event_stream", "snapshot") +MONITOR_PROCESSORS: tuple[str, ...] = ("lite", "base") +MONITOR_STATUSES: tuple[str, ...] = ("active", "cancelled") + +# Webhook event types accepted by the API. +MONITOR_EVENT_TYPES: list[str] = [ "monitor.event.detected", "monitor.execution.completed", "monitor.execution.failed", ] -def _request( - method: str, - path: str, - api_key: str | None = None, - source: ClientSource = "python", - json: Any | None = None, - params: dict[str, Any] | None = None, -) -> httpx.Response: - """Send an authenticated request to the Monitor API. - - Args: - method: HTTP method (GET, POST, DELETE). - path: API path (e.g., "/v1alpha/monitors"). - api_key: Optional API key override. - source: Client source identifier for User-Agent. - json: Optional JSON body. - params: Optional query parameters. - - Returns: - The httpx Response object. +def resolve_frequency(value: str) -> str: + """Translate a friendly preset (e.g. "daily") to an SDK frequency string ("1d").""" + return MONITOR_FREQUENCY_PRESETS.get(value, value) - Raises: - httpx.HTTPStatusError: If the response indicates an error. - """ - key = resolve_api_key(api_key) - headers = { - **get_default_headers(source), - "x-api-key": key, - "Content-Type": "application/json", - } - url = f"{BASE_URL}{path}" - response = httpx.request(method, url, headers=headers, json=json, params=params, timeout=30) - response.raise_for_status() - return response +def _build_webhook(url: str, event_types: list[str] | None = None) -> dict[str, Any]: + return {"url": url, "event_types": event_types or ["monitor.event.detected"]} -def _build_webhook(webhook_url: str, event_types: list[str] | None = None) -> dict[str, Any]: - """Build a webhook config object from a URL.""" - return { - "url": webhook_url, - "event_types": event_types or ["monitor.event.detected"], - } +def _to_dict(model: Any) -> dict[str, Any]: + """Convert an SDK pydantic response to a JSON-safe dict.""" + if model is None: + return {} + if hasattr(model, "model_dump"): + return model.model_dump(mode="json") + if isinstance(model, dict): + return model + return dict(model) def create_monitor( - query: str, - cadence: str, + query: str | None = None, + frequency: str = "1d", + *, + type: str = "event_stream", + task_run_id: str | None = None, webhook: str | None = None, - metadata: dict[str, Any] | None = None, + metadata: dict[str, str] | None = None, output_schema: dict[str, Any] | None = None, + include_backfill: bool | None = None, + processor: str | None = None, api_key: str | None = None, source: ClientSource = "python", ) -> dict[str, Any]: """Create a new monitor. Args: - query: Natural language query describing what to track. - cadence: How often to check (e.g., "daily", "hourly"). + query: Search query to monitor (required for ``type="event_stream"``). + frequency: How often to run the monitor. Format ```` where unit is + ``h``, ``d``, or ``w`` (e.g. ``1d``, ``12h``, ``2w``). Friendly aliases + like ``"daily"`` and ``"hourly"`` are also accepted. + type: ``event_stream`` to track a search query, or ``snapshot`` to track a + specific Task Run output. + task_run_id: Task Run whose output should be tracked (required for + ``type="snapshot"``). webhook: Optional webhook URL for event delivery. - metadata: Optional metadata dict. - output_schema: Optional JSON schema for structured output. + metadata: Optional metadata dict (max 16 keys, max 512 chars per value). + output_schema: Optional JSON schema for structured output (event_stream only). + include_backfill: For event_stream monitors, include a sample of historical + events on the first run. + processor: ``lite`` (default, fast/cheap) or ``base`` (more thorough). api_key: Optional API key override. source: Client source identifier for User-Agent. Returns: - Dict with monitor details including monitor_id. + Dict representation of the created Monitor. """ - body: dict[str, Any] = {"query": query, "cadence": cadence} + client = create_client(api_key, source) + + settings: dict[str, Any] + if type == "event_stream": + if not query: + raise ValueError("query is required when type='event_stream'") + settings = {"query": query} + if include_backfill is not None: + settings["include_backfill"] = include_backfill + if output_schema is not None: + settings["output_schema"] = {"type": "json", "json_schema": output_schema} + elif type == "snapshot": + if not task_run_id: + raise ValueError("task_run_id is required when type='snapshot'") + settings = {"task_run_id": task_run_id} + else: + raise ValueError(f"Unsupported monitor type: {type!r}") + + kwargs: dict[str, Any] = { + "frequency": resolve_frequency(frequency), + "settings": settings, + "type": type, + } if webhook is not None: - body["webhook"] = _build_webhook(webhook) + kwargs["webhook"] = _build_webhook(webhook) if metadata is not None: - body["metadata"] = metadata - if output_schema is not None: - body["output_schema"] = output_schema + kwargs["metadata"] = metadata + if processor is not None: + kwargs["processor"] = processor - resp = _request("POST", "/v1alpha/monitors", api_key=api_key, source=source, json=body) - return resp.json() + return _to_dict(client.monitor.create(**kwargs)) def list_monitors( - monitor_id: str | None = None, + cursor: str | None = None, limit: int | None = None, + *, + status: list[str] | None = None, + type: list[str] | None = None, api_key: str | None = None, source: ClientSource = "python", -) -> list[dict[str, Any]]: - """List monitors with optional pagination. +) -> dict[str, Any]: + """List monitors with cursor-based pagination. Args: - monitor_id: Cursor for pagination (start after this monitor). - limit: Maximum number of monitors to return. + cursor: Pagination token from ``next_cursor`` of a previous response. + limit: Maximum number of monitors to return (1-10000, default 100). + status: Filter by status. Defaults to ``["active"]`` server-side. + type: Filter by monitor type. api_key: Optional API key override. source: Client source identifier for User-Agent. Returns: - List of monitor dicts. + Dict with ``monitors`` (list) and optional ``next_cursor``. """ - params: dict[str, Any] = {} - if monitor_id is not None: - params["monitor_id"] = monitor_id + client = create_client(api_key, source) + kwargs: dict[str, Any] = {} + if cursor is not None: + kwargs["cursor"] = cursor if limit is not None: - params["limit"] = limit - - resp = _request("GET", "/v1alpha/monitors", api_key=api_key, source=source, params=params) - data = resp.json() - return data.get("monitors", data) if isinstance(data, dict) else data + kwargs["limit"] = limit + if status is not None: + kwargs["status"] = status + if type is not None: + kwargs["type"] = type + return _to_dict(client.monitor.list(**kwargs)) def get_monitor( @@ -153,152 +169,121 @@ def get_monitor( api_key: str | None = None, source: ClientSource = "python", ) -> dict[str, Any]: - """Retrieve a single monitor by ID. - - Args: - monitor_id: The monitor ID. - api_key: Optional API key override. - source: Client source identifier for User-Agent. - - Returns: - Dict with monitor details. - """ - resp = _request("GET", f"/v1alpha/monitors/{monitor_id}", api_key=api_key, source=source) - return resp.json() + """Retrieve a monitor by ID.""" + client = create_client(api_key, source) + return _to_dict(client.monitor.retrieve(monitor_id)) def update_monitor( monitor_id: str, - query: str | None = None, - cadence: str | None = None, + *, + frequency: str | None = None, + metadata: dict[str, str] | None = None, + type: str | None = None, webhook: str | None = None, - metadata: dict[str, Any] | None = None, + advanced_settings: dict[str, Any] | None = None, api_key: str | None = None, source: ClientSource = "python", ) -> dict[str, Any]: """Update an existing monitor. + Only ``frequency``, ``metadata``, ``webhook``, and (event_stream-only) + ``advanced_settings`` can be modified after creation. Query / task_run_id + are immutable — create a new monitor to change them. + Args: - monitor_id: The monitor ID. - query: Updated query text. - cadence: Updated cadence. - webhook: Updated webhook URL. - metadata: Updated metadata dict. + monitor_id: The monitor to update. + frequency: New frequency (e.g. ``"6h"``, ``"1w"``); aliases accepted. + metadata: Replacement metadata dict. + type: Required when ``advanced_settings`` is provided; must be ``event_stream``. + webhook: Replacement webhook URL. + advanced_settings: Advanced configuration overrides for event_stream monitors. api_key: Optional API key override. source: Client source identifier for User-Agent. Returns: - Dict with updated monitor details. + Dict representation of the updated Monitor. """ - body: dict[str, Any] = {} - if query is not None: - body["query"] = query - if cadence is not None: - body["cadence"] = cadence - if webhook is not None: - body["webhook"] = _build_webhook(webhook) + client = create_client(api_key, source) + kwargs: dict[str, Any] = {} + if frequency is not None: + kwargs["frequency"] = resolve_frequency(frequency) if metadata is not None: - body["metadata"] = metadata + kwargs["metadata"] = metadata + if webhook is not None: + kwargs["webhook"] = _build_webhook(webhook) + if advanced_settings is not None: + kwargs["settings"] = {"advanced_settings": advanced_settings} + if type is None: + type = "event_stream" + if type is not None: + kwargs["type"] = type + + if not kwargs: + raise ValueError("At least one field must be provided to update_monitor") - resp = _request("POST", f"/v1alpha/monitors/{monitor_id}", api_key=api_key, source=source, json=body) - return resp.json() + return _to_dict(client.monitor.update(monitor_id, **kwargs)) -def delete_monitor( +def cancel_monitor( monitor_id: str, api_key: str | None = None, source: ClientSource = "python", ) -> dict[str, Any]: - """Delete a monitor. - - Args: - monitor_id: The monitor ID. - api_key: Optional API key override. - source: Client source identifier for User-Agent. - - Returns: - Dict with deletion confirmation. - """ - resp = _request("DELETE", f"/v1alpha/monitors/{monitor_id}", api_key=api_key, source=source) - if resp.status_code == 204 or not resp.content: - return {"monitor_id": monitor_id, "deleted": True} - return resp.json() + """Cancel a monitor (irreversible). Replaces ``delete_monitor`` from the alpha API.""" + client = create_client(api_key, source) + return _to_dict(client.monitor.cancel(monitor_id)) def list_monitor_events( monitor_id: str, - lookback_period: str = "10d", + *, + cursor: str | None = None, + event_group_id: str | None = None, + include_completions: bool = False, + limit: int | None = None, api_key: str | None = None, source: ClientSource = "python", ) -> dict[str, Any]: - """List events for a monitor. + """List events for a monitor, newest first. - Args: - monitor_id: The monitor ID. - lookback_period: How far back to look (e.g., "10d", "1w"). Minimum 1d. - api_key: Optional API key override. - source: Client source identifier for User-Agent. - - Returns: - Dict with events list. - """ - params = {"lookback_period": lookback_period} - resp = _request("GET", f"/v1alpha/monitors/{monitor_id}/events", api_key=api_key, source=source, params=params) - return resp.json() - - -def get_monitor_event_group( - monitor_id: str, - event_group_id: str, - api_key: str | None = None, - source: ClientSource = "python", -) -> dict[str, Any]: - """Retrieve a specific event group. + Pass ``event_group_id`` to narrow results to a single execution. Pagination + parameters are ignored when ``event_group_id`` is set. Args: - monitor_id: The monitor ID. - event_group_id: The event group ID. + monitor_id: The monitor whose events to fetch. + cursor: Pagination token from a previous response. + event_group_id: Restrict results to a single execution. + include_completions: Include no-change completion events (audit history). + limit: Maximum number of events to return (1-100, default 20). api_key: Optional API key override. source: Client source identifier for User-Agent. Returns: - Dict with event group details. + Dict with ``events`` (list), optional ``next_cursor``, and optional ``warnings``. """ - resp = _request( - "GET", - f"/v1alpha/monitors/{monitor_id}/event_groups/{event_group_id}", - api_key=api_key, - source=source, - ) - return resp.json() + client = create_client(api_key, source) + kwargs: dict[str, Any] = {} + if cursor is not None: + kwargs["cursor"] = cursor + if event_group_id is not None: + kwargs["event_group_id"] = event_group_id + if include_completions: + kwargs["include_completions"] = True + if limit is not None: + kwargs["limit"] = limit + return _to_dict(client.monitor.events(monitor_id, **kwargs)) -def simulate_monitor_event( +def trigger_monitor( monitor_id: str, - event_type: str | None = None, api_key: str | None = None, source: ClientSource = "python", ) -> None: - """Simulate an event for webhook testing. - - Requires a webhook to be configured on the monitor. + """Trigger an immediate one-off monitor run. - Args: - monitor_id: The monitor ID. - event_type: Optional event type to simulate. Defaults to - "monitor.event.detected". Valid values: monitor.event.detected, - monitor.execution.completed, monitor.execution.failed. - api_key: Optional API key override. - source: Client source identifier for User-Agent. + The monitor's regular schedule is unaffected. An event is only emitted if + a material change is detected. Replaces the alpha ``simulate_event`` flow. """ - params: dict[str, Any] = {} - if event_type is not None: - params["event_type"] = event_type - - _request( - "POST", - f"/v1alpha/monitors/{monitor_id}/simulate_event", - api_key=api_key, - source=source, - params=params if params else None, - ) + client = create_client(api_key, source) + client.monitor.trigger(monitor_id) diff --git a/parallel_web_tools/integrations/spark/streaming.py b/parallel_web_tools/integrations/spark/streaming.py index 244159d..38affe7 100644 --- a/parallel_web_tools/integrations/spark/streaming.py +++ b/parallel_web_tools/integrations/spark/streaming.py @@ -166,8 +166,7 @@ def enrich_streaming_batch( >>> >>> query = stream_df.writeStream.foreachBatch(process_batch).start() """ - from parallel.types import JsonSchemaParam, TaskSpecParam - from parallel.types.beta import BetaRunInputParam + from parallel.types import JsonSchemaParam, RunInputParam, TaskSpecParam # Collect batch data (this is safe in micro-batches, which are already small) rows = batch_df.collect() @@ -195,11 +194,11 @@ def enrich_streaming_batch( ) # Create task group - task_group = client.beta.task_group.create() + task_group = client.task_group.create() taskgroup_id = task_group.task_group_id # Build inputs from batch rows - run_inputs: list[BetaRunInputParam] = [] + run_inputs: list[RunInputParam] = [] for row in rows: # Convert Row to dict for reliable field access row_dict = row.asDict() @@ -210,7 +209,7 @@ def enrich_streaming_batch( if value is not None: input_data[parallel_name] = str(value) - run_input: BetaRunInputParam = { + run_input: RunInputParam = { "input": input_data, "processor": processor, } @@ -222,7 +221,7 @@ def enrich_streaming_batch( run_inputs.append(run_input) # Add all runs to the group - response = client.beta.task_group.add_runs( + response = client.task_group.add_runs( taskgroup_id, default_task_spec=task_spec, inputs=run_inputs, @@ -238,7 +237,7 @@ def enrich_streaming_batch( time.sleep(3) # Initial delay before first poll start_time = time.time() while time.time() - start_time < timeout: - status = client.beta.task_group.retrieve(taskgroup_id) + status = client.task_group.retrieve(taskgroup_id) status_counts = status.status.task_run_status_counts or {} completed = status_counts.get("completed", 0) failed = status_counts.get("failed", 0) @@ -250,7 +249,7 @@ def enrich_streaming_batch( # Collect results results_by_id: dict[str, str] = {} - runs_stream = client.beta.task_group.get_runs( + runs_stream = client.task_group.get_runs( taskgroup_id, include_input=True, include_output=True, diff --git a/pyproject.toml b/pyproject.toml index 8674deb..37c9bc1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ classifiers = [ ] dependencies = [ - "parallel-web>=0.4.2", + "parallel-web>=0.6.0", "python-dotenv>=1.0.0", # CLI dependencies (minimal - search, extract, enrich with CLI args) "click>=8.1.0", diff --git a/tests/test_enrichment.py b/tests/test_enrichment.py index 0c9e4a2..e6e3f84 100644 --- a/tests/test_enrichment.py +++ b/tests/test_enrichment.py @@ -283,17 +283,17 @@ def test_successful_enrichment(self): mock_client = mock.MagicMock() mock_task_group = mock.MagicMock() mock_task_group.task_group_id = "tgrp_123" - mock_client.beta.task_group.create.return_value = mock_task_group + mock_client.task_group.create.return_value = mock_task_group mock_add_response = mock.MagicMock() mock_add_response.run_ids = ["run_1", "run_2"] - mock_client.beta.task_group.add_runs.return_value = mock_add_response + mock_client.task_group.add_runs.return_value = mock_add_response mock_status = mock.MagicMock() mock_status.status.task_run_status_counts = {"completed": 2, "failed": 0} mock_status.status.num_task_runs = 2 mock_status.status.is_active = False - mock_client.beta.task_group.retrieve.return_value = mock_status + mock_client.task_group.retrieve.return_value = mock_status # Mock the stream of events event_1 = SimpleNamespace( @@ -312,7 +312,7 @@ def test_successful_enrichment(self): basis=[], ), ) - mock_client.beta.task_group.get_runs.return_value = [event_1, event_2] + mock_client.task_group.get_runs.return_value = [event_1, event_2] with mock.patch("parallel_web_tools.core.batch.create_client", return_value=mock_client): with mock.patch("parallel_web_tools.core.batch.time.sleep"): # Skip sleeps @@ -334,17 +334,17 @@ def test_content_as_string_json(self): mock_client = mock.MagicMock() mock_task_group = mock.MagicMock() mock_task_group.task_group_id = "tgrp_123" - mock_client.beta.task_group.create.return_value = mock_task_group + mock_client.task_group.create.return_value = mock_task_group mock_add_response = mock.MagicMock() mock_add_response.run_ids = ["run_1"] - mock_client.beta.task_group.add_runs.return_value = mock_add_response + mock_client.task_group.add_runs.return_value = mock_add_response mock_status = mock.MagicMock() mock_status.status.task_run_status_counts = {"completed": 1} mock_status.status.num_task_runs = 1 mock_status.status.is_active = False - mock_client.beta.task_group.retrieve.return_value = mock_status + mock_client.task_group.retrieve.return_value = mock_status # Content as JSON string event = SimpleNamespace( @@ -355,7 +355,7 @@ def test_content_as_string_json(self): basis=[], ), ) - mock_client.beta.task_group.get_runs.return_value = [event] + mock_client.task_group.get_runs.return_value = [event] with mock.patch("parallel_web_tools.core.batch.create_client", return_value=mock_client): with mock.patch("parallel_web_tools.core.batch.time.sleep"): @@ -372,17 +372,17 @@ def test_content_as_invalid_json_string(self): mock_client = mock.MagicMock() mock_task_group = mock.MagicMock() mock_task_group.task_group_id = "tgrp_123" - mock_client.beta.task_group.create.return_value = mock_task_group + mock_client.task_group.create.return_value = mock_task_group mock_add_response = mock.MagicMock() mock_add_response.run_ids = ["run_1"] - mock_client.beta.task_group.add_runs.return_value = mock_add_response + mock_client.task_group.add_runs.return_value = mock_add_response mock_status = mock.MagicMock() mock_status.status.task_run_status_counts = {"completed": 1} mock_status.status.num_task_runs = 1 mock_status.status.is_active = False - mock_client.beta.task_group.retrieve.return_value = mock_status + mock_client.task_group.retrieve.return_value = mock_status # Content as non-JSON string event = SimpleNamespace( @@ -393,7 +393,7 @@ def test_content_as_invalid_json_string(self): basis=[], ), ) - mock_client.beta.task_group.get_runs.return_value = [event] + mock_client.task_group.get_runs.return_value = [event] with mock.patch("parallel_web_tools.core.batch.create_client", return_value=mock_client): with mock.patch("parallel_web_tools.core.batch.time.sleep"): @@ -410,17 +410,17 @@ def test_content_as_other_type(self): mock_client = mock.MagicMock() mock_task_group = mock.MagicMock() mock_task_group.task_group_id = "tgrp_123" - mock_client.beta.task_group.create.return_value = mock_task_group + mock_client.task_group.create.return_value = mock_task_group mock_add_response = mock.MagicMock() mock_add_response.run_ids = ["run_1"] - mock_client.beta.task_group.add_runs.return_value = mock_add_response + mock_client.task_group.add_runs.return_value = mock_add_response mock_status = mock.MagicMock() mock_status.status.task_run_status_counts = {"completed": 1} mock_status.status.num_task_runs = 1 mock_status.status.is_active = False - mock_client.beta.task_group.retrieve.return_value = mock_status + mock_client.task_group.retrieve.return_value = mock_status # Content as number event = SimpleNamespace( @@ -431,7 +431,7 @@ def test_content_as_other_type(self): basis=[], ), ) - mock_client.beta.task_group.get_runs.return_value = [event] + mock_client.task_group.get_runs.return_value = [event] with mock.patch("parallel_web_tools.core.batch.create_client", return_value=mock_client): with mock.patch("parallel_web_tools.core.batch.time.sleep"): @@ -448,17 +448,17 @@ def test_include_basis_true(self): mock_client = mock.MagicMock() mock_task_group = mock.MagicMock() mock_task_group.task_group_id = "tgrp_123" - mock_client.beta.task_group.create.return_value = mock_task_group + mock_client.task_group.create.return_value = mock_task_group mock_add_response = mock.MagicMock() mock_add_response.run_ids = ["run_1"] - mock_client.beta.task_group.add_runs.return_value = mock_add_response + mock_client.task_group.add_runs.return_value = mock_add_response mock_status = mock.MagicMock() mock_status.status.task_run_status_counts = {"completed": 1} mock_status.status.num_task_runs = 1 mock_status.status.is_active = False - mock_client.beta.task_group.retrieve.return_value = mock_status + mock_client.task_group.retrieve.return_value = mock_status field_basis = SimpleNamespace(field="ceo_name", reasoning="test") event = SimpleNamespace( @@ -469,7 +469,7 @@ def test_include_basis_true(self): basis=[field_basis], ), ) - mock_client.beta.task_group.get_runs.return_value = [event] + mock_client.task_group.get_runs.return_value = [event] with mock.patch("parallel_web_tools.core.batch.create_client", return_value=mock_client): with mock.patch("parallel_web_tools.core.batch.time.sleep"): @@ -488,17 +488,17 @@ def test_include_basis_false(self): mock_client = mock.MagicMock() mock_task_group = mock.MagicMock() mock_task_group.task_group_id = "tgrp_123" - mock_client.beta.task_group.create.return_value = mock_task_group + mock_client.task_group.create.return_value = mock_task_group mock_add_response = mock.MagicMock() mock_add_response.run_ids = ["run_1"] - mock_client.beta.task_group.add_runs.return_value = mock_add_response + mock_client.task_group.add_runs.return_value = mock_add_response mock_status = mock.MagicMock() mock_status.status.task_run_status_counts = {"completed": 1} mock_status.status.num_task_runs = 1 mock_status.status.is_active = False - mock_client.beta.task_group.retrieve.return_value = mock_status + mock_client.task_group.retrieve.return_value = mock_status field_basis = SimpleNamespace(field="ceo_name", reasoning="test") event = SimpleNamespace( @@ -509,7 +509,7 @@ def test_include_basis_false(self): basis=[field_basis], ), ) - mock_client.beta.task_group.get_runs.return_value = [event] + mock_client.task_group.get_runs.return_value = [event] with mock.patch("parallel_web_tools.core.batch.create_client", return_value=mock_client): with mock.patch("parallel_web_tools.core.batch.time.sleep"): @@ -527,24 +527,24 @@ def test_run_error_handling(self): mock_client = mock.MagicMock() mock_task_group = mock.MagicMock() mock_task_group.task_group_id = "tgrp_123" - mock_client.beta.task_group.create.return_value = mock_task_group + mock_client.task_group.create.return_value = mock_task_group mock_add_response = mock.MagicMock() mock_add_response.run_ids = ["run_1"] - mock_client.beta.task_group.add_runs.return_value = mock_add_response + mock_client.task_group.add_runs.return_value = mock_add_response mock_status = mock.MagicMock() mock_status.status.task_run_status_counts = {"completed": 0, "failed": 1} mock_status.status.num_task_runs = 1 mock_status.status.is_active = False - mock_client.beta.task_group.retrieve.return_value = mock_status + mock_client.task_group.retrieve.return_value = mock_status event = SimpleNamespace( type="task_run.state", run=SimpleNamespace(run_id="run_1", error="API error occurred"), output=None, ) - mock_client.beta.task_group.get_runs.return_value = [event] + mock_client.task_group.get_runs.return_value = [event] with mock.patch("parallel_web_tools.core.batch.create_client", return_value=mock_client): with mock.patch("parallel_web_tools.core.batch.time.sleep"): @@ -562,17 +562,17 @@ def test_missing_result(self): mock_client = mock.MagicMock() mock_task_group = mock.MagicMock() mock_task_group.task_group_id = "tgrp_123" - mock_client.beta.task_group.create.return_value = mock_task_group + mock_client.task_group.create.return_value = mock_task_group mock_add_response = mock.MagicMock() mock_add_response.run_ids = ["run_1", "run_2"] - mock_client.beta.task_group.add_runs.return_value = mock_add_response + mock_client.task_group.add_runs.return_value = mock_add_response mock_status = mock.MagicMock() mock_status.status.task_run_status_counts = {"completed": 1} mock_status.status.num_task_runs = 2 mock_status.status.is_active = False - mock_client.beta.task_group.retrieve.return_value = mock_status + mock_client.task_group.retrieve.return_value = mock_status # Only return event for run_1, not run_2 event = SimpleNamespace( @@ -580,7 +580,7 @@ def test_missing_result(self): run=SimpleNamespace(run_id="run_1", error=None), output=SimpleNamespace(content={"ceo_name": "Test"}, basis=[]), ) - mock_client.beta.task_group.get_runs.return_value = [event] + mock_client.task_group.get_runs.return_value = [event] with mock.patch("parallel_web_tools.core.batch.create_client", return_value=mock_client): with mock.patch("parallel_web_tools.core.batch.time.sleep"): @@ -598,11 +598,11 @@ def test_no_run_ids_returned(self): mock_client = mock.MagicMock() mock_task_group = mock.MagicMock() mock_task_group.task_group_id = "tgrp_123" - mock_client.beta.task_group.create.return_value = mock_task_group + mock_client.task_group.create.return_value = mock_task_group mock_add_response = mock.MagicMock() mock_add_response.run_ids = [] - mock_client.beta.task_group.add_runs.return_value = mock_add_response + mock_client.task_group.add_runs.return_value = mock_add_response with mock.patch("parallel_web_tools.core.batch.create_client", return_value=mock_client): with mock.patch("parallel_web_tools.core.batch.time.sleep"): @@ -637,24 +637,24 @@ def test_processor_passed_correctly(self): mock_client = mock.MagicMock() mock_task_group = mock.MagicMock() mock_task_group.task_group_id = "tgrp_123" - mock_client.beta.task_group.create.return_value = mock_task_group + mock_client.task_group.create.return_value = mock_task_group mock_add_response = mock.MagicMock() mock_add_response.run_ids = ["run_1"] - mock_client.beta.task_group.add_runs.return_value = mock_add_response + mock_client.task_group.add_runs.return_value = mock_add_response mock_status = mock.MagicMock() mock_status.status.task_run_status_counts = {"completed": 1} mock_status.status.num_task_runs = 1 mock_status.status.is_active = False - mock_client.beta.task_group.retrieve.return_value = mock_status + mock_client.task_group.retrieve.return_value = mock_status event = SimpleNamespace( type="task_run.state", run=SimpleNamespace(run_id="run_1", error=None), output=SimpleNamespace(content={}, basis=[]), ) - mock_client.beta.task_group.get_runs.return_value = [event] + mock_client.task_group.get_runs.return_value = [event] with mock.patch("parallel_web_tools.core.batch.create_client", return_value=mock_client): with mock.patch("parallel_web_tools.core.batch.time.sleep"): @@ -666,7 +666,7 @@ def test_processor_passed_correctly(self): ) # Check that processor was passed correctly - call_args = mock_client.beta.task_group.add_runs.call_args + call_args = mock_client.task_group.add_runs.call_args assert call_args.kwargs["inputs"][0]["processor"] == "pro-fast" def test_ignores_non_task_run_events(self): @@ -674,17 +674,17 @@ def test_ignores_non_task_run_events(self): mock_client = mock.MagicMock() mock_task_group = mock.MagicMock() mock_task_group.task_group_id = "tgrp_123" - mock_client.beta.task_group.create.return_value = mock_task_group + mock_client.task_group.create.return_value = mock_task_group mock_add_response = mock.MagicMock() mock_add_response.run_ids = ["run_1"] - mock_client.beta.task_group.add_runs.return_value = mock_add_response + mock_client.task_group.add_runs.return_value = mock_add_response mock_status = mock.MagicMock() mock_status.status.task_run_status_counts = {"completed": 1} mock_status.status.num_task_runs = 1 mock_status.status.is_active = False - mock_client.beta.task_group.retrieve.return_value = mock_status + mock_client.task_group.retrieve.return_value = mock_status # Include various event types event_other = SimpleNamespace(type="other.event") @@ -693,7 +693,7 @@ def test_ignores_non_task_run_events(self): run=SimpleNamespace(run_id="run_1", error=None), output=SimpleNamespace(content={"ceo_name": "Test"}, basis=[]), ) - mock_client.beta.task_group.get_runs.return_value = [event_other, event_valid] + mock_client.task_group.get_runs.return_value = [event_other, event_valid] with mock.patch("parallel_web_tools.core.batch.create_client", return_value=mock_client): with mock.patch("parallel_web_tools.core.batch.time.sleep"): @@ -781,13 +781,13 @@ class OutputModel(BaseModel): mock_client = mock.MagicMock() # Mock task group create - mock_client.beta.task_group.create.return_value = mock.MagicMock(task_group_id="tgrp_123") + mock_client.task_group.create.return_value = mock.MagicMock(task_group_id="tgrp_123") # Mock add_runs - mock_client.beta.task_group.add_runs.return_value = mock.MagicMock(run_ids=["run_1", "run_2"]) + mock_client.task_group.add_runs.return_value = mock.MagicMock(run_ids=["run_1", "run_2"]) # Mock retrieve (status check) - return completed immediately - mock_client.beta.task_group.retrieve.return_value = mock.MagicMock( + mock_client.task_group.retrieve.return_value = mock.MagicMock( status=mock.MagicMock(is_active=False, task_run_status_counts={"completed": 2}) ) @@ -802,7 +802,7 @@ class OutputModel(BaseModel): input=mock.MagicMock(input={"company": "OpenAI"}), output=mock.MagicMock(content={"ceo": "Sam Altman"}), ) - mock_client.beta.task_group.get_runs.return_value = [mock_event1, mock_event2] + mock_client.task_group.get_runs.return_value = [mock_event1, mock_event2] with mock.patch("parallel_web_tools.core.batch.create_client", return_value=mock_client): with mock.patch("parallel_web_tools.core.batch.time.sleep"): # Speed up test @@ -837,8 +837,8 @@ class OutputModel(BaseModel): ceo: str mock_client = mock.MagicMock() - mock_client.beta.task_group.create.return_value = mock.MagicMock(task_group_id="tgrp_abc") - mock_client.beta.task_group.add_runs.return_value = mock.MagicMock(run_ids=["run_1", "run_2"]) + mock_client.task_group.create.return_value = mock.MagicMock(task_group_id="tgrp_abc") + mock_client.task_group.add_runs.return_value = mock.MagicMock(run_ids=["run_1", "run_2"]) with mock.patch("parallel_web_tools.core.batch.create_client", return_value=mock_client): result = create_task_group( @@ -864,8 +864,8 @@ class OutputModel(BaseModel): ceo: str mock_client = mock.MagicMock() - mock_client.beta.task_group.create.return_value = mock.MagicMock(task_group_id="tgrp_batch") - mock_client.beta.task_group.add_runs.return_value = mock.MagicMock(run_ids=["r"] * 100) + mock_client.task_group.create.return_value = mock.MagicMock(task_group_id="tgrp_batch") + mock_client.task_group.add_runs.return_value = mock.MagicMock(run_ids=["r"] * 100) input_data = [{"company": f"C{i}"} for i in range(250)] @@ -873,7 +873,7 @@ class OutputModel(BaseModel): result = create_task_group(input_data, InputModel, OutputModel) # 250 items / 100 batch size = 3 calls - assert mock_client.beta.task_group.add_runs.call_count == 3 + assert mock_client.task_group.add_runs.call_count == 3 assert result["num_runs"] == 300 # 3 * 100 from mock @@ -887,7 +887,7 @@ def test_returns_status_info(self): mock_status.status.task_run_status_counts = {"completed": 5, "failed": 1} mock_status.status.is_active = False mock_status.status.num_task_runs = 6 - mock_client.beta.task_group.retrieve.return_value = mock_status + mock_client.task_group.retrieve.return_value = mock_status with mock.patch("parallel_web_tools.core.batch.create_client", return_value=mock_client): result = get_task_group_status("tgrp_test", api_key="test-key") @@ -905,7 +905,7 @@ def test_active_task_group(self): mock_status.status.task_run_status_counts = {"completed": 2, "running": 3} mock_status.status.is_active = True mock_status.status.num_task_runs = 5 - mock_client.beta.task_group.retrieve.return_value = mock_status + mock_client.task_group.retrieve.return_value = mock_status with mock.patch("parallel_web_tools.core.batch.create_client", return_value=mock_client): result = get_task_group_status("tgrp_active") @@ -932,7 +932,7 @@ def test_polls_until_complete_and_returns_results(self): done_status.status.is_active = False done_status.status.num_task_runs = 2 - mock_client.beta.task_group.retrieve.side_effect = [active_status, done_status] + mock_client.task_group.retrieve.side_effect = [active_status, done_status] event_1 = SimpleNamespace( type="task_run.state", @@ -946,7 +946,7 @@ def test_polls_until_complete_and_returns_results(self): input=SimpleNamespace(input={"company": "B"}), output=SimpleNamespace(content={"ceo": "CEO B"}), ) - mock_client.beta.task_group.get_runs.return_value = [event_1, event_2] + mock_client.task_group.get_runs.return_value = [event_1, event_2] with mock.patch("parallel_web_tools.core.batch.create_client", return_value=mock_client): with mock.patch("parallel_web_tools.core.batch.time.sleep"): @@ -964,7 +964,7 @@ def test_timeout_raises_error(self): active_status.status.task_run_status_counts = {"running": 2} active_status.status.is_active = True active_status.status.num_task_runs = 2 - mock_client.beta.task_group.retrieve.return_value = active_status + mock_client.task_group.retrieve.return_value = active_status with mock.patch("parallel_web_tools.core.batch.create_client", return_value=mock_client): with mock.patch("parallel_web_tools.core.batch.time.sleep"): @@ -981,8 +981,8 @@ def test_calls_on_progress(self): done_status.status.task_run_status_counts = {"completed": 3} done_status.status.is_active = False done_status.status.num_task_runs = 3 - mock_client.beta.task_group.retrieve.return_value = done_status - mock_client.beta.task_group.get_runs.return_value = [] + mock_client.task_group.retrieve.return_value = done_status + mock_client.task_group.get_runs.return_value = [] progress_calls = [] @@ -1003,7 +1003,7 @@ def test_handles_failed_runs(self): done_status.status.task_run_status_counts = {"completed": 0, "failed": 1} done_status.status.is_active = False done_status.status.num_task_runs = 1 - mock_client.beta.task_group.retrieve.return_value = done_status + mock_client.task_group.retrieve.return_value = done_status event = SimpleNamespace( type="task_run.state", @@ -1011,7 +1011,7 @@ def test_handles_failed_runs(self): input=SimpleNamespace(input={"company": "X"}), output=None, ) - mock_client.beta.task_group.get_runs.return_value = [event] + mock_client.task_group.get_runs.return_value = [event] with mock.patch("parallel_web_tools.core.batch.create_client", return_value=mock_client): with mock.patch("parallel_web_tools.core.batch.time.sleep"): diff --git a/tests/test_monitor.py b/tests/test_monitor.py index 6d3a97b..d1dd093 100644 --- a/tests/test_monitor.py +++ b/tests/test_monitor.py @@ -1,770 +1,493 @@ """Tests for the Monitor web tracking functionality.""" import json +from types import SimpleNamespace from unittest import mock -import httpx import pytest from click.testing import CliRunner from parallel_web_tools.cli.commands import main from parallel_web_tools.core.monitor import ( - BASE_URL, - MONITOR_CADENCES, MONITOR_EVENT_TYPES, - _request, + MONITOR_FREQUENCY_PRESETS, + MONITOR_PROCESSORS, + MONITOR_TYPES, + cancel_monitor, create_monitor, - delete_monitor, get_monitor, - get_monitor_event_group, list_monitor_events, list_monitors, - simulate_monitor_event, + resolve_frequency, + trigger_monitor, update_monitor, ) @pytest.fixture def runner(): - """Create a CLI test runner.""" return CliRunner() -@pytest.fixture -def mock_httpx(): - """Mock httpx.request for monitor API calls.""" - with mock.patch("parallel_web_tools.core.monitor.httpx") as m: - yield m +def _model(**fields): + """Build a stand-in for an SDK pydantic model: model_dump returns the fields verbatim.""" + obj = SimpleNamespace(**fields) + obj.model_dump = lambda mode="json": dict(fields) # type: ignore[attr-defined] + return obj @pytest.fixture -def mock_resolve_api_key(): - """Mock resolve_api_key to return a test key.""" - with mock.patch("parallel_web_tools.core.monitor.resolve_api_key", return_value="test-key"): - yield - - -def _make_response(status_code=200, json_data=None): - """Helper to build a mock httpx Response.""" - resp = mock.MagicMock(spec=httpx.Response) - resp.status_code = status_code - resp.content = json.dumps(json_data).encode() if json_data is not None else b"" - resp.json.return_value = json_data if json_data is not None else {} - resp.raise_for_status.return_value = None - return resp +def mock_client(): + """Patch create_client and yield a fully mocked Parallel client.""" + with mock.patch("parallel_web_tools.core.monitor.create_client") as factory: + client = mock.MagicMock() + factory.return_value = client + yield client # ============================================================================= -# Constants Tests +# Constants # ============================================================================= class TestConstants: - """Tests for module-level constants.""" + def test_frequency_presets(self): + assert MONITOR_FREQUENCY_PRESETS["hourly"] == "1h" + assert MONITOR_FREQUENCY_PRESETS["daily"] == "1d" + assert MONITOR_FREQUENCY_PRESETS["weekly"] == "1w" + assert MONITOR_FREQUENCY_PRESETS["every_two_weeks"] == "2w" - def test_cadences_exist(self): - assert "daily" in MONITOR_CADENCES - assert "hourly" in MONITOR_CADENCES - assert len(MONITOR_CADENCES) > 0 - - def test_event_types_exist(self): + def test_event_types(self): assert "monitor.event.detected" in MONITOR_EVENT_TYPES assert "monitor.execution.completed" in MONITOR_EVENT_TYPES assert "monitor.execution.failed" in MONITOR_EVENT_TYPES - assert len(MONITOR_EVENT_TYPES) > 0 - - def test_base_url(self): - assert BASE_URL == "https://api.parallel.ai" - - -# ============================================================================= -# Core Function Tests -# ============================================================================= - - -class TestRequest: - """Tests for the _request helper.""" - - def test_sends_auth_header(self, mock_httpx, mock_resolve_api_key): - mock_httpx.request.return_value = _make_response(200, {"ok": True}) - - _request("GET", "/v1alpha/monitors") - - call_kwargs = mock_httpx.request.call_args - headers = call_kwargs.kwargs.get("headers") or call_kwargs[1].get("headers") - assert headers["x-api-key"] == "test-key" - - def test_uses_correct_url(self, mock_httpx, mock_resolve_api_key): - mock_httpx.request.return_value = _make_response(200, {}) - - _request("GET", "/v1alpha/monitors") - - call_args = mock_httpx.request.call_args - assert call_args[0][0] == "GET" - assert call_args[0][1] == f"{BASE_URL}/v1alpha/monitors" - - def test_passes_json_body(self, mock_httpx, mock_resolve_api_key): - mock_httpx.request.return_value = _make_response(200, {}) - body = {"query": "test", "cadence": "daily"} - - _request("POST", "/v1alpha/monitors", json=body) - - call_kwargs = mock_httpx.request.call_args - assert call_kwargs.kwargs.get("json") == body - def test_passes_query_params(self, mock_httpx, mock_resolve_api_key): - mock_httpx.request.return_value = _make_response(200, {}) + def test_types_and_processors(self): + assert "event_stream" in MONITOR_TYPES + assert "snapshot" in MONITOR_TYPES + assert "lite" in MONITOR_PROCESSORS + assert "base" in MONITOR_PROCESSORS - _request("GET", "/v1alpha/monitors", params={"limit": 10}) + def test_resolve_frequency_alias(self): + assert resolve_frequency("daily") == "1d" - call_kwargs = mock_httpx.request.call_args - assert call_kwargs.kwargs.get("params") == {"limit": 10} + def test_resolve_frequency_passthrough(self): + assert resolve_frequency("6h") == "6h" - def test_raises_on_http_error(self, mock_httpx, mock_resolve_api_key): - mock_httpx.request.return_value.raise_for_status.side_effect = httpx.HTTPStatusError( - "Not Found", request=mock.MagicMock(), response=mock.MagicMock() - ) - with pytest.raises(httpx.HTTPStatusError): - _request("GET", "/v1alpha/monitors/bad_id") +# ============================================================================= +# create_monitor +# ============================================================================= class TestCreateMonitor: - """Tests for create_monitor.""" - - def test_basic_create(self, mock_httpx, mock_resolve_api_key): - expected = {"monitor_id": "mon_123", "query": "track AI news", "cadence": "daily"} - mock_httpx.request.return_value = _make_response(200, expected) + def test_event_stream_basic(self, mock_client): + mock_client.monitor.create.return_value = _model(monitor_id="mon_123", type="event_stream") result = create_monitor("track AI news", "daily") assert result["monitor_id"] == "mon_123" - call_kwargs = mock_httpx.request.call_args - body = call_kwargs.kwargs.get("json") - assert body["query"] == "track AI news" - assert body["cadence"] == "daily" + kwargs = mock_client.monitor.create.call_args.kwargs + assert kwargs["frequency"] == "1d" # alias resolved + assert kwargs["type"] == "event_stream" + assert kwargs["settings"] == {"query": "track AI news"} - def test_create_with_all_options(self, mock_httpx, mock_resolve_api_key): - expected = {"monitor_id": "mon_456"} - mock_httpx.request.return_value = _make_response(200, expected) + def test_event_stream_with_all_options(self, mock_client): + mock_client.monitor.create.return_value = _model(monitor_id="mon_456") - result = create_monitor( + create_monitor( "track stuff", - "hourly", + "1h", webhook="https://hook.example.com", metadata={"project": "test"}, - output_schema={"type": "object"}, + output_schema={"type": "object", "properties": {}}, + include_backfill=True, + processor="base", ) - assert result["monitor_id"] == "mon_456" - call_kwargs = mock_httpx.request.call_args - body = call_kwargs.kwargs.get("json") - assert body["webhook"] == {"url": "https://hook.example.com", "event_types": ["monitor.event.detected"]} - assert body["metadata"] == {"project": "test"} - assert body["output_schema"] == {"type": "object"} - - def test_create_omits_none_fields(self, mock_httpx, mock_resolve_api_key): - mock_httpx.request.return_value = _make_response(200, {"monitor_id": "mon_x"}) + kwargs = mock_client.monitor.create.call_args.kwargs + assert kwargs["frequency"] == "1h" + assert kwargs["webhook"] == { + "url": "https://hook.example.com", + "event_types": ["monitor.event.detected"], + } + assert kwargs["metadata"] == {"project": "test"} + assert kwargs["processor"] == "base" + assert kwargs["settings"]["include_backfill"] is True + assert kwargs["settings"]["output_schema"] == { + "type": "json", + "json_schema": {"type": "object", "properties": {}}, + } + + def test_snapshot_type(self, mock_client): + mock_client.monitor.create.return_value = _model(monitor_id="mon_snap") + + create_monitor( + None, + "1d", + type="snapshot", + task_run_id="trun_xyz", + ) - create_monitor("query", "daily") + kwargs = mock_client.monitor.create.call_args.kwargs + assert kwargs["type"] == "snapshot" + assert kwargs["settings"] == {"task_run_id": "trun_xyz"} - call_kwargs = mock_httpx.request.call_args - body = call_kwargs.kwargs.get("json") - assert "webhook" not in body - assert "metadata" not in body - assert "output_schema" not in body + def test_event_stream_requires_query(self, mock_client): + with pytest.raises(ValueError, match="query is required"): + create_monitor(None, "1d") + def test_snapshot_requires_task_run_id(self, mock_client): + with pytest.raises(ValueError, match="task_run_id"): + create_monitor(None, "1d", type="snapshot") -class TestListMonitors: - """Tests for list_monitors.""" + def test_unsupported_type(self, mock_client): + with pytest.raises(ValueError, match="Unsupported monitor type"): + create_monitor("q", "1d", type="bogus") # type: ignore[arg-type] - def test_list_basic(self, mock_httpx, mock_resolve_api_key): - monitors = [{"monitor_id": "mon_1"}, {"monitor_id": "mon_2"}] - mock_httpx.request.return_value = _make_response(200, {"monitors": monitors}) - result = list_monitors() +# ============================================================================= +# list / get / update / cancel +# ============================================================================= - assert len(result) == 2 - assert result[0]["monitor_id"] == "mon_1" - def test_list_with_limit(self, mock_httpx, mock_resolve_api_key): - mock_httpx.request.return_value = _make_response(200, {"monitors": []}) +class TestListMonitors: + def test_basic(self, mock_client): + mock_client.monitor.list.return_value = _model( + monitors=[{"monitor_id": "mon_1"}], + next_cursor=None, + ) - list_monitors(limit=5) + result = list_monitors() - call_kwargs = mock_httpx.request.call_args - assert call_kwargs.kwargs.get("params") == {"limit": 5} + assert isinstance(result["monitors"], list) + assert result["monitors"][0]["monitor_id"] == "mon_1" - def test_list_with_cursor(self, mock_httpx, mock_resolve_api_key): - mock_httpx.request.return_value = _make_response(200, {"monitors": []}) + def test_passes_filters(self, mock_client): + mock_client.monitor.list.return_value = _model(monitors=[], next_cursor=None) - list_monitors(monitor_id="mon_cursor") + list_monitors(cursor="cur_abc", limit=5, status=["active"], type=["event_stream"]) - call_kwargs = mock_httpx.request.call_args - assert call_kwargs.kwargs.get("params")["monitor_id"] == "mon_cursor" + kwargs = mock_client.monitor.list.call_args.kwargs + assert kwargs == { + "cursor": "cur_abc", + "limit": 5, + "status": ["active"], + "type": ["event_stream"], + } - def test_list_handles_array_response(self, mock_httpx, mock_resolve_api_key): - monitors = [{"monitor_id": "mon_1"}] - mock_httpx.request.return_value = _make_response(200, monitors) + def test_omits_unset(self, mock_client): + mock_client.monitor.list.return_value = _model(monitors=[], next_cursor=None) - result = list_monitors() + list_monitors() - assert len(result) == 1 + assert mock_client.monitor.list.call_args.kwargs == {} class TestGetMonitor: - """Tests for get_monitor.""" - - def test_get_by_id(self, mock_httpx, mock_resolve_api_key): - expected = {"monitor_id": "mon_abc", "query": "test", "cadence": "daily"} - mock_httpx.request.return_value = _make_response(200, expected) + def test_get(self, mock_client): + mock_client.monitor.retrieve.return_value = _model(monitor_id="mon_abc") result = get_monitor("mon_abc") assert result["monitor_id"] == "mon_abc" - call_args = mock_httpx.request.call_args - assert "/v1alpha/monitors/mon_abc" in call_args[0][1] + mock_client.monitor.retrieve.assert_called_once_with("mon_abc") class TestUpdateMonitor: - """Tests for update_monitor.""" + def test_update_frequency(self, mock_client): + mock_client.monitor.update.return_value = _model(monitor_id="mon_abc") - def test_update_query(self, mock_httpx, mock_resolve_api_key): - expected = {"monitor_id": "mon_abc", "query": "new query"} - mock_httpx.request.return_value = _make_response(200, expected) + update_monitor("mon_abc", frequency="hourly") - result = update_monitor("mon_abc", query="new query") + args, kwargs = mock_client.monitor.update.call_args + assert args == ("mon_abc",) + assert kwargs == {"frequency": "1h"} - assert result["query"] == "new query" - call_kwargs = mock_httpx.request.call_args - body = call_kwargs.kwargs.get("json") - assert body["query"] == "new query" + def test_update_webhook(self, mock_client): + mock_client.monitor.update.return_value = _model(monitor_id="mon_abc") - def test_update_cadence(self, mock_httpx, mock_resolve_api_key): - mock_httpx.request.return_value = _make_response(200, {"monitor_id": "mon_abc"}) + update_monitor("mon_abc", webhook="https://hook.example.com") - update_monitor("mon_abc", cadence="hourly") + kwargs = mock_client.monitor.update.call_args.kwargs + assert kwargs["webhook"] == { + "url": "https://hook.example.com", + "event_types": ["monitor.event.detected"], + } - call_kwargs = mock_httpx.request.call_args - body = call_kwargs.kwargs.get("json") - assert body["cadence"] == "hourly" + def test_update_advanced_settings_sets_event_stream_type(self, mock_client): + mock_client.monitor.update.return_value = _model(monitor_id="mon_abc") - def test_update_omits_none_fields(self, mock_httpx, mock_resolve_api_key): - mock_httpx.request.return_value = _make_response(200, {"monitor_id": "mon_abc"}) + update_monitor("mon_abc", advanced_settings={"foo": "bar"}) - update_monitor("mon_abc", query="only query") + kwargs = mock_client.monitor.update.call_args.kwargs + assert kwargs["settings"] == {"advanced_settings": {"foo": "bar"}} + assert kwargs["type"] == "event_stream" - call_kwargs = mock_httpx.request.call_args - body = call_kwargs.kwargs.get("json") - assert "cadence" not in body - assert "webhook" not in body + def test_update_no_fields_raises(self, mock_client): + with pytest.raises(ValueError, match="At least one field"): + update_monitor("mon_abc") -class TestDeleteMonitor: - """Tests for delete_monitor.""" - - def test_delete(self, mock_httpx, mock_resolve_api_key): - mock_httpx.request.return_value = _make_response(200, {"monitor_id": "mon_del", "deleted": True}) - - result = delete_monitor("mon_del") +class TestCancelMonitor: + def test_cancel(self, mock_client): + mock_client.monitor.cancel.return_value = _model( + monitor_id="mon_abc", + status="cancelled", + ) - assert result["deleted"] is True - call_args = mock_httpx.request.call_args - assert call_args[0][0] == "DELETE" + result = cancel_monitor("mon_abc") - def test_delete_204_no_content(self, mock_httpx, mock_resolve_api_key): - resp = _make_response(204) - resp.content = b"" - mock_httpx.request.return_value = resp + assert result["status"] == "cancelled" + mock_client.monitor.cancel.assert_called_once_with("mon_abc") - result = delete_monitor("mon_del") - assert result["monitor_id"] == "mon_del" - assert result["deleted"] is True +# ============================================================================= +# events / trigger +# ============================================================================= class TestListMonitorEvents: - """Tests for list_monitor_events.""" - - def test_list_events_default_lookback(self, mock_httpx, mock_resolve_api_key): - expected = {"events": [{"event_id": "ev_1"}]} - mock_httpx.request.return_value = _make_response(200, expected) + def test_basic(self, mock_client): + mock_client.monitor.events.return_value = _model( + events=[{"event_id": "ev_1"}], + next_cursor=None, + ) result = list_monitor_events("mon_abc") - assert len(result["events"]) == 1 - call_kwargs = mock_httpx.request.call_args - assert call_kwargs.kwargs.get("params") == {"lookback_period": "10d"} - - def test_list_events_custom_lookback(self, mock_httpx, mock_resolve_api_key): - mock_httpx.request.return_value = _make_response(200, {"events": []}) - - list_monitor_events("mon_abc", lookback_period="24h") - - call_kwargs = mock_httpx.request.call_args - assert call_kwargs.kwargs.get("params") == {"lookback_period": "24h"} - - -class TestGetMonitorEventGroup: - """Tests for get_monitor_event_group.""" - - def test_get_event_group(self, mock_httpx, mock_resolve_api_key): - expected = {"event_group_id": "eg_123", "type": "event"} - mock_httpx.request.return_value = _make_response(200, expected) - - result = get_monitor_event_group("mon_abc", "eg_123") - - assert result["event_group_id"] == "eg_123" - call_args = mock_httpx.request.call_args - assert "/v1alpha/monitors/mon_abc/event_groups/eg_123" in call_args[0][1] + assert result["events"][0]["event_id"] == "ev_1" + mock_client.monitor.events.assert_called_once_with("mon_abc") + def test_passes_filters(self, mock_client): + mock_client.monitor.events.return_value = _model(events=[], next_cursor=None) -class TestSimulateMonitorEvent: - """Tests for simulate_monitor_event.""" - - def test_simulate_basic(self, mock_httpx, mock_resolve_api_key): - mock_httpx.request.return_value = _make_response(200, {}) - - simulate_monitor_event("mon_abc") - - call_args = mock_httpx.request.call_args - assert call_args[0][0] == "POST" - assert "/simulate_event" in call_args[0][1] - - def test_simulate_with_event_type(self, mock_httpx, mock_resolve_api_key): - mock_httpx.request.return_value = _make_response(200, {}) - - simulate_monitor_event("mon_abc", event_type="monitor.event.detected") + list_monitor_events( + "mon_abc", + cursor="cur_x", + event_group_id="egrp_y", + include_completions=True, + limit=50, + ) - call_kwargs = mock_httpx.request.call_args - params = call_kwargs.kwargs.get("params") - assert params["event_type"] == "monitor.event.detected" + args, kwargs = mock_client.monitor.events.call_args + assert args == ("mon_abc",) + assert kwargs == { + "cursor": "cur_x", + "event_group_id": "egrp_y", + "include_completions": True, + "limit": 50, + } - def test_simulate_returns_none(self, mock_httpx, mock_resolve_api_key): - mock_httpx.request.return_value = _make_response(200, {}) - result = simulate_monitor_event("mon_abc") +class TestTriggerMonitor: + def test_trigger(self, mock_client): + trigger_monitor("mon_abc") - assert result is None + mock_client.monitor.trigger.assert_called_once_with("mon_abc") # ============================================================================= -# CLI Command Tests +# CLI # ============================================================================= class TestMonitorGroup: - """Tests for the monitor command group.""" - - def test_monitor_help(self, runner): + def test_help_lists_subcommands(self, runner): result = runner.invoke(main, ["monitor", "--help"]) assert result.exit_code == 0 - assert "create" in result.output - assert "list" in result.output - assert "get" in result.output - assert "update" in result.output - assert "delete" in result.output - assert "events" in result.output - assert "event-group" in result.output - assert "simulate" in result.output - - def test_monitor_in_main_help(self, runner): - result = runner.invoke(main, ["--help"]) - assert result.exit_code == 0 - assert "monitor" in result.output + for sub in ("create", "list", "get", "update", "cancel", "events", "trigger"): + assert sub in result.output class TestMonitorCreateCommand: - """Tests for the monitor create CLI command.""" - - def test_create_help(self, runner): + def test_help(self, runner): result = runner.invoke(main, ["monitor", "create", "--help"]) assert result.exit_code == 0 - assert "--cadence" in result.output + assert "--frequency" in result.output + assert "--type" in result.output assert "--webhook" in result.output - assert "--json" in result.output - - def test_create_no_query(self, runner): - result = runner.invoke(main, ["monitor", "create"]) - assert result.exit_code != 0 - def test_create_basic(self, runner): - with mock.patch("parallel_web_tools.cli.commands.create_monitor") as mock_create: - mock_create.return_value = { + def test_basic(self, runner): + with mock.patch("parallel_web_tools.cli.commands.create_monitor") as patched: + patched.return_value = { "monitor_id": "mon_test", - "query": "track AI news", - "cadence": "daily", + "type": "event_stream", + "frequency": "1d", } - result = runner.invoke(main, ["monitor", "create", "track AI news"]) - assert result.exit_code == 0 - assert "mon_test" in result.output - mock_create.assert_called_once() + assert result.exit_code == 0 + assert "mon_test" in result.output + kwargs = patched.call_args.kwargs + assert kwargs["query"] == "track AI news" + assert kwargs["frequency"] == "1d" + assert kwargs["type"] == "event_stream" - def test_create_with_cadence(self, runner): - with mock.patch("parallel_web_tools.cli.commands.create_monitor") as mock_create: - mock_create.return_value = {"monitor_id": "mon_hr"} + def test_with_frequency_alias(self, runner): + with mock.patch("parallel_web_tools.cli.commands.create_monitor") as patched: + patched.return_value = {"monitor_id": "mon_hr", "frequency": "1h", "type": "event_stream"} + result = runner.invoke(main, ["monitor", "create", "track stuff", "--frequency", "hourly"]) - result = runner.invoke(main, ["monitor", "create", "track stuff", "--cadence", "hourly"]) + assert result.exit_code == 0 + assert patched.call_args.kwargs["frequency"] == "hourly" - assert result.exit_code == 0 - call_kwargs = mock_create.call_args.kwargs - assert call_kwargs["cadence"] == "hourly" + def test_snapshot_requires_task_run_id(self, runner): + result = runner.invoke(main, ["monitor", "create", "--type", "snapshot"]) + assert result.exit_code != 0 - def test_create_json_output(self, runner): - with mock.patch("parallel_web_tools.cli.commands.create_monitor") as mock_create: - mock_create.return_value = { - "monitor_id": "mon_json", - "query": "test", - "cadence": "daily", - } + def test_event_stream_requires_query(self, runner): + result = runner.invoke(main, ["monitor", "create"]) + assert result.exit_code != 0 + def test_json_output(self, runner): + with mock.patch("parallel_web_tools.cli.commands.create_monitor") as patched: + patched.return_value = {"monitor_id": "mon_json"} result = runner.invoke(main, ["monitor", "create", "test", "--json"]) - assert result.exit_code == 0 - output = json.loads(result.output) - assert output["monitor_id"] == "mon_json" - - def test_create_with_webhook(self, runner): - with mock.patch("parallel_web_tools.cli.commands.create_monitor") as mock_create: - mock_create.return_value = {"monitor_id": "mon_wh"} - - result = runner.invoke(main, ["monitor", "create", "test", "--webhook", "https://hook.example.com"]) - - assert result.exit_code == 0 - call_kwargs = mock_create.call_args.kwargs - assert call_kwargs["webhook"] == "https://hook.example.com" + assert result.exit_code == 0 + assert json.loads(result.output)["monitor_id"] == "mon_json" - def test_create_invalid_metadata_json(self, runner): + def test_invalid_metadata_json(self, runner): result = runner.invoke(main, ["monitor", "create", "test", "--metadata", "not-json"]) assert result.exit_code != 0 - def test_create_saves_to_output_file(self, runner, tmp_path): - output_file = tmp_path / "monitor.json" - - with mock.patch("parallel_web_tools.cli.commands.create_monitor") as mock_create: - mock_create.return_value = {"monitor_id": "mon_file", "query": "test", "cadence": "daily"} - - result = runner.invoke(main, ["monitor", "create", "test", "-o", str(output_file)]) - - assert result.exit_code == 0 - assert output_file.exists() - data = json.loads(output_file.read_text()) - assert data["monitor_id"] == "mon_file" - class TestMonitorListCommand: - """Tests for the monitor list CLI command.""" - - def test_list_help(self, runner): - result = runner.invoke(main, ["monitor", "list", "--help"]) - assert result.exit_code == 0 - assert "--limit" in result.output - - def test_list_basic(self, runner): - with mock.patch("parallel_web_tools.cli.commands.list_monitors") as mock_list: - mock_list.return_value = [ - {"monitor_id": "mon_1", "query": "test", "cadence": "daily", "status": "active"}, - ] - + def test_basic(self, runner): + with mock.patch("parallel_web_tools.cli.commands.list_monitors") as patched: + patched.return_value = { + "monitors": [ + { + "monitor_id": "mon_1", + "type": "event_stream", + "frequency": "1d", + "status": "active", + "settings": {"query": "test"}, + } + ], + "next_cursor": None, + } result = runner.invoke(main, ["monitor", "list"]) - assert result.exit_code == 0 - assert "mon_1" in result.output - - def test_list_empty(self, runner): - with mock.patch("parallel_web_tools.cli.commands.list_monitors") as mock_list: - mock_list.return_value = [] + assert result.exit_code == 0 + assert "mon_1" in result.output + def test_empty(self, runner): + with mock.patch("parallel_web_tools.cli.commands.list_monitors") as patched: + patched.return_value = {"monitors": [], "next_cursor": None} result = runner.invoke(main, ["monitor", "list"]) - assert result.exit_code == 0 - assert "No monitors found" in result.output - - def test_list_json(self, runner): - with mock.patch("parallel_web_tools.cli.commands.list_monitors") as mock_list: - monitors = [{"monitor_id": "mon_1"}, {"monitor_id": "mon_2"}] - mock_list.return_value = monitors - - result = runner.invoke(main, ["monitor", "list", "--json"]) - - assert result.exit_code == 0 - output = json.loads(result.output) - assert len(output) == 2 - - def test_list_with_limit(self, runner): - with mock.patch("parallel_web_tools.cli.commands.list_monitors") as mock_list: - mock_list.return_value = [] + assert result.exit_code == 0 + assert "No monitors" in result.output - runner.invoke(main, ["monitor", "list", "--limit", "5"]) + def test_status_filter(self, runner): + with mock.patch("parallel_web_tools.cli.commands.list_monitors") as patched: + patched.return_value = {"monitors": [], "next_cursor": None} + runner.invoke(main, ["monitor", "list", "--status", "active", "--status", "cancelled"]) - call_kwargs = mock_list.call_args.kwargs - assert call_kwargs["limit"] == 5 + assert patched.call_args.kwargs["status"] == ["active", "cancelled"] class TestMonitorGetCommand: - """Tests for the monitor get CLI command.""" - - def test_get_help(self, runner): - result = runner.invoke(main, ["monitor", "get", "--help"]) - assert result.exit_code == 0 - assert "MONITOR_ID" in result.output - - def test_get_basic(self, runner): - with mock.patch("parallel_web_tools.cli.commands.get_monitor") as mock_get: - mock_get.return_value = { + def test_basic(self, runner): + with mock.patch("parallel_web_tools.cli.commands.get_monitor") as patched: + patched.return_value = { "monitor_id": "mon_abc", - "query": "track news", - "cadence": "daily", + "type": "event_stream", + "frequency": "1d", "status": "active", + "processor": "lite", + "settings": {"query": "track news"}, } - result = runner.invoke(main, ["monitor", "get", "mon_abc"]) - assert result.exit_code == 0 - assert "mon_abc" in result.output - assert "track news" in result.output - - def test_get_json(self, runner): - with mock.patch("parallel_web_tools.cli.commands.get_monitor") as mock_get: - mock_get.return_value = {"monitor_id": "mon_abc", "query": "test"} - - result = runner.invoke(main, ["monitor", "get", "mon_abc", "--json"]) - - assert result.exit_code == 0 - output = json.loads(result.output) - assert output["monitor_id"] == "mon_abc" + assert result.exit_code == 0 + assert "mon_abc" in result.output + assert "track news" in result.output class TestMonitorUpdateCommand: - """Tests for the monitor update CLI command.""" - - def test_update_help(self, runner): - result = runner.invoke(main, ["monitor", "update", "--help"]) - assert result.exit_code == 0 - assert "--query" in result.output - assert "--cadence" in result.output - - def test_update_no_fields(self, runner): + def test_no_fields_fails(self, runner): result = runner.invoke(main, ["monitor", "update", "mon_abc"]) assert result.exit_code != 0 - def test_update_query(self, runner): - with mock.patch("parallel_web_tools.cli.commands.update_monitor") as mock_update: - mock_update.return_value = {"monitor_id": "mon_abc", "query": "new query"} - - result = runner.invoke(main, ["monitor", "update", "mon_abc", "--query", "new query"]) - - assert result.exit_code == 0 - assert "updated" in result.output.lower() - - def test_update_json(self, runner): - with mock.patch("parallel_web_tools.cli.commands.update_monitor") as mock_update: - mock_update.return_value = {"monitor_id": "mon_abc"} - - result = runner.invoke(main, ["monitor", "update", "mon_abc", "--cadence", "hourly", "--json"]) + def test_frequency(self, runner): + with mock.patch("parallel_web_tools.cli.commands.update_monitor") as patched: + patched.return_value = {"monitor_id": "mon_abc"} + result = runner.invoke(main, ["monitor", "update", "mon_abc", "--frequency", "1h"]) - assert result.exit_code == 0 - output = json.loads(result.output) - assert output["monitor_id"] == "mon_abc" - - -class TestMonitorDeleteCommand: - """Tests for the monitor delete CLI command.""" - - def test_delete_help(self, runner): - result = runner.invoke(main, ["monitor", "delete", "--help"]) assert result.exit_code == 0 + assert patched.call_args.kwargs["frequency"] == "1h" - def test_delete_basic(self, runner): - with mock.patch("parallel_web_tools.cli.commands.delete_monitor") as mock_del: - mock_del.return_value = {"monitor_id": "mon_abc", "deleted": True} - - result = runner.invoke(main, ["monitor", "delete", "mon_abc"]) - - assert result.exit_code == 0 - assert "mon_abc" in result.output - def test_delete_json(self, runner): - with mock.patch("parallel_web_tools.cli.commands.delete_monitor") as mock_del: - mock_del.return_value = {"monitor_id": "mon_abc", "deleted": True} +class TestMonitorCancelCommand: + def test_basic(self, runner): + with mock.patch("parallel_web_tools.cli.commands.cancel_monitor") as patched: + patched.return_value = {"monitor_id": "mon_abc", "status": "cancelled"} + result = runner.invoke(main, ["monitor", "cancel", "mon_abc"]) - result = runner.invoke(main, ["monitor", "delete", "mon_abc", "--json"]) - - assert result.exit_code == 0 - output = json.loads(result.output) - assert output["deleted"] is True + assert result.exit_code == 0 + assert "mon_abc" in result.output class TestMonitorEventsCommand: - """Tests for the monitor events CLI command.""" - - def test_events_help(self, runner): - result = runner.invoke(main, ["monitor", "events", "--help"]) - assert result.exit_code == 0 - assert "--lookback" in result.output - - def test_events_basic(self, runner): - with mock.patch("parallel_web_tools.cli.commands.list_monitor_events") as mock_events: - mock_events.return_value = { + def test_basic(self, runner): + with mock.patch("parallel_web_tools.cli.commands.list_monitor_events") as patched: + patched.return_value = { "events": [ { - "type": "event", - "event_group_id": "mevtgrp_abc123", - "output": "Price changed", - "event_date": "2026-01-01", - "source_urls": ["https://example.com"], - }, - ] + "event_type": "event_stream", + "event_group_id": "egrp_abc", + "event_id": "ev_1", + "event_date": "2026-05-01", + "output": {"content": "Price changed"}, + } + ], + "next_cursor": None, } - - result = runner.invoke(main, ["monitor", "events", "mon_abc"]) - - assert result.exit_code == 0 - assert "mevtgrp_abc123" in result.output - - def test_events_empty(self, runner): - with mock.patch("parallel_web_tools.cli.commands.list_monitor_events") as mock_events: - mock_events.return_value = {"events": []} - result = runner.invoke(main, ["monitor", "events", "mon_abc"]) - assert result.exit_code == 0 - assert "No events found" in result.output - - def test_events_json(self, runner): - with mock.patch("parallel_web_tools.cli.commands.list_monitor_events") as mock_events: - mock_events.return_value = {"events": [{"event_id": "ev_1"}]} - - result = runner.invoke(main, ["monitor", "events", "mon_abc", "--json"]) - - assert result.exit_code == 0 - output = json.loads(result.output) - assert len(output["events"]) == 1 - - def test_events_custom_lookback(self, runner): - with mock.patch("parallel_web_tools.cli.commands.list_monitor_events") as mock_events: - mock_events.return_value = {"events": []} - - runner.invoke(main, ["monitor", "events", "mon_abc", "--lookback", "24h"]) - - call_kwargs = mock_events.call_args.kwargs - assert call_kwargs["lookback_period"] == "24h" - - def test_events_saves_to_file(self, runner, tmp_path): - output_file = tmp_path / "events.json" - - with mock.patch("parallel_web_tools.cli.commands.list_monitor_events") as mock_events: - mock_events.return_value = {"events": [{"event_id": "ev_1"}]} - - result = runner.invoke(main, ["monitor", "events", "mon_abc", "-o", str(output_file)]) - - assert result.exit_code == 0 - assert output_file.exists() - - -class TestMonitorEventGroupCommand: - """Tests for the monitor event-group CLI command.""" - - def test_event_group_help(self, runner): - result = runner.invoke(main, ["monitor", "event-group", "--help"]) assert result.exit_code == 0 + assert "egrp_abc" in result.output - def test_event_group_basic(self, runner): - with mock.patch("parallel_web_tools.cli.commands.get_monitor_event_group") as mock_eg: - mock_eg.return_value = { - "event_group_id": "eg_123", - "events": [], - } - - result = runner.invoke(main, ["monitor", "event-group", "mon_abc", "eg_123"]) + def test_event_group_id_filter(self, runner): + with mock.patch("parallel_web_tools.cli.commands.list_monitor_events") as patched: + patched.return_value = {"events": [], "next_cursor": None} + runner.invoke(main, ["monitor", "events", "mon_abc", "--event-group-id", "egrp_xyz"]) - assert result.exit_code == 0 - assert "eg_123" in result.output + assert patched.call_args.kwargs["event_group_id"] == "egrp_xyz" - def test_event_group_json(self, runner): - with mock.patch("parallel_web_tools.cli.commands.get_monitor_event_group") as mock_eg: - mock_eg.return_value = {"event_group_id": "eg_123"} - result = runner.invoke(main, ["monitor", "event-group", "mon_abc", "eg_123", "--json"]) +class TestMonitorTriggerCommand: + def test_basic(self, runner): + with mock.patch("parallel_web_tools.cli.commands.trigger_monitor") as patched: + patched.return_value = None + result = runner.invoke(main, ["monitor", "trigger", "mon_abc"]) - assert result.exit_code == 0 - output = json.loads(result.output) - assert output["event_group_id"] == "eg_123" - - -class TestMonitorSimulateCommand: - """Tests for the monitor simulate CLI command.""" - - def test_simulate_help(self, runner): - result = runner.invoke(main, ["monitor", "simulate", "--help"]) assert result.exit_code == 0 - assert "--event-type" in result.output - - def test_simulate_basic(self, runner): - with mock.patch("parallel_web_tools.cli.commands.simulate_monitor_event") as mock_sim: - mock_sim.return_value = None - - result = runner.invoke(main, ["monitor", "simulate", "mon_abc"]) + assert "mon_abc" in result.output - assert result.exit_code == 0 - assert "simulated" in result.output.lower() + def test_json(self, runner): + with mock.patch("parallel_web_tools.cli.commands.trigger_monitor") as patched: + patched.return_value = None + result = runner.invoke(main, ["monitor", "trigger", "mon_abc", "--json"]) - def test_simulate_with_event_type(self, runner): - with mock.patch("parallel_web_tools.cli.commands.simulate_monitor_event") as mock_sim: - mock_sim.return_value = None - - result = runner.invoke(main, ["monitor", "simulate", "mon_abc", "--event-type", "monitor.event.detected"]) - - assert result.exit_code == 0 - call_kwargs = mock_sim.call_args.kwargs - assert call_kwargs["event_type"] == "monitor.event.detected" - - def test_simulate_json(self, runner): - with mock.patch("parallel_web_tools.cli.commands.simulate_monitor_event") as mock_sim: - mock_sim.return_value = None - - result = runner.invoke(main, ["monitor", "simulate", "mon_abc", "--json"]) - - assert result.exit_code == 0 - output = json.loads(result.output) - assert output["simulated"] is True + assert result.exit_code == 0 + assert json.loads(result.output)["triggered"] is True class TestMonitorErrorHandling: - """Tests for error handling across monitor commands.""" - def test_create_api_error(self, runner): - with mock.patch("parallel_web_tools.cli.commands.create_monitor") as mock_create: - mock_create.side_effect = Exception("API error") - + with mock.patch("parallel_web_tools.cli.commands.create_monitor") as patched: + patched.side_effect = Exception("API error") result = runner.invoke(main, ["monitor", "create", "test"]) - assert result.exit_code != 0 - - def test_create_api_error_json(self, runner): - with mock.patch("parallel_web_tools.cli.commands.create_monitor") as mock_create: - mock_create.side_effect = Exception("API error") - - result = runner.invoke(main, ["monitor", "create", "test", "--json"]) - - assert result.exit_code != 0 - output = json.loads(result.output) - assert "error" in output + assert result.exit_code != 0 def test_get_api_error(self, runner): - with mock.patch("parallel_web_tools.cli.commands.get_monitor") as mock_get: - mock_get.side_effect = Exception("Not found") - + with mock.patch("parallel_web_tools.cli.commands.get_monitor") as patched: + patched.side_effect = Exception("Not found") result = runner.invoke(main, ["monitor", "get", "mon_bad"]) - assert result.exit_code != 0 - - def test_list_api_error(self, runner): - with mock.patch("parallel_web_tools.cli.commands.list_monitors") as mock_list: - mock_list.side_effect = Exception("Unauthorized") - - result = runner.invoke(main, ["monitor", "list"]) - - assert result.exit_code != 0 + assert result.exit_code != 0 diff --git a/uv.lock b/uv.lock index 1a1b6fe..728f94b 100644 --- a/uv.lock +++ b/uv.lock @@ -1433,7 +1433,7 @@ wheels = [ [[package]] name = "parallel-web" -version = "0.5.1" +version = "0.6.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "anyio" }, @@ -1443,9 +1443,9 @@ dependencies = [ { name = "sniffio" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/f0/35/55355e4d748959973bb921dc6547834cb16f732ab209fcc2bb1d69ec195f/parallel_web-0.5.1.tar.gz", hash = "sha256:e967f3bd1833c73db30ea11aa49f5b3248c10342af1fa768a4a290ff8f4301f6", size = 154941, upload-time = "2026-04-22T18:03:38.131Z" } +sdist = { url = "https://files.pythonhosted.org/packages/7f/81/101c961fe6665212df01fb39a70ebb379dc33529c7bc9210675c0f525139/parallel_web-0.6.0.tar.gz", hash = "sha256:f8aecd3f1958090090c4516881cefea4f55c40948ba3bb99217ca9a6d4263225", size = 173149, upload-time = "2026-05-06T19:13:09.782Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/2a/d7/2bcad8a9c6c878439bf922e49d82a3ef31e63ae323cbaab4ed9b2ec15c03/parallel_web-0.5.1-py3-none-any.whl", hash = "sha256:7db65556a362d44ae864b5e4881a239e96377bcefbf931616d9c3b80a6124c21", size = 164287, upload-time = "2026-04-22T18:03:36.854Z" }, + { url = "https://files.pythonhosted.org/packages/a2/7c/7e8b63a0e90efaf567a818fca86c6ad3a85711f8995d2657b51b0cae2351/parallel_web-0.6.0-py3-none-any.whl", hash = "sha256:dc5342ef7262bd2e9f85eb7eace32833bd3d7e3af0bf5fbd780d1ea8c8d9ceb0", size = 199217, upload-time = "2026-05-06T19:13:08.316Z" }, ] [[package]] @@ -1539,7 +1539,7 @@ requires-dist = [ { name = "httpx", specifier = ">=0.25.0" }, { name = "nest-asyncio", marker = "extra == 'duckdb'", specifier = ">=1.6.0" }, { name = "pandas", marker = "extra == 'pandas'", specifier = ">=2.3.0" }, - { name = "parallel-web", specifier = ">=0.4.2" }, + { name = "parallel-web", specifier = ">=0.6.0" }, { name = "parallel-web-tools", extras = ["all", "spark"], marker = "extra == 'dev'" }, { name = "parallel-web-tools", extras = ["cli"], marker = "extra == 'bigquery'" }, { name = "parallel-web-tools", extras = ["cli", "polars"], marker = "extra == 'duckdb'" },