Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions nf_core/components/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,10 @@ def install(self, component: str | dict[str, str], silent: bool = False) -> bool
# Install included modules and subworkflows
self.install_included_components(component_dir)

# Regenerate container configuration files for the pipeline when modules are installed
# Update container configs for the installed module. Subworkflows have no container entries of their
# own; their included modules each trigger this when installed above.
if self.component_type == "modules":
try_generate_container_configs(self.directory, component_dir, component)
try_generate_container_configs(self.directory, component_dir)

if not silent:
modules_json.load()
Expand Down
5 changes: 2 additions & 3 deletions nf_core/components/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,9 @@ def update(self, component=None, silent=False, updated=None, check_diff_exist=Tr
# Update modules.json with newly installed component
self.modules_json.update(self.component_type, modules_repo, component, version, installed_by=None)
updated.append(component)

# Regenerate container configuration files for the pipeline when modules are updated
if self.component_type == "modules":
try_generate_container_configs(self.directory)
try_generate_container_configs(self.directory, component_dir)

recursive_update = True
modules_to_update, subworkflows_to_update = self.get_components_to_update(component)
if not silent and len(modules_to_update + subworkflows_to_update) > 0 and not self.update_all:
Expand Down
269 changes: 119 additions & 150 deletions nf_core/pipelines/containers_utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import json
import contextlib
import logging
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path

import yaml

from nf_core.utils import NF_INSPECT_MIN_NF_VERSION, check_nextflow_version, pretty_nf_version, run_cmd
from nf_core.utils import read_module_name

log = logging.getLogger(__name__)

Expand All @@ -20,185 +21,153 @@
"conda_lock_files_arm64": ["conda", "linux/arm64", "lock_file"],
}

_CONFIG_LINE_RE = re.compile(r"withName:\s*'(\w+)'\s*\{\s*(?:container|conda)\s*=\s*'([^']+)'")

class ContainerConfigs:
"""Generates the container configuration files for a pipeline.

Args:
workflow_directory (Path): The directory containing the workflow files.
"""
def _container_key(platform: str) -> str:
return "conda" if platform.startswith("conda_lock_") else "container"

def __init__(
self,
workflow_directory: Path = Path(),
):
self.workflow_directory = workflow_directory

def check_nextflow_version_sufficient(self) -> None:
"""Check if the Nextflow version is sufficient to run `nextflow inspect`."""
if not check_nextflow_version(NF_INSPECT_MIN_NF_VERSION, silent=True):
raise UserWarning(
f"To use Seqera containers Nextflow version >= {pretty_nf_version(NF_INSPECT_MIN_NF_VERSION)} is required.\n"
f"Please update your Nextflow version with [magenta]'nextflow self-update'[/]\n"
)
def _parse_config_file(config_path: Path) -> dict[str, str]:
"""Return {module_name: container} from an existing platform config file."""
result: dict[str, str] = {}
with contextlib.suppress(OSError):
for line in config_path.read_text().splitlines():
m = _CONFIG_LINE_RE.search(line)
if m:
result[m.group(1)] = m.group(2)
return result

def parse_module_paths(self) -> dict[str, Path]:
"""Parse include statements from workflow files to extract module paths.

Only processes includes pointing to 'modules/' directories.
Extracts the path from 'modules/' onwards, ignoring any '../' prefixes.
def _write_platform_config(config_path: Path, entries: dict[str, str], key: str) -> bool:
"""Write *entries* to *config_path*, or delete it when empty.

Returns:
dict: Mapping of process names to their module paths (e.g., 'modules/nf-core/fastqc')
"""
module_paths = {}
Skips the write when content is unchanged. Returns True if the file now exists.
"""
if not entries:
config_path.unlink(missing_ok=True)
return False
new_content = "".join(
f"process {{ withName: '{name}' {{ {key} = '{container}' }} }}\n" for name, container in sorted(entries.items())
)
if not config_path.exists() or config_path.read_text() != new_content:
config_path.write_text(new_content)
return True

# Pattern matches: include { NAME ... } from 'path'
# Captures the first name (original) and the path, ignoring any alias
include_pattern = re.compile(r"include\s*\{\s*(\w+).*?\}\s*from\s+['\"]([^'\"]+)['\"]")

# Search in root (main.nf), workflows, modules, and subworkflows directories
search_dirs = ["workflows", "modules", "subworkflows"]
def _process_meta(meta_path: Path) -> tuple[str, dict[str, str]] | None:
"""Read one meta.yml + sibling main.nf and return (module_name, {platform: container}).

nf_files_to_search = list(self.workflow_directory.glob("*.nf"))
for search_dir in search_dirs:
search_path = self.workflow_directory / search_dir
if not search_path.exists():
continue
nf_files_to_search.extend(search_path.rglob("*.nf"))
Returns None when the file should be skipped.
"""
try:
raw = meta_path.read_bytes()
except OSError as e:
log.debug(f"Could not read {meta_path}: {e}")
return None

