From 8cae045e500b93b5a24ccad5c9b28f10a643f90b Mon Sep 17 00:00:00 2001 From: HyangMin Jeong Date: Fri, 2 Jan 2026 00:38:42 +0900 Subject: [PATCH 1/4] Allow worker-saturation=0 to disable worker queuing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #9116 This change allows setting `worker-saturation` to 0, which disables worker-side queuing entirely. Tasks are only sent to completely idle workers (no tasks in processing queue, excluding seceded tasks). This is useful for workloads with long-running tasks where you want to avoid head-of-line blocking - shorter tasks won't get stuck behind long-running tasks in worker queues. Changes: - Modified `_task_slots_available()` to handle saturation_factor=0 by only allowing task assignment when worker is completely idle - Updated validation to accept values >= 0 (previously required > 0) - Updated schema to allow minimum: 0 (previously exclusiveMinimum: 0) - Added documentation for saturation=0 special case - Added test case for saturation=0.0 - Added test to ensure negative values are still rejected 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 --- CLAUDE.md | 337 ++++++++++++++++++++++++++++ distributed/distributed-schema.yaml | 10 +- distributed/scheduler.py | 17 +- distributed/tests/test_scheduler.py | 7 + 4 files changed, 366 insertions(+), 5 deletions(-) create mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000000..e7832718a9 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,337 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +Distributed is a library for distributed computation in Python, part of the Dask ecosystem. It provides a distributed task scheduler with work stealing, data locality, and fault tolerance. + +## Development Commands + +### Setup +```bash +# Create conda environment (replace 3.12 with your Python version: 3.10, 3.11, 3.12, or 3.13) +mamba env create -f continuous_integration/environment-3.12.yaml -n dask-distributed +conda activate dask-distributed + +# Install in development mode +pip install -e . +``` + +### Testing +```bash +# Run all tests +pytest distributed + +# Run tests in a specific file +pytest distributed/tests/test_client.py + +# Run a specific test +pytest distributed/tests/test_client.py::test_submit + +# Run with slow tests included +pytest distributed --runslow + +# Run tests with markers (exclude CI-avoided tests, include partition) +pytest distributed -m "not avoid_ci and ci1" --runslow + +# Run tests with coverage +pytest distributed --cov=distributed --cov-report=html + +# Run tests with leak detection +pytest distributed --leaks=fds,processes,threads +``` + +### Code Quality +```bash +# Run all pre-commit hooks +pre-commit run --all-files + +# Run ruff linting +ruff check distributed + +# Run ruff formatting +ruff format distributed + +# Run mypy type checking +mypy distributed + +# Run codespell +codespell +``` + +### Documentation +```bash +cd docs +make html # Build HTML documentation +``` + +### Running Distributed Components +```bash +# Start a scheduler +dask-scheduler + +# Start a worker (connect to scheduler at specified address) +dask-worker tcp://127.0.0.1:8786 + +# Start scheduler via SSH +dask-ssh --hostfile hosts.txt +``` + +## Architecture Overview + +### Core Components + +**Scheduler (`distributed/scheduler.py`)** - The central coordinator that: +- Maintains cluster state (workers, tasks, clients) +- Schedules tasks to workers based on data locality and resource availability +- Implements work stealing for load balancing +- Handles worker failures and task retry +- Provides extension/plugin system for custom behavior + +**Client (`distributed/client.py`)** - User-facing API that: +- Submits tasks and task graphs to the scheduler +- Retrieves results via futures +- Provides high-level operations: `submit()`, `map()`, `compute()`, `gather()` +- Manages connection to scheduler + +**Worker (`distributed/worker.py`)** - Task executor that: +- Executes tasks in thread/process pools +- Manages local data storage with memory limits +- Reports metrics to scheduler +- Implements worker-side state machine (`worker_state_machine.py`) +- Handles data transfer between workers + +**Nanny (`distributed/nanny.py`)** - Worker supervisor that: +- Spawns and monitors worker processes +- Restarts crashed workers +- Enforces memory limits (pause/restart on exceed) + +### Communication Architecture + +All components communicate via **async RPC over pluggable transports**: + +- **Transport Layer** (`distributed/comm/`): TCP (default), WebSocket, in-process +- **RPC Protocol** (`distributed/core.py`): Message-based with handler dispatch + - Messages are dicts: `{'op': 'operation_name', 'arg1': value, ...}` + - Handlers registered in `handlers` dict on Server classes + - First handler arg is `comm: Comm`, remaining args from message dict +- **Serialization** (`distributed/protocol/`): cloudpickle, msgpack, with compression +- **Connection Pooling** (`distributed/batched.py`): Persistent connections, message batching + +### State Machine Pattern + +Tasks flow through states on both scheduler and worker: + +**Scheduler Task States:** +- `released` → `waiting` → `queued` → `processing` → `memory` → `forgotten` +- Alternative paths: `error`, `erred`, `no-worker` + +**Worker Task States:** (via `WorkerState` in `worker_state_machine.py`) +- `fetch` → `flight` → `executing` → `memory` → `released` +- Alternative paths: `error`, `missing`, `constrained`, `long-running` + +State transitions are coordinated via RPC and logged in `transition_log` for debugging. + +### Plugin System + +**SchedulerPlugin** (`distributed/diagnostics/plugin.py`): +- Hooks: `start()`, `close()`, `update_graph()`, `transition()`, etc. +- Full access to scheduler state +- Used by distributed data structures (Queue, Variable, Lock, etc.) + +**WorkerPlugin**: +- Similar hooks for worker-side customization +- Used for monitoring, preprocessing, resource management + +### Key Subsystems + +**Shuffle (`distributed/shuffle/`)** - Distributed data repartitioning: +- Coordinates complex multi-worker data transfer +- Used by dask.dataframe and dask.array for repartitioning/rechunking +- Implements scheduler and worker plugins for orchestration + +**Active Memory Manager (`distributed/active_memory_manager.py`)** - Automatic memory management: +- Replicates important data +- Drops redundant data when memory pressure builds +- Rebalances data across workers + +**Dashboard (`distributed/dashboard/`)** - Web UI (Bokeh-based): +- Real-time cluster monitoring +- Task stream visualization +- Resource graphs (CPU, memory, network) +- Available at `http://scheduler:8787` by default + +**Spans (`distributed/spans.py`)** - Distributed tracing: +- OpenTelemetry integration +- Track computation spans across cluster + +## Testing Patterns + +### Fixtures and Utilities + +Tests use fixtures from `distributed.utils_test`: +- `cleanup()` - Ensure clean state between tests +- `loop()` - Event loop fixture +- `cluster()` / `client()` - LocalCluster and Client fixtures +- `ws` fixture - Automatically marks test as `@pytest.mark.workerstate` + +### Common Testing Patterns + +```python +# Use gen_cluster for async tests with cluster +@gen_cluster(client=True) +async def test_something(c, s, a, b): + # c = client, s = scheduler, a = first worker, b = second worker + future = c.submit(inc, 1) + result = await future + assert result == 2 + +# Use cluster fixture for sync tests +def test_something_sync(client): + future = client.submit(inc, 1) + result = future.result() + assert result == 2 +``` + +### Test Markers + +- `@pytest.mark.slow` - Long-running tests (skip without `--runslow`) +- `@pytest.mark.avoid_ci` - Flaky/broken tests excluded from CI +- `@pytest.mark.ci1` / not `ci1` - Test partitioning for parallel CI runs +- `@pytest.mark.gpu` - GPU-specific tests +- `@pytest.mark.leaking` - Tests with expected resource leaks + +## Configuration System + +Distributed uses Dask's configuration system with distributed-specific config in `~/.config/dask/distributed.yaml`: + +```yaml +distributed: + scheduler: + allowed-failures: 3 + bandwidth: 100000000 + work-stealing: True + worker: + memory: + target: 0.60 # Start spilling at 60% memory + spill: 0.70 # Spill to disk at 70% + pause: 0.80 # Pause at 80% + terminate: 0.95 # Terminate at 95% + comm: + compression: auto + timeouts: + connect: 30s + tcp: 30s +``` + +Override in code: +```python +import dask +dask.config.set({'distributed.scheduler.work-stealing': False}) +``` + +Or via environment variables: +```bash +DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING=False pytest distributed/tests/test_scheduler.py +``` + +## Important Implementation Notes + +### Async/Await Throughout + +- Built on tornado IOLoop (compatible with asyncio) +- All I/O is non-blocking +- RPC handlers should be `async def` functions +- Use `await self.rpc(worker_address).some_operation(args)` for RPC calls + +### Serialization + +- User functions serialized with cloudpickle +- Small messages use msgpack for efficiency +- Large data uses custom protocol with zero-copy where possible +- Compression configurable (default: auto based on size/type) + +### Worker State Machine (`worker_state_machine.py`) + +- The 142k-line file is the deterministic core of worker task execution +- Pure functions (no I/O) that take state and return new state +- Extensively tested with property-based testing +- When modifying, ensure transitions remain deterministic + +### Memory Management + +- Workers track memory via `psutil` +- Three-tier spilling: memory → disk → drop +- Memory limits configurable per worker: `--memory-limit 4GB` +- Active Memory Manager coordinates cluster-wide optimization + +### Test Isolation + +- Each test should clean up resources (workers, schedulers, clients) +- Use fixtures for automatic cleanup +- Pytest resource leak detection catches unclosed resources +- Tests run with strict warnings (most warnings are errors) + +## Common Development Tasks + +### Adding a New RPC Operation + +1. Add handler to target class (Scheduler/Worker/Client): +```python +class Scheduler: + async def my_new_operation(self, arg1, arg2): + # Implementation + return result +``` + +2. Call from remote: +```python +result = await self.rpc(scheduler_address).my_new_operation(val1, val2) +``` + +### Adding a Scheduler Plugin + +```python +from distributed.diagnostics.plugin import SchedulerPlugin + +class MyPlugin(SchedulerPlugin): + async def start(self, scheduler): + # Called when plugin registered + pass + + async def transition(self, key, start, finish, *args, **kwargs): + # Called on every task state transition + pass + +# Register +client.register_scheduler_plugin(MyPlugin()) +``` + +### Adding a Worker Plugin + +```python +from distributed.diagnostics.plugin import WorkerPlugin + +class MyPlugin(WorkerPlugin): + def setup(self, worker): + # Called once when plugin registered + pass + + def transition(self, key, start, finish, *args, **kwargs): + # Called on worker task transitions + pass + +# Register +client.register_worker_plugin(MyPlugin()) +``` + +## CI/CD Details + +- Tests run on Ubuntu, Windows, and macOS +- Python versions: 3.10, 3.11, 3.12, 3.13 +- Special environments: mindeps (minimum dependencies) +- Tests partitioned with `ci1` marker for parallel execution +- pytest-timeout configured (signal mode on Unix, thread mode on Windows) +- Coverage reports uploaded to codecov.io +- Longitudinal test reports track flaky tests over time diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 79e8986b4d..73aa2b51a5 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -116,7 +116,7 @@ properties: worker-saturation: oneOf: - type: number - exclusiveMinimum: 0 + minimum: 0 # String "inf", not to be confused with .inf which in YAML means float # infinity. This is necessary because there's no way to parse a float # infinity from a DASK_* environment variable. @@ -125,7 +125,13 @@ properties: Controls how many root tasks are sent to workers (like a `readahead`). Up to worker-saturation * nthreads root tasks are sent to a - worker at a time. If `.inf`, all runnable tasks are immediately sent to workers. + worker at a time. + + Special values: + - 0: Only send tasks to completely idle workers (no queuing). Useful for + long-running tasks to avoid head-of-line blocking. + - .inf: All runnable tasks are immediately sent to workers. + The target number is rounded up, so any `worker-saturation` value > 1.0 guarantees at least one extra task will be sent to workers. diff --git a/distributed/scheduler.py b/distributed/scheduler.py index d73da7c71f..5271a076e2 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1838,10 +1838,10 @@ def __init__( self.WORKER_SATURATION = math.inf if ( not isinstance(self.WORKER_SATURATION, (int, float)) - or self.WORKER_SATURATION <= 0 + or self.WORKER_SATURATION < 0 ): raise ValueError( # pragma: nocover - "`distributed.scheduler.worker-saturation` must be a float > 0; got " + "`distributed.scheduler.worker-saturation` must be a float >= 0; got " + repr(self.WORKER_SATURATION) ) @@ -9278,8 +9278,19 @@ def heartbeat_interval(n: int) -> float: def _task_slots_available(ws: WorkerState, saturation_factor: float) -> int: - """Number of tasks that can be sent to this worker without oversaturating it""" + """Number of tasks that can be sent to this worker without oversaturating it + + When saturation_factor is 0, tasks are only sent to completely idle workers + (no queuing). This is useful for long-running tasks where you want to avoid + head-of-line blocking. + """ assert not math.isinf(saturation_factor) + + # Special case: saturation_factor == 0 means no queuing + # Only send tasks to workers that are completely idle + if saturation_factor == 0: + return 0 - (len(ws.processing) - len(ws.long_running)) + return max(math.ceil(saturation_factor * ws.nthreads), 1) - ( len(ws.processing) - len(ws.long_running) ) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 4b9d917df9..d7cf074bac 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -626,6 +626,7 @@ def func(first, second): (1.1, (3, 2)), (1.0, (2, 1)), (0.1, (1, 1)), + (0.0, (2, 1)), # No queuing: only executing tasks, no queued tasks # This is necessary because there's no way to parse a float infinite from # a DASK_* environment variable ("inf", (6, 4)), @@ -674,6 +675,12 @@ async def test_bad_saturation_factor(): async with Scheduler(dashboard_address=":0"): pass + # Negative values should be rejected + with pytest.raises(ValueError, match=">= 0"): + with dask.config.set({"distributed.scheduler.worker-saturation": -1.0}): + async with Scheduler(dashboard_address=":0"): + pass + @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) async def test_move_data_over_break_restrictions(client, s, a, b, c): From 66ed3595f89288a4f173cda0704fdaf8b60aae67 Mon Sep 17 00:00:00 2001 From: Hyangmin Jeong Date: Fri, 2 Jan 2026 00:44:00 +0900 Subject: [PATCH 2/4] Delete CLAUDE.md --- CLAUDE.md | 337 ------------------------------------------------------ 1 file changed, 337 deletions(-) delete mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md deleted file mode 100644 index e7832718a9..0000000000 --- a/CLAUDE.md +++ /dev/null @@ -1,337 +0,0 @@ -# CLAUDE.md - -This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. - -## Project Overview - -Distributed is a library for distributed computation in Python, part of the Dask ecosystem. It provides a distributed task scheduler with work stealing, data locality, and fault tolerance. - -## Development Commands - -### Setup -```bash -# Create conda environment (replace 3.12 with your Python version: 3.10, 3.11, 3.12, or 3.13) -mamba env create -f continuous_integration/environment-3.12.yaml -n dask-distributed -conda activate dask-distributed - -# Install in development mode -pip install -e . -``` - -### Testing -```bash -# Run all tests -pytest distributed - -# Run tests in a specific file -pytest distributed/tests/test_client.py - -# Run a specific test -pytest distributed/tests/test_client.py::test_submit - -# Run with slow tests included -pytest distributed --runslow - -# Run tests with markers (exclude CI-avoided tests, include partition) -pytest distributed -m "not avoid_ci and ci1" --runslow - -# Run tests with coverage -pytest distributed --cov=distributed --cov-report=html - -# Run tests with leak detection -pytest distributed --leaks=fds,processes,threads -``` - -### Code Quality -```bash -# Run all pre-commit hooks -pre-commit run --all-files - -# Run ruff linting -ruff check distributed - -# Run ruff formatting -ruff format distributed - -# Run mypy type checking -mypy distributed - -# Run codespell -codespell -``` - -### Documentation -```bash -cd docs -make html # Build HTML documentation -``` - -### Running Distributed Components -```bash -# Start a scheduler -dask-scheduler - -# Start a worker (connect to scheduler at specified address) -dask-worker tcp://127.0.0.1:8786 - -# Start scheduler via SSH -dask-ssh --hostfile hosts.txt -``` - -## Architecture Overview - -### Core Components - -**Scheduler (`distributed/scheduler.py`)** - The central coordinator that: -- Maintains cluster state (workers, tasks, clients) -- Schedules tasks to workers based on data locality and resource availability -- Implements work stealing for load balancing -- Handles worker failures and task retry -- Provides extension/plugin system for custom behavior - -**Client (`distributed/client.py`)** - User-facing API that: -- Submits tasks and task graphs to the scheduler -- Retrieves results via futures -- Provides high-level operations: `submit()`, `map()`, `compute()`, `gather()` -- Manages connection to scheduler - -**Worker (`distributed/worker.py`)** - Task executor that: -- Executes tasks in thread/process pools -- Manages local data storage with memory limits -- Reports metrics to scheduler -- Implements worker-side state machine (`worker_state_machine.py`) -- Handles data transfer between workers - -**Nanny (`distributed/nanny.py`)** - Worker supervisor that: -- Spawns and monitors worker processes -- Restarts crashed workers -- Enforces memory limits (pause/restart on exceed) - -### Communication Architecture - -All components communicate via **async RPC over pluggable transports**: - -- **Transport Layer** (`distributed/comm/`): TCP (default), WebSocket, in-process -- **RPC Protocol** (`distributed/core.py`): Message-based with handler dispatch - - Messages are dicts: `{'op': 'operation_name', 'arg1': value, ...}` - - Handlers registered in `handlers` dict on Server classes - - First handler arg is `comm: Comm`, remaining args from message dict -- **Serialization** (`distributed/protocol/`): cloudpickle, msgpack, with compression -- **Connection Pooling** (`distributed/batched.py`): Persistent connections, message batching - -### State Machine Pattern - -Tasks flow through states on both scheduler and worker: - -**Scheduler Task States:** -- `released` → `waiting` → `queued` → `processing` → `memory` → `forgotten` -- Alternative paths: `error`, `erred`, `no-worker` - -**Worker Task States:** (via `WorkerState` in `worker_state_machine.py`) -- `fetch` → `flight` → `executing` → `memory` → `released` -- Alternative paths: `error`, `missing`, `constrained`, `long-running` - -State transitions are coordinated via RPC and logged in `transition_log` for debugging. - -### Plugin System - -**SchedulerPlugin** (`distributed/diagnostics/plugin.py`): -- Hooks: `start()`, `close()`, `update_graph()`, `transition()`, etc. -- Full access to scheduler state -- Used by distributed data structures (Queue, Variable, Lock, etc.) - -**WorkerPlugin**: -- Similar hooks for worker-side customization -- Used for monitoring, preprocessing, resource management - -### Key Subsystems - -**Shuffle (`distributed/shuffle/`)** - Distributed data repartitioning: -- Coordinates complex multi-worker data transfer -- Used by dask.dataframe and dask.array for repartitioning/rechunking -- Implements scheduler and worker plugins for orchestration - -**Active Memory Manager (`distributed/active_memory_manager.py`)** - Automatic memory management: -- Replicates important data -- Drops redundant data when memory pressure builds -- Rebalances data across workers - -**Dashboard (`distributed/dashboard/`)** - Web UI (Bokeh-based): -- Real-time cluster monitoring -- Task stream visualization -- Resource graphs (CPU, memory, network) -- Available at `http://scheduler:8787` by default - -**Spans (`distributed/spans.py`)** - Distributed tracing: -- OpenTelemetry integration -- Track computation spans across cluster - -## Testing Patterns - -### Fixtures and Utilities - -Tests use fixtures from `distributed.utils_test`: -- `cleanup()` - Ensure clean state between tests -- `loop()` - Event loop fixture -- `cluster()` / `client()` - LocalCluster and Client fixtures -- `ws` fixture - Automatically marks test as `@pytest.mark.workerstate` - -### Common Testing Patterns - -```python -# Use gen_cluster for async tests with cluster -@gen_cluster(client=True) -async def test_something(c, s, a, b): - # c = client, s = scheduler, a = first worker, b = second worker - future = c.submit(inc, 1) - result = await future - assert result == 2 - -# Use cluster fixture for sync tests -def test_something_sync(client): - future = client.submit(inc, 1) - result = future.result() - assert result == 2 -``` - -### Test Markers - -- `@pytest.mark.slow` - Long-running tests (skip without `--runslow`) -- `@pytest.mark.avoid_ci` - Flaky/broken tests excluded from CI -- `@pytest.mark.ci1` / not `ci1` - Test partitioning for parallel CI runs -- `@pytest.mark.gpu` - GPU-specific tests -- `@pytest.mark.leaking` - Tests with expected resource leaks - -## Configuration System - -Distributed uses Dask's configuration system with distributed-specific config in `~/.config/dask/distributed.yaml`: - -```yaml -distributed: - scheduler: - allowed-failures: 3 - bandwidth: 100000000 - work-stealing: True - worker: - memory: - target: 0.60 # Start spilling at 60% memory - spill: 0.70 # Spill to disk at 70% - pause: 0.80 # Pause at 80% - terminate: 0.95 # Terminate at 95% - comm: - compression: auto - timeouts: - connect: 30s - tcp: 30s -``` - -Override in code: -```python -import dask -dask.config.set({'distributed.scheduler.work-stealing': False}) -``` - -Or via environment variables: -```bash -DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING=False pytest distributed/tests/test_scheduler.py -``` - -## Important Implementation Notes - -### Async/Await Throughout - -- Built on tornado IOLoop (compatible with asyncio) -- All I/O is non-blocking -- RPC handlers should be `async def` functions -- Use `await self.rpc(worker_address).some_operation(args)` for RPC calls - -### Serialization - -- User functions serialized with cloudpickle -- Small messages use msgpack for efficiency -- Large data uses custom protocol with zero-copy where possible -- Compression configurable (default: auto based on size/type) - -### Worker State Machine (`worker_state_machine.py`) - -- The 142k-line file is the deterministic core of worker task execution -- Pure functions (no I/O) that take state and return new state -- Extensively tested with property-based testing -- When modifying, ensure transitions remain deterministic - -### Memory Management - -- Workers track memory via `psutil` -- Three-tier spilling: memory → disk → drop -- Memory limits configurable per worker: `--memory-limit 4GB` -- Active Memory Manager coordinates cluster-wide optimization - -### Test Isolation - -- Each test should clean up resources (workers, schedulers, clients) -- Use fixtures for automatic cleanup -- Pytest resource leak detection catches unclosed resources -- Tests run with strict warnings (most warnings are errors) - -## Common Development Tasks - -### Adding a New RPC Operation - -1. Add handler to target class (Scheduler/Worker/Client): -```python -class Scheduler: - async def my_new_operation(self, arg1, arg2): - # Implementation - return result -``` - -2. Call from remote: -```python -result = await self.rpc(scheduler_address).my_new_operation(val1, val2) -``` - -### Adding a Scheduler Plugin - -```python -from distributed.diagnostics.plugin import SchedulerPlugin - -class MyPlugin(SchedulerPlugin): - async def start(self, scheduler): - # Called when plugin registered - pass - - async def transition(self, key, start, finish, *args, **kwargs): - # Called on every task state transition - pass - -# Register -client.register_scheduler_plugin(MyPlugin()) -``` - -### Adding a Worker Plugin - -```python -from distributed.diagnostics.plugin import WorkerPlugin - -class MyPlugin(WorkerPlugin): - def setup(self, worker): - # Called once when plugin registered - pass - - def transition(self, key, start, finish, *args, **kwargs): - # Called on worker task transitions - pass - -# Register -client.register_worker_plugin(MyPlugin()) -``` - -## CI/CD Details - -- Tests run on Ubuntu, Windows, and macOS -- Python versions: 3.10, 3.11, 3.12, 3.13 -- Special environments: mindeps (minimum dependencies) -- Tests partitioned with `ci1` marker for parallel execution -- pytest-timeout configured (signal mode on Unix, thread mode on Windows) -- Coverage reports uploaded to codecov.io -- Longitudinal test reports track flaky tests over time From cc139fbe7b4c4f22b9b2f4eae0983753e9869ec9 Mon Sep 17 00:00:00 2001 From: HyangMin Jeong Date: Sun, 4 Jan 2026 00:22:09 +0900 Subject: [PATCH 3/4] Fix saturation_factor=0 to allow task assignment to idle workers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous implementation returned 0 for idle workers when saturation_factor=0, which caused _worker_full() to incorrectly mark idle workers as full (since it checks `<= 0`). This prevented any tasks from being assigned, causing test timeouts. Changed _task_slots_available() to return 1 for idle workers when saturation_factor=0, allowing idle workers to accept tasks while still preventing queuing on busy workers. Behavior with saturation_factor=0: - Idle worker: slots = 1 (can accept tasks) - Worker with 1 task: slots = 0 (marked as full) - Worker with seceded task: slots = 1 (can accept tasks) Fixes test_saturation_factor[0.0-expected_task_counts5] timeout. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5271a076e2..a1b227461d 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -9289,7 +9289,7 @@ def _task_slots_available(ws: WorkerState, saturation_factor: float) -> int: # Special case: saturation_factor == 0 means no queuing # Only send tasks to workers that are completely idle if saturation_factor == 0: - return 0 - (len(ws.processing) - len(ws.long_running)) + return 1 - (len(ws.processing) - len(ws.long_running)) return max(math.ceil(saturation_factor * ws.nthreads), 1) - ( len(ws.processing) - len(ws.long_running) From c066abb6897fe17a1c79a66e86f0149f6e02b0e4 Mon Sep 17 00:00:00 2001 From: Hyangmin Jeong Date: Sun, 4 Jan 2026 18:09:05 +0900 Subject: [PATCH 4/4] Fix saturation_factor=0 to properly support multi-threaded workers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When saturation_factor=0, workers should accept tasks up to their thread count without queuing. The previous implementation returned only 1 slot for idle workers, causing multi-threaded workers to underutilize threads. Now returns ws.nthreads - active_tasks, allowing workers to fill all idle threads while preventing queuing (head-of-line blocking). Fixes test_saturation_factor[0.0] which expects (2, 1) task distribution for workers with (2, 1) threads respectively. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 --- distributed/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a1b227461d..b811a8b874 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -9287,9 +9287,9 @@ def _task_slots_available(ws: WorkerState, saturation_factor: float) -> int: assert not math.isinf(saturation_factor) # Special case: saturation_factor == 0 means no queuing - # Only send tasks to workers that are completely idle + # Only send tasks to fill idle threads (no tasks beyond thread count) if saturation_factor == 0: - return 1 - (len(ws.processing) - len(ws.long_running)) + return ws.nthreads - (len(ws.processing) - len(ws.long_running)) return max(math.ceil(saturation_factor * ws.nthreads), 1) - ( len(ws.processing) - len(ws.long_running)