-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpiece.py
More file actions
283 lines (228 loc) · 11.4 KB
/
piece.py
File metadata and controls
283 lines (228 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
import math
import hashlib
import time
import logging
from block import Block, BLOCK_SIZE, State
class Piece:
def __init__(self, piece_index: int, piece_size: int, piece_hash: bytes):
self.piece_index: int = piece_index
self.piece_size: int = piece_size
self.piece_hash: bytes = piece_hash
self.is_full: bool = False
self.files = []
self.raw_data: bytes = b''
self.number_of_blocks: int = int(math.ceil(float(piece_size) / BLOCK_SIZE))
self.blocks: list[Block] = []
self.creation_time = time.time()
self.completion_time = None
self.hash_verification_count = 0
# Validate inputs
if piece_size <= 0:
raise ValueError(f"Invalid piece size: {piece_size}")
if len(piece_hash) != 20:
raise ValueError(f"Invalid piece hash length: {len(piece_hash)}")
self._init_blocks()
logging.debug(f"Created piece {piece_index} with {self.number_of_blocks} blocks, size: {piece_size}")
def update_block_status(self):
"""If block is pending for too long, set it free"""
current_time = time.time()
reset_count = 0
for i, block in enumerate(self.blocks):
if (block.state == State.PENDING and
(current_time - block.last_seen) > 10): # Increased timeout to 10 seconds
# Only reset if we're not complete and block hasn't been updated
if not self.is_full:
# Create new block but preserve any existing data
if block.data:
logging.debug(f"Resetting pending block {i} in piece {self.piece_index} with data")
self.blocks[i] = Block(block_size=block.block_size)
reset_count += 1
if reset_count > 0:
logging.debug(f"Reset {reset_count} pending blocks in piece {self.piece_index}")
def set_block(self, offset: int, data: bytes):
"""Set block data at given offset with validation"""
if not data:
logging.warning(f"Attempted to set empty block data in piece {self.piece_index}")
return
if self.is_full:
logging.debug(f"Ignoring block data for completed piece {self.piece_index}")
return
block_index = int(offset / BLOCK_SIZE)
# Validate block index
if block_index < 0 or block_index >= len(self.blocks):
logging.error(f"Invalid block index {block_index} for piece {self.piece_index} with {len(self.blocks)} blocks")
return
block = self.blocks[block_index]
# Validate offset matches block boundary
expected_offset = block_index * BLOCK_SIZE
if offset != expected_offset:
logging.warning(f"Block offset mismatch: expected {expected_offset}, got {offset}")
# Only set if block is not already full
if block.state != State.FULL:
# Validate data size matches block size (allow for last block being smaller)
expected_size = block.block_size
if len(data) != expected_size:
logging.warning(f"Block size mismatch in piece {self.piece_index}: expected {expected_size}, got {len(data)}")
# Truncate or pad data to expected size
if len(data) > expected_size:
data = data[:expected_size]
else:
data = data.ljust(expected_size, b'\x00')
block.data = data
block.state = State.FULL
block.last_seen = time.time()
logging.debug(f"Set block {block_index} in piece {self.piece_index}, size: {len(data)}")
else:
logging.debug(f"Block {block_index} in piece {self.piece_index} already full")
def get_block(self, block_offset: int, block_length: int) -> bytes:
"""Get block data from piece with bounds checking"""
if not self.is_full:
return b''
if block_offset < 0 or block_length <= 0:
return b''
# Ensure we don't read beyond piece boundaries
end_offset = block_offset + block_length
if end_offset > len(self.raw_data):
block_length = len(self.raw_data) - block_offset
if block_length <= 0:
return b''
return self.raw_data[block_offset:block_offset + block_length]
def get_empty_block(self):
"""Get next empty block to download with prioritization"""
if self.is_full:
return None
# First, try to find any free block
for block_index, block in enumerate(self.blocks):
if block.state == State.FREE:
return self._prepare_block_request(block_index, block)
# If no free blocks, check for stale pending blocks
current_time = time.time()
for block_index, block in enumerate(self.blocks):
if (block.state == State.PENDING and
(current_time - block.last_seen) > 15): # Longer timeout for re-request
logging.debug(f"Re-requesting stale block {block_index} in piece {self.piece_index}")
return self._prepare_block_request(block_index, block)
return None
def _prepare_block_request(self, block_index: int, block: Block):
"""Prepare a block request and mark it as pending"""
block_offset = block_index * BLOCK_SIZE
block.state = State.PENDING
block.last_seen = time.time()
logging.debug(f"Requesting block {block_index} in piece {self.piece_index}, size: {block.block_size}")
return self.piece_index, block_offset, block.block_size
def are_all_blocks_full(self) -> bool:
"""Check if all blocks in this piece are full"""
# Quick check - if we're already marked full, return True
if self.is_full:
return True
# Detailed check of all blocks
for block in self.blocks:
if block.state != State.FULL:
return False
return True
def get_completion_percentage(self) -> float:
"""Get completion percentage of this piece"""
if self.is_full:
return 100.0
full_blocks = sum(1 for block in self.blocks if block.state == State.FULL)
return (full_blocks / len(self.blocks)) * 100.0
def set_to_full(self) -> bool:
"""Merge blocks and verify piece hash with comprehensive validation"""
if not self.are_all_blocks_full():
logging.debug(f"Piece {self.piece_index} not ready for completion: blocks not all full")
return False
# Merge all blocks into single piece data
data = self._merge_blocks()
# Validate piece size
if len(data) != self.piece_size:
logging.error(f"Piece {self.piece_index} size mismatch: expected {self.piece_size}, got {len(data)}")
self._handle_corruption()
return False
# Verify piece hash
if not self._valid_blocks(data):
self.hash_verification_count += 1
# Allow one retry in case of temporary corruption
if self.hash_verification_count <= 1:
logging.warning(f"Piece {self.piece_index} hash mismatch, attempt {self.hash_verification_count}")
self._init_blocks() # Reset blocks for re-download
return False
else:
logging.error(f"Piece {self.piece_index} failed hash verification {self.hash_verification_count} times")
self._handle_corruption()
return False
# Mark as complete
self.is_full = True
self.raw_data = data
self.completion_time = time.time()
download_time = self.completion_time - self.creation_time
logging.info(f"✅ Piece {self.piece_index} verified and completed in {download_time:.2f}s "
f"({self.piece_size} bytes, {len(self.blocks)} blocks)")
return True
def _handle_corruption(self):
"""Handle corrupted piece data"""
logging.error(f"Piece {self.piece_index} is corrupted, resetting all blocks")
self._init_blocks()
self.raw_data = b''
self.is_full = False
def _init_blocks(self):
"""Initialize blocks for this piece with proper sizing"""
self.blocks = []
if self.number_of_blocks > 1:
for i in range(self.number_of_blocks):
block_size = BLOCK_SIZE
# Last block might be smaller
if i == self.number_of_blocks - 1:
remaining_size = self.piece_size - (i * BLOCK_SIZE)
block_size = remaining_size if remaining_size > 0 else BLOCK_SIZE
self.blocks.append(Block(block_size=block_size))
else:
# Single block piece
self.blocks.append(Block(block_size=self.piece_size))
def _merge_blocks(self) -> bytes:
"""Merge all blocks into piece data efficiently"""
if not self.blocks:
return b''
# Pre-allocate buffer for efficiency with large pieces
total_size = sum(len(block.data) for block in self.blocks)
buffer = bytearray(total_size)
offset = 0
for block in self.blocks:
block_data = block.data
buffer[offset:offset + len(block_data)] = block_data
offset += len(block_data)
return bytes(buffer)
def _valid_blocks(self, piece_raw_data: bytes) -> bool:
"""Verify piece hash matches expected hash with detailed logging"""
if not piece_raw_data:
logging.error(f"Piece {self.piece_index} has no data for hash verification")
return False
if len(piece_raw_data) != self.piece_size:
logging.error(f"Piece {self.piece_index} data size mismatch for hash verification")
return False
hashed_piece_raw_data = hashlib.sha1(piece_raw_data).digest()
if hashed_piece_raw_data == self.piece_hash:
return True
# Log hash mismatch details for debugging
expected_hash = self.piece_hash.hex()
actual_hash = hashed_piece_raw_data.hex()
logging.warning(f"Piece {self.piece_index} hash mismatch")
logging.warning(f" Expected: {expected_hash}")
logging.warning(f" Actual: {actual_hash}")
logging.warning(f" Data size: {len(piece_raw_data)} bytes")
return False
def get_stats(self):
"""Get statistics about this piece's download progress"""
full_blocks = sum(1 for block in self.blocks if block.state == State.FULL)
pending_blocks = sum(1 for block in self.blocks if block.state == State.PENDING)
free_blocks = sum(1 for block in self.blocks if block.state == State.FREE)
return {
'piece_index': self.piece_index,
'is_full': self.is_full,
'completion_percentage': self.get_completion_percentage(),
'blocks_total': len(self.blocks),
'blocks_full': full_blocks,
'blocks_pending': pending_blocks,
'blocks_free': free_blocks,
'piece_size': self.piece_size,
'hash_verification_count': self.hash_verification_count
}