-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathtest_client.py
More file actions
313 lines (263 loc) · 9.86 KB
/
test_client.py
File metadata and controls
313 lines (263 loc) · 9.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
#!/usr/bin/env python3
"""Test client for zs3 - uses only stdlib"""
import hashlib
import hmac
from datetime import datetime, timezone
import urllib.request
import urllib.parse
HOST = "localhost:9000"
ACCESS_KEY = "minioadmin"
SECRET_KEY = "minioadmin"
REGION = "us-east-1"
def sign_request(method, path, query="", headers=None, payload=b""):
"""AWS SigV4 signing"""
if headers is None:
headers = {}
t = datetime.now(timezone.utc)
amz_date = t.strftime("%Y%m%dT%H%M%SZ")
date_stamp = t.strftime("%Y%m%d")
payload_hash = hashlib.sha256(payload).hexdigest()
headers["x-amz-date"] = amz_date
headers["x-amz-content-sha256"] = payload_hash
headers["host"] = HOST
# Sort and format headers
signed_headers = ";".join(sorted(k.lower() for k in headers))
canonical_headers = "".join(f"{k.lower()}:{v}\n" for k, v in sorted(headers.items(), key=lambda x: x[0].lower()))
# Sort query string - normalize bare params (e.g. "delete") to "delete="
# to match server's sortQueryString behavior (required by SigV4)
if query:
pairs = [p if "=" in p else p + "=" for p in query.split("&")]
pairs.sort()
canonical_query = "&".join(pairs)
else:
canonical_query = ""
canonical_request = f"{method}\n{path}\n{canonical_query}\n{canonical_headers}\n{signed_headers}\n{payload_hash}"
credential_scope = f"{date_stamp}/{REGION}/s3/aws4_request"
string_to_sign = f"AWS4-HMAC-SHA256\n{amz_date}\n{credential_scope}\n{hashlib.sha256(canonical_request.encode()).hexdigest()}"
def sign(key, msg):
return hmac.new(key, msg.encode(), hashlib.sha256).digest()
k_date = sign(f"AWS4{SECRET_KEY}".encode(), date_stamp)
k_region = sign(k_date, REGION)
k_service = sign(k_region, "s3")
k_signing = sign(k_service, "aws4_request")
signature = hmac.new(k_signing, string_to_sign.encode(), hashlib.sha256).hexdigest()
headers["Authorization"] = f"AWS4-HMAC-SHA256 Credential={ACCESS_KEY}/{credential_scope}, SignedHeaders={signed_headers}, Signature={signature}"
return headers
def request(method, path, data=None, query=""):
payload = data if isinstance(data, bytes) else (data.encode() if data else b"")
headers = sign_request(method, path, query, {}, payload)
url = f"http://{HOST}{path}"
if query:
url += f"?{query}"
req = urllib.request.Request(url, data=payload if payload else None, headers=headers, method=method)
try:
with urllib.request.urlopen(req) as resp:
return resp.status, resp.read().decode()
except urllib.error.HTTPError as e:
return e.code, e.read().decode()
except urllib.error.URLError as e:
return 0, f"Connection failed: {e.reason}"
def test(name, expected_status, actual_status, body=""):
status = "PASS" if actual_status == expected_status else "FAIL"
print(f" [{status}] {name}: {actual_status} (expected {expected_status})")
if status == "FAIL" and body:
print(f" Response: {body[:100]}")
return status == "PASS"
def run_tests():
print("=" * 60)
print("zs3 Test Suite")
print("=" * 60)
passed = 0
failed = 0
# Test 1: List buckets (empty)
print("\n[Bucket Operations]")
status, body = request("GET", "/")
if test("List buckets (empty)", 200, status, body):
passed += 1
else:
failed += 1
# Test 2: Create bucket
status, body = request("PUT", "/testbucket")
if test("Create bucket", 200, status, body):
passed += 1
else:
failed += 1
# Test 3: Create bucket again (idempotent)
status, body = request("PUT", "/testbucket")
if test("Create bucket (idempotent)", 200, status, body):
passed += 1
else:
failed += 1
# Test 4: List buckets (should have one)
status, body = request("GET", "/")
if test("List buckets (has testbucket)", 200, status, body) and "testbucket" in body:
passed += 1
else:
failed += 1
# Test 5: Invalid bucket name
status, body = request("PUT", "/ab") # too short
if test("Invalid bucket name (too short)", 400, status, body):
passed += 1
else:
failed += 1
# Test: Head bucket (exists)
status, body = request("HEAD", "/testbucket")
if test("Head bucket (exists)", 200, status, body):
passed += 1
else:
failed += 1
# Test: Head bucket (not exists)
status, body = request("HEAD", "/nonexistentbucket")
if test("Head bucket (not exists)", 404, status, body):
passed += 1
else:
failed += 1
# Object operations
print("\n[Object Operations]")
# Test 6: Put object
status, body = request("PUT", "/testbucket/hello.txt", "Hello, World!")
if test("Put object", 200, status, body):
passed += 1
else:
failed += 1
# Test 7: Get object
status, body = request("GET", "/testbucket/hello.txt")
if test("Get object", 200, status, body) and body == "Hello, World!":
passed += 1
else:
failed += 1
print(f" Got: {body}")
# Test 8: Head object
status, body = request("HEAD", "/testbucket/hello.txt")
if test("Head object", 200, status, body):
passed += 1
else:
failed += 1
# Test 9: Get non-existent object
status, body = request("GET", "/testbucket/nonexistent.txt")
if test("Get non-existent object", 404, status, body):
passed += 1
else:
failed += 1
# Test 10: Put nested object
status, body = request("PUT", "/testbucket/folder/nested.txt", "Nested content")
if test("Put nested object", 200, status, body):
passed += 1
else:
failed += 1
# Test 11: Get nested object
status, body = request("GET", "/testbucket/folder/nested.txt")
if test("Get nested object", 200, status, body) and body == "Nested content":
passed += 1
else:
failed += 1
# Test 12: Put binary data
binary_data = bytes(range(256))
status, body = request("PUT", "/testbucket/binary.bin", binary_data)
if test("Put binary data", 200, status, body):
passed += 1
else:
failed += 1
# List operations
print("\n[List Operations]")
# Test 13: List objects
status, body = request("GET", "/testbucket", query="list-type=2")
if test("List objects", 200, status, body) and "hello.txt" in body:
passed += 1
else:
failed += 1
# Test 14: List with prefix
status, body = request("GET", "/testbucket", query="list-type=2&prefix=folder/")
if test("List with prefix", 200, status, body) and "nested.txt" in body:
passed += 1
else:
failed += 1
# Test 15: List with delimiter
status, body = request("GET", "/testbucket", query="list-type=2&delimiter=/")
if test("List with delimiter", 200, status, body) and "CommonPrefixes" in body:
passed += 1
else:
failed += 1
# Range requests
print("\n[Range Requests]")
# Test 16: Range request
headers = sign_request("GET", "/testbucket/hello.txt", "", {"Range": "bytes=0-4"}, b"")
req = urllib.request.Request(f"http://{HOST}/testbucket/hello.txt", headers=headers, method="GET")
try:
with urllib.request.urlopen(req) as resp:
status = resp.status
body = resp.read().decode()
except urllib.error.HTTPError as e:
status = e.code
body = e.read().decode()
if test("Range request (bytes=0-4)", 206, status, body) and body == "Hello":
passed += 1
else:
failed += 1
print(f" Got: {body}")
# Batch Operations
print("\n[Batch Operations]")
# Create files for batch delete
request("PUT", "/testbucket/batch1.txt", "batch1")
request("PUT", "/testbucket/batch2.txt", "batch2")
request("PUT", "/testbucket/batch3.txt", "batch3")
# Test: DeleteObjects batch
delete_xml = '<Delete><Object><Key>batch1.txt</Key></Object><Object><Key>batch2.txt</Key></Object><Object><Key>batch3.txt</Key></Object></Delete>'
status, body = request("POST", "/testbucket", delete_xml, query="delete")
if test("DeleteObjects batch", 200, status, body) and "batch1.txt" in body and "batch2.txt" in body:
passed += 1
else:
failed += 1
# Verify files are deleted
status, body = request("GET", "/testbucket/batch1.txt")
if test("Verify batch delete (file gone)", 404, status, body):
passed += 1
else:
failed += 1
# Cleanup
print("\n[Cleanup]")
# Delete objects
status, body = request("DELETE", "/testbucket/hello.txt")
if test("Delete object", 204, status, body):
passed += 1
else:
failed += 1
status, body = request("DELETE", "/testbucket/folder/nested.txt")
if test("Delete nested object", 204, status, body):
passed += 1
else:
failed += 1
status, body = request("DELETE", "/testbucket/binary.bin")
if test("Delete binary object", 204, status, body):
passed += 1
else:
failed += 1
# Delete bucket (may have leftover folder/ dir from nested object)
# First, list and delete any remaining objects
status, body = request("GET", "/testbucket", query="list-type=2")
if "<Key>" in body:
import re
keys = re.findall(r"<Key>([^<]+)</Key>", body)
for key in keys:
request("DELETE", f"/testbucket/{key}")
print(f" [INFO] Cleaned up leftover: {key}")
# Delete bucket
status, body = request("DELETE", "/testbucket")
if test("Delete bucket", 204, status, body):
passed += 1
else:
failed += 1
# Summary
print("\n" + "=" * 60)
total = passed + failed
print(f"Results: {passed}/{total} tests passed")
if failed == 0:
print("All tests passed!")
else:
print(f"{failed} tests failed")
print("=" * 60)
return failed == 0
if __name__ == "__main__":
import sys
success = run_tests()
sys.exit(0 if success else 1)