Skip to content

Commit 012dd97

Browse files
feat: execute shell scripts created via redirect (cowrie#2908)
1 parent 6e2f9bf commit 012dd97

File tree

3 files changed

+224
-5
lines changed

3 files changed

+224
-5
lines changed

src/cowrie/commands/bash.py

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,26 @@
1-
# Copyright (c) 2009 Upi Tamminen <desaster@gmail.com>
2-
# See the COPYRIGHT file for more information
3-
4-
# coding=utf-8
1+
# ABOUTME: Implements bash/sh shell command for cowrie honeypot.
2+
# ABOUTME: Handles -c flags, piped input, script file execution, and interactive shells.
53

64
from __future__ import annotations
75

6+
import re
87
from typing import TYPE_CHECKING
98

109
from twisted.internet import error
1110
from twisted.python import failure
1211

1312
from cowrie.shell.command import HoneyPotCommand
13+
from cowrie.shell.fs import FileNotFound
1414
from cowrie.shell.honeypot import HoneyPotShell
1515

1616
if TYPE_CHECKING:
1717
from collections.abc import Callable
1818

1919
commands: dict[str, Callable] = {}
2020

21+
MAX_SCRIPT_DEPTH = 5
22+
_SHEBANG_RE = re.compile(r"^#!\s*/bin/(ba)?sh")
23+
2124

2225
class Command_sh(HoneyPotCommand):
2326
def start(self) -> None:
@@ -37,10 +40,41 @@ def start(self) -> None:
3740
self.execute_commands(self.input_data.decode("utf8"))
3841
self.exit()
3942

43+
elif self.args and not self.args[0].startswith("-"):
44+
self.execute_script_file(self.args[0])
45+
self.exit()
46+
4047
else:
4148
self.interactive_shell()
4249

43-
# TODO: handle spawning multiple shells, support other sh flags
50+
def execute_script_file(self, filename: str) -> None:
51+
depth = getattr(self.protocol, "_script_depth", 0)
52+
if depth >= MAX_SCRIPT_DEPTH:
53+
self.errorWrite(f"-bash: {filename}: too many levels of recursion\n")
54+
return
55+
56+
path = self.fs.resolve_path(filename, self.protocol.cwd)
57+
try:
58+
contents = self.fs.file_contents(path)
59+
except (FileNotFound, FileNotFoundError):
60+
self.errorWrite(f"bash: {filename}: No such file or directory\n")
61+
return
62+
63+
lines = contents.decode("utf-8", errors="replace").splitlines()
64+
# Strip shebang line
65+
if lines and _SHEBANG_RE.match(lines[0]):
66+
lines = lines[1:]
67+
# Strip comment-only lines and blank lines
68+
lines = [line for line in lines if line.strip() and not line.strip().startswith("#")]
69+
70+
if not lines:
71+
return
72+
73+
self.protocol._script_depth = depth + 1
74+
try:
75+
self.execute_commands("; ".join(lines))
76+
finally:
77+
self.protocol._script_depth = depth
4478

4579
def execute_commands(self, cmds: str) -> None:
4680
# self.input_data holds commands passed via PIPE

src/cowrie/shell/protocol.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,69 @@ def call(self):
143143

144144
return Command_txtcmd
145145

146+
def scriptcmd(self, path: str) -> object:
147+
"""Return a command class that executes a shell script from the virtual filesystem."""
148+
import re
149+
150+
shebang_re = re.compile(r"^#!\s*/bin/(ba|a)?sh")
151+
max_depth = 5
152+
153+
class Command_scriptcmd(command.HoneyPotCommand):
154+
def call(self_cmd):
155+
depth = getattr(self_cmd.protocol, "_script_depth", 0)
156+
if depth >= max_depth:
157+
self_cmd.errorWrite(
158+
f"-bash: {path}: too many levels of recursion\n"
159+
)
160+
return
161+
162+
try:
163+
contents = self_cmd.fs.file_contents(path)
164+
except Exception:
165+
self_cmd.errorWrite(
166+
f"-bash: {path}: No such file or directory\n"
167+
)
168+
return
169+
170+
# Null bytes indicate actual binary — reject like real bash
171+
if b"\x00" in contents:
172+
self_cmd.errorWrite(
173+
f"-bash: {path}: cannot execute binary file: Exec format error\n"
174+
)
175+
return
176+
177+
lines = contents.decode("utf-8", errors="replace").splitlines()
178+
179+
if not lines:
180+
return
181+
182+
# Strip shebang line if it's a shell shebang
183+
if shebang_re.match(lines[0]):
184+
lines = lines[1:]
185+
186+
# Strip comment-only and blank lines
187+
lines = [
188+
line
189+
for line in lines
190+
if line.strip() and not line.strip().startswith("#")
191+
]
192+
193+
if not lines:
194+
return
195+
196+
self_cmd.protocol._script_depth = depth + 1
197+
try:
198+
shell = honeypot.HoneyPotShell(
199+
self_cmd.protocol, interactive=False
200+
)
201+
self_cmd.protocol.cmdstack.append(shell)
202+
shell.lineReceived("; ".join(lines))
203+
self_cmd.protocol.cmdstack.pop()
204+
finally:
205+
self_cmd.protocol._script_depth = depth
206+
207+
return Command_scriptcmd
208+
146209
def isCommand(self, cmd):
147210
"""
148211
Check if cmd (the argument of a command) is a command, too.
@@ -187,6 +250,9 @@ def getCommand(self, cmd, paths):
187250
if path in self.commands:
188251
return self.commands[path]
189252

253+
if self.fs.isfile(path):
254+
return self.scriptcmd(path)
255+
190256
log.msg(f"Can't find command {cmd}")
191257
return None
192258

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
# ABOUTME: Tests for shell script execution via bash/sh and path-based invocation.
2+
# ABOUTME: Covers shebang stripping, binary detection, comment stripping, and recursion depth limits.
3+
4+
from __future__ import annotations
5+
6+
import os
7+
import unittest
8+
9+
from cowrie.shell.protocol import HoneyPotInteractiveProtocol
10+
from cowrie.test.fake_server import FakeAvatar, FakeServer
11+
from cowrie.test.fake_transport import FakeTransport
12+
13+
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data"
14+
os.environ["COWRIE_HONEYPOT_DOWNLOAD_PATH"] = "/tmp"
15+
os.environ["COWRIE_SHELL_FILESYSTEM"] = "src/cowrie/data/fs.pickle"
16+
17+
PROMPT = b"root@unitTest:~# "
18+
19+
20+
class ScriptExecutionTests(unittest.TestCase):
21+
"""Tests for executing shell scripts via bash/sh and ./path."""
22+
23+
def setUp(self) -> None:
24+
self.proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer()))
25+
self.tr = FakeTransport("", "31337")
26+
self.proto.makeConnection(self.tr)
27+
self.tr.clear()
28+
29+
def tearDown(self) -> None:
30+
self.proto.connectionLost()
31+
32+
def test_bash_executes_script_file(self) -> None:
33+
"""bash script.sh reads and executes file contents."""
34+
self.proto.lineReceived(b'echo "echo hello" > /tmp/test.sh')
35+
self.tr.clear()
36+
self.proto.lineReceived(b"bash /tmp/test.sh")
37+
self.assertEqual(self.tr.value(), b"hello\n" + PROMPT)
38+
39+
def test_sh_executes_script_file(self) -> None:
40+
"""sh script.sh reads and executes file contents."""
41+
self.proto.lineReceived(b'echo "echo world" > /tmp/test_sh.sh')
42+
self.tr.clear()
43+
self.proto.lineReceived(b"sh /tmp/test_sh.sh")
44+
self.assertEqual(self.tr.value(), b"world\n" + PROMPT)
45+
46+
def test_bash_nonexistent_file(self) -> None:
47+
"""bash nonexistent.sh shows error."""
48+
self.proto.lineReceived(b"bash /tmp/nonexistent.sh")
49+
output = self.tr.value()
50+
self.assertIn(b"No such file or directory", output)
51+
52+
def test_dotslash_with_shebang_executes(self) -> None:
53+
"""./script.sh with #!/bin/sh shebang executes."""
54+
self.proto.lineReceived(b'printf "#!/bin/sh\\necho from_script\\n" > run.sh')
55+
self.tr.clear()
56+
self.proto.lineReceived(b"./run.sh")
57+
self.assertEqual(self.tr.value(), b"from_script\n" + PROMPT)
58+
59+
def test_dotslash_without_shebang_executes(self) -> None:
60+
"""./file without shebang executes as shell script (kernel ENOEXEC fallback)."""
61+
self.proto.lineReceived(b'echo "echo no_shebang" > noshebang.sh')
62+
self.tr.clear()
63+
self.proto.lineReceived(b"./noshebang.sh")
64+
self.assertEqual(self.tr.value(), b"no_shebang\n" + PROMPT)
65+
66+
def test_dotslash_binary_file_fails(self) -> None:
67+
"""./file with null bytes emits 'cannot execute binary file'."""
68+
# Use printf to write a null byte into the file
69+
self.proto.lineReceived(b'printf "\\x00ELF" > binfile')
70+
self.tr.clear()
71+
self.proto.lineReceived(b"./binfile")
72+
output = self.tr.value()
73+
self.assertIn(b"cannot execute binary file", output)
74+
75+
def test_shebang_line_stripped(self) -> None:
76+
"""Shebang line is not echoed or executed as a command."""
77+
self.proto.lineReceived(
78+
b'printf "#!/bin/bash\\necho shebang_stripped\\n" > /tmp/shebang.sh'
79+
)
80+
self.tr.clear()
81+
self.proto.lineReceived(b"bash /tmp/shebang.sh")
82+
self.assertEqual(self.tr.value(), b"shebang_stripped\n" + PROMPT)
83+
84+
def test_comment_lines_stripped(self) -> None:
85+
"""Comment-only lines are stripped from script execution."""
86+
self.proto.lineReceived(
87+
b'printf "#!/bin/sh\\n# this is a comment\\necho works\\n" > /tmp/comments.sh'
88+
)
89+
self.tr.clear()
90+
self.proto.lineReceived(b"bash /tmp/comments.sh")
91+
self.assertEqual(self.tr.value(), b"works\n" + PROMPT)
92+
93+
def test_multiline_script(self) -> None:
94+
"""Script with multiple commands executes all of them."""
95+
self.proto.lineReceived(
96+
b'printf "echo line1\\necho line2\\necho line3\\n" > /tmp/multi.sh'
97+
)
98+
self.tr.clear()
99+
self.proto.lineReceived(b"bash /tmp/multi.sh")
100+
self.assertEqual(self.tr.value(), b"line1\nline2\nline3\n" + PROMPT)
101+
102+
def test_absolute_path_with_shebang(self) -> None:
103+
"""Absolute path /tmp/script.sh with shebang executes."""
104+
self.proto.lineReceived(
105+
b'printf "#!/bin/sh\\necho absolute\\n" > /tmp/abs.sh'
106+
)
107+
self.tr.clear()
108+
self.proto.lineReceived(b"/tmp/abs.sh")
109+
self.assertEqual(self.tr.value(), b"absolute\n" + PROMPT)
110+
111+
def test_bash_dash_c_still_works(self) -> None:
112+
"""Existing sh -c 'cmd' functionality still works."""
113+
self.proto.lineReceived(b"sh -c 'echo still_works'")
114+
self.assertEqual(self.tr.value(), b"still_works\n" + PROMPT)
115+
116+
def test_bash_piped_input_still_works(self) -> None:
117+
"""Existing piped input functionality still works."""
118+
self.proto.lineReceived(b"echo echo piped | bash")
119+
self.assertEqual(self.tr.value(), b"piped\n" + PROMPT)

0 commit comments

Comments
 (0)