|
1 | 1 | from pathlib import Path |
2 | 2 |
|
3 | 3 | import nibabel as nb |
| 4 | +import nitransforms as nt |
4 | 5 | import numpy as np |
5 | 6 | import pytest |
6 | 7 | import yaml |
|
11 | 12 | from ....utils import bids |
12 | 13 | from ...tests import mock_config |
13 | 14 | from ...tests.test_base import BASE_LAYOUT |
14 | | -from ..fit import init_pet_fit_wf, init_pet_native_wf |
| 15 | +from ..fit import _extract_twa_image, init_pet_fit_wf, init_pet_native_wf |
15 | 16 | from ..outputs import init_refmask_report_wf |
16 | 17 |
|
17 | 18 |
|
@@ -370,6 +371,108 @@ def test_pet_fit_stage1_with_cached_baseline(bids_root: Path, tmp_path: Path): |
370 | 371 | assert not any(name.startswith('pet_hmc_wf') for name in wf.list_node_names()) |
371 | 372 |
|
372 | 373 |
|
| 374 | +def test_pet_fit_hmc_off_disables_stage1(bids_root: Path, tmp_path: Path): |
| 375 | + """Disabling HMC should skip Stage 1 and use identity transforms.""" |
| 376 | + pet_series = [str(bids_root / 'sub-01' / 'pet' / 'sub-01_task-rest_run-1_pet.nii.gz')] |
| 377 | + data = np.stack( |
| 378 | + ( |
| 379 | + np.ones((2, 2, 2), dtype=np.float32), |
| 380 | + np.full((2, 2, 2), 3.0, dtype=np.float32), |
| 381 | + ), |
| 382 | + axis=-1, |
| 383 | + ) |
| 384 | + img = nb.Nifti1Image(data, np.eye(4)) |
| 385 | + for path in pet_series: |
| 386 | + img.to_filename(path) |
| 387 | + |
| 388 | + sidecar = Path(pet_series[0]).with_suffix('').with_suffix('.json') |
| 389 | + sidecar.write_text('{"FrameTimesStart": [0, 2], "FrameDuration": [2, 4]}') |
| 390 | + |
| 391 | + with mock_config(bids_dir=bids_root): |
| 392 | + config.workflow.hmc_off = True |
| 393 | + wf = init_pet_fit_wf(pet_series=pet_series, precomputed={}, omp_nthreads=1) |
| 394 | + |
| 395 | + assert not any(name.startswith('pet_hmc_wf') for name in wf.list_node_names()) |
| 396 | + hmc_buffer = wf.get_node('hmc_buffer') |
| 397 | + assert str(hmc_buffer.inputs.hmc_xforms).endswith('idmat.tfm') |
| 398 | + hmc = nt.linear.load(hmc_buffer.inputs.hmc_xforms) |
| 399 | + assert hmc.matrix.shape[0] == data.shape[-1] |
| 400 | + assert np.allclose(hmc.matrix, np.tile(np.eye(4), (data.shape[-1], 1, 1))) |
| 401 | + petref_buffer = wf.get_node('petref_buffer') |
| 402 | + petref_name = Path(petref_buffer.inputs.petref).name |
| 403 | + assert petref_name.endswith('_timeavgref.nii.gz') |
| 404 | + assert '.nii_timeavgref' not in petref_name |
| 405 | + petref_img = nb.load(petref_buffer.inputs.petref) |
| 406 | + assert np.allclose(petref_img.get_fdata(), 14.0 / 6.0) |
| 407 | + |
| 408 | + |
| 409 | +@pytest.mark.parametrize( |
| 410 | + ('frame_start_times', 'frame_durations', 'message'), |
| 411 | + [ |
| 412 | + (None, [1, 1], 'Frame timing metadata are required'), |
| 413 | + ([0, 1], None, 'Frame timing metadata are required'), |
| 414 | + ([[0, 1]], [1, 1], 'must be one-dimensional'), |
| 415 | + ([0, 1], [1], 'the same length'), |
| 416 | + ([0, 1, 2], [1, 1, 1], 'match the number of frames'), |
| 417 | + ([0, 1], [1, -1], 'must all be positive'), |
| 418 | + ([1, 0], [1, 1], 'must be non-decreasing'), |
| 419 | + ], |
| 420 | +) |
| 421 | +def test_extract_twa_image_validation( |
| 422 | + tmp_path: Path, frame_start_times, frame_durations, message: str |
| 423 | +): |
| 424 | + """Validate error handling for malformed frame timing metadata.""" |
| 425 | + |
| 426 | + pet_img = nb.Nifti1Image(np.zeros((2, 2, 2, 2), dtype=np.float32), np.eye(4)) |
| 427 | + pet_file = tmp_path / 'pet.nii.gz' |
| 428 | + pet_img.to_filename(pet_file) |
| 429 | + |
| 430 | + with pytest.raises(ValueError, match=message): # noqa: PT011 |
| 431 | + _extract_twa_image( |
| 432 | + str(pet_file), |
| 433 | + tmp_path / 'out', |
| 434 | + frame_start_times, |
| 435 | + frame_durations, |
| 436 | + ) |
| 437 | + |
| 438 | + |
| 439 | +def test_pet_fit_hmc_off_ignores_precomputed(bids_root: Path, tmp_path: Path): |
| 440 | + """Precomputed derivatives are ignored when ``--hmc-off`` is set.""" |
| 441 | + |
| 442 | + pet_series = [str(bids_root / 'sub-01' / 'pet' / 'sub-01_task-rest_run-1_pet.nii.gz')] |
| 443 | + data = np.stack((np.ones((2, 2, 2)), np.full((2, 2, 2), 2.0)), axis=-1) |
| 444 | + img = nb.Nifti1Image(data, np.eye(4)) |
| 445 | + for path in pet_series: |
| 446 | + img.to_filename(path) |
| 447 | + |
| 448 | + sidecar = Path(pet_series[0]).with_suffix('').with_suffix('.json') |
| 449 | + sidecar.write_text('{"FrameTimesStart": [0, 1], "FrameDuration": [1, 1]}') |
| 450 | + |
| 451 | + precomputed_petref = tmp_path / 'precomputed_petref.nii.gz' |
| 452 | + precomputed_hmc = tmp_path / 'precomputed_hmc.txt' |
| 453 | + img.to_filename(precomputed_petref) |
| 454 | + np.savetxt(precomputed_hmc, np.eye(4)) |
| 455 | + |
| 456 | + with mock_config(bids_dir=bids_root): |
| 457 | + config.workflow.hmc_off = True |
| 458 | + wf = init_pet_fit_wf( |
| 459 | + pet_series=pet_series, |
| 460 | + precomputed={ |
| 461 | + 'petref': str(precomputed_petref), |
| 462 | + 'transforms': {'hmc': str(precomputed_hmc)}, |
| 463 | + }, |
| 464 | + omp_nthreads=1, |
| 465 | + ) |
| 466 | + |
| 467 | + petref_buffer = wf.get_node('petref_buffer') |
| 468 | + hmc_buffer = wf.get_node('hmc_buffer') |
| 469 | + |
| 470 | + assert petref_buffer.inputs.petref != str(precomputed_petref) |
| 471 | + assert Path(petref_buffer.inputs.petref).name.endswith('_timeavgref.nii.gz') |
| 472 | + assert hmc_buffer.inputs.hmc_xforms != str(precomputed_hmc) |
| 473 | + assert Path(hmc_buffer.inputs.hmc_xforms).name == 'idmat.tfm' |
| 474 | + |
| 475 | + |
373 | 476 | def test_init_refmask_report_wf(tmp_path: Path): |
374 | 477 | """Ensure the refmask report workflow initializes without errors.""" |
375 | 478 | wf = init_refmask_report_wf(output_dir=str(tmp_path), ref_name='test') |
|
0 commit comments