From 4170aaea3595c1a2074931e949f36cf99d10999a Mon Sep 17 00:00:00 2001 From: DarkAngel Date: Tue, 21 Apr 2026 04:42:16 +0200 Subject: [PATCH 1/2] feat(expansion): add email security, SSH fingerprint, and TLS certificate modules - email_security_check: SPF/DKIM/DMARC/MTA-STS posture assessment for domains (score /5) - ssh_fingerprint: SSH banner and key exchange fingerprint grab for IPs (MitM detection) - tls_certificate_check: TLS cert chain, issuer, SANs, expiry analysis for domains All modules are standalone with zero external API dependencies. Tested on NixOS 25.11 against google.com and 8.8.8.8. --- .../modules/expansion/email_security_check.py | 158 ++++++++++++++++++ .../modules/expansion/ssh_fingerprint.py | 140 ++++++++++++++++ .../expansion/tls_certificate_check.py | 147 ++++++++++++++++ 3 files changed, 445 insertions(+) create mode 100644 misp_modules/modules/expansion/email_security_check.py create mode 100644 misp_modules/modules/expansion/ssh_fingerprint.py create mode 100644 misp_modules/modules/expansion/tls_certificate_check.py diff --git a/misp_modules/modules/expansion/email_security_check.py b/misp_modules/modules/expansion/email_security_check.py new file mode 100644 index 00000000..ee4a91d7 --- /dev/null +++ b/misp_modules/modules/expansion/email_security_check.py @@ -0,0 +1,158 @@ +import json + +try: + import dns.resolver + + resolver = dns.resolver.Resolver() + resolver.timeout = 2 + resolver.lifetime = 2 +except ImportError: + print("dnspython is missing, use 'pip install dnspython' to install it.") + +misperrors = {"error": "Error"} +mispattributes = {"input": ["domain", "hostname"], "output": ["text"]} +moduleinfo = { + "version": "0.1", + "author": "Mihai Saveanu", + "description": "Check email security posture (SPF, DKIM, DMARC, MTA-STS) for a domain.", + "module-type": ["expansion", "hover"], + "name": "Email Security Check", + "logo": "", + "requirements": ["dnspython"], + "features": ( + "The module takes a domain or hostname attribute as input and queries DNS" + " for email security records: SPF (TXT), DMARC (_dmarc), DKIM (common selectors)," + " and MTA-STS (_mta-sts). Results include record content and a pass/fail assessment." + ), + "references": [ + "https://tools.ietf.org/html/rfc7208", + "https://tools.ietf.org/html/rfc7489", + ], + "input": "A domain or hostname attribute.", + "output": "Text containing email security posture assessment.", +} +moduleconfig = ["custom_resolver"] + +DKIM_SELECTORS = [ + "default", + "google", + "selector1", + "selector2", + "k1", + "mandrill", + "everlytickey1", + "everlytickey2", + "dkim", + "s1", + "s2", + "mailo", +] + + +def _query_txt(domain): + try: + answers = resolver.resolve(domain, "TXT") + return [str(rdata).strip('"') for rdata in answers] + except Exception: + return [] + + +def _check_spf(domain): + records = _query_txt(domain) + spf = [r for r in records if r.startswith("v=spf1")] + if spf: + return {"status": "FOUND", "record": spf[0]} + return {"status": "MISSING", "record": None} + + +def _check_dmarc(domain): + records = _query_txt(f"_dmarc.{domain}") + dmarc = [r for r in records if r.startswith("v=DMARC1")] + if dmarc: + policy = "none" + for part in dmarc[0].split(";"): + part = part.strip() + if part.startswith("p="): + policy = part[2:] + return {"status": "FOUND", "record": dmarc[0], "policy": policy} + return {"status": "MISSING", "record": None, "policy": None} + + +def _check_dkim(domain): + found = [] + for selector in DKIM_SELECTORS: + records = _query_txt(f"{selector}._domainkey.{domain}") + dkim = [r for r in records if "DKIM1" in r or "k=" in r or "p=" in r] + if dkim: + found.append({"selector": selector, "record": dkim[0]}) + return found + + +def _check_mta_sts(domain): + records = _query_txt(f"_mta-sts.{domain}") + sts = [r for r in records if r.startswith("v=STSv1")] + if sts: + return {"status": "FOUND", "record": sts[0]} + return {"status": "MISSING", "record": None} + + +def handler(q=False): + if q is False: + return False + + request = json.loads(q) + + domain = request.get("domain") or request.get("hostname") + if not domain: + misperrors["error"] = "A domain or hostname attribute is required." + return misperrors + + if request.get("config", {}).get("custom_resolver"): + resolver.nameservers = [request["config"]["custom_resolver"]] + + spf = _check_spf(domain) + dmarc = _check_dmarc(domain) + dkim = _check_dkim(domain) + mta_sts = _check_mta_sts(domain) + + lines = [f"=== Email Security Posture: {domain} ===", ""] + + lines.append(f"SPF: {spf['status']}") + if spf["record"]: + lines.append(f" Record: {spf['record']}") + + lines.append(f"\nDMARC: {dmarc['status']}") + if dmarc["record"]: + lines.append(f" Policy: {dmarc['policy']}") + lines.append(f" Record: {dmarc['record']}") + + if dkim: + lines.append(f"\nDKIM: FOUND ({len(dkim)} selector(s))") + for entry in dkim: + lines.append(f" Selector '{entry['selector']}': {entry['record'][:80]}...") + else: + lines.append("\nDKIM: NOT FOUND (tested common selectors)") + + lines.append(f"\nMTA-STS: {mta_sts['status']}") + if mta_sts["record"]: + lines.append(f" Record: {mta_sts['record']}") + + score = sum([ + 1 if spf["status"] == "FOUND" else 0, + 1 if dmarc["status"] == "FOUND" else 0, + 1 if dmarc.get("policy") in ("reject", "quarantine") else 0, + 1 if dkim else 0, + 1 if mta_sts["status"] == "FOUND" else 0, + ]) + lines.append(f"\nSecurity Score: {score}/5") + + result_text = "\n".join(lines) + return {"results": [{"types": ["text"], "values": result_text}]} + + +def introspection(): + return mispattributes + + +def version(): + return moduleinfo diff --git a/misp_modules/modules/expansion/ssh_fingerprint.py b/misp_modules/modules/expansion/ssh_fingerprint.py new file mode 100644 index 00000000..ef3b8d17 --- /dev/null +++ b/misp_modules/modules/expansion/ssh_fingerprint.py @@ -0,0 +1,140 @@ +import hashlib +import json +import socket + + +misperrors = {"error": "Error"} +mispattributes = {"input": ["ip-src", "ip-dst"], "output": ["text"]} +moduleinfo = { + "version": "0.1", + "author": "Mihai Saveanu", + "description": "Grab SSH server key fingerprint from an IP address for verification or MitM detection.", + "module-type": ["expansion", "hover"], + "name": "SSH Fingerprint", + "logo": "", + "requirements": [], + "features": ( + "The module takes an IP address attribute as input, connects to port 22," + " performs the SSH protocol version exchange and key exchange init to extract" + " the server host key algorithms and SSH banner. Useful for detecting MitM" + " attacks or verifying server identity changes." + ), + "references": ["https://tools.ietf.org/html/rfc4253"], + "input": "An IP address attribute (ip-src or ip-dst).", + "output": "Text containing SSH banner and key exchange information.", +} +moduleconfig = ["port", "timeout"] + + +def _grab_ssh_banner(ip, port=22, timeout=5): + result = { + "banner": None, + "kex_algorithms": None, + "host_key_algorithms": None, + "error": None, + } + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(timeout) + sock.connect((ip, port)) + + banner = sock.recv(256).decode("utf-8", errors="replace").strip() + result["banner"] = banner + + sock.sendall(b"SSH-2.0-MISP_Fingerprint_Module\r\n") + + kex_data = sock.recv(4096) + if len(kex_data) > 21: + payload = kex_data[5:] + if payload and payload[0:1] == b"\x14": + payload = payload[16:] + if payload and payload[0:1] == b"\x14": + pass + + try: + msg_code = payload[0] + if msg_code == 20: + offset = 17 + if offset < len(payload): + kex_len = int.from_bytes( + payload[offset : offset + 4], "big" + ) + offset += 4 + if offset + kex_len <= len(payload): + kex_str = payload[offset : offset + kex_len].decode( + "utf-8", errors="replace" + ) + result["kex_algorithms"] = kex_str + offset += kex_len + + hk_len = int.from_bytes( + payload[offset : offset + 4], "big" + ) + offset += 4 + if offset + hk_len <= len(payload): + hk_str = payload[offset : offset + hk_len].decode( + "utf-8", errors="replace" + ) + result["host_key_algorithms"] = hk_str + except (IndexError, ValueError): + pass + + raw_hash = hashlib.sha256(kex_data).hexdigest() + result["kex_hash"] = raw_hash + + sock.close() + except socket.timeout: + result["error"] = "Connection timed out" + except ConnectionRefusedError: + result["error"] = "Connection refused (port closed)" + except Exception as e: + result["error"] = str(e) + + return result + + +def handler(q=False): + if q is False: + return False + + request = json.loads(q) + + ip = request.get("ip-src") or request.get("ip-dst") + if not ip: + misperrors["error"] = "An IP address attribute is required." + return misperrors + + config = request.get("config", {}) + port = int(config.get("port") or 22) + timeout = float(config.get("timeout") or 5) + + result = _grab_ssh_banner(ip, port, timeout) + + lines = [f"=== SSH Fingerprint: {ip}:{port} ===", ""] + + if result["error"]: + lines.append(f"Error: {result['error']}") + return {"results": [{"types": ["text"], "values": "\n".join(lines)}]} + + if result["banner"]: + lines.append(f"Banner: {result['banner']}") + + if result.get("host_key_algorithms"): + lines.append(f"\nHost Key Algorithms: {result['host_key_algorithms']}") + + if result.get("kex_algorithms"): + lines.append(f"KEX Algorithms: {result['kex_algorithms']}") + + if result.get("kex_hash"): + lines.append(f"\nKEX Init Hash (SHA256): {result['kex_hash']}") + lines.append(" (Compare this hash over time to detect server key changes)") + + return {"results": [{"types": ["text"], "values": "\n".join(lines)}]} + + +def introspection(): + return mispattributes + + +def version(): + return moduleinfo diff --git a/misp_modules/modules/expansion/tls_certificate_check.py b/misp_modules/modules/expansion/tls_certificate_check.py new file mode 100644 index 00000000..cdf9b722 --- /dev/null +++ b/misp_modules/modules/expansion/tls_certificate_check.py @@ -0,0 +1,147 @@ +import json +import socket +import ssl +from datetime import datetime, timezone + + +misperrors = {"error": "Error"} +mispattributes = {"input": ["domain", "hostname"], "output": ["text"]} +moduleinfo = { + "version": "0.1", + "author": "Mihai Saveanu", + "description": "Extract TLS certificate details from a domain: issuer, validity, SANs, chain info.", + "module-type": ["expansion", "hover"], + "name": "TLS Certificate Check", + "logo": "", + "requirements": [], + "features": ( + "The module takes a domain or hostname attribute as input, connects to port 443," + " performs a TLS handshake and extracts certificate details including issuer," + " subject, validity period, Subject Alternative Names, serial number, and" + " protocol version. No external API required — pure Python ssl module." + ), + "references": ["https://tools.ietf.org/html/rfc5246"], + "input": "A domain or hostname attribute.", + "output": "Text containing TLS certificate details and assessment.", +} +moduleconfig = ["port", "timeout"] + + +def _get_cert_info(domain, port=443, timeout=5): + result = { + "subject": None, + "issuer": None, + "serial": None, + "not_before": None, + "not_after": None, + "sans": [], + "protocol": None, + "error": None, + } + + try: + context = ssl.create_default_context() + context.check_hostname = True + context.verify_mode = ssl.CERT_REQUIRED + + with socket.create_connection((domain, port), timeout=timeout) as sock: + with context.wrap_socket(sock, server_hostname=domain) as tls: + cert = tls.getpeercert() + result["protocol"] = tls.version() + + subj = dict(x[0] for x in cert.get("subject", ())) + result["subject"] = subj.get("commonName", "N/A") + + issuer = dict(x[0] for x in cert.get("issuer", ())) + result["issuer"] = ( + f"{issuer.get('organizationName', 'N/A')}" + f" ({issuer.get('commonName', 'N/A')})" + ) + + result["serial"] = str(cert.get("serialNumber", "N/A")) + result["not_before"] = cert.get("notBefore", "N/A") + result["not_after"] = cert.get("notAfter", "N/A") + + for entry_type, entry_value in cert.get("subjectAltName", ()): + if entry_type == "DNS": + result["sans"].append(entry_value) + + except ssl.SSLCertVerificationError as e: + result["error"] = f"Certificate verification failed: {e}" + except ssl.SSLError as e: + result["error"] = f"SSL error: {e}" + except socket.timeout: + result["error"] = "Connection timed out" + except ConnectionRefusedError: + result["error"] = "Connection refused (port closed)" + except Exception as e: + result["error"] = str(e) + + return result + + +def _days_until_expiry(not_after_str): + try: + expiry = datetime.strptime(not_after_str, "%b %d %H:%M:%S %Y %Z") + expiry = expiry.replace(tzinfo=timezone.utc) + delta = expiry - datetime.now(timezone.utc) + return delta.days + except Exception: + return None + + +def handler(q=False): + if q is False: + return False + + request = json.loads(q) + + domain = request.get("domain") or request.get("hostname") + if not domain: + misperrors["error"] = "A domain or hostname attribute is required." + return misperrors + + config = request.get("config", {}) + port = int(config.get("port") or 443) + timeout = float(config.get("timeout") or 5) + + result = _get_cert_info(domain, port, timeout) + + lines = [f"=== TLS Certificate: {domain}:{port} ===", ""] + + if result["error"]: + lines.append(f"Error: {result['error']}") + return {"results": [{"types": ["text"], "values": "\n".join(lines)}]} + + lines.append(f"Subject: {result['subject']}") + lines.append(f"Issuer: {result['issuer']}") + lines.append(f"Serial: {result['serial']}") + lines.append(f"Protocol: {result['protocol']}") + lines.append(f"\nValid From: {result['not_before']}") + lines.append(f"Valid Until: {result['not_after']}") + + days_left = _days_until_expiry(result["not_after"]) + if days_left is not None: + if days_left < 0: + lines.append(f"STATUS: EXPIRED ({abs(days_left)} days ago)") + elif days_left < 30: + lines.append(f"STATUS: EXPIRING SOON ({days_left} days left)") + else: + lines.append(f"STATUS: VALID ({days_left} days remaining)") + + if result["sans"]: + lines.append(f"\nSubject Alternative Names ({len(result['sans'])}):") + for san in result["sans"][:20]: + lines.append(f" - {san}") + if len(result["sans"]) > 20: + lines.append(f" ... and {len(result['sans']) - 20} more") + + return {"results": [{"types": ["text"], "values": "\n".join(lines)}]} + + +def introspection(): + return mispattributes + + +def version(): + return moduleinfo From c6c0336794dcfe8abce56bc43805f19e80d08752 Mon Sep 17 00:00:00 2001 From: DarkAngel Date: Tue, 21 Apr 2026 12:14:42 +0200 Subject: [PATCH 2/2] feat(expansion): update modules to return misp-objects (x509, passive-ssh, domain-ip) - tls_certificate_check: returns x509 MISPObject (subject, issuer, serial, validity, SANs) - ssh_fingerprint: returns passive-ssh MISPObject (host, banner, port, fingerprint) - email_security_check: returns domain-ip MISPObject with SPF/DKIM/DMARC/MTA-STS findings All modules now use format: misp_standard and return structured Attribute/Object results via MISPEvent, as requested in PR review. --- .../modules/expansion/email_security_check.py | 138 +++++++++++++----- .../modules/expansion/ssh_fingerprint.py | 70 +++++---- .../expansion/tls_certificate_check.py | 106 +++++++++----- 3 files changed, 217 insertions(+), 97 deletions(-) diff --git a/misp_modules/modules/expansion/email_security_check.py b/misp_modules/modules/expansion/email_security_check.py index ee4a91d7..0243c37e 100644 --- a/misp_modules/modules/expansion/email_security_check.py +++ b/misp_modules/modules/expansion/email_security_check.py @@ -9,10 +9,12 @@ except ImportError: print("dnspython is missing, use 'pip install dnspython' to install it.") +from pymisp import MISPAttribute, MISPEvent, MISPObject + misperrors = {"error": "Error"} -mispattributes = {"input": ["domain", "hostname"], "output": ["text"]} +mispattributes = {"input": ["domain", "hostname"], "format": "misp_standard"} moduleinfo = { - "version": "0.1", + "version": "0.2", "author": "Mihai Saveanu", "description": "Check email security posture (SPF, DKIM, DMARC, MTA-STS) for a domain.", "module-type": ["expansion", "hover"], @@ -22,14 +24,15 @@ "features": ( "The module takes a domain or hostname attribute as input and queries DNS" " for email security records: SPF (TXT), DMARC (_dmarc), DKIM (common selectors)," - " and MTA-STS (_mta-sts). Results include record content and a pass/fail assessment." + " and MTA-STS (_mta-sts). Returns structured MISP attributes with a domain-ip" + " object linking the findings to the queried domain." ), "references": [ "https://tools.ietf.org/html/rfc7208", "https://tools.ietf.org/html/rfc7489", ], "input": "A domain or hostname attribute.", - "output": "Text containing email security posture assessment.", + "output": "Domain-ip MISP object with email security assessment attributes.", } moduleconfig = ["custom_resolver"] @@ -102,10 +105,14 @@ def handler(q=False): request = json.loads(q) - domain = request.get("domain") or request.get("hostname") - if not domain: - misperrors["error"] = "A domain or hostname attribute is required." - return misperrors + if not request.get("attribute") or not request["attribute"].get("type"): + return {"error": "Missing or invalid attribute."} + + attribute = request["attribute"] + if attribute["type"] not in mispattributes["input"]: + return {"error": f"Unsupported attribute type: {attribute['type']}"} + + domain = attribute["value"] if request.get("config", {}).get("custom_resolver"): resolver.nameservers = [request["config"]["custom_resolver"]] @@ -115,39 +122,98 @@ def handler(q=False): dkim = _check_dkim(domain) mta_sts = _check_mta_sts(domain) - lines = [f"=== Email Security Posture: {domain} ===", ""] + event = MISPEvent() + initial_attribute = MISPAttribute() + initial_attribute.from_dict(**attribute) + event.add_attribute(**initial_attribute) - lines.append(f"SPF: {spf['status']}") - if spf["record"]: - lines.append(f" Record: {spf['record']}") + domain_obj = MISPObject("domain-ip") + domain_obj.add_attribute("domain", **{"type": "domain", "value": domain}) - lines.append(f"\nDMARC: {dmarc['status']}") - if dmarc["record"]: - lines.append(f" Policy: {dmarc['policy']}") - lines.append(f" Record: {dmarc['record']}") + score = 0 + + if spf["status"] == "FOUND": + score += 1 + domain_obj.add_attribute( + "text", + **{"type": "text", "value": f"SPF: {spf['record']}", "comment": "SPF record", "disable_correlation": True}, + ) + else: + domain_obj.add_attribute( + "text", + **{"type": "text", "value": "SPF: MISSING", "comment": "SPF record", "disable_correlation": True}, + ) + + if dmarc["status"] == "FOUND": + score += 1 + if dmarc["policy"] in ("reject", "quarantine"): + score += 1 + domain_obj.add_attribute( + "text", + **{ + "type": "text", + "value": f"DMARC: {dmarc['policy']} — {dmarc['record']}", + "comment": "DMARC record and policy", + "disable_correlation": True, + }, + ) + else: + domain_obj.add_attribute( + "text", + **{"type": "text", "value": "DMARC: MISSING", "comment": "DMARC record", "disable_correlation": True}, + ) if dkim: - lines.append(f"\nDKIM: FOUND ({len(dkim)} selector(s))") - for entry in dkim: - lines.append(f" Selector '{entry['selector']}': {entry['record'][:80]}...") + score += 1 + selectors = ", ".join(d["selector"] for d in dkim) + domain_obj.add_attribute( + "text", + **{ + "type": "text", + "value": f"DKIM: FOUND ({len(dkim)} selector(s): {selectors})", + "comment": "DKIM selectors found", + "disable_correlation": True, + }, + ) + else: + domain_obj.add_attribute( + "text", + **{ + "type": "text", + "value": "DKIM: NOT FOUND (tested common selectors)", + "comment": "DKIM check", + "disable_correlation": True, + }, + ) + + if mta_sts["status"] == "FOUND": + score += 1 + domain_obj.add_attribute( + "text", + **{"type": "text", "value": f"MTA-STS: {mta_sts['record']}", "comment": "MTA-STS record", "disable_correlation": True}, + ) else: - lines.append("\nDKIM: NOT FOUND (tested common selectors)") - - lines.append(f"\nMTA-STS: {mta_sts['status']}") - if mta_sts["record"]: - lines.append(f" Record: {mta_sts['record']}") - - score = sum([ - 1 if spf["status"] == "FOUND" else 0, - 1 if dmarc["status"] == "FOUND" else 0, - 1 if dmarc.get("policy") in ("reject", "quarantine") else 0, - 1 if dkim else 0, - 1 if mta_sts["status"] == "FOUND" else 0, - ]) - lines.append(f"\nSecurity Score: {score}/5") - - result_text = "\n".join(lines) - return {"results": [{"types": ["text"], "values": result_text}]} + domain_obj.add_attribute( + "text", + **{"type": "text", "value": "MTA-STS: MISSING", "comment": "MTA-STS record", "disable_correlation": True}, + ) + + domain_obj.add_attribute( + "text", + **{ + "type": "text", + "value": f"Email Security Score: {score}/5", + "comment": "Overall email security posture score", + "disable_correlation": True, + }, + ) + + domain_obj.add_reference(initial_attribute.uuid, "related-to") + event.add_object(**domain_obj) + + ev = json.loads(event.to_json()) + results = {key: ev[key] for key in ("Attribute", "Object") if key in ev and ev[key]} + return {"results": results} def introspection(): diff --git a/misp_modules/modules/expansion/ssh_fingerprint.py b/misp_modules/modules/expansion/ssh_fingerprint.py index ef3b8d17..8d86cc4c 100644 --- a/misp_modules/modules/expansion/ssh_fingerprint.py +++ b/misp_modules/modules/expansion/ssh_fingerprint.py @@ -2,13 +2,14 @@ import json import socket +from pymisp import MISPAttribute, MISPEvent, MISPObject misperrors = {"error": "Error"} -mispattributes = {"input": ["ip-src", "ip-dst"], "output": ["text"]} +mispattributes = {"input": ["ip-src", "ip-dst"], "format": "misp_standard"} moduleinfo = { - "version": "0.1", + "version": "0.2", "author": "Mihai Saveanu", - "description": "Grab SSH server key fingerprint from an IP address for verification or MitM detection.", + "description": "Grab SSH server fingerprint from an IP and return as passive-ssh MISP object.", "module-type": ["expansion", "hover"], "name": "SSH Fingerprint", "logo": "", @@ -16,12 +17,12 @@ "features": ( "The module takes an IP address attribute as input, connects to port 22," " performs the SSH protocol version exchange and key exchange init to extract" - " the server host key algorithms and SSH banner. Useful for detecting MitM" - " attacks or verifying server identity changes." + " the server host key algorithms and SSH banner. Returns a structured passive-ssh" + " MISP object. Useful for detecting MitM attacks or verifying server identity." ), "references": ["https://tools.ietf.org/html/rfc4253"], "input": "An IP address attribute (ip-src or ip-dst).", - "output": "Text containing SSH banner and key exchange information.", + "output": "passive-ssh MISP object with banner and fingerprint.", } moduleconfig = ["port", "timeout"] @@ -31,6 +32,7 @@ def _grab_ssh_banner(ip, port=22, timeout=5): "banner": None, "kex_algorithms": None, "host_key_algorithms": None, + "kex_hash": None, "error": None, } try: @@ -46,10 +48,6 @@ def _grab_ssh_banner(ip, port=22, timeout=5): kex_data = sock.recv(4096) if len(kex_data) > 21: payload = kex_data[5:] - if payload and payload[0:1] == b"\x14": - payload = payload[16:] - if payload and payload[0:1] == b"\x14": - pass try: msg_code = payload[0] @@ -99,37 +97,55 @@ def handler(q=False): request = json.loads(q) - ip = request.get("ip-src") or request.get("ip-dst") - if not ip: - misperrors["error"] = "An IP address attribute is required." - return misperrors + if not request.get("attribute") or not request["attribute"].get("type"): + return {"error": "Missing or invalid attribute."} + attribute = request["attribute"] + if attribute["type"] not in mispattributes["input"]: + return {"error": f"Unsupported attribute type: {attribute['type']}"} + + ip = attribute["value"] config = request.get("config", {}) port = int(config.get("port") or 22) timeout = float(config.get("timeout") or 5) result = _grab_ssh_banner(ip, port, timeout) - lines = [f"=== SSH Fingerprint: {ip}:{port} ===", ""] + event = MISPEvent() + initial_attribute = MISPAttribute() + initial_attribute.from_dict(**attribute) + event.add_attribute(**initial_attribute) if result["error"]: - lines.append(f"Error: {result['error']}") - return {"results": [{"types": ["text"], "values": "\n".join(lines)}]} + event.add_attribute( + "text", + f"SSH error for {ip}: {result['error']}", + comment="SSH Fingerprint - error", + ) + ev = json.loads(event.to_json()) + results = {key: ev[key] for key in ("Attribute", "Object") if key in ev and ev[key]} + return {"results": results} - if result["banner"]: - lines.append(f"Banner: {result['banner']}") + ssh = MISPObject("passive-ssh") - if result.get("host_key_algorithms"): - lines.append(f"\nHost Key Algorithms: {result['host_key_algorithms']}") + ssh.add_attribute("host", **{"type": "ip-dst", "value": ip}) + ssh.add_attribute("port", **{"type": "port", "value": port}) + + if result["banner"]: + ssh.add_attribute("banner", **{"type": "text", "value": result["banner"]}) - if result.get("kex_algorithms"): - lines.append(f"KEX Algorithms: {result['kex_algorithms']}") + if result["kex_hash"]: + ssh.add_attribute( + "fingerprint", + **{"type": "ssh-fingerprint", "value": result["kex_hash"]}, + ) - if result.get("kex_hash"): - lines.append(f"\nKEX Init Hash (SHA256): {result['kex_hash']}") - lines.append(" (Compare this hash over time to detect server key changes)") + ssh.add_reference(initial_attribute.uuid, "related-to") + event.add_object(**ssh) - return {"results": [{"types": ["text"], "values": "\n".join(lines)}]} + ev = json.loads(event.to_json()) + results = {key: ev[key] for key in ("Attribute", "Object") if key in ev and ev[key]} + return {"results": results} def introspection(): diff --git a/misp_modules/modules/expansion/tls_certificate_check.py b/misp_modules/modules/expansion/tls_certificate_check.py index cdf9b722..2ab98b83 100644 --- a/misp_modules/modules/expansion/tls_certificate_check.py +++ b/misp_modules/modules/expansion/tls_certificate_check.py @@ -3,26 +3,27 @@ import ssl from datetime import datetime, timezone +from pymisp import MISPAttribute, MISPEvent, MISPObject misperrors = {"error": "Error"} -mispattributes = {"input": ["domain", "hostname"], "output": ["text"]} +mispattributes = {"input": ["domain", "hostname"], "format": "misp_standard"} moduleinfo = { - "version": "0.1", + "version": "0.2", "author": "Mihai Saveanu", - "description": "Extract TLS certificate details from a domain: issuer, validity, SANs, chain info.", + "description": "Extract TLS certificate details from a domain and return as x509 MISP object.", "module-type": ["expansion", "hover"], "name": "TLS Certificate Check", "logo": "", "requirements": [], "features": ( "The module takes a domain or hostname attribute as input, connects to port 443," - " performs a TLS handshake and extracts certificate details including issuer," - " subject, validity period, Subject Alternative Names, serial number, and" - " protocol version. No external API required — pure Python ssl module." + " performs a TLS handshake and extracts certificate details. Returns a structured" + " x509 MISP object with subject, issuer, validity, SANs, serial number, and" + " protocol version. No external API required." ), "references": ["https://tools.ietf.org/html/rfc5246"], "input": "A domain or hostname attribute.", - "output": "Text containing TLS certificate details and assessment.", + "output": "x509 MISP object with certificate details.", } moduleconfig = ["port", "timeout"] @@ -31,6 +32,8 @@ def _get_cert_info(domain, port=443, timeout=5): result = { "subject": None, "issuer": None, + "issuer_org": None, + "issuer_cn": None, "serial": None, "not_before": None, "not_after": None, @@ -53,14 +56,17 @@ def _get_cert_info(domain, port=443, timeout=5): result["subject"] = subj.get("commonName", "N/A") issuer = dict(x[0] for x in cert.get("issuer", ())) + result["issuer_org"] = issuer.get("organizationName", "") + result["issuer_cn"] = issuer.get("commonName", "") result["issuer"] = ( - f"{issuer.get('organizationName', 'N/A')}" - f" ({issuer.get('commonName', 'N/A')})" + f"{result['issuer_org']} ({result['issuer_cn']})" + if result["issuer_org"] + else result["issuer_cn"] ) result["serial"] = str(cert.get("serialNumber", "N/A")) - result["not_before"] = cert.get("notBefore", "N/A") - result["not_after"] = cert.get("notAfter", "N/A") + result["not_before"] = cert.get("notBefore", "") + result["not_after"] = cert.get("notAfter", "") for entry_type, entry_value in cert.get("subjectAltName", ()): if entry_type == "DNS": @@ -80,6 +86,14 @@ def _get_cert_info(domain, port=443, timeout=5): return result +def _parse_datetime(date_str): + try: + dt = datetime.strptime(date_str, "%b %d %H:%M:%S %Y %Z") + return dt.replace(tzinfo=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00") + except Exception: + return None + + def _days_until_expiry(not_after_str): try: expiry = datetime.strptime(not_after_str, "%b %d %H:%M:%S %Y %Z") @@ -96,47 +110,71 @@ def handler(q=False): request = json.loads(q) - domain = request.get("domain") or request.get("hostname") - if not domain: - misperrors["error"] = "A domain or hostname attribute is required." - return misperrors + if not request.get("attribute") or not request["attribute"].get("type"): + return {"error": "Missing or invalid attribute."} + + attribute = request["attribute"] + if attribute["type"] not in mispattributes["input"]: + return {"error": f"Unsupported attribute type: {attribute['type']}"} + domain = attribute["value"] config = request.get("config", {}) port = int(config.get("port") or 443) timeout = float(config.get("timeout") or 5) result = _get_cert_info(domain, port, timeout) - lines = [f"=== TLS Certificate: {domain}:{port} ===", ""] + event = MISPEvent() + initial_attribute = MISPAttribute() + initial_attribute.from_dict(**attribute) + event.add_attribute(**initial_attribute) if result["error"]: - lines.append(f"Error: {result['error']}") - return {"results": [{"types": ["text"], "values": "\n".join(lines)}]} + event.add_attribute( + "text", + f"TLS error for {domain}: {result['error']}", + comment="TLS Certificate Check - error", + ) + ev = json.loads(event.to_json()) + results = {key: ev[key] for key in ("Attribute", "Object") if key in ev and ev[key]} + return {"results": results} + + x509 = MISPObject("x509") + + x509.add_attribute("serial-number", **{"type": "text", "value": result["serial"]}) + x509.add_attribute("issuer", **{"type": "text", "value": result["issuer"], "disable_correlation": True}) + x509.add_attribute("subject", **{"type": "text", "value": result["subject"]}) + + if result["protocol"]: + x509.add_attribute("version", **{"type": "text", "value": result["protocol"], "disable_correlation": True}) + + not_before_iso = _parse_datetime(result["not_before"]) + not_after_iso = _parse_datetime(result["not_after"]) + + if not_before_iso: + x509.add_attribute("validity-not-before", **{"type": "datetime", "value": not_before_iso, "disable_correlation": True}) + if not_after_iso: + x509.add_attribute("validity-not-after", **{"type": "datetime", "value": not_after_iso, "disable_correlation": True}) - lines.append(f"Subject: {result['subject']}") - lines.append(f"Issuer: {result['issuer']}") - lines.append(f"Serial: {result['serial']}") - lines.append(f"Protocol: {result['protocol']}") - lines.append(f"\nValid From: {result['not_before']}") - lines.append(f"Valid Until: {result['not_after']}") + for san in result["sans"][:50]: + x509.add_attribute("dns_names", **{"type": "hostname", "value": san}) days_left = _days_until_expiry(result["not_after"]) if days_left is not None: if days_left < 0: - lines.append(f"STATUS: EXPIRED ({abs(days_left)} days ago)") + status = f"EXPIRED ({abs(days_left)} days ago)" elif days_left < 30: - lines.append(f"STATUS: EXPIRING SOON ({days_left} days left)") + status = f"EXPIRING SOON ({days_left} days left)" else: - lines.append(f"STATUS: VALID ({days_left} days remaining)") + status = f"VALID ({days_left} days remaining)" + x509.add_attribute("text", **{"type": "text", "value": status, "disable_correlation": True}) - if result["sans"]: - lines.append(f"\nSubject Alternative Names ({len(result['sans'])}):") - for san in result["sans"][:20]: - lines.append(f" - {san}") - if len(result["sans"]) > 20: - lines.append(f" ... and {len(result['sans']) - 20} more") + x509.add_reference(initial_attribute.uuid, "related-to") + event.add_object(**x509) - return {"results": [{"types": ["text"], "values": "\n".join(lines)}]} + ev = json.loads(event.to_json()) + results = {key: ev[key] for key in ("Attribute", "Object") if key in ev and ev[key]} + return {"results": results} def introspection():