Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
309 changes: 195 additions & 114 deletions modules/signatures/windows/ransomware_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
except ImportError:
import re


class RansomwareMessage(Signature):
name = "ransomware_message"
description = "Writes a potential ransom message to disk"
Expand All @@ -32,38 +31,81 @@ class RansomwareMessage(Signature):
ttps = ["T1486"]
mbcs = ["OB0008", "E1486", "OC0001", "C0016"]

filter_apinames = {"NtWriteFile"}
filter_apinames = {"NtWriteFile", "WriteFile"}

def __init__(self, *args, **kwargs):
Signature.__init__(self, *args, **kwargs)
self.ret = False

self.indicators = [
"your files",
"your data",
"your documents",
"restore files",
"restore data",
"restore the files",
"restore the data",
"recover files",
"recover data",
"recover the files",
"recover the data",
".onion",
"aes 128",
"aes 256",
"aes-128",
"aes-256",
"aes128",
"aes256",
"all data",
"attention!",
"bit coin",
"bitcoin",
"bootkit",
"btc",
"decrypt",
"decrypter",
"decryptor",
"device id",
"download tor",
"encrypt",
"encrypted",
"encryption id",
"enter code",
"ethereum",
"get back my",
"get back your",
"hardwareid",
"has been locked",
"pay fine",
"install tor",
"localbitcoins",
"military grade encryption",
"pay a fine",
"pay fine",
"pay the fine",
"decrypt",
"encrypt",
"payment",
"personal code",
"personal id",
"personal identification code",
"personal identifier",
"personal key",
"private code",
"private key",
"ransom",
"recover data",
"recover files",
"recover my",
"recover personal",
"recover the data",
"recover the files",
"recover them",
"recover your",
"recover personal",
"bitcoin",
"secret server",
"restore data",
"restore files",
"restore system",
"restore the data",
"restore the files",
"restore the system",
"rootkit",
"rsa 1024",
"rsa 2048",
"rsa 4096",
"rsa-1024",
"rsa-2048",
"rsa-4096",
"rsa1024",
"rsa2048",
"rsa4096",
"secret internet server",
"install tor",
"download tor",
"secret server",
"tor browser",
"tor gateway",
"tor-browser",
Expand All @@ -72,128 +114,167 @@ def __init__(self, *args, **kwargs):
"torgateway",
"torproject.org",
"tox.chat",
"ransom",
"bootkit",
"rootkit",
"payment",
"unique id",
"unique key",
"victim",
"AES128",
"AES256",
"AES 128",
"AES 256",
"AES-128",
"AES-256",
"RSA1024",
"RSA2048",
"RSA4096",
"RSA 1024",
"RSA 2048",
"RSA 4096",
"RSA-1024",
"RSA-2048",
"RSA-4096",
"private key",
"personal key",
"wallet address",
"what happened",
"your code",
"private code",
"personal code",
"enter code",
"your key",
"unique key",
"your data",
"your database",
"encrypted",
"bit coin",
"BTC",
"ethereum",
"what happened",
"what happened",
"decryptor",
"decrypter",
"personal ID",
"unique ID",
"encryption ID",
"device ID",
"hardwareid",
"recover my",
"wallet address",
"localbitcoins",
"Attention!",
"restore the system",
"restore system",
"military grade encryption",
"personal identifier",
"personal identification code",
"get back my",
"get back your",
"your network",
"your documents",
"your files",
"your key",
"your network"
]

indicators_bytes = [i.encode("utf-8").lower() for i in self.indicators]
pattern_bytes = b"|".join(re.escape(i) for i in indicators_bytes)
self.regex = re.compile(pattern_bytes)
indicators_str = [re.escape(i.lower()) for i in self.indicators]
pattern_str = "|".join(indicators_str)
self.regex = re.compile(pattern_str)

def on_call(self, call, process):
filepath = self.get_argument(call, "HandleName")

filepath = self.get_argument(call, "HandleName") or self.get_argument(call, "FileName")
if not filepath:
return

filepath_lower = filepath.lower()

is_target_path = (
filepath_lower == "\\??\\physicaldrive0"
or filepath_lower.startswith("\\device\\harddisk")
or filepath_lower.endswith((".txt", ".html", ".hta", ".rtf"))
or "readme" in filepath_lower
or "read_me" in filepath_lower
or "decrypt" in filepath_lower
filepath_lower == "\\??\\physicaldrive0" or
filepath_lower.startswith("\\device\\harddisk") or
filepath_lower.endswith((".txt", ".html", ".hta", ".rtf")) or
"readme" in filepath_lower or
"read_me" in filepath_lower or
"decrypt" in filepath_lower
)

