Skip to content

Commit 3f08ab1

Browse files
authored
Add retrieval of DPAPI backup keys (#42)
1 parent ca61f5a commit 3f08ab1

3 files changed

Lines changed: 161 additions & 2 deletions

File tree

dissect/database/ese/ntds/ntds.py

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,25 @@
11
from __future__ import annotations
22

33
from typing import TYPE_CHECKING, BinaryIO
4+
from uuid import UUID
45

56
from dissect.database.ese.ntds.database import Database
7+
from dissect.database.ese.ntds.objects.secret import BackupKey
68

79
if TYPE_CHECKING:
810
from collections.abc import Iterator
911

10-
from dissect.database.ese.ntds.objects import Computer, DomainDNS, Group, GroupPolicyContainer, Object, Server, User
11-
from dissect.database.ese.ntds.objects.trusteddomain import TrustedDomain
12+
from dissect.database.ese.ntds.objects import (
13+
Computer,
14+
DomainDNS,
15+
Group,
16+
GroupPolicyContainer,
17+
Object,
18+
Secret,
19+
Server,
20+
TrustedDomain,
21+
User,
22+
)
1223
from dissect.database.ese.ntds.pek import PEK
1324

1425

@@ -93,3 +104,37 @@ def trusts(self) -> Iterator[TrustedDomain]:
93104
def group_policies(self) -> Iterator[GroupPolicyContainer]:
94105
"""Get all group policy objects (GPO) objects from the database."""
95106
yield from self.search(objectClass="groupPolicyContainer")
107+
108+
def secrets(self) -> Iterator[Secret]:
109+
"""Get all secret objects from the database."""
110+
yield from self.search(objectClass="secret")
111+
112+
def backup_keys(self) -> Iterator[BackupKey]:
113+
"""Get all DPAPI backup keys from the database."""
114+
if not self.pek.unlocked:
115+
raise ValueError("PEK must be unlocked to retrieve backup keys")
116+
117+
for secret in self.secrets():
118+
if secret.is_phantom or not secret.name.startswith("BCKUPKEY_") or secret.name.startswith("BCKUPKEY_P"):
119+
continue
120+
121+
yield BackupKey(secret)
122+
123+
def preferred_backup_keys(self) -> Iterator[BackupKey]:
124+
"""Get preferred DPAPI backup keys from the database."""
125+
if not self.pek.unlocked:
126+
raise ValueError("PEK must be unlocked to retrieve backup keys")
127+
128+
# We could do this the proper way (lookup the BCKUPKEY_P* secrets and then directly lookup the
129+
# corresponding BCKUPKEY_* secrets), but in practice there are only a few backup keys, so just
130+
# filter after the fact
131+
preferred_guids = []
132+
for secret in self.secrets():
133+
if secret.is_phantom or not secret.name.startswith("BCKUPKEY_P"):
134+
continue
135+
136+
preferred_guids.append(UUID(bytes_le=secret.current_value))
137+
138+
for key in self.backup_keys():
139+
if key.guid in preferred_guids:
140+
yield key

dissect/database/ese/ntds/objects/secret.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,17 @@
11
from __future__ import annotations
22

3+
from functools import cached_property
4+
from typing import TYPE_CHECKING
5+
from uuid import UUID
6+
7+
from dissect.util.ts import wintimestamp
8+
9+
from dissect.database.ese.ntds.c_ds import c_ds
310
from dissect.database.ese.ntds.objects.leaf import Leaf
411

12+
if TYPE_CHECKING:
13+
from datetime import datetime
14+
515

616
class Secret(Leaf):
717
"""Represents a secret object in the Active Directory.
@@ -11,3 +21,79 @@ class Secret(Leaf):
1121
"""
1222

1323
__object_class__ = "secret"
24+
25+
def __repr_body__(self) -> str:
26+
return f"name={self.name!r} last_set_time={self.last_set_time} prior_set_time={self.prior_set_time}"
27+
28+
@property
29+
def current_value(self) -> bytes:
30+
"""Return the current value of the secret."""
31+
return self.get("currentValue")
32+
33+
@property
34+
def last_set_time(self) -> datetime | None:
35+
"""Return the last set time of the secret."""
36+
if (ts := self.get("lastSetTime")) is not None:
37+
return wintimestamp(ts)
38+
return None
39+
40+
@property
41+
def prior_value(self) -> bytes:
42+
"""Return the prior value of the secret."""
43+
return self.get("priorValue")
44+
45+
@property
46+
def prior_set_time(self) -> datetime | None:
47+
"""Return the prior set time of the secret."""
48+
if (ts := self.get("priorSetTime")) is not None:
49+
return wintimestamp(ts)
50+
return None
51+
52+
53+
class BackupKey:
54+
"""Represents a DPAPI backup key object in the Active Directory."""
55+
56+
def __init__(self, secret: Secret):
57+
self.secret = secret
58+
59+
def __repr__(self) -> str:
60+
return f"<BackupKey guid={self.guid} version={self.version}>"
61+
62+
@cached_property
63+
def guid(self) -> UUID:
64+
"""The GUID of the backup key."""
65+
return UUID(self.secret.name.removeprefix("BCKUPKEY_").removesuffix(" Secret"))
66+
67+
@cached_property
68+
def version(self) -> int:
69+
"""The version of the backup key."""
70+
return c_ds.DWORD(self.secret.current_value)
71+
72+
@cached_property
73+
def is_legacy(self) -> bool:
74+
"""Whether the backup key is a legacy key (version 1)."""
75+
return self.version == 1
76+
77+
@cached_property
78+
def key(self) -> bytes:
79+
"""The key bytes of the backup key, for legacy keys (version 1)."""
80+
if self.version == 1:
81+
return self.secret.current_value[4:]
82+
raise TypeError(f"Backup key version {self.version} does not have a single key value")
83+
84+
@cached_property
85+
def private_key(self) -> bytes:
86+
"""The private key bytes of the backup key, for version 2 keys."""
87+
if self.version == 2:
88+
private_length = c_ds.DWORD(self.secret.current_value[4:8])
89+
return self.secret.current_value[12 : 12 + private_length]
90+
raise TypeError(f"Backup key version {self.version} does not have a private key value")
91+
92+
@cached_property
93+
def public_key(self) -> bytes:
94+
"""The public key bytes of the backup key, for version 2 keys."""
95+
if self.version == 2:
96+
private_length = c_ds.DWORD(self.secret.current_value[4:8])
97+
public_length = c_ds.DWORD(self.secret.current_value[8:12])
98+
return self.secret.current_value[12 + private_length : 12 + private_length + public_length]
99+
raise TypeError(f"Backup key version {self.version} does not have a public key value")

tests/ese/ntds/test_ntds.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from __future__ import annotations
22

3+
import hashlib
34
from typing import TYPE_CHECKING
5+
from uuid import UUID
46

57
import pytest
68

@@ -265,6 +267,7 @@ def test_all_memberships(large: NTDS) -> None:
265267

266268

267269
def test_group_policies(goad: NTDS) -> None:
270+
"""Test retrieval of group policies."""
268271
gpos: list[GroupPolicyContainer] = sorted(goad.group_policies(), key=lambda x: x.distinguished_name)
269272
assert len(gpos) == 5
270273
assert isinstance(gpos[0], GroupPolicyContainer)
@@ -275,3 +278,28 @@ def test_group_policies(goad: NTDS) -> None:
275278
"CN={6AC1786C-016F-11D2-945F-00C04FB984F9},CN=POLICIES,CN=SYSTEM,DC=NORTH,DC=SEVENKINGDOMS,DC=LOCAL",
276279
"CN={6AC1786C-016F-11D2-945F-00C04FB984F9},CN=POLICIES,CN=SYSTEM,DC=SEVENKINGDOMS,DC=LOCAL",
277280
]
281+
282+
283+
def test_backup_keys(goad: NTDS) -> None:
284+
"""Test retrieval of DPAPI backup keys."""
285+
with pytest.raises(ValueError, match="PEK must be unlocked to retrieve backup keys"):
286+
list(goad.backup_keys())
287+
288+
goad.pek.unlock(bytes.fromhex("079f95655b66f16deb28aa1ab3a81eb0"))
289+
290+
keys = list(goad.backup_keys())
291+
assert len(keys) == 2
292+
assert keys[0].guid == UUID("dbea00d0-005f-4233-b140-41a9961da100")
293+
assert keys[0].version == 1
294+
assert hashlib.sha256(keys[0].key).hexdigest() == "bae7b058f277922b75d63d9803b85fca40a95a3cc9d47c0ef0a644a203009562"
295+
296+
assert keys[1].guid == UUID("b7d3c47b-2efe-4cad-b37a-bb2f8b18bd87")
297+
assert keys[1].version == 2 # Current key version
298+
assert (
299+
hashlib.sha256(keys[1].private_key).hexdigest()
300+
== "e7317dfe5f962121afead04e0dbb4249aa395ef281e2332f6179f940b54f202f"
301+
)
302+
assert (
303+
hashlib.sha256(keys[1].public_key).hexdigest()
304+
== "398fef9281677096b18785d0ad000251d41f76b82e28687718d6a9812ddaca8a"
305+
)

0 commit comments

Comments
 (0)