|
| 1 | +""" |
| 2 | +MISP Expansion Module: QR Code Decoder (Anti-Quishing) |
| 3 | +This module downloads and decodes QR codes from local attachments or remote URLs. |
| 4 | +It includes security hardening against SSRF and DoS attacks. |
| 5 | +""" |
| 6 | + |
1 | 7 | import binascii |
2 | 8 | import json |
3 | 9 | import re |
| 10 | +import socket |
| 11 | +import ipaddress |
| 12 | +from urllib.parse import urlparse |
4 | 13 |
|
| 14 | +# Third-party imports |
| 15 | +# pylint: disable=import-error |
| 16 | +import requests |
5 | 17 | import cv2 |
6 | | -import np |
| 18 | +import numpy as np |
7 | 19 | from pyzbar import pyzbar |
| 20 | +import urllib3 |
| 21 | + |
| 22 | +# Suppress SSL warnings for analysis purposes |
| 23 | +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
| 24 | + |
| 25 | +# Pylint ignores for dynamic libraries like cv2 |
| 26 | +# pylint: disable=no-member |
8 | 27 |
|
9 | | -misperrors = {"error": "Error"} |
10 | | -mispattributes = {"input": ["attachment"], "output": ["url", "btc"]} |
11 | | -moduleinfo = { |
12 | | - "version": "0.1", |
| 28 | +MISP_ERRORS = {"error": "Error"} |
| 29 | + |
| 30 | +MISP_ATTRIBUTES = { |
| 31 | + "input": ["attachment", "url", "link"], |
| 32 | + "output": ["url", "btc"] |
| 33 | +} |
| 34 | + |
| 35 | +MODULE_INFO = { |
| 36 | + "version": "0.3", |
13 | 37 | "author": "Sascha Rommelfangen", |
14 | | - "description": "Module to decode QR codes.", |
| 38 | + "description": "Decode QR codes from attachments OR remote URLs (Anti-Quishing).", |
15 | 39 | "module-type": ["expansion", "hover"], |
16 | 40 | "name": "QR Code Decode", |
17 | | - "logo": "", |
18 | | - "requirements": [ |
19 | | - "cv2: The OpenCV python library.", |
20 | | - "pyzbar: Python library to read QR codes.", |
21 | | - ], |
22 | | - "features": ( |
23 | | - "The module reads the QR code and returns the related address, which can be an URL or a bitcoin address." |
24 | | - ), |
25 | | - "references": [], |
26 | | - "input": "A QR code stored as attachment attribute.", |
| 41 | + "requirements": ["cv2", "pyzbar", "requests", "numpy"], |
| 42 | + "input": "A QR code stored as attachment attribute or a remote URL.", |
27 | 43 | "output": "The URL or bitcoin address the QR code is pointing to.", |
28 | 44 | } |
29 | 45 |
|
30 | | -debug = True |
31 | | -debug_prefix = "[DEBUG] QR Code module: " |
32 | | -# format example: bitcoin:1GXZ6v7FZzYBEnoRaG77SJxhu7QkvQmFuh?amount=0.15424 |
33 | | -# format example: http://example.com |
34 | | -cryptocurrencies = ["bitcoin"] |
35 | | -schemas = ["http://", "https://", "ftp://"] |
36 | | -moduleconfig = [] |
| 46 | +DEBUG_MODE = True |
| 47 | +DEBUG_PREFIX = "[DEBUG] QR Code module: " |
| 48 | +CRYPTOCURRENCIES = ["bitcoin"] |
| 49 | +SCHEMAS = ["http://", "https://", "ftp://"] |
| 50 | +MODULE_CONFIG = [] |
| 51 | + |
| 52 | +# --- SECURITY CONFIGURATION --- |
| 53 | +MAX_IMAGE_SIZE = 10 * 1024 * 1024 # 10 MB limit (Anti-DoS) |
| 54 | +TIMEOUT_SECONDS = 10 |
| 55 | + |
| 56 | + |
| 57 | +def is_safe_url(url): |
| 58 | + """ |
| 59 | + SSRF Protection: Validates that the URL resolves to a public IP. |
| 60 | + Returns: (bool, message) |
| 61 | + """ |
| 62 | + try: |
| 63 | + parsed = urlparse(url) |
| 64 | + hostname = parsed.hostname |
| 65 | + # DNS Resolution to check real IP |
| 66 | + ip_addr_str = socket.gethostbyname(hostname) |
| 67 | + ip_addr = ipaddress.ip_address(ip_addr_str) |
| 68 | + |
| 69 | + # Block private, loopback, and reserved IPs |
| 70 | + if ip_addr.is_loopback or ip_addr.is_private or ip_addr.is_reserved: |
| 71 | + return False, f"Blocked internal IP: {ip_addr_str}" |
| 72 | + |
| 73 | + return True, "OK" |
| 74 | + except Exception as e: # pylint: disable=broad-exception-caught |
| 75 | + # Fail safe: if we can't resolve or parse, we block |
| 76 | + return False, f"DNS Resolution failed: {str(e)}" |
| 77 | + |
| 78 | + |
| 79 | +def fetch_url_image(target_url): |
| 80 | + """ |
| 81 | + Downloads image from URL with security checks (Anti-Cloaking & DoS protection). |
| 82 | + """ |
| 83 | + # 1. SSRF Check |
| 84 | + is_safe, msg = is_safe_url(target_url) |
| 85 | + if not is_safe: |
| 86 | + return None, f"Security Block (SSRF Protection): {msg}" |
| 87 | + |
| 88 | + try: |
| 89 | + # Anti-Cloaking: Simulate mobile User-Agent |
| 90 | + user_agent = ( |
| 91 | + "Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X) " |
| 92 | + "AppleWebKit/605.1.15 (KHTML, like Gecko) " |
| 93 | + "Version/15.0 Mobile/15E148 Safari/604.1" |
| 94 | + ) |
| 95 | + headers = {'User-Agent': user_agent} |
| 96 | + |
| 97 | + # 2. Secure Download (Stream + Size Limit) |
| 98 | + # pylint: disable=missing-timeout |
| 99 | + with requests.get( |
| 100 | + target_url, |
| 101 | + headers=headers, |
| 102 | + timeout=TIMEOUT_SECONDS, |
| 103 | + stream=True, |
| 104 | + verify=False |
| 105 | + ) as response: # nosec |
| 106 | + response.raise_for_status() |
| 107 | + |
| 108 | + if 'content-length' in response.headers: |
| 109 | + if int(response.headers['content-length']) > MAX_IMAGE_SIZE: |
| 110 | + return None, 'Image too large (DoS protection).' |
37 | 111 |
|
| 112 | + content = b"" |
| 113 | + for chunk in response.iter_content(chunk_size=8192): |
| 114 | + content += chunk |
| 115 | + if len(content) > MAX_IMAGE_SIZE: |
| 116 | + return None, 'Image too large (DoS protection) - Download aborted.' |
38 | 117 |
|
| 118 | + return np.frombuffer(content, np.uint8), None |
| 119 | + |
| 120 | + except Exception as e: # pylint: disable=broad-exception-caught |
| 121 | + return None, f"Fetch Error: {str(e)}" |
| 122 | + |
| 123 | + |
| 124 | +# pylint: disable=too-many-return-statements, too-many-branches |
39 | 125 | def handler(q=False): |
| 126 | + """ |
| 127 | + Main handler function for MISP module. |
| 128 | + """ |
40 | 129 | if q is False: |
41 | 130 | return False |
| 131 | + |
42 | 132 | q = json.loads(q) |
43 | | - filename = q["attachment"] |
| 133 | + img_array = None |
| 134 | + filename = "unknown" |
| 135 | + |
| 136 | + # --- CASE 1: URL Handling --- |
| 137 | + if "url" in q or "link" in q: |
| 138 | + target_url = q.get("url", q.get("link")) |
| 139 | + filename = target_url |
| 140 | + img_array, error_msg = fetch_url_image(target_url) |
| 141 | + if error_msg: |
| 142 | + MISP_ERRORS["error"] = error_msg |
| 143 | + if DEBUG_MODE: |
| 144 | + print(DEBUG_PREFIX + error_msg) |
| 145 | + return MISP_ERRORS |
| 146 | + |
| 147 | + # --- CASE 2: Attachment Handling --- |
| 148 | + elif "attachment" in q: |
| 149 | + filename = q["attachment"] |
| 150 | + try: |
| 151 | + img_array = np.frombuffer(binascii.a2b_base64(q["data"]), np.uint8) |
| 152 | + except Exception: # pylint: disable=broad-exception-caught |
| 153 | + return {'error': "Attachment error: empty or invalid data."} |
| 154 | + |
| 155 | + else: |
| 156 | + return {'error': 'Unsupported input. Provide an attachment or a URL.'} |
| 157 | + |
| 158 | + # --- DECODING --- |
| 159 | + if img_array is None: |
| 160 | + return {'error': 'Failed to process image data.'} |
| 161 | + |
44 | 162 | try: |
45 | | - img_array = np.frombuffer(binascii.a2b_base64(q["data"]), np.uint8) |
46 | | - except Exception as e: |
47 | | - err = "Couldn't fetch attachment (JSON 'data' is empty). Are you using the 'Query enrichment' action?" |
48 | | - misperrors["error"] = err |
49 | | - print(err) |
50 | | - print(e) |
51 | | - return misperrors |
52 | | - image = cv2.imdecode(img_array, cv2.IMREAD_COLOR) |
53 | | - if q: |
| 163 | + image = cv2.imdecode(img_array, cv2.IMREAD_COLOR) |
| 164 | + if image is None: |
| 165 | + return {'error': 'Not a valid image file.'} |
54 | 166 | barcodes = pyzbar.decode(image) |
| 167 | + except Exception as e: # pylint: disable=broad-exception-caught |
| 168 | + return {'error': f'CV2/Pyzbar error: {str(e)}'} |
| 169 | + |
| 170 | + if not barcodes: |
| 171 | + return {'error': 'No QR code found in image.'} |
| 172 | + |
55 | 173 | for item in barcodes: |
56 | 174 | try: |
57 | 175 | result = item.data.decode() |
58 | | - except Exception as e: |
59 | | - print(e) |
60 | | - return |
61 | | - if debug: |
62 | | - print(debug_prefix + result) |
63 | | - for item in cryptocurrencies: |
64 | | - if item in result: |
65 | | - try: |
66 | | - currency, address, extra = re.split(r"\:|\?", result) |
67 | | - except Exception as e: |
68 | | - print(e) |
69 | | - if currency in cryptocurrencies: |
70 | | - try: |
71 | | - amount = re.split("=", extra)[1] |
72 | | - if debug: |
73 | | - print(debug_prefix + address) |
74 | | - print(debug_prefix + amount) |
75 | | - return { |
76 | | - "results": [ |
77 | | - { |
78 | | - "types": ["btc"], |
79 | | - "values": address, |
80 | | - "comment": "BTC: " + amount + " from file " + filename, |
81 | | - } |
82 | | - ] |
83 | | - } |
84 | | - except Exception as e: |
85 | | - print(e) |
86 | | - else: |
87 | | - print(address) |
88 | | - for item in schemas: |
89 | | - if item in result: |
90 | | - try: |
91 | | - url = result |
92 | | - if debug: |
93 | | - print(debug_prefix + url) |
94 | | - return { |
95 | | - "results": [ |
96 | | - { |
97 | | - "types": ["url"], |
98 | | - "values": url, |
99 | | - "comment": "from QR code of file " + filename, |
100 | | - } |
101 | | - ] |
102 | | - } |
103 | | - except Exception as e: |
104 | | - print(e) |
105 | | - else: |
106 | | - try: |
| 176 | + except Exception as e: # pylint: disable=broad-exception-caught |
| 177 | + print(f"Warning: Could not decode barcode data: {e}") |
| 178 | + continue |
| 179 | + |
| 180 | + if DEBUG_MODE: |
| 181 | + print(DEBUG_PREFIX + result) |
| 182 | + |
| 183 | + # Bitcoin logic (Legacy support) |
| 184 | + for crypto in CRYPTOCURRENCIES: |
| 185 | + if crypto in result: |
| 186 | + parts = re.split(r"\:|\?", result) |
| 187 | + if len(parts) > 1 and parts[0] in CRYPTOCURRENCIES: |
107 | 188 | return { |
108 | | - "results": [ |
109 | | - { |
110 | | - "types": ["text"], |
111 | | - "values": result, |
112 | | - "comment": "from QR code of file " + filename, |
113 | | - } |
114 | | - ] |
| 189 | + "results": [{ |
| 190 | + "types": ["btc"], |
| 191 | + "values": parts[1], |
| 192 | + "comment": f"BTC found in {filename}" |
| 193 | + }] |
115 | 194 | } |
116 | | - except Exception as e: |
117 | | - print(e) |
118 | | - misperrors["error"] = "Couldn't decode QR code in attachment." |
119 | | - return misperrors |
| 195 | + |
| 196 | + # URL/Text Logic |
| 197 | + is_url = any(schema in result for schema in SCHEMAS) |
| 198 | + return { |
| 199 | + "results": [{ |
| 200 | + "types": ["url"] if is_url else ["text"], |
| 201 | + "values": result, |
| 202 | + "comment": f"Decoded from {filename}" |
| 203 | + }] |
| 204 | + } |
| 205 | + |
| 206 | + return {'error': "Analysis finished but no data returned."} |
120 | 207 |
|
121 | 208 |
|
122 | 209 | def introspection(): |
123 | | - return mispattributes |
| 210 | + """Returns the input and output attributes supported by the module.""" |
| 211 | + return MISP_ATTRIBUTES |
124 | 212 |
|
125 | 213 |
|
126 | 214 | def version(): |
127 | | - moduleinfo["config"] = moduleconfig |
128 | | - return moduleinfo |
| 215 | + """Returns the version and configuration of the module.""" |
| 216 | + MODULE_INFO["config"] = MODULE_CONFIG |
| 217 | + return MODULE_INFO |
0 commit comments