diff --git a/sigma/cli/list.py b/sigma/cli/list.py index 6c9cc98..a3a3fe7 100644 --- a/sigma/cli/list.py +++ b/sigma/cli/list.py @@ -13,6 +13,53 @@ plugins = InstalledSigmaPlugins.autodiscover() +def _plugin_id_from_module(module_name: str, namespace: str) -> str: + """Extract the Sigma plugin identifier from a module name. + + Sigma backends live in ``sigma.backends..*`` and pipelines in + ``sigma.pipelines..*``. Splitting on '.' and picking the + component after the namespace prefix returns the plugin identifier directly + (e.g. ``sigma.backends.splunk.backend`` → ``splunk``). + """ + parts = module_name.split(".") + # Expected layout: sigma . . [. submodule ...] + try: + idx = parts.index(namespace) + plugin_id = parts[idx + 1] + return plugin_id if plugin_id else "n/a" + except (ValueError, IndexError): + return "n/a" + + +def _get_backend_plugin_id(backend_class) -> str: + """Return the Sigma plugin identifier for a backend class.""" + return _plugin_id_from_module(backend_class.__module__, "backends") + + +def _get_pipeline_plugin_id(pipeline_obj) -> str: + """Return the Sigma plugin identifier for a pipeline object or function.""" + # Plain function: use its own module + module_name = getattr(pipeline_obj, "__module__", None) + if module_name and module_name != "sigma.pipelines.base": + return _plugin_id_from_module(module_name, "pipelines") + # Pipeline-decorator instance: retrieve the wrapped function's module + func = getattr(pipeline_obj, "func", None) + if func is not None: + func_module = getattr(func, "__module__", None) + if func_module: + return _plugin_id_from_module(func_module, "pipelines") + return "n/a" + + +# Pre-compute plugin IDs for all discovered pipelines (keyed by identifier). +# This must be done before the resolver resolves pipeline callables into plain +# ProcessingPipeline objects, which no longer carry module information. +_pipeline_plugin_ids: dict = { + name: _get_pipeline_plugin_id(pipeline) + for name, pipeline in plugins.pipelines.items() +} + + @click.group(name="list", help="List available targets or processing pipelines.") def list_group(): pass @@ -32,6 +79,7 @@ def list_targets(): "Identifier", "Target Query Language", "Processing Pipeline Required", + "Plugin", ) table.add_rows( [ @@ -39,6 +87,7 @@ def list_targets(): name, fill(backend.name, width=60), "Yes" if backend.requires_pipeline else "No", + _get_backend_plugin_id(backend), ) for name, backend in plugins.backends.items() ] @@ -108,6 +157,7 @@ def list_pipelines(backend): "Priority", "Processing Pipeline", "Backends", + "Plugin", ) for name, pipeline in pipelines: if ( @@ -120,7 +170,13 @@ def list_pipelines(backend): else: backends = "all" table.add_row( - (name, pipeline.priority, fill(pipeline.name, width=60), backends) + ( + name, + pipeline.priority, + fill(pipeline.name, width=60), + backends, + _pipeline_plugin_ids.get(name, "n/a"), + ) ) table.align = "l" click.echo(table.get_string()) diff --git a/tests/test_lists.py b/tests/test_lists.py index e0e4783..4254833 100644 --- a/tests/test_lists.py +++ b/tests/test_lists.py @@ -125,10 +125,23 @@ def test_transformation_list(): assert all((name in result.stdout for name in transformations.keys())) -def test_condition_list(): +def test_targets_has_plugin_column(): cli = CliRunner() - result = cli.invoke(list_conditions) - conditions = list(rule_conditions.keys()) - conditions.extend(detection_item_conditions.keys()) - conditions.extend(field_name_conditions.keys()) - assert all((name in result.stdout for name in conditions)) + result = cli.invoke(list_targets) + assert result.exit_code == 0 + assert "Plugin" in result.stdout + # Each row should show a non-empty package name (not "n/a") for installed backends. + plugins = InstalledSigmaPlugins.autodiscover() + if plugins.backends: + assert "n/a" not in result.stdout + + +def test_pipelines_has_plugin_column(): + cli = CliRunner() + result = cli.invoke(list_pipelines) + assert result.exit_code == 0 + assert "Plugin" in result.stdout + # Each row should show a non-empty package name (not "n/a") for installed pipelines. + plugins = InstalledSigmaPlugins.autodiscover() + if plugins.pipelines: + assert "n/a" not in result.stdout