# TODO: remove this early-exit once containers are present in the majority of modules
if b"containers:" not in raw:
return None

meta = yaml.safe_load(raw)

for nf_file in nf_files_to_search:
try:
content = nf_file.read_text()
for match in include_pattern.finditer(content):
process_name = match.group(1)
relative_path = match.group(2)
module_name = read_module_name(meta_path.parent / "main.nf")
if not module_name:
log.debug(f"No process definition found next to {meta_path}, skipping")
return None

# Only process paths that contain 'modules/'
if "modules/" not in relative_path:
continue
platform_containers: dict[str, str] = {}
for platform_name, (runtime, arch, protocol) in PLATFORMS.items():
with contextlib.suppress(KeyError, TypeError):
platform_containers[platform_name] = meta["containers"][runtime][arch][protocol]

# Extract everything from 'modules/' onwards, removing any '/main' suffix
module_path_str = relative_path[relative_path.find("modules/") :]
module_path_str = module_path_str.replace("/main", "")
return module_name, platform_containers


class ContainerConfigs:
"""Generates the container configuration files for a pipeline.

module_paths[process_name] = Path(module_path_str)
log.debug(f"Found include: {process_name} -> {module_path_str}")
Args:
workflow_directory (Path): The directory containing the workflow files.
"""

except OSError as e:
log.debug(f"Error parsing {nf_file}: {e}")
continue
def __init__(self, workflow_directory: Path = Path()) -> None:
self.workflow_directory = workflow_directory

return module_paths
def update_module_container_config(self, module_path: Path) -> None:
"""Targeted update for a single module.

def generate_container_configs(
self, new_module_path: Path | None = None, new_module_name: str | None = None
) -> set[str]:
Reads the current config files, splices in (or removes) the entry for
the module at *module_path*, and writes back only what changed.
"""
Generate the container configuration files for a pipeline.
Requires Nextflow >= 25.04.4
result = _process_meta(module_path / "meta.yml")

if result is None:
module_name = read_module_name(module_path / "main.nf")
if not module_name:
log.debug(f"Could not determine process name for {module_path}, skipping")
return
platform_containers: dict[str, str] = {}
else:
module_name, platform_containers = result

conf_dir = self.workflow_directory / "conf"
for platform in PLATFORMS:
config_path = conf_dir / f"containers_{platform}.config"
entries = _parse_config_file(config_path)
if platform in platform_containers:
entries[module_name] = platform_containers[platform]
_write_platform_config(config_path, entries, _container_key(platform))

def generate_container_configs(self) -> set[str]:
"""Full scan of all ``meta.yml`` files under ``modules/``.

Used by lint and bulk operations (update, remove, patch) where multiple
modules may have changed.

Returns:
set[str]: Names of config files written (e.g. ``{'containers_docker_amd64.config'}``).
"""
self.check_nextflow_version_sufficient()
log.debug("Generating container config file with [magenta bold]nextflow inspect[/].")
try:
# Run nextflow inspect
executable = "nextflow"
cmd_params = f"inspect -format json {self.workflow_directory}"
cmd_out = run_cmd(executable, cmd_params)
if cmd_out is None:
raise UserWarning("Failed to run `nextflow inspect`. Please check your Nextflow installation.")

out, _ = cmd_out
out_str = out.decode("utf-8", errors="replace")
try:
# Newer Nextflow versions print [PIPELINE]/[WORKDIR] headers before and [SUCCESS] after the JSON
json_start = out_str.find("{")
out_json, _ = json.JSONDecoder().raw_decode(out_str, json_start if json_start >= 0 else 0)
except json.JSONDecodeError:
out_json = json.loads(out)

except RuntimeError as e:
log.error("Running 'nextflow inspect' failed with the following error:")
raise UserWarning(e) from e

module_names = {p.get("name") for p in out_json["processes"] if p.get("name")}
log.debug(f"Found {len(module_names)} modules: {', '.join(module_names)}")

# Parse module paths from include statements
module_path_map = self.parse_module_paths()
log.debug(f"Parsed {len(module_path_map)} module paths from include statements")

if new_module_name and new_module_path:
module_names.add(new_module_name.upper())
module_path_map[new_module_name.upper()] = new_module_path

# Build containers dict from module meta.yml files
# Pre-initialize all platforms to avoid repeated existence checks
has_warnings = False
modules_dir = self.workflow_directory / "modules"
if not modules_dir.is_dir():
log.debug(f"No modules directory found at {modules_dir}, skipping")
return set()

containers: dict[str, dict[str, str]] = {platform: {} for platform in PLATFORMS}
for m_name in module_names:
# Try to get module path from include statements
if m_name in module_path_map:
module_path = module_path_map[m_name]
log.debug(f"Using parsed path for {m_name}: {module_path}")
else:
# Fallback to old heuristic method
log.debug(f"No parsed path found for {m_name}, using heuristic")
module_path = Path(*m_name.lower().split("_", 1))

