-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_quant.py
More file actions
131 lines (114 loc) · 4.79 KB
/
test_quant.py
File metadata and controls
131 lines (114 loc) · 4.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import struct, json, os, array, random, subprocess
random.seed(42)
tbm_path = 'D:/Dev/tensorbit_labs/tensorbit-quant/test_input.tbm'
quant_path = 'D:/Dev/tensorbit_labs/tensorbit-quant/test_output.tbm'
TB_HEADER_SIZE = 4096
TB_MAGIC = 0x31304254
TB_VERSION = 1
def make_tb_header(num_weights, nm_n, nm_m, num_mask_bytes):
weights_offset = TB_HEADER_SIZE
masks_offset = TB_HEADER_SIZE + num_weights * 4
hdr = struct.pack('<IIIIQQQ', TB_MAGIC, TB_VERSION, nm_n, nm_m,
num_weights, num_mask_bytes, weights_offset)
hdr += struct.pack('<Q', masks_offset)
hdr += b'\x00'
hdr += b'\x00' * (TB_HEADER_SIZE - len(hdr))
return hdr
tensors = [
('model.embed_tokens.weight', [32, 64], 2, 4),
('model.layers.0.self_attn.q_proj.weight', [64, 64], 2, 4),
('model.layers.0.self_attn.k_proj.weight', [64, 32], 2, 4),
('model.layers.0.mlp.gate_proj.weight', [128, 64], 2, 4),
('model.layers.0.mlp.down_proj.weight', [64, 128], 2, 4),
('model.layers.0.input_layernorm.weight', [64], 1, 4),
('model.layers.1.self_attn.q_proj.weight', [64, 64], 2, 4),
('lm_head.weight', [64, 32], 2, 4),
]
print('=' * 60)
print('Tensorbit Quant — End-to-End Test')
print('=' * 60)
# Build test .tbm
with open(tbm_path, 'wb') as f:
entries = []
for name, shape, nm_n, nm_m in tensors:
offset = f.tell()
if len(shape) == 1: nw = shape[0]
else: nw = shape[0] * shape[1]
nmb = nw // nm_m
hdr = make_tb_header(nw, nm_n, nm_m, nmb)
f.write(hdr)
wts = array.array('f', [random.uniform(-1.0, 1.0) for _ in range(nw)])
f.write(wts.tobytes())
f.write(b'\x05' * nmb)
entries.append({'name': name, 'offset': offset, 'shape': shape,
'nm_n': nm_n, 'nm_m': nm_m, 'dtype': 'fp32',
'num_weights': nw, 'num_mask_bytes': nmb})
index = {'architecture': 'test_model',
'config': {'num_layers': 2, 'hidden_size': 64, 'num_heads': 8},
'tensors': entries}
json_bytes = json.dumps(index, separators=(',', ':')).encode('utf-8')
f.write(json_bytes)
f.write(struct.pack('<I', len(json_bytes)))
print(f'Input: {tbm_path} ({os.path.getsize(tbm_path)} bytes, {len(entries)} tensors)')
# Run tb-quant
result = subprocess.run(
['D:/Dev/tensorbit_labs/tensorbit-quant/tb-quant.exe',
'--model', tbm_path, '--output', quant_path,
'--dtype', 'int4', '--scheme', 'symmetric', '--group-size', '64'],
capture_output=True, text=True)
print(result.stdout)
if result.returncode != 0:
print('STDERR:', result.stderr)
raise SystemExit('Quantization FAILED')
# Verify output
out_size = os.path.getsize(quant_path)
print(f'Output: {quant_path} ({out_size} bytes)')
with open(quant_path, 'rb') as f:
f.seek(-4, os.SEEK_END)
idx_len = struct.unpack('<I', f.read(4))[0]
f.seek(out_size - 4 - idx_len)
out_index = json.loads(f.read(idx_len))
assert len(out_index['tensors']) == len(entries), f"tensor count mismatch: {len(out_index['tensors'])} vs {len(entries)}"
print(f' Tensor count: {len(out_index["tensors"])} OK')
# Verify dtypes and dequantization
all_passed = 0
for t_in, t_out in zip(entries, out_index['tensors']):
name = t_out['name']
dtype = t_out.get('dtype', '?')
assert dtype == 'int4', f'{name}: expected int4, got {dtype}'
assert t_out.get('scale_count', 0) > 0, f'{name}: missing scale_count'
# Dequantize and compare
with open(tbm_path, 'rb') as f:
f.seek(t_in['offset'] + TB_HEADER_SIZE)
orig = array.array('f')
orig.frombytes(f.read(t_in['num_weights'] * 4))
with open(quant_path, 'rb') as f:
f.seek(t_out['offset'] + TB_HEADER_SIZE)
nw = t_out['num_weights']
qsize = (nw + 1) // 2
qbytes = f.read(qsize)
scales = array.array('f')
scales.frombytes(f.read(t_out['scale_count'] * 4))
gs = t_out.get('group_size', 64)
deq = array.array('f', [0.0] * nw)
for i in range(nw):
g = i // gs
if g >= len(scales): break
scl = scales[g]
pi = i // 2
b = qbytes[pi]
nib = (b >> 4) if (i % 2) else (b & 0xF)
deq[i] = (nib - 16) * scl if (nib & 0x8) else nib * scl
max_abs_orig = max(abs(v) for v in orig) or 1.0
max_abs_err = max(abs(deq[i] - orig[i]) for i in range(nw))
rel_err = max_abs_err / max_abs_orig
max_err_bound = max_abs_orig / 7.0 * 0.5 # half a quantization bin
assert rel_err < max_err_bound * 1.5 / max_abs_orig + 0.1, \
f'{name}: dequant error {rel_err:.3%} exceeds bound'
all_passed += 1
print(f' [{all_passed}/{len(entries)}] {name}: dtype={dtype}, '
f'scales={t_out["scale_count"]}, max_dequant_err={rel_err:.3%}')
print(f'\nAll {all_passed}/{len(entries)} tensors passed.')
print('=' * 60)
print('TEST PASSED')
print('=' * 60)