diff --git a/bids2table/__main__.py b/bids2table/__main__.py index 355f8c9..bfa60c8 100644 --- a/bids2table/__main__.py +++ b/bids2table/__main__.py @@ -37,14 +37,14 @@ def main(): "--workers", "-j", type=int, - help="Number of worker processes. Setting to -1 runs as many workers as there " + help="Number of worker processes for dataset-level parallelism. Setting to -1 runs as many workers as there " "are cores available. Setting to 0 runs in the main process. (default: 0)", default=0, ) parser_index.add_argument( "--use-threads", action="store_true", - help="Use threads instead of processes when workers > 0.", + help="Use threads instead of processes when workers > 0 (dataset-level parallelism only).", ) parser_index.add_argument( "--no-progress", "-q", action="store_true", help="Disable the progress bar." @@ -105,12 +105,6 @@ def _index_command(args: argparse.Namespace): for path in args.root: _check_path(path) - max_workers = None if args.workers == -1 else args.workers - if args.use_threads: - executor_cls = concurrent.futures.ThreadPoolExecutor - else: - executor_cls = concurrent.futures.ProcessPoolExecutor - root = [] for path in args.root: if glob.has_magic(path): @@ -124,18 +118,15 @@ def _index_command(args: argparse.Namespace): table = b2t2.index_dataset( root[0], include_subjects=args.subjects, - max_workers=max_workers, - executor_cls=executor_cls, show_progress=not args.no_progress, ) pq.write_table(table, args.output) else: - if len(root) == 0 and not sys.stdin.isatty(): - # read datasets from stdin, one per line - root = (line.strip() for line in sys.stdin if line.strip()) - elif len(root) == 0: - _logger.error("No datasets to index given; exiting.") - sys.exit(1) + max_workers = None if args.workers == -1 else args.workers + if args.use_threads: + executor_cls = concurrent.futures.ThreadPoolExecutor + else: + executor_cls = concurrent.futures.ProcessPoolExecutor schema = b2t2.get_arrow_schema() with pq.ParquetWriter(args.output, schema=schema) as writer: diff --git a/bids2table/_indexing.py b/bids2table/_indexing.py index b9d94a0..0862a9c 100644 --- a/bids2table/_indexing.py +++ b/bids2table/_indexing.py @@ -196,9 +196,6 @@ def find_bids_datasets( def index_dataset( root: str | PathT, include_subjects: str | list[str] | None = None, - max_workers: int | None = 0, - chunksize: int = 32, - executor_cls: type[Executor] = ProcessPoolExecutor, show_progress: bool = False, ) -> pa.Table: """Index a BIDS dataset. @@ -207,13 +204,6 @@ def index_dataset( root: BIDS dataset root directory. include_subjects: Glob pattern or list of patterns for matching subjects to include in the index. - max_workers: Number of indexing processes to run in parallel. Setting - `max_workers=0` (the default) uses the main process only. Setting - `max_workers=None` starts as many workers as there are available CPUs. See - `concurrent.futures.ProcessPoolExecutor` for details. - chunksize: Number of subjects per process task. Only used for - `ProcessPoolExecutor` when `max_workers > 0`. - executor_cls: Executor class to use for parallel indexing. show_progress: Show progress bar. Returns: @@ -234,25 +224,12 @@ def index_dataset( _logger.warning(f"Path {root} contains no matching subject dirs.") return pa.Table.from_pylist([], schema=schema) - func = partial(_index_bids_subject_dir, schema=schema, dataset=dataset) - tables = [] file_count = 0 - for sub, table in ( - pbar := tqdm( - _pmap(func, subject_dirs, max_workers, chunksize, executor_cls), - desc=dataset, - total=len(subject_dirs), - disable=not show_progress, - ) - ): - file_count += len(table) - pbar.set_postfix(dict(sub=sub, N=_hfmt(file_count)), refresh=False) + for sub in subject_dirs: + _, table = _index_bids_subject_dir(sub, schema=schema, dataset=dataset) tables.append(table) - - # NOTE: concat_tables produces a table where each column is a ChunkedArray, with one - # chunk per original subject table. Is it better to keep the original chunks (one - # per subject) or merge using `combine_chunks`? + file_count += len(table) table = pa.concat_tables(tables).combine_chunks() return table @@ -292,7 +269,7 @@ def batch_index_dataset( def _batch_index_func(root: str | PathT) -> tuple[str | None, pa.Table]: dataset, _ = _get_bids_dataset(root) - table = index_dataset(root, max_workers=0, show_progress=False) + table = index_dataset(root, show_progress=False) return dataset, table diff --git a/bids2table/_pathlib.py b/bids2table/_pathlib.py index 4113f9a..dbc9944 100644 --- a/bids2table/_pathlib.py +++ b/bids2table/_pathlib.py @@ -9,7 +9,7 @@ S3Client(no_sign_request=True).set_as_default_client() GSClient().set_as_default_client() -except ImportError: +except Exception: AnyPath = CloudPath = Path _CLOUDPATHLIB_AVAILABLE = False diff --git a/pyproject.toml b/pyproject.toml index ec2ab72..3d86aae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,12 @@ build-backend = "setuptools.build_meta" [project] name = "bids2table" dynamic = ["version"] -authors = [{ name = "Connor Lane", email = "connor.lane858@gmail.com" }] +authors = [ + { name = "Connor Lane", email = "connor.lane858@gmail.com" }, + { name = "Jason Kai" }, + { name = "Florian Rupprecht" }, + { name = "Gregory Kiar" } +] description = "Index BIDS datasets fast, locally or in the cloud." readme = "README.md" requires-python = ">=3.11" diff --git a/tests/conftest.py b/tests/conftest.py index 1b5ee3a..d0bb741 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,6 +2,20 @@ import pytest +from bids2table._pathlib import cloudpathlib_is_available + + +def pytest_configure(config): + config.addinivalue_line( + "markers", "cloud: Tests requiring cloud group dependencies" + ) + + +def pytest_runtest_setup(item): + if "cloud" in item.keywords: + if not cloudpathlib_is_available(): + pytest.skip("cloudpathlib is not available or not fully functional") + @pytest.fixture def symlink_dataset(tmp_path: Path, sub: str = "sub-001", ses: str = "ses-001") -> Path: diff --git a/tests/pybids/test_layout.py b/tests/pybids/test_layout.py index ad6b75f..8230de3 100644 --- a/tests/pybids/test_layout.py +++ b/tests/pybids/test_layout.py @@ -13,7 +13,7 @@ def test_dataset(): """Return path to a test BIDS dataset.""" # Use one of the bids-examples datasets - dataset_path = Path(__file__).parents[2] / "bids-examples" / "ds001" + dataset_path = Path(__file__).parents[2] / "bids-examples" / "ds114" if not dataset_path.exists(): pytest.skip(f"Test dataset not found: {dataset_path}") return dataset_path