-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdecoynet.py
More file actions
169 lines (144 loc) · 5.86 KB
/
decoynet.py
File metadata and controls
169 lines (144 loc) · 5.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import asyncio
import argparse
import logging
from datetime import datetime
# --- Configuration ---
ENABLED_SERVICES = {
'SSH': 22,
'FTP': 21,
'HTTP': 80,
}
LOG_FILE = 'decoynet.log'
# --- Logging Setup ---
def setup_logging():
"""Configures structured logging to both console and file."""
logging.basicConfig(
level=logging.INFO,
format='%(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
def log_event(service, attacker_ip, attacker_port, event):
"""Logs an event in a structured key=value format."""
timestamp = datetime.utcnow().isoformat()
log_message = (
f"timestamp={timestamp} service='{service}' "
f"attacker_ip='{attacker_ip}' attacker_port={attacker_port} "
f"event='{event}'"
)
logging.info(log_message)
# --- Base Service Class ---
class HoneypotService:
"""Base class for a honeypot service."""
def __init__(self, service_name, port):
self.service_name = service_name
self.port = port
async def handle_connection(self, reader, writer):
"""Handles an incoming connection."""
attacker_ip, attacker_port = writer.get_extra_info('peername')
log_event(self.service_name, attacker_ip, attacker_port, 'Connection received')
try:
await self.emulate_service(reader, writer)
except Exception as e:
log_event(self.service_name, attacker_ip, attacker_port, f"Error: {e}")
finally:
writer.close()
await writer.wait_closed()
async def emulate_service(self, reader, writer):
"""The core logic for emulating the service. Must be overridden by subclasses."""
raise NotImplementedError("Subclasses must implement this method.")
async def start(self):
"""Starts the service listener."""
server = await asyncio.start_server(
self.handle_connection, '0.0.0.0', self.port
)
addr = server.sockets[0].getsockname()
print(f"[*] Starting {self.service_name} service on {addr[0]}:{addr[1]}")
async with server:
await server.serve_forever()
# --- Service Implementations ---
class SSHHoneypot(HoneypotService):
"""A honeypot for the SSH service."""
def __init__(self, port):
super().__init__('SSH', port)
async def emulate_service(self, reader, writer):
"""Immediately closes the connection after logging."""
# No banner, just close.
pass
class FTPHoneypot(HoneypotService):
"""A honeypot for the FTP service."""
def __init__(self, port):
super().__init__('FTP', port)
async def emulate_service(self, reader, writer):
"""Sends a fake FTP banner and then closes the connection."""
attacker_ip, attacker_port = writer.get_extra_info('peername')
banner = b"220 ProFTPD 1.3.5a Server ready.\r\n"
writer.write(banner)
await writer.drain()
log_event(self.service_name, attacker_ip, attacker_port, 'Banner sent')
class HTTPHoneypot(HoneypotService):
"""A honeypot for the HTTP service."""
def __init__(self, port):
super().__init__('HTTP', port)
async def emulate_service(self, reader, writer):
"""Sends a fake HTTP 404 Not Found response."""
attacker_ip, attacker_port = writer.get_extra_info('peername')
response_body = b"<!DOCTYPE html><html><head><title>404 Not Found</title></head><body><h1>Not Found</h1><p>The requested URL was not found on this server.</p></body></html>"
response_headers = (
b"HTTP/1.1 404 Not Found\r\n"
b"Content-Type: text/html\r\n"
b"Content-Length: " + str(len(response_body)).encode() + b"\r\n"
b"Connection: close\r\n\r\n"
)
writer.write(response_headers + response_body)
await writer.drain()
log_event(self.service_name, attacker_ip, attacker_port, '404 response sent')
# --- Main Application Logic ---
class DecoyNet:
"""The main application class for the honeypot."""
def __init__(self):
self.services = []
self._is_running = False
self._create_services()
def _create_services(self):
"""Initializes the honeypot services based on the configuration."""
service_map = {
'SSH': SSHHoneypot,
'FTP': FTPHoneypot,
'HTTP': HTTPHoneypot,
}
for name, port in ENABLED_SERVICES.items():
if name in service_map:
self.services.append(service_map[name](port))
async def start_honeypot(self):
"""Starts all configured honeypot services."""
if not self.services:
print("[!] No services configured. Exiting.")
return
print("[+] DecoyNet Honeypot Starting...")
self._is_running = True
tasks = [asyncio.create_task(service.start()) for service in self.services]
await asyncio.gather(*tasks)
def get_status(self):
"""Prints the current status of the honeypot."""
if self._is_running:
print("DecoyNet is running.")
else:
print("DecoyNet is not yet running.")
def main():
"""Parses command-line arguments and runs the application."""
setup_logging()
parser = argparse.ArgumentParser(description="DecoyNet: A simple Python honeypot.")
subparsers = parser.add_subparsers(dest='command', required=True)
# 'start' command
parser_start = subparsers.add_parser('start', help='Starts the honeypot server.')
parser_start.set_defaults(func=lambda args: asyncio.run(DecoyNet().start_honeypot()))
# 'status' command
parser_status = subparsers.add_parser('status', help='Checks if the honeypot is running.')
parser_status.set_defaults(func=lambda args: DecoyNet().get_status())
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()