Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 7 additions & 16 deletions bids2table/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ def main():
"--workers",
"-j",
type=int,
help="Number of worker processes. Setting to -1 runs as many workers as there "
help="Number of worker processes for dataset-level parallelism. Setting to -1 runs as many workers as there "
"are cores available. Setting to 0 runs in the main process. (default: 0)",
default=0,
)
parser_index.add_argument(
"--use-threads",
action="store_true",
help="Use threads instead of processes when workers > 0.",
help="Use threads instead of processes when workers > 0 (dataset-level parallelism only).",
)
parser_index.add_argument(
"--no-progress", "-q", action="store_true", help="Disable the progress bar."
Expand Down Expand Up @@ -105,12 +105,6 @@ def _index_command(args: argparse.Namespace):
for path in args.root:
_check_path(path)

max_workers = None if args.workers == -1 else args.workers
if args.use_threads:
executor_cls = concurrent.futures.ThreadPoolExecutor
else:
executor_cls = concurrent.futures.ProcessPoolExecutor

root = []
for path in args.root:
if glob.has_magic(path):
Expand All @@ -124,18 +118,15 @@ def _index_command(args: argparse.Namespace):
table = b2t2.index_dataset(
root[0],
include_subjects=args.subjects,
max_workers=max_workers,
executor_cls=executor_cls,
show_progress=not args.no_progress,
)
pq.write_table(table, args.output)
else:
if len(root) == 0 and not sys.stdin.isatty():
# read datasets from stdin, one per line
root = (line.strip() for line in sys.stdin if line.strip())
elif len(root) == 0:
_logger.error("No datasets to index given; exiting.")
sys.exit(1)
max_workers = None if args.workers == -1 else args.workers
if args.use_threads:
executor_cls = concurrent.futures.ThreadPoolExecutor
else:
executor_cls = concurrent.futures.ProcessPoolExecutor

schema = b2t2.get_arrow_schema()
with pq.ParquetWriter(args.output, schema=schema) as writer:
Expand Down
31 changes: 4 additions & 27 deletions bids2table/_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,6 @@ def find_bids_datasets(
def index_dataset(
root: str | PathT,
include_subjects: str | list[str] | None = None,
max_workers: int | None = 0,
chunksize: int = 32,
executor_cls: type[Executor] = ProcessPoolExecutor,
show_progress: bool = False,
) -> pa.Table:
"""Index a BIDS dataset.
Expand All @@ -207,13 +204,6 @@ def index_dataset(
root: BIDS dataset root directory.
include_subjects: Glob pattern or list of patterns for matching subjects to
include in the index.
max_workers: Number of indexing processes to run in parallel. Setting
`max_workers=0` (the default) uses the main process only. Setting
`max_workers=None` starts as many workers as there are available CPUs. See
`concurrent.futures.ProcessPoolExecutor` for details.
chunksize: Number of subjects per process task. Only used for
`ProcessPoolExecutor` when `max_workers > 0`.
executor_cls: Executor class to use for parallel indexing.
show_progress: Show progress bar.

Returns:
Expand All @@ -234,25 +224,12 @@ def index_dataset(
_logger.warning(f"Path {root} contains no matching subject dirs.")
return pa.Table.from_pylist([], schema=schema)

func = partial(_index_bids_subject_dir, schema=schema, dataset=dataset)

tables = []
file_count = 0
for sub, table in (
pbar := tqdm(
_pmap(func, subject_dirs, max_workers, chunksize, executor_cls),
desc=dataset,
total=len(subject_dirs),
disable=not show_progress,
)
):
file_count += len(table)
pbar.set_postfix(dict(sub=sub, N=_hfmt(file_count)), refresh=False)
for sub in subject_dirs:
_, table = _index_bids_subject_dir(sub, schema=schema, dataset=dataset)
tables.append(table)

# NOTE: concat_tables produces a table where each column is a ChunkedArray, with one
# chunk per original subject table. Is it better to keep the original chunks (one
# per subject) or merge using `combine_chunks`?
file_count += len(table)
table = pa.concat_tables(tables).combine_chunks()
return table

Expand Down Expand Up @@ -292,7 +269,7 @@ def batch_index_dataset(

def _batch_index_func(root: str | PathT) -> tuple[str | None, pa.Table]:
dataset, _ = _get_bids_dataset(root)
table = index_dataset(root, max_workers=0, show_progress=False)
table = index_dataset(root, show_progress=False)
return dataset, table


Expand Down
2 changes: 1 addition & 1 deletion bids2table/_pathlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
S3Client(no_sign_request=True).set_as_default_client()
GSClient().set_as_default_client()

except ImportError:
except Exception:
AnyPath = CloudPath = Path

_CLOUDPATHLIB_AVAILABLE = False
Expand Down
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ build-backend = "setuptools.build_meta"
[project]
name = "bids2table"
dynamic = ["version"]
authors = [{ name = "Connor Lane", email = "connor.lane858@gmail.com" }]
authors = [
{ name = "Connor Lane", email = "connor.lane858@gmail.com" },
{ name = "Jason Kai" },
{ name = "Florian Rupprecht" },
{ name = "Gregory Kiar" }
]
description = "Index BIDS datasets fast, locally or in the cloud."
readme = "README.md"
requires-python = ">=3.11"
Expand Down
14 changes: 14 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,20 @@

import pytest

from bids2table._pathlib import cloudpathlib_is_available


def pytest_configure(config):
config.addinivalue_line(
"markers", "cloud: Tests requiring cloud group dependencies"
)


def pytest_runtest_setup(item):
if "cloud" in item.keywords:
if not cloudpathlib_is_available():
pytest.skip("cloudpathlib is not available or not fully functional")


@pytest.fixture
def symlink_dataset(tmp_path: Path, sub: str = "sub-001", ses: str = "ses-001") -> Path:
Expand Down
2 changes: 1 addition & 1 deletion tests/pybids/test_layout.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
def test_dataset():
"""Return path to a test BIDS dataset."""
# Use one of the bids-examples datasets
dataset_path = Path(__file__).parents[2] / "bids-examples" / "ds001"
dataset_path = Path(__file__).parents[2] / "bids-examples" / "ds114"
if not dataset_path.exists():
pytest.skip(f"Test dataset not found: {dataset_path}")
return dataset_path
Expand Down