Skip to content

Commit 14a226f

Browse files
authored
Update pyarchivefile_py3.py
1 parent 9090f83 commit 14a226f

1 file changed

Lines changed: 261 additions & 0 deletions

File tree

pyarchivefile_py3.py

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15070,6 +15070,267 @@ def send_from_fileobj(fileobj, host, port, proto="tcp", path_text=None, **kwargs
1507015070
return _udp_quic_send(fileobj, host, port, **kwargs)
1507115071
return _udp_seq_send(fileobj, host, port, **kwargs)
1507215072

15073+
def _udp_raw_send(fileobj, host, port, **kwargs):
15074+
logger = _logger_from_kwargs(kwargs)
15075+
addr = (host or "127.0.0.1", int(port))
15076+
15077+
# ---- normalize bool-ish flags (URL query values may be strings) ----
15078+
handshake = _kw_bool(kwargs.get("handshake", True), True)
15079+
raw_ack = _kw_bool(kwargs.get("raw_ack", False), False)
15080+
raw_meta = _kw_bool(kwargs.get("raw_meta", True), True)
15081+
raw_sha = _kw_bool(kwargs.get("raw_sha", False), False)
15082+
wait = _kw_bool(kwargs.get("wait", True), True) or _kw_bool(kwargs.get("connect_wait", False), False)
15083+
15084+
verbose = _kw_bool(kwargs.get("verbose", False), False)
15085+
15086+
def _log(msg):
15087+
_net_log(verbose, msg, logger=logger)
15088+
15089+
# ---- numeric params ----
15090+
try:
15091+
chunk = int(kwargs.get("chunk", 1200))
15092+
except Exception:
15093+
chunk = 1200
15094+
if chunk < 256:
15095+
chunk = 256
15096+
15097+
try:
15098+
wt = kwargs.get("wait_timeout", None)
15099+
wt = float(wt) if wt is not None else None
15100+
except Exception:
15101+
wt = None
15102+
15103+
try:
15104+
hello_iv = float(kwargs.get("hello_interval", 0.1) or 0.1)
15105+
except Exception:
15106+
hello_iv = 0.1
15107+
if hello_iv <= 0:
15108+
hello_iv = 0.1
15109+
15110+
# ---- compute total remaining length (for META and/or HASH) ----
15111+
total_len = None
15112+
pos = None
15113+
if raw_meta or raw_sha:
15114+
try:
15115+
pos = fileobj.tell()
15116+
fileobj.seek(0, os.SEEK_END)
15117+
end = fileobj.tell()
15118+
fileobj.seek(pos, os.SEEK_SET)
15119+
total_len = int(end - pos)
15120+
if total_len < 0:
15121+
total_len = None
15122+
except Exception:
15123+
total_len = None
15124+
try:
15125+
if pos is not None:
15126+
fileobj.seek(pos, os.SEEK_SET)
15127+
except Exception:
15128+
pass
15129+
15130+
# ---- precompute expected hash (optional) ----
15131+
expected_hex = None
15132+
raw_hash = (kwargs.get("raw_hash", "sha256") or "sha256").lower()
15133+
if raw_sha and total_len is not None:
15134+
try:
15135+
h = hashlib.sha256() if raw_hash != "md5" else hashlib.md5()
15136+
cur = fileobj.tell()
15137+
while True:
15138+
b = fileobj.read(65536)
15139+
if not b:
15140+
break
15141+
h.update(_to_bytes(b))
15142+
expected_hex = h.hexdigest()
15143+
fileobj.seek(cur, os.SEEK_SET)
15144+
except Exception:
15145+
expected_hex = None
15146+
try:
15147+
if pos is not None:
15148+
fileobj.seek(pos, os.SEEK_SET)
15149+
except Exception:
15150+
pass
15151+
15152+
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
15153+
15154+
try:
15155+
# ---- handshake / wait-for-receiver ----
15156+
tok = kwargs.get("token")
15157+
tok = _hs_token() if tok is None else _to_bytes(tok)
15158+
15159+
if wait:
15160+
start_t = time.time()
15161+
while True:
15162+
if wt is not None and wt >= 0 and (time.time() - start_t) >= wt:
15163+
_log("UDP raw: wait_timeout reached; no receiver READY")
15164+
try:
15165+
sock.close()
15166+
except Exception:
15167+
pass
15168+
return False
15169+
15170+
# announce
15171+
if handshake:
15172+
try:
15173+
sock.sendto(b"HELLO " + tok + b"\n", addr)
15174+
except Exception:
15175+
pass
15176+
15177+
if raw_meta and total_len is not None:
15178+
try:
15179+
sock.sendto(b"META " + str(total_len).encode("ascii") + b"\n", addr)
15180+
except Exception:
15181+
pass
15182+
15183+
if raw_sha and expected_hex:
15184+
try:
15185+
sock.sendto(b"HASH " + raw_hash.encode("ascii") + b" " + expected_hex.encode("ascii") + b"\n", addr)
15186+
except Exception:
15187+
pass
15188+
15189+
# wait briefly for READY
15190+
try:
15191+
sock.settimeout(hello_iv)
15192+
except Exception:
15193+
pass
15194+
15195+
try:
15196+
pkt, _a = sock.recvfrom(1024)
15197+
if pkt.startswith(b"READY"):
15198+
# READY or READY <token>
15199+
if b" " in pkt:
15200+
rt = pkt.split(None, 1)[1].strip()
15201+
if rt and rt != tok:
15202+
continue
15203+
_log("UDP raw: received READY from receiver")
15204+
break
15205+
except socket.timeout:
15206+
continue
15207+
except Exception:
15208+
continue
15209+
else:
15210+
# if not waiting, still send META/HASH once up front
15211+
if handshake:
15212+
try:
15213+
sock.sendto(b"HELLO " + tok + b"\n", addr)
15214+
except Exception:
15215+
pass
15216+
if raw_meta and total_len is not None:
15217+
try:
15218+
sock.sendto(b"META " + str(total_len).encode("ascii") + b"\n", addr)
15219+
except Exception:
15220+
pass
15221+
if raw_sha and expected_hex:
15222+
try:
15223+
sock.sendto(b"HASH " + raw_hash.encode("ascii") + b" " + expected_hex.encode("ascii") + b"\n", addr)
15224+
except Exception:
15225+
pass
15226+
15227+
# ---- send data ----
15228+
if raw_ack:
15229+
# sliding window retransmit
15230+
try:
15231+
ack_to = float(kwargs.get("raw_ack_timeout", 0.5) or 0.5)
15232+
except Exception:
15233+
ack_to = 0.5
15234+
try:
15235+
retries_max = int(kwargs.get("raw_ack_retries", 40) or 40)
15236+
except Exception:
15237+
retries_max = 40
15238+
try:
15239+
win = int(kwargs.get("raw_ack_window", 1) or 1)
15240+
except Exception:
15241+
win = 1
15242+
if win < 1:
15243+
win = 1
15244+
15245+
try:
15246+
sock.settimeout(ack_to)
15247+
except Exception:
15248+
pass
15249+
15250+
def _make_pkt(seq, data):
15251+
return b"PKT " + str(seq).encode("ascii") + b" " + _to_bytes(data)
15252+
15253+
base_seq = 0
15254+
next_seq = 0
15255+
pkts = {}
15256+
eof = False
15257+
timeout_tries = 0
15258+
15259+
while True:
15260+
# fill window
15261+
while (not eof) and next_seq < base_seq + win:
15262+
data = fileobj.read(chunk)
15263+
if not data:
15264+
eof = True
15265+
break
15266+
pkt = _make_pkt(next_seq, data)
15267+
pkts[next_seq] = pkt
15268+
try:
15269+
sock.sendto(pkt, addr)
15270+
except Exception:
15271+
pass
15272+
next_seq += 1
15273+
15274+
if eof and base_seq == next_seq:
15275+
break
15276+
15277+
try:
15278+
apkt, _a = sock.recvfrom(1024)
15279+
if apkt.startswith(b"ACK "):
15280+
try:
15281+
aseq = int(apkt.split()[1])
15282+
except Exception:
15283+
aseq = -1
15284+
new_base = aseq + 1
15285+
if new_base > base_seq:
15286+
for s in list(pkts.keys()):
15287+
if s < new_base:
15288+
pkts.pop(s, None)
15289+
base_seq = new_base
15290+
timeout_tries = 0
15291+
except socket.timeout:
15292+
timeout_tries += 1
15293+
if retries_max >= 0 and timeout_tries >= retries_max:
15294+
_log("UDP raw: too many ACK timeouts, giving up")
15295+
return False
15296+
# retransmit all in-flight
15297+
for s in range(base_seq, next_seq):
15298+
pkt = pkts.get(s)
15299+
if pkt is None:
15300+
continue
15301+
try:
15302+
sock.sendto(pkt, addr)
15303+
except Exception:
15304+
pass
15305+
except Exception:
15306+
# treat as timeout-ish
15307+
timeout_tries += 1
15308+
15309+
else:
15310+
# legacy raw: just send datagrams
15311+
while True:
15312+
data = fileobj.read(chunk)
15313+
if not data:
15314+
break
15315+
try:
15316+
sock.sendto(_to_bytes(data), addr)
15317+
except Exception:
15318+
pass
15319+
15320+
# ---- finish ----
15321+
try:
15322+
sock.sendto(b"DONE", addr)
15323+
except Exception:
15324+
pass
15325+
15326+
return True
15327+
15328+
finally:
15329+
try:
15330+
sock.close()
15331+
except Exception:
15332+
pass
15333+
1507315334
def _udp_raw_recv(fileobj, host, port, **kwargs):
1507415335
logger = _logger_from_kwargs(kwargs)
1507515336
addr = (host or "", int(port))

0 commit comments

Comments
 (0)