if not is_target_path:
return

buff = self.get_raw_argument(call, "Buffer")
if buff and len(buff) >= 128:
buff_lower = buff.lower()
matches = set(self.regex.findall(buff_lower))
buff = self.get_argument(call, "Buffer")

if buff:
if isinstance(buff, bytes) or isinstance(buff, bytearray):
buff_str = bytes(buff).decode('utf-8', errors='ignore')
else:
buff_str = str(buff)

if len(matches) > 1:
self.data.append({"ransom_note": filepath})
self.data.append({"beginning_of_ransom_message": buff})

if self.pid:
if len(buff_str) >= 32:
buff_lower = buff_str.lower()
matches = set(self.regex.findall(buff_lower))
if len(matches) > 1:
self.mark_call()
self.ret = True
return True

def on_complete(self):
if not self.ret and "dropped" in self.results:
for dropped in self.results["dropped"]:

raw_name = dropped.get("name", "")
if isinstance(raw_name, list) and len(raw_name) > 0:
filename = str(raw_name[0]).lower()
else:
filename = str(raw_name).lower()

if (
filename.endswith((".txt", ".html", ".hta", ".rtf"))
or "read_me" in filename
or "readme" in filename
or "read-me" in filename
):

if filename.endswith((".txt", ".html", ".hta", ".rtf")) or "read_me" in filename or "readme" in filename:
filedata = dropped.get("data")

if filedata:
if isinstance(filedata, bytes) or isinstance(filedata, bytearray):
filedata_str = bytes(filedata).decode('utf-8', errors='ignore')
else:
filedata_str = str(filedata)

if len(filedata_str) >= 32:
filedata_lower = filedata_str.lower()
matches = set(self.regex.findall(filedata_lower))

if len(matches) > 1:
self.data.append({"ransom_note": filename})
self.data.append({"beginning_of_ransom_message": filedata_str})
self.ret = True
break

if isinstance(filedata, str):
filedata = filedata.encode("utf-8", errors="ignore")
return self.ret

if filedata and len(filedata) >= 128:
filedata_lower = filedata.lower()
matches = set(self.regex.findall(filedata_lower))

if len(matches) > 1:
self.data.append({"ransom_note": filename})
self.data.append({"beginning_of_ransom_message": filedata})
self.ret = True
break
class MassRansomNoteDrop(Signature):
name = "mass_ransom_note_drop"
description = "Writes or copies the same ransom note filename across multiple directories"
severity = 3
categories = ["ransomware"]
authors = ["Kevin Ross"]
minimum = "1.3"
evented = True
ttps = ["T1486"]
mbcs = ["OB0008", "E1486"]

filter_apinames = set([
"NtWriteFile", "WriteFile",
"CopyFileA", "CopyFileW", "CopyFileExA", "CopyFileExW",
"MoveFileA", "MoveFileW", "MoveFileExA", "MoveFileExW"
])

def __init__(self, *args, **kwargs):
Signature.__init__(self, *args, **kwargs)
self.ret = False
self.marked_calls = 0
self.dropped_notes = {}
self.note_keywords = ("readme", "read_me", "decrypt", "restore", "instructions", "recover")
self.extensions = (".txt", ".html", ".hta", ".rtf", ".url")

def on_call(self, call, process):
pid = process.get("process_id")

filepath = self.get_argument(call, "NewFileName") or self.get_argument(call, "HandleName") or self.get_argument(call, "FileName")

if not isinstance(filepath, str):
return

filepath = filepath.replace("/", "\\")
if "\\" not in filepath:
return

dirname, _, filename = filepath.rpartition("\\")
filename_lower = filename.lower()

if not filename_lower.endswith(self.extensions):
return

if not any(kw in filename_lower for kw in self.note_keywords):
return

dirname_lower = dirname.lower()

if pid not in self.dropped_notes:
self.dropped_notes[pid] = {}

if filename_lower not in self.dropped_notes[pid]:
self.dropped_notes[pid][filename_lower] = set()

if dirname_lower in self.dropped_notes[pid][filename_lower]:
return

self.dropped_notes[pid][filename_lower].add(dirname_lower)
dir_count = len(self.dropped_notes[pid][filename_lower])

if dir_count >= 2 and self.marked_calls < 5:
self.mark_call()
self.marked_calls += 1

if dir_count >= 5:
self.ret = True

def on_complete(self):
if self.ret:
for pid, notes in self.dropped_notes.items():
for note_name, dirs in notes.items():
if len(dirs) >= 5:
self.data.append({
"ransom_note": note_name,
"pid": pid,
"directories_count": len(dirs)
})
return self.ret