-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcoding.py
More file actions
96 lines (88 loc) · 3.14 KB
/
coding.py
File metadata and controls
96 lines (88 loc) · 3.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def bitstream(data):
for byte in data:
bits = format(byte, '08b')
for b in bits:
yield b
def get_bytes_to_write(data):
bytes_to_write = []
acc_bits = ''
for bit in data:
if len(acc_bits) == 8:
bytes_to_write.append(int(acc_bits, 2))
acc_bits = ''
acc_bits += bit
for i in range(8 - len(acc_bits)):
acc_bits += '0'
bytes_to_write.append(int(acc_bits, 2))
return bytes(bytes_to_write)
def write_to_file(file_name, data):
file = open(file_name, 'wb')
bytes_to_write = get_bytes_to_write(data)
file.write(bytes_to_write)
file.close()
class Encoder:
def __init__(self, n_bits=8):
self.low = 0
self.n_bits = n_bits
self.high = (1 << self.n_bits)-1
self.compressed_data = ''
def encode(self, bit, prob):
"""
:param bit:
:param prob: of the bit being 0
:return:
"""
r = self.high - self.low
point = self.low + int(prob * r)
if bit == '1':
self.low = point + 1
else:
self.high = point
while (self.high >> (self.n_bits-1)) == (self.low >> (self.n_bits-1)):
minus = 0
if self.high >> (self.n_bits-1) == 1:
minus = 1 << self.n_bits
self.compressed_data += str(self.high >> (self.n_bits-1))
self.high = (self.high << 1) - minus + 1
self.low = (self.low << 1) - minus
assert self.high > self.low, f"{self.high}, {self.low}"
class Decoder:
def __init__(self, bit_stream, n_bits=8):
self.n_bits = n_bits
self.low = 0
self.high = (1 << self.n_bits)-1
self.num = None
self.uncompressed_data = ''
self.bit_stream = bit_stream
def decode(self, prob):
if self.num is None:
binary_num = ''
for i in range(self.n_bits):
binary_num += next(self.bit_stream)
self.num = int(binary_num, 2)
r = self.high - self.low
point = self.low + int(prob * r)
if self.num > point:
self.uncompressed_data += '1'
self.low = point + 1
else:
self.uncompressed_data += '0'
self.high = point
while (self.high >> (self.n_bits-1)) == (self.low >> (self.n_bits-1)):
minus = 0
if self.high >> (self.n_bits-1) == 1:
minus = 1 << self.n_bits
self.high = (self.high << 1) - minus + 1
self.low = (self.low << 1) - minus
if (self.num >> (self.n_bits-1)) == 1:
minus = 1 << self.n_bits
else:
minus = 0
next_bit = next(self.bit_stream, None)
if next_bit is None:
# self.num = (self.num << 1) - minus
# print(format(self.low, f'0{self.n_bits}b'), format(self.high, f'0{self.n_bits}b'), format(self.num, f'0{self.n_bits}b'))
# self.uncompressed_data += format(self.num, f'0{self.n_bits}b')
return None
self.num = (self.num << 1) - minus + int(next_bit)
return self.uncompressed_data[-1]