-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsecure_sub.py
More file actions
141 lines (114 loc) · 4.31 KB
/
secure_sub.py
File metadata and controls
141 lines (114 loc) · 4.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python3
import dns.resolver
import base64
import zlib
import time
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
# Server settings
SERVER_IP = '0.0.0.0'
SERVER_PORT = 25500
SECRET_PATH = "/sub/replace_with_your_uuid" # Replace with your own path
# Target domain for ECH fetching
# Replace test.example.com to your own (sub)domain
REAL_DOMAIN = "test.example.com"
# Exclave export string containing the 'a' placeholder
# Replace the string below with your own exported URI
RAW_BACKUP_URI = "exclave://vmess?YOUR_EXPORT_DATA_HERE"
# Pattern to locate the placeholder (40 'a's)
SEARCH_PATTERN = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
# Global buffer for the subscription content
CURRENT_SUB_CONTENT = b""
def get_ech_config(domain):
"""Fetch ECH config from Cloudflare DNS."""
print(f"[DNS] Querying {domain}...")
try:
resolver = dns.resolver.Resolver()
resolver.nameservers = ['1.1.1.1', '1.0.0.1']
answers = resolver.resolve(domain, 'HTTPS')
for rdata in answers:
if 5 in rdata.params:
ech_obj = rdata.params[5]
ech_bytes = None
if hasattr(ech_obj, 'ech'):
ech_bytes = ech_obj.ech
elif hasattr(ech_obj, 'data'):
ech_bytes = ech_obj.data
else:
try: ech_bytes = bytes(ech_obj)
except: pass
if ech_bytes:
return base64.b64encode(ech_bytes).decode('utf-8')
except Exception as e:
print(f"[DNS] Error: {e}")
return None
def patch_protobuf(data, new_val_str):
"""Replace placeholder in protobuf binary with fixed length."""
new_bytes = new_val_str.encode('utf-8')
start_idx = data.find(SEARCH_PATTERN)
if start_idx == -1:
print("[Error] Placeholder not found.")
return None
# Locate placeholder boundaries
end_idx = start_idx
while end_idx < len(data) and data[end_idx] == 97: # 97 is 'a'
end_idx += 1
real_start = start_idx
while real_start > 0 and data[real_start - 1] == 97:
real_start -= 1
capacity = end_idx - real_start
print(f"[Debug] Placeholder capacity: {capacity} bytes")
if len(new_bytes) > capacity:
print(f"[Error] New value exceeds capacity.")
return None
# Pad with spaces
padding = b' ' * (capacity - len(new_bytes))
return data[:real_start] + new_bytes + padding + data[end_idx:]
def generate_sub():
"""Main workflow: Fetch -> Patch -> Pack."""
global CURRENT_SUB_CONTENT
ech = get_ech_config(REAL_DOMAIN)
if not ech:
return
try:
# Unpack
b64_str = RAW_BACKUP_URI.replace("exclave://vmess?", "").strip()
padding = len(b64_str) % 4
if padding:
b64_str += "=" * (4 - padding)
raw_proto = zlib.decompress(base64.urlsafe_b64decode(b64_str))
# Patch
patched_proto = patch_protobuf(raw_proto, ech)
if not patched_proto:
return
# Repack
compressed = zlib.compress(patched_proto)
new_b64 = base64.urlsafe_b64encode(compressed).decode('utf-8').rstrip("=")
final_uri = f"exclave://vmess?{new_b64}"
# Encode for subscription
CURRENT_SUB_CONTENT = base64.b64encode(final_uri.encode('utf-8'))
print(f"[Success] Updated ECH: {ech[:8]}...")
except Exception as e:
print(f"[Error] Update failed: {e}")
def loop():
while True:
generate_sub()
time.sleep(3000) # 50 minutes
class Handler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == SECRET_PATH:
self.send_response(200)
self.send_header('Content-type', 'text/plain; charset=utf-8')
self.send_header('Cache-Control', 'no-store')
self.end_headers()
self.wfile.write(CURRENT_SUB_CONTENT)
else:
self.send_response(404)
self.end_headers()
if __name__ == "__main__":
generate_sub()
t = threading.Thread(target=loop, daemon=True)
t.start()
server = HTTPServer((SERVER_IP, SERVER_PORT), Handler)
print(f"Serving at http://{SERVER_IP}:{SERVER_PORT}{SECRET_PATH}")
server.serve_forever()