Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,9 @@ defmodule CopiWeb.Plugs.RateLimiterPlugTest do
assert conn.status != 429
refute conn.halted
end

test "init/1 passes opts through unchanged" do
assert RateLimiterPlug.init([]) == []
assert RateLimiterPlug.init(foo: :bar) == [foo: :bar]
end
end
20 changes: 6 additions & 14 deletions scripts/capec_map_enricher.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ class EnricherVars:
DEFAULT_SOURCE_DIR = Path(__file__).parent / "../source"
args: argparse.Namespace


def _extract_names_from_items(
items: Any,
target: dict[int, str],
Expand All @@ -50,7 +49,6 @@ def _extract_names_from_items(
# Ignore malformed IDs
continue


def extract_capec_names(json_data: dict[str, Any]) -> dict[int, str]:
"""
Extract CAPEC ID to Name mappings from JSON data.
Expand Down Expand Up @@ -82,17 +80,17 @@ def extract_capec_names(json_data: dict[str, Any]) -> dict[int, str]:
attack_patterns = patterns["Attack_Pattern"]
_extract_names_from_items(attack_patterns, capec_names, warn_if_not_list=True, label="Attack_Pattern")

if "Categories" not in catalog:
logging.warning("No 'Categories' key found in catalog")
elif "Category" not in catalog["Categories"]:
categories = catalog.get("Categories")
if not isinstance(categories, dict):
logging.warning("Invalid 'Categories' section in catalog; expected an object")
elif "Category" not in categories:
logging.warning("No 'Category' key found in categories section")
else:
_extract_names_from_items(catalog["Categories"]["Category"], capec_names, label="Category")
_extract_names_from_items(catalog["Categories"]["Category"], capec_names, warn_if_not_list=True, label="Category")

logging.info("Extracted %d CAPEC name mappings", len(capec_names))
return capec_names


def enrich_capec_mappings(capec_mappings: dict[str, Any], capec_names: dict[int, str]) -> dict[str, Any]:
"""
Enrich CAPEC mappings with names from the CAPEC catalog.
Expand Down Expand Up @@ -130,7 +128,6 @@ def enrich_capec_mappings(capec_mappings: dict[str, Any], capec_names: dict[int,
logging.info("Enriched %d CAPEC mappings", len(enriched) - (1 if "meta" in enriched else 0))
return enriched


def load_json_file(filepath: Path) -> dict[str, Any]:
"""Load and parse a JSON file."""
try:
Expand All @@ -148,7 +145,6 @@ def load_json_file(filepath: Path) -> dict[str, Any]:
logging.error("Error loading JSON file %s: %s", filepath, str(e))
return {}


def load_yaml_file(filepath: Path) -> dict[str, Any]:
"""Load and parse a YAML file."""
try:
Expand All @@ -170,7 +166,6 @@ def load_yaml_file(filepath: Path) -> dict[str, Any]:
logging.error("Error loading YAML file %s: %s", filepath, str(e))
return {}


def save_yaml_file(filepath: Path, data: dict[str, Any]) -> bool:
"""Save data as YAML file."""
try:
Expand All @@ -182,7 +177,6 @@ def save_yaml_file(filepath: Path, data: dict[str, Any]) -> bool:
logging.error("Error saving YAML file %s: %s", filepath, str(e))
return False


def set_logging() -> None:
"""Configure logging based on debug flag."""
logging.basicConfig(
Expand All @@ -194,7 +188,6 @@ def set_logging() -> None:
else:
logging.getLogger().setLevel(logging.INFO)


def parse_arguments(input_args: list[str]) -> argparse.Namespace:
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="Enrich CAPEC mappings with names from CAPEC JSON catalog")
Expand Down Expand Up @@ -253,7 +246,6 @@ def parse_arguments(input_args: list[str]) -> argparse.Namespace:
sys.exit(1)
return args


def main() -> None:
"""Main execution function."""
enricher_vars.args = parse_arguments(sys.argv[1:])
Expand Down Expand Up @@ -317,4 +309,4 @@ def main() -> None:

if __name__ == "__main__":
enricher_vars: EnricherVars = EnricherVars()
main()
main()
67 changes: 66 additions & 1 deletion tests/scripts/capec_map_enricher_utest.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,71 @@ def test_extract_capec_names_not_list(self):
self.assertEqual(result, {})
self.assertIn("'Attack_Pattern' is not a list", log.output[0])

def _make_data_with_attack_patterns(self, categories_value=None, include_categories=False):
"""Helper that returns a catalog with Attack_Pattern entries and optional Categories."""
catalog: dict = {
"Attack_Patterns": {
"Attack_Pattern": [
{"_ID": "1", "_Name": "Test Attack 1"},
]
}
}
if include_categories:
catalog["Categories"] = categories_value
return {"Attack_Pattern_Catalog": catalog}

def test_categories_missing_still_returns_attack_patterns(self):
"""When 'Categories' key is absent, warn and still return Attack_Pattern names."""
data = self._make_data_with_attack_patterns(include_categories=False)

with self.assertLogs(logging.getLogger(), logging.WARNING) as log:
result = enricher.extract_capec_names(data)

self.assertEqual(result, {1: "Test Attack 1"})
self.assertIn("Invalid 'Categories' section in catalog; expected an object", log.output[0])

def test_categories_none_still_returns_attack_patterns(self):
"""When 'Categories' is None, warn and still return Attack_Pattern names."""
data = self._make_data_with_attack_patterns(categories_value=None, include_categories=True)

with self.assertLogs(logging.getLogger(), logging.WARNING) as log:
result = enricher.extract_capec_names(data)

self.assertEqual(result, {1: "Test Attack 1"})
self.assertIn("Invalid 'Categories' section in catalog; expected an object", log.output[0])

def test_categories_non_dict_still_returns_attack_patterns(self):
"""When 'Categories' is a non-dict (e.g. a list), warn and still return Attack_Pattern names."""
data = self._make_data_with_attack_patterns(categories_value=["unexpected"], include_categories=True)

with self.assertLogs(logging.getLogger(), logging.WARNING) as log:
result = enricher.extract_capec_names(data)

self.assertEqual(result, {1: "Test Attack 1"})
self.assertIn("Invalid 'Categories' section in catalog; expected an object", log.output[0])

def test_category_key_missing_still_returns_attack_patterns(self):
"""When 'Category' key is absent from Categories dict, warn and still return Attack_Pattern names."""
data = self._make_data_with_attack_patterns(categories_value={"other_key": []}, include_categories=True)

with self.assertLogs(logging.getLogger(), logging.WARNING) as log:
result = enricher.extract_capec_names(data)

self.assertEqual(result, {1: "Test Attack 1"})
self.assertIn("No 'Category' key found in categories section", log.output[0])

def test_category_not_list_still_returns_attack_patterns(self):
"""When 'Category' exists but is not a list, warn and still return Attack_Pattern names."""
data = self._make_data_with_attack_patterns(
categories_value={"Category": "malformed"}, include_categories=True
)

with self.assertLogs(logging.getLogger(), logging.WARNING) as log:
result = enricher.extract_capec_names(data)

self.assertEqual(result, {1: "Test Attack 1"})
self.assertIn("'Category' is not a list", log.output[0])

def test_extract_capec_names_missing_fields(self):
"""Test with missing _ID or _Name fields"""
data = {
Expand Down Expand Up @@ -161,7 +226,7 @@ def test_extract_capec_names_missing_categories(self):

self.assertEqual(len(result), 1)
self.assertIn(1, result)
self.assertIn("No 'Categories' key found", log.output[0])
self.assertIn("Invalid 'Categories' section in catalog; expected an object", log.output[0])

def test_extract_capec_names_missing_category_inside_categories(self):
"""Test that missing Category key inside Categories logs a warning"""
Expand Down
Loading