# Look for meta.yml in the module path
meta_path = self.workflow_directory / module_path / "meta.yml"

try:
with open(meta_path) as fh:
meta = yaml.safe_load(fh)
log.debug(f"Loaded meta.yml for {m_name} from {meta_path}")
except FileNotFoundError:
log.warning(f"Could not find meta.yml for {m_name} at {meta_path}")
continue

# Extract containers for all platforms
for platform_name, (runtime, arch, protocol) in PLATFORMS.items():
try:
containers[platform_name][m_name] = meta["containers"][runtime][arch][protocol]
log.debug(f"Found {platform_name} container for {m_name}")
except (KeyError, TypeError):
log.debug(f"Could not find {platform_name} container for {m_name}")
has_warnings = True

with ThreadPoolExecutor() as pool:
futures = {pool.submit(_process_meta, p): p for p in modules_dir.rglob("meta.yml")}
for future in as_completed(futures):
result = future.result()
if result is None:
continue
if has_warnings:
log.debug(
"Generated container configs for the pipeline. Not all containers were found. Run with `-v` to see detailed warning messages."
)
else:
log.info("Generated container configs for the pipeline successfully.")
module_name, platform_containers = result
for platform_name, container in platform_containers.items():
containers[platform_name][module_name] = container

log.info("Generated container configs for the pipeline.")

# remove all generated config files, to handle removed modules
for platform in PLATFORMS:
(self.workflow_directory / "conf" / f"containers_{platform}.config").unlink(missing_ok=True)
# write config files
written: set[str] = set()
for platform, module_containers in containers.items():
if not module_containers:
continue
container_key = "conda" if platform.startswith("conda_lock_") else "container"
lines = [
f"process {{ withName: '{module_name}' {{ {container_key} = '{container}' }} }}\n"
for module_name, container in sorted(module_containers.items())
]
config_path = self.workflow_directory / "conf" / f"containers_{platform}.config"
config_path.write_text("".join(lines))
written.add(config_path.name)
if _write_platform_config(config_path, module_containers, _container_key(platform)):
written.add(config_path.name)
return written


def try_generate_container_configs(
directory: Path, new_module_path: Path | None = None, new_module_name: str | None = None
) -> None:
def try_generate_container_configs(directory: Path, module_path: Path | None = None) -> None:
"""Regenerate container configs for *directory*.

If *module_path* is given, only that module's entries are updated (fast
path for single-module installs). Otherwise a full scan of ``modules/``
is performed.
"""
try:
ContainerConfigs(directory).generate_container_configs(new_module_path, new_module_name)
configs = ContainerConfigs(directory)
if module_path is not None:
configs.update_module_container_config(module_path)
else:
configs.generate_container_configs()
except UserWarning as e:
log.warning(f"Could not regenerate container configuration files: {e}")
14 changes: 11 additions & 3 deletions nf_core/pipelines/lint/container_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
def container_configs(self):
"""Check that the container configuration files in ``conf/`` are up to date.

Runs ``nextflow inspect`` to regenerate container configuration files directly
in ``conf/`` and uses ``git diff`` to detect changes. If not in ``--fix`` mode
the working tree is restored to its original state afterwards.
Scans all ``meta.yml`` files under ``modules/`` that contain a ``containers``
key, reads the process name from the sibling ``main.nf``, and regenerates
the container configuration files in ``conf/``. Uses ``git diff`` to detect
changes. If not in ``--fix`` mode the working tree is restored to its
original state afterwards.

Can be skipped by adding the following to the ``.nf-core.yml`` file:

Expand All @@ -37,6 +39,8 @@ def container_configs(self):
warned.append(f"Could not generate container configuration files: {e}")
return {"passed": passed, "failed": failed, "warned": warned}

log.debug(f"Generated {len(generated)} container config file(s): {', '.join(sorted(generated)) or 'none'}")

# Files modified in the working tree (tracked and changed by generation)
modified = {
Path(d.a_path).name
Expand All @@ -52,13 +56,16 @@ def container_configs(self):
# Already-correct files: generated, tracked, and unchanged
correct = generated - modified - new

log.debug(f"Container config status — correct: {len(correct)}, modified: {len(modified)}, new: {len(new)}")

fixing = "container_configs" in self.fix

for name in sorted(correct):
passed.append(f"`conf/{name}` is up to date")

for name in sorted(modified | new):
if fixing:
log.debug(f"Overwriting `conf/{name}` with regenerated container configuration")
passed.append(f"`conf/{name}` is up to date")
fixed.append(f"`conf/{name}` overwritten with regenerated container configuration.")
else:
Expand All @@ -70,6 +77,7 @@ def container_configs(self):

if not fixing:
# Restore working tree: reset modified tracked files and delete new untracked ones
log.debug(f"Restoring working tree: resetting {len(modified)} modified, removing {len(new)} new file(s)")
for name in modified:
repo.git.restore(str(conf_dir / name))
for name in new:
Expand Down
Loading
Loading