Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion sigma/cli/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,53 @@
plugins = InstalledSigmaPlugins.autodiscover()


def _plugin_id_from_module(module_name: str, namespace: str) -> str:
"""Extract the Sigma plugin identifier from a module name.

Sigma backends live in ``sigma.backends.<plugin_id>.*`` and pipelines in
``sigma.pipelines.<plugin_id>.*``. Splitting on '.' and picking the
component after the namespace prefix returns the plugin identifier directly
(e.g. ``sigma.backends.splunk.backend`` → ``splunk``).
"""
parts = module_name.split(".")
# Expected layout: sigma . <namespace> . <plugin_id> [. submodule ...]
try:
idx = parts.index(namespace)
plugin_id = parts[idx + 1]
return plugin_id if plugin_id else "n/a"
except (ValueError, IndexError):
return "n/a"


def _get_backend_plugin_id(backend_class) -> str:
"""Return the Sigma plugin identifier for a backend class."""
return _plugin_id_from_module(backend_class.__module__, "backends")


def _get_pipeline_plugin_id(pipeline_obj) -> str:
"""Return the Sigma plugin identifier for a pipeline object or function."""
# Plain function: use its own module
module_name = getattr(pipeline_obj, "__module__", None)
if module_name and module_name != "sigma.pipelines.base":
return _plugin_id_from_module(module_name, "pipelines")
# Pipeline-decorator instance: retrieve the wrapped function's module
func = getattr(pipeline_obj, "func", None)
if func is not None:
func_module = getattr(func, "__module__", None)
if func_module:
return _plugin_id_from_module(func_module, "pipelines")
return "n/a"


# Pre-compute plugin IDs for all discovered pipelines (keyed by identifier).
# This must be done before the resolver resolves pipeline callables into plain
# ProcessingPipeline objects, which no longer carry module information.
_pipeline_plugin_ids: dict = {
name: _get_pipeline_plugin_id(pipeline)
for name, pipeline in plugins.pipelines.items()
}


@click.group(name="list", help="List available targets or processing pipelines.")
def list_group():
pass
Expand All @@ -32,13 +79,15 @@ def list_targets():
"Identifier",
"Target Query Language",
"Processing Pipeline Required",
"Plugin",
)
table.add_rows(
[
(
name,
fill(backend.name, width=60),
"Yes" if backend.requires_pipeline else "No",
_get_backend_plugin_id(backend),
)
for name, backend in plugins.backends.items()
]
Expand Down Expand Up @@ -108,6 +157,7 @@ def list_pipelines(backend):
"Priority",
"Processing Pipeline",
"Backends",
"Plugin",
)
for name, pipeline in pipelines:
if (
Expand All @@ -120,7 +170,13 @@ def list_pipelines(backend):
else:
backends = "all"
table.add_row(
(name, pipeline.priority, fill(pipeline.name, width=60), backends)
(
name,
pipeline.priority,
fill(pipeline.name, width=60),
backends,
_pipeline_plugin_ids.get(name, "n/a"),
)
)
table.align = "l"
click.echo(table.get_string())
Expand Down
25 changes: 19 additions & 6 deletions tests/test_lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,23 @@ def test_transformation_list():
assert all((name in result.stdout for name in transformations.keys()))


def test_condition_list():
def test_targets_has_plugin_column():
cli = CliRunner()
result = cli.invoke(list_conditions)
conditions = list(rule_conditions.keys())
conditions.extend(detection_item_conditions.keys())
conditions.extend(field_name_conditions.keys())
assert all((name in result.stdout for name in conditions))
result = cli.invoke(list_targets)
assert result.exit_code == 0
assert "Plugin" in result.stdout
# Each row should show a non-empty package name (not "n/a") for installed backends.
plugins = InstalledSigmaPlugins.autodiscover()
if plugins.backends:
assert "n/a" not in result.stdout


def test_pipelines_has_plugin_column():
cli = CliRunner()
result = cli.invoke(list_pipelines)
assert result.exit_code == 0
assert "Plugin" in result.stdout
# Each row should show a non-empty package name (not "n/a") for installed pipelines.
plugins = InstalledSigmaPlugins.autodiscover()
if plugins.pipelines:
assert "n/a" not in result.stdout
Loading