-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun.py
More file actions
executable file
·128 lines (104 loc) · 4.42 KB
/
run.py
File metadata and controls
executable file
·128 lines (104 loc) · 4.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/python3
import sqlite3
import paramiko
import sys
import os
def get_db_command(platform, action):
# Detect system language
env_lang = os.environ.get('LANG', 'en').split('_')[0].split('.')[0]
conn = sqlite3.connect('shell_recipes.db')
cursor = conn.cursor()
# Improved Query:
# 1. Uses COALESCE to handle any remaining NULL languages.
# 2. Uses LIKE and explicit checks to match 'fedora / rhel'.
query = '''
SELECT syntax FROM recipes
WHERE command = ?
AND (platform = ? OR platform LIKE ? OR platform IN ('linux', 'common', 'fedora'))
AND COALESCE(NULLIF(language, ''), 'en') IN (?, 'en')
ORDER BY
CASE WHEN COALESCE(NULLIF(language, ''), 'en') = ? THEN 1 ELSE 2 END,
CASE
WHEN platform = ? THEN 1
WHEN platform LIKE ? THEN 2
WHEN platform = 'linux' THEN 3
ELSE 4
END
LIMIT 1
'''
# We create a pattern for LIKE search (e.g., %rhel%)
platform_pat = f"%{platform}%"
cursor.execute(query, (action, platform, platform_pat, env_lang, env_lang, platform, platform_pat))
result = cursor.fetchone()
conn.close()
return result[0] if result else None
def check_remote_command(ssh, cmd_part):
parts = cmd_part.split()
if not parts: return False
idx = 1 if parts[0] == 'sudo' and len(parts) > 1 else 0
executable = parts[idx]
_, stdout, _ = ssh.exec_command(f"command -v {executable}")
return stdout.channel.recv_exit_status() == 0
def execute_remote(host_alias, action, args=""):
ssh_config = paramiko.SSHConfig()
config_path = os.path.expanduser("~/.ssh/config")
if os.path.exists(config_path):
with open(config_path) as f:
ssh_config.parse(f)
host_info = ssh_config.lookup(host_alias)
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
print(f"[*] Connecting: {host_alias}...")
ssh.connect(
hostname=host_info.get('hostname', host_alias),
username=host_info.get('user'),
port=int(host_info.get('port', 22)),
key_filename=host_info.get('identityfile', [None])[0]
)
_, stdout, _ = ssh.exec_command("uname -m")
arch = stdout.read().decode().strip()
_, stdout, _ = ssh.exec_command("cat /etc/os-release")
os_release = stdout.read().decode().lower()
# PLATFORM DETECTION FIX:
# We normalize to 'fedora / rhel' to match the database entries from BSD.md
if 'rhel' in os_release or 'red hat' in os_release or 'fedora' in os_release:
platform = 'fedora / rhel'
elif 'ubuntu' in os_release or 'debian' in os_release:
platform = 'ubuntu'
else:
platform = 'linux'
print(f"[*] Target Platform: {platform.upper()} ({arch})")
command_block = get_db_command(platform, action)
if not command_block:
print(f"[-] Error: Command '{action}' was not found.")
return
forbidden = ['bash', '```bash', '```', 'sh']
script_lines = command_block.replace('"$@"', args).split('\n')
for line in script_lines:
line = line.strip()
if not line or line in forbidden: continue
parts = [p.strip() for p in line.split('&&')]
valid_parts = []
for p in parts:
if check_remote_command(ssh, p):
valid_parts.append(p)
else:
cmd_name = p.split()[1] if p.startswith('sudo') else p.split()[0]
print(f"[!] Skipping {cmd_name} (not installed)")
if valid_parts:
final_cmd = " && ".join(valid_parts)
print(f"\n[>>>] RUNNING: {final_cmd}")
stdin, stdout, stderr = ssh.exec_command(final_cmd, get_pty=True)
while True:
line_out = stdout.readline()
if not line_out: break
print(line_out, end="")
stdout.channel.recv_exit_status()
except Exception as e:
print(f"[-] SSH Error: {e}")
finally:
ssh.close()
if __name__ == "__main__":
if len(sys.argv) >= 3:
execute_remote(sys.argv[1], sys.argv[2], " ".join(sys.argv[3:]))