From 79748b4949a19c614c9761b497658ee2e7a1ca86 Mon Sep 17 00:00:00 2001 From: Mark Karpeles Date: Mon, 15 Jun 2026 22:45:34 +0900 Subject: [PATCH 1/3] lzma/xz/lzma2: bounded-memory sliding-window streaming encoders The xz, raw lzma2, and .lzma encoders previously buffered the entire input and built a hash-chain match finder with a `prev[i]` slot for every input position, so peak memory was O(input): a 600 MB file needed >600 MB and aborted under a 244 MB cap. Replace the whole-buffer match finder + buffer-then-emit drivers with a bounded sliding-window streaming encoder, keeping the same continuous dictionary (and therefore the same compression ratio): - HashChain `prev` is now a power-of-two ring sized O(dict_size + MAX_MATCH_LEN), indexed `pos & mask`. Positions older than the dictionary are evicted naturally; the chain walk breaks on `dist > dict_size` before it can follow a wrapped (stale) link, so the finder returns exactly the same matches a whole-buffer chain would. - A sliding input window retains only ~dict_size + slop of history plus one chunk/lookahead of pending input; the front is dropped once the droppable prefix exceeds dict_size (amortised O(1) per byte). - All parse/price/emit code now reads the window via an absolute `base` (`win[pos - base]`) instead of indexing the whole input. Drivers: - lzma2_internal: new `Lzma2StreamEncoder` (push/finish) emits framed chunks incrementally; xz and raw lzma2 feed it and drain chunks as they are produced instead of staging the whole payload at finish. - lzma: new `LzmaStreamEncoder` streams the continuous range-coded body, emitting the 13-byte header up front and the EOS marker + flush at finish. Small inputs (<= 64 KiB) still run the greedy-vs-optimal guard pass (buffered, bounded) so the optimal parser's cold start never loses to greedy; larger inputs stream with the optimal parse. Peak memory is now O(dict_size), independent of input length. Ratio on the 2.9 MB corpus at -l 9 is essentially unchanged (xz 532320 -> 532316, lzma 521918 -> 521957); reference cross-decode (`xz -d`, `xz --format=lzma -d`) is byte-exact at every level including inputs far larger than the dictionary, the exact window boundary, incompressible, and empty. Co-Authored-By: Claude Fable 5 --- src/lzma/encoder.rs | 712 ++++++++++++++++++++-------- src/lzma2/mod.rs | 116 +++-- src/lzma2_internal/lzma2_encoder.rs | 440 ++++++++++------- src/xz/mod.rs | 145 ++++-- tests/lzma.rs | 36 +- 5 files changed, 994 insertions(+), 455 deletions(-) diff --git a/src/lzma/encoder.rs b/src/lzma/encoder.rs index 32345ee..f31b7a4 100644 --- a/src/lzma/encoder.rs +++ b/src/lzma/encoder.rs @@ -189,26 +189,6 @@ impl LevelParams { }, } } - - /// Compute the dict size actually written into the header for an input of - /// `input_len` bytes. We never claim more than what could possibly be - /// referenced, so a 1 KiB input doesn't force the decoder to allocate a - /// 64 MiB window. The advertised size is also clamped to the decoder's - /// minimum of 4 KiB. - fn effective_dict_size(&self, input_len: usize) -> u32 { - // Round `input_len` up to a power of two (clamped at u32::MAX). Empty - // input gets the minimum dict; `next_power_of_two` would panic on 0. - let needed = if input_len == 0 { - MIN_DICT_SIZE - } else { - let np2 = (input_len as u64) - .checked_next_power_of_two() - .unwrap_or(u32::MAX as u64); - np2.min(u32::MAX as u64) as u32 - }; - let needed = needed.max(MIN_DICT_SIZE); - needed.min(self.dict_size) - } } fn hash3(b0: u8, b1: u8, b2: u8) -> u32 { @@ -841,28 +821,30 @@ impl LzmaEncCore { } } - /// Emit a literal packet for the byte at index `pos` in the input. - fn emit_literal(&mut self, input: &[u8], pos: usize) { + /// Emit a literal packet for the byte at absolute position `pos`, reading + /// from the sliding window `win` (whose first byte is absolute `base`). + fn emit_literal(&mut self, win: &[u8], base: usize, pos: usize) { let pos_state = self.pos_state(); let idx = self.state * POS_STATES_MAX + pos_state as usize; rc_encode_bit(&mut self.rc, &mut self.is_match[idx], 0); - let prev_byte = if pos > 0 { input[pos - 1] } else { 0 }; + let i = pos - base; + let prev_byte = if pos > 0 { win[i - 1] } else { 0 }; let match_byte = if self.state < LIT_STATES { None } else { - // The byte at distance rep0; we always have it available in the - // input buffer because the encoder buffered everything. + // The byte at distance rep0; always retained in the sliding window + // (distance ≤ dict_size ≤ retained history). let d = self.rep0 as usize + 1; if d <= pos { - Some(input[pos - d]) + Some(win[i - d]) } else { // Shouldn't happen at a literal-after-match state, but be // safe — fall back to plain literal coding. None } }; - self.encode_literal_full(input[pos], prev_byte, match_byte); + self.encode_literal_full(win[i], prev_byte, match_byte); self.state = state_after_literal(self.state); self.output_pos += 1; @@ -1014,46 +996,67 @@ fn get_dist_slot(distance: u32) -> u32 { // all input before encoding, there's no sliding window to maintain; the head // array and prev links cover the whole buffer in one shot. +/// Sliding-window 3-byte hash-chain match finder. +/// +/// Positions in `head`/`prev` are **absolute** input offsets; `prev` is a ring +/// of `prev.len()` slots indexed `pos & prev_mask`, sized larger than +/// `dict_size + MAX_MATCH_LEN` so every legally-reachable link (distance ≤ +/// `dict_size`) is still intact. Byte reads go through the sliding window `win` +/// whose first byte is absolute `base` (absolute `p` reads `win[p - base]`). +/// This keeps peak memory `O(dict_size)` regardless of input length. struct HashChain { head: Box<[u32; HASH_SIZE]>, prev: Vec, + prev_mask: usize, } impl HashChain { - fn new(buf_len: usize) -> Self { + /// Build a finder whose `prev` ring covers at least `dict_size + + /// MAX_MATCH_LEN` positions (rounded up to a power of two), capped by + /// `cap_hint` when the total input is known smaller. + fn new(dict_size: u32, cap_hint: usize) -> Self { + let needed = (dict_size as usize) + .saturating_add(MAX_MATCH_LEN as usize) + .saturating_add(2); + let want = needed.min(cap_hint.max(1)); + let cap = want.max(1).next_power_of_two(); Self { head: Box::new([NIL; HASH_SIZE]), - prev: vec![NIL; buf_len], + prev: vec![NIL; cap], + prev_mask: cap - 1, } } - /// Splice position `pos` into the hash chain. No-op if there aren't - /// three bytes available. - fn insert(&mut self, input: &[u8], pos: usize) { - if pos + 3 > input.len() { + /// Splice absolute position `pos` into the hash chain. No-op if fewer than + /// three bytes follow in the window. + fn insert(&mut self, win: &[u8], base: usize, pos: usize) { + let i = pos - base; + if i + 3 > win.len() { return; } - let h = hash3(input[pos], input[pos + 1], input[pos + 2]) as usize; - self.prev[pos] = self.head[h]; + let h = hash3(win[i], win[i + 1], win[i + 2]) as usize; + self.prev[pos & self.prev_mask] = self.head[h]; self.head[h] = pos as u32; } - /// Find the longest match for `input[pos..]` against earlier positions, - /// bounded by `MAX_MATCH_LEN`, the per-level chain budget, and the - /// per-level "nice match" early-exit. + /// Find the longest match for the window position at absolute `pos` against + /// earlier positions, bounded by `MAX_MATCH_LEN`, the per-level chain + /// budget, and the per-level "nice match" early-exit. fn find_longest( &self, - input: &[u8], + win: &[u8], + base: usize, pos: usize, dict_size: u32, max_chain: usize, nice_match: u32, ) -> Option<(u32, u32)> { - if pos + 3 > input.len() { + let pi = pos - base; + if pi + 3 > win.len() { return None; } - let h = hash3(input[pos], input[pos + 1], input[pos + 2]) as usize; - let max_len = MAX_MATCH_LEN.min((input.len() - pos) as u32); + let h = hash3(win[pi], win[pi + 1], win[pi + 2]) as usize; + let max_len = MAX_MATCH_LEN.min((win.len() - pi) as u32); let max_dist = (dict_size as usize).min(pos); let mut best_len: u32 = 0; let mut best_dist: u32 = 0; @@ -1062,7 +1065,7 @@ impl HashChain { while cur != NIL && steps < max_chain { let cur_pos = cur as usize; if cur_pos >= pos { - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; continue; } @@ -1070,17 +1073,18 @@ impl HashChain { if dist > max_dist { break; } + let ci = cur_pos - base; // Cheap rejection by the (best_len)-th byte. if best_len > 2 - && (best_len as usize) < (input.len() - pos) - && input[cur_pos + best_len as usize] != input[pos + best_len as usize] + && (best_len as usize) < (win.len() - pi) + && win[ci + best_len as usize] != win[pi + best_len as usize] { - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; continue; } let mut len = 0u32; - while len < max_len && input[cur_pos + len as usize] == input[pos + len as usize] { + while len < max_len && win[ci + len as usize] == win[pi + len as usize] { len += 1; } if len >= MATCH_LEN_MIN && len > best_len { @@ -1091,7 +1095,7 @@ impl HashChain { break; } } - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; } if best_len >= MATCH_LEN_MIN { @@ -1105,9 +1109,11 @@ impl HashChain { /// achievable length `>= MATCH_LEN_MIN`, the *shortest* distance that /// achieves it. `out` is filled with `(len, dist0based)` pairs in /// strictly increasing length order. Returns the longest length found. + #[allow(clippy::too_many_arguments)] fn find_matches( &self, - input: &[u8], + win: &[u8], + base: usize, pos: usize, dict_size: u32, max_chain: usize, @@ -1115,11 +1121,12 @@ impl HashChain { out: &mut Vec<(u32, u32)>, ) -> u32 { out.clear(); - if pos + 3 > input.len() { + let pi = pos - base; + if pi + 3 > win.len() { return 0; } - let h = hash3(input[pos], input[pos + 1], input[pos + 2]) as usize; - let max_len = MAX_MATCH_LEN.min((input.len() - pos) as u32); + let h = hash3(win[pi], win[pi + 1], win[pi + 2]) as usize; + let max_len = MAX_MATCH_LEN.min((win.len() - pi) as u32); let max_dist = (dict_size as usize).min(pos); let mut best_len: u32 = MATCH_LEN_MIN - 1; let mut cur = self.head[h]; @@ -1127,7 +1134,7 @@ impl HashChain { while cur != NIL && steps < max_chain { let cur_pos = cur as usize; if cur_pos >= pos { - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; continue; } @@ -1135,16 +1142,17 @@ impl HashChain { if dist > max_dist { break; } + let ci = cur_pos - base; if best_len >= MATCH_LEN_MIN - && (best_len as usize) < (input.len() - pos) - && input[cur_pos + best_len as usize] != input[pos + best_len as usize] + && (best_len as usize) < (win.len() - pi) + && win[ci + best_len as usize] != win[pi + best_len as usize] { - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; continue; } let mut len = 0u32; - while len < max_len && input[cur_pos + len as usize] == input[pos + len as usize] { + while len < max_len && win[ci + len as usize] == win[pi + len as usize] { len += 1; } if len >= MATCH_LEN_MIN && len > best_len { @@ -1156,7 +1164,7 @@ impl HashChain { break; } } - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; } if best_len >= MATCH_LEN_MIN { @@ -1169,18 +1177,18 @@ impl HashChain { // ─── rep-match helpers ─────────────────────────────────────────────────── -/// Try to extend a match starting at `pos` using a 0-based LZ distance -/// `dist`. Returns the match length (0 if not even 1 byte matches), capped -/// at `MAX_MATCH_LEN`. The byte at index `pos - dist - 1` is the first -/// candidate. -fn rep_match_len(input: &[u8], pos: usize, dist: u32) -> u32 { +/// Length of a repeat match at absolute `pos` against 0-based LZ distance +/// `dist`, reading from the sliding window `win` (first byte = absolute `base`). +/// Capped at `MAX_MATCH_LEN`. +fn rep_match_len(win: &[u8], base: usize, pos: usize, dist: u32) -> u32 { let d = dist as usize + 1; if d > pos { return 0; } - let max_len = MAX_MATCH_LEN.min((input.len() - pos) as u32) as usize; + let pi = pos - base; + let max_len = MAX_MATCH_LEN.min((win.len() - pi) as u32) as usize; let mut len = 0usize; - while len < max_len && input[pos - d + len] == input[pos + len] { + while len < max_len && win[pi - d + len] == win[pi + len] { len += 1; } len as u32 @@ -1280,7 +1288,8 @@ fn literal_price_at( core: &LzmaEncCore, prices: &[u32; PRICE_TABLE_SIZE], snap: &PriceSnapshot, - input: &[u8], + win: &[u8], + base: usize, pos: usize, out_pos: u64, state: usize, @@ -1288,85 +1297,322 @@ fn literal_price_at( ) -> u32 { let pos_state = (out_pos as u32) & core.pos_mask; let im_idx = state * POS_STATES_MAX + pos_state as usize; - let prev_byte = if pos > 0 { input[pos - 1] } else { 0 }; + let i = pos - base; + let prev_byte = if pos > 0 { win[i - 1] } else { 0 }; let match_byte = if state < LIT_STATES { None } else { let d = rep0 as usize + 1; - if d <= pos { Some(input[pos - d]) } else { None } + if d <= pos { Some(win[i - d]) } else { None } }; - snap.is_match[im_idx][0] - + core.literal_price(prices, out_pos, input[pos], prev_byte, match_byte) + snap.is_match[im_idx][0] + core.literal_price(prices, out_pos, win[i], prev_byte, match_byte) } // ─── full encode pass ──────────────────────────────────────────────────── -fn encode_all(input: &[u8], params: LevelParams) -> Vec { - let dict_size = params.effective_dict_size(input.len()); - - // Threshold below which we also run a greedy pass and keep the smaller - // body. The optimal parser's cold-start price model can briefly lose to - // greedy on small, highly-repetitive inputs; the absolute loss is bounded - // by the first few price-refresh segments, so on larger inputs the optimal - // parse always wins overall and the extra greedy pass is pure waste. We - // therefore only run the guard pass on small inputs. - const GUARD_LIMIT: usize = 64 * 1024; - - let body = if params.opt_window == 0 { - encode_body(input, dict_size, params, false) - } else if input.len() <= GUARD_LIMIT { - let opt = encode_body(input, dict_size, params, true); - let greedy = encode_body(input, dict_size, params, false); - if greedy.len() < opt.len() { - greedy +/// Extra window history retained behind the current parse position, on top of +/// `dict_size`, so an insertion never overwrites a still-reachable older ring +/// slot and the longest match / literal look-back can always be read. +const WINDOW_SLOP: usize = MAX_MATCH_LEN as usize + 16; + +/// How many bytes of forward lookahead must be buffered past the parse position +/// before a streaming `push` will encode them, so the optimal parser always has +/// its full look-ahead window plus a longest-match's worth of context. On +/// `finish` the remaining tail is encoded without this margin. +fn lookahead_margin(params: LevelParams) -> usize { + params.opt_window as usize + MAX_MATCH_LEN as usize + 1 +} + +/// Streaming `.lzma` body encoder with **bounded memory**. +/// +/// The `.lzma` range coder is a single continuous stream (no per-chunk reset); +/// this struct drives the parse incrementally over a sliding `~dict_size` +/// window and forwards range-coded output bytes as they are produced. Peak +/// memory is `O(dict_size)`: +/// +/// - the match finder's `prev` ring is `O(dict_size)` (see [`HashChain`]); +/// - only `dict_size + WINDOW_SLOP` history plus one lookahead margin of bytes +/// is retained in `win` — older bytes are dropped once the parse position has +/// moved past them. +/// +/// The 13-byte header (props + dict size + `u64::MAX` length) is emitted before +/// the first body bytes; the EOS marker + range-coder flush are appended by +/// [`finish`](Self::finish). +struct LzmaStreamEncoder { + core: LzmaEncCore, + hc: HashChain, + params: LevelParams, + dict_size: u32, + /// Reusable optimal-parser scratch (only used at levels with `opt_window`). + opt: Optimizer, + prob_prices: [u32; PRICE_TABLE_SIZE], + /// Retained window bytes; `win[0]` is absolute offset `win_base`. + win: Vec, + win_base: usize, + /// Absolute offset of the next byte to encode. + pos: usize, + /// Absolute count of bytes appended so far. + appended: usize, + /// Range-coder output bytes already forwarded to the caller. + drained: usize, + /// Set once the header has been emitted (prepended to the first push). + header_done: bool, + /// Set once the EOS marker + flush have been emitted. + finished: bool, + /// `true` until we commit to true incremental streaming. While `false` and + /// the total stays `<= GUARD_LIMIT`, input is only buffered (no output) so + /// `finish` can run the greedy-vs-optimal guard pass over the whole small + /// input and keep the smaller body — matching the old one-shot behaviour and + /// preventing the optimal parser's cold start from ever losing to greedy on + /// small inputs. Once the total exceeds `GUARD_LIMIT` we commit and stream. + committed: bool, +} + +/// Inputs at or below this size run the greedy-vs-optimal guard pass at +/// `finish` (and are therefore buffered whole — bounded, since this is far +/// below `dict_size`). Larger inputs stream incrementally with the optimal +/// parse only. Mirrors the old `encode_all` GUARD_LIMIT. +const GUARD_LIMIT: usize = 64 * 1024; + +impl LzmaStreamEncoder { + fn new(params: LevelParams) -> Self { + // Advertise (and cap matches at) the level's dictionary size, clamped to + // the decoder's 4 KiB minimum. Independent of input length so the header + // can be emitted up front without buffering the whole stream. + let dict_size = params.dict_size.max(MIN_DICT_SIZE); + let cap_hint = (dict_size as usize) + .saturating_add(MAX_MATCH_LEN as usize) + .saturating_add(WINDOW_SLOP + lookahead_margin(params)); + Self { + core: LzmaEncCore::new(), + hc: HashChain::new(dict_size, cap_hint), + params, + dict_size, + opt: Optimizer::new(params.opt_window as usize), + prob_prices: build_prob_prices(), + win: Vec::new(), + win_base: 0, + pos: 0, + appended: 0, + drained: 0, + header_done: false, + finished: false, + // Greedy-only levels (`opt_window == 0`) have no cold-start + // regression, so they commit to streaming immediately and skip the + // guard buffering. + committed: params.opt_window == 0, + } + } + + /// Append `data`, encode any bytes now safely buffered (keeping a forward + /// lookahead margin), and return all output produced so far (header on the + /// first commit, then range-coded body bytes). + fn push(&mut self, data: &[u8]) -> Vec { + self.win.extend_from_slice(data); + self.appended += data.len(); + // While still small and uncommitted, only buffer — `finish` will run the + // greedy-vs-optimal guard. Commit (and start streaming) once the total + // crosses GUARD_LIMIT. + if !self.committed { + if self.appended <= GUARD_LIMIT { + return Vec::new(); + } + self.committed = true; + } + let margin = lookahead_margin(self.params); + // Only encode up to `appended - margin` so the parser keeps full + // forward context; the tail is encoded at `finish`. + let encode_to = self.appended.saturating_sub(margin); + if encode_to > self.pos { + self.encode_range(encode_to); + self.trim_window(); + } + self.take_output() + } + + /// Encode the remaining tail, emit the EOS marker + flush, and return all + /// remaining output bytes. + fn finish(&mut self) -> Vec { + if !self.finished { + if !self.committed { + // Small input: run the greedy-vs-optimal guard over the whole + // buffered window and keep the smaller body, exactly as the old + // one-shot encoder did. `win` holds the entire input (≤ GUARD). + return self.finish_small(); + } + if self.pos < self.appended { + self.encode_range(self.appended); + } + self.core.emit_eos_marker(); + self.core.rc.flush(); + self.finished = true; + } + self.take_output() + } + + /// Guard path for small inputs: encode the whole buffered `win` twice + /// (greedy + optimal) from a fresh core and keep the smaller body, then + /// emit header + body. Memory is bounded (input ≤ GUARD_LIMIT). + fn finish_small(&mut self) -> Vec { + let input = &self.win; + let dict_size = self.dict_size; + let params = self.params; + + let opt_body = { + let mut core = LzmaEncCore::new(); + let mut hc = HashChain::new(dict_size, input.len().max(1)); + let mut opt = Optimizer::new(params.opt_window as usize); + let prices = build_prob_prices(); + encode_optimal( + &mut core, + &mut hc, + input, + 0, + 0, + input.len(), + dict_size, + params, + &prices, + &mut opt, + ); + core.emit_eos_marker(); + core.rc.flush(); + core.rc.out + }; + let greedy_body = { + let mut core = LzmaEncCore::new(); + let mut hc = HashChain::new(dict_size, input.len().max(1)); + encode_greedy( + &mut core, + &mut hc, + input, + 0, + 0, + input.len(), + dict_size, + params, + ); + core.emit_eos_marker(); + core.rc.flush(); + core.rc.out + }; + let body = if greedy_body.len() < opt_body.len() { + greedy_body + } else { + opt_body + }; + + self.finished = true; + let mut out = Vec::with_capacity(13 + body.len()); + out.push(ENC_PROPS_BYTE); + out.extend_from_slice(&self.dict_size.to_le_bytes()); + out.extend_from_slice(&u64::MAX.to_le_bytes()); + out.extend_from_slice(&body); + self.header_done = true; + out + } + + /// Encode absolute positions `[self.pos, end)` into the continuous range + /// coder, advancing `self.pos`. + fn encode_range(&mut self, end: usize) { + let base = self.win_base; + let start = self.pos; + if self.params.opt_window == 0 { + encode_greedy( + &mut self.core, + &mut self.hc, + &self.win, + base, + start, + end, + self.dict_size, + self.params, + ); } else { - opt + encode_optimal( + &mut self.core, + &mut self.hc, + &self.win, + base, + start, + end, + self.dict_size, + self.params, + &self.prob_prices, + &mut self.opt, + ); } - } else { - encode_body(input, dict_size, params, true) - }; + self.pos = end; + } - let mut out = Vec::with_capacity(13 + body.len()); - out.push(ENC_PROPS_BYTE); - out.extend_from_slice(&dict_size.to_le_bytes()); - out.extend_from_slice(&u64::MAX.to_le_bytes()); - out.extend_from_slice(&body); - out -} + /// Drop window history older than `dict_size + WINDOW_SLOP` before `pos`. + /// Only shifts once the droppable prefix exceeds a whole `dict_size + + /// WINDOW_SLOP`, amortising the `drain` memmove to `O(1)` per input byte. + fn trim_window(&mut self) { + let keep_from = self + .pos + .saturating_sub(self.dict_size as usize + WINDOW_SLOP); + let droppable = keep_from.saturating_sub(self.win_base); + if droppable >= self.dict_size as usize + WINDOW_SLOP { + self.win.drain(..droppable); + self.win_base = keep_from; + } + } -/// Encode the range-coded body (with EOS marker + flush, no 13-byte header) -/// using the greedy or optimal parse. Returns the raw body bytes. -fn encode_body(input: &[u8], dict_size: u32, params: LevelParams, optimal: bool) -> Vec { - let mut core = LzmaEncCore::new(); - let mut hc = HashChain::new(input.len()); - if optimal { - encode_optimal(&mut core, &mut hc, input, dict_size, params); - } else { - encode_greedy(&mut core, &mut hc, input, dict_size, params); + /// Pull any newly-produced output: the 13-byte header on first call, then + /// the range coder's freshly-flushed bytes. + fn take_output(&mut self) -> Vec { + let mut out = Vec::new(); + if !self.header_done { + out.push(ENC_PROPS_BYTE); + out.extend_from_slice(&self.dict_size.to_le_bytes()); + out.extend_from_slice(&u64::MAX.to_le_bytes()); + self.header_done = true; + } + let body = &self.core.rc.out; + if self.drained < body.len() { + out.extend_from_slice(&body[self.drained..]); + self.drained = body.len(); + } + out } - core.emit_eos_marker(); - core.rc.flush(); - core.rc.out } -/// Greedy/lazy parse — used by the lowest levels. +/// Greedy/lazy parse over the sliding window. Encodes absolute positions +/// `[pos_start, pos_end)`; match lengths are clamped to `pos_end` so a streaming +/// driver can stop at a lookahead boundary without crossing it. +#[allow(clippy::too_many_arguments)] fn encode_greedy( core: &mut LzmaEncCore, hc: &mut HashChain, - input: &[u8], + win: &[u8], + base: usize, + pos_start: usize, + pos_end: usize, dict_size: u32, params: LevelParams, ) { - let mut pos = 0usize; - while pos < input.len() { + let win_end = base + win.len(); + let mut pos = pos_start; + while pos < pos_end { + let cap = (pos_end - pos) as u32; let rep_lens = [ - rep_match_len(input, pos, core.rep0), - rep_match_len(input, pos, core.rep1), - rep_match_len(input, pos, core.rep2), - rep_match_len(input, pos, core.rep3), + rep_match_len(win, base, pos, core.rep0).min(cap), + rep_match_len(win, base, pos, core.rep1).min(cap), + rep_match_len(win, base, pos, core.rep2).min(cap), + rep_match_len(win, base, pos, core.rep3).min(cap), ]; - let new_match = hc.find_longest(input, pos, dict_size, params.max_chain, params.nice_match); + let new_match = hc + .find_longest( + win, + base, + pos, + dict_size, + params.max_chain, + params.nice_match, + ) + .map(|(l, d)| (l.min(cap), d)); let best_rep_len = rep_lens.iter().copied().max().unwrap_or(0); let best_rep_idx = rep_lens @@ -1382,14 +1628,14 @@ fn encode_greedy( let emit_rep_long = !emit_new && best_rep_len >= MATCH_LEN_MIN; let emit_short_rep = !emit_new && !emit_rep_long && rep_lens[0] >= 1; - hc.insert(input, pos); + hc.insert(win, base, pos); if emit_new { let (len, dist) = new_match.unwrap(); for j in 1..(len as usize) { let p = pos + j; - if p + 3 <= input.len() { - hc.insert(input, p); + if p + 3 <= win_end { + hc.insert(win, base, p); } } core.emit_match(dist, len); @@ -1397,8 +1643,8 @@ fn encode_greedy( } else if emit_rep_long { for j in 1..(best_rep_len as usize) { let p = pos + j; - if p + 3 <= input.len() { - hc.insert(input, p); + if p + 3 <= win_end { + hc.insert(win, base, p); } } core.emit_long_rep(best_rep_idx, best_rep_len); @@ -1407,41 +1653,48 @@ fn encode_greedy( core.emit_short_rep(); pos += 1; } else { - core.emit_literal(input, pos); + core.emit_literal(win, base, pos); pos += 1; } } } -/// Cost-based optimal parse over a look-ahead window. +/// Cost-based optimal parse over a look-ahead window, encoding absolute +/// positions `[pos_start, pos_end)`. +#[allow(clippy::too_many_arguments)] fn encode_optimal( core: &mut LzmaEncCore, hc: &mut HashChain, - input: &[u8], + win: &[u8], + base: usize, + pos_start: usize, + pos_end: usize, dict_size: u32, params: LevelParams, + prob_prices: &[u32; PRICE_TABLE_SIZE], + opt: &mut Optimizer, ) { - let prob_prices = build_prob_prices(); let window = params.opt_window as usize; - let mut opt = Optimizer::new(window); - let mut pos = 0usize; - while pos < input.len() { - let snap = core.price_snapshot(&prob_prices); + let mut pos = pos_start; + while pos < pos_end { + let snap = core.price_snapshot(prob_prices); let parsed = parse_window( core, hc, - input, + win, + base, pos, + pos_end, dict_size, params, window, - &prob_prices, + prob_prices, &snap, - &mut opt, + opt, ); debug_assert!(parsed > 0); - replay(core, hc, input, pos, &opt.decisions); + replay(core, hc, win, base, pos, &opt.decisions); pos += parsed; } } @@ -1452,8 +1705,10 @@ fn encode_optimal( fn parse_window( core: &LzmaEncCore, hc: &HashChain, - input: &[u8], + win: &[u8], + base: usize, start: usize, + pos_end: usize, dict_size: u32, params: LevelParams, window: usize, @@ -1461,7 +1716,9 @@ fn parse_window( snap: &PriceSnapshot, opt: &mut Optimizer, ) -> usize { - let avail = input.len() - start; + // `avail` is bounded by the encode boundary `pos_end`, not the end of the + // window, so the DP never produces a decision that carries past `pos_end`. + let avail = pos_end - start; let limit = window.min(avail); opt.opt[0] = OptNode { @@ -1511,7 +1768,7 @@ fn parse_window( // ── literal ────────────────────────────────────────────────────── { - let lp = literal_price_at(core, prices, snap, input, pos, out_pos, state, reps[0]); + let lp = literal_price_at(core, prices, snap, win, base, pos, out_pos, state, reps[0]); let np = node.price.saturating_add(lp); let to = cur + 1; if to <= limit && np < opt.opt[to].price { @@ -1532,7 +1789,7 @@ fn parse_window( // ── rep matches ────────────────────────────────────────────────── for rep_idx in 0..4u32 { - let rlen = rep_match_len(input, pos, reps[rep_idx as usize]); + let rlen = rep_match_len(win, base, pos, reps[rep_idx as usize]); if rlen < 1 { continue; } @@ -1598,7 +1855,8 @@ fn parse_window( let longest = { let opt_matches = &mut opt.matches; hc.find_matches( - input, + win, + base, pos, dict_size, params.max_chain, @@ -1678,28 +1936,30 @@ fn trace_back(opt: &mut Optimizer, end: usize) { fn replay( core: &mut LzmaEncCore, hc: &mut HashChain, - input: &[u8], + win: &[u8], + base: usize, start: usize, decisions: &[Decision], ) { + let win_end = base + win.len(); let mut pos = start; for &d in decisions { match d { Decision::Literal => { - hc.insert(input, pos); - core.emit_literal(input, pos); + hc.insert(win, base, pos); + core.emit_literal(win, base, pos); pos += 1; } Decision::ShortRep => { - hc.insert(input, pos); + hc.insert(win, base, pos); core.emit_short_rep(); pos += 1; } Decision::Match(dist, len) => { for j in 0..(len as usize) { let p = pos + j; - if p + 3 <= input.len() { - hc.insert(input, p); + if p + 3 <= win_end { + hc.insert(win, base, p); } } core.emit_match(dist, len); @@ -1708,8 +1968,8 @@ fn replay( Decision::Rep(idx, len) => { for j in 0..(len as usize) { let p = pos + j; - if p + 3 <= input.len() { - hc.insert(input, p); + if p + 3 <= win_end { + hc.insert(win, base, p); } } core.emit_long_rep(idx, len); @@ -1721,19 +1981,21 @@ fn replay( // ─── public streaming Encoder ──────────────────────────────────────────── -/// Streaming `.lzma` (alone) encoder. +/// Streaming `.lzma` (alone) encoder with **bounded memory**. /// -/// Implementation note: LZMA's range coder operates on the entire stream, so -/// the simplest correct approach is to accumulate input into a `Vec` and -/// produce the compressed output in one shot on `finish`. The streaming -/// `encode` calls append to the buffer and never write output; `finish` -/// builds the full output and then drains it across however many calls the -/// caller's output buffer requires. +/// Drives a [`LzmaStreamEncoder`] whose match finder, sliding window, and LZ +/// history are all `O(dict_size)` — so peak memory is independent of the input +/// length. `encode` feeds input incrementally and emits range-coded output as +/// it is produced (the header up front, then body bytes); `finish` emits the +/// EOS marker + flush. Output the codec produces faster than the caller drains +/// it is held in a small `pending` buffer (bounded by one push's worth). pub struct Encoder { - input_buf: Vec, - output_buf: Vec, - output_pos: usize, - finished: bool, + stream: Option, + /// Produced output bytes not yet handed to the caller. + pending: Vec, + pending_idx: usize, + /// Set once `stream.finish()` has run. + stream_finished: bool, /// Match-finder tuning derived from [`EncoderConfig::level`]. Persisted /// across `reset` since configuration is meant to survive resets. params: LevelParams, @@ -1756,52 +2018,118 @@ impl Encoder { /// the nearest valid level rather than rejected. pub fn with_config(config: EncoderConfig) -> Self { Self { - input_buf: Vec::new(), - output_buf: Vec::new(), - output_pos: 0, - finished: false, + stream: None, + pending: Vec::new(), + pending_idx: 0, + stream_finished: false, params: LevelParams::from_level(config.level), } } + + fn stream(&mut self) -> &mut LzmaStreamEncoder { + let params = self.params; + self.stream + .get_or_insert_with(|| LzmaStreamEncoder::new(params)) + } + + /// Push staged output bytes from `pending[pending_idx..]` into `output`. + /// Returns true once the buffer is fully drained. + fn drain_pending(&mut self, output: &mut [u8], written: &mut usize) -> bool { + while self.pending_idx < self.pending.len() && *written < output.len() { + output[*written] = self.pending[self.pending_idx]; + *written += 1; + self.pending_idx += 1; + } + if self.pending_idx >= self.pending.len() { + self.pending.clear(); + self.pending_idx = 0; + true + } else { + false + } + } } +/// Bytes of input pulled into the stream encoder per `raw_encode` step, so +/// `pending` (produced output) stays bounded between drains. +const ENC_PUSH_MAX: usize = 1 << 16; + impl RawEncoder for Encoder { - fn raw_encode(&mut self, input: &[u8], _output: &mut [u8]) -> Result { - if self.finished { + fn raw_encode(&mut self, input: &[u8], output: &mut [u8]) -> Result { + if self.stream_finished { return Err(Error::Corrupt); } - self.input_buf.extend_from_slice(input); - Ok(RawProgress { - consumed: input.len(), - written: 0, - done: false, - }) + let mut consumed = 0usize; + let mut written = 0usize; + loop { + // Drain any output we already produced first. + if self.pending_idx < self.pending.len() { + if !self.drain_pending(output, &mut written) { + return Ok(RawProgress { + consumed, + written, + done: false, + }); + } + continue; + } + if consumed < input.len() { + let take = (input.len() - consumed).min(ENC_PUSH_MAX); + let slice = &input[consumed..consumed + take]; + let produced = self.stream().push(slice); + consumed += take; + if !produced.is_empty() { + self.pending = produced; + self.pending_idx = 0; + } + } else { + return Ok(RawProgress { + consumed, + written, + done: false, + }); + } + } } fn raw_finish(&mut self, output: &mut [u8]) -> Result { - if !self.finished { - // One-shot encode of everything we've buffered. - self.output_buf = encode_all(&self.input_buf, self.params); - self.output_pos = 0; - self.finished = true; + let mut written = 0usize; + loop { + if self.pending_idx < self.pending.len() { + if !self.drain_pending(output, &mut written) { + return Ok(RawProgress { + consumed: 0, + written, + done: false, + }); + } + continue; + } + if !self.stream_finished { + // Finish the stream (emit EOS + flush) and stage its remaining + // output. An encoder that never saw input still emits a valid + // empty stream (header + EOS). + let produced = self.stream().finish(); + self.stream_finished = true; + if !produced.is_empty() { + self.pending = produced; + self.pending_idx = 0; + } + } else { + return Ok(RawProgress { + consumed: 0, + written, + done: true, + }); + } } - let remaining = self.output_buf.len() - self.output_pos; - let n = remaining.min(output.len()); - output[..n].copy_from_slice(&self.output_buf[self.output_pos..self.output_pos + n]); - self.output_pos += n; - let done = self.output_pos >= self.output_buf.len(); - Ok(RawProgress { - consumed: 0, - written: n, - done, - }) } fn raw_reset(&mut self) { - self.input_buf.clear(); - self.output_buf.clear(); - self.output_pos = 0; - self.finished = false; + self.stream = None; + self.pending.clear(); + self.pending_idx = 0; + self.stream_finished = false; // Note: params is preserved per the trait contract. } } diff --git a/src/lzma2/mod.rs b/src/lzma2/mod.rs index 742a8e3..c7069fa 100644 --- a/src/lzma2/mod.rs +++ b/src/lzma2/mod.rs @@ -181,7 +181,9 @@ fn resolve_dict_size(cfg: &DecoderConfig) -> Result { // ─── encoder ────────────────────────────────────────────────────────────── -use crate::lzma2_internal::lzma2_encoder::{EncoderParams, LZMA2_PROPS_BYTE, encode_lzma2_stream}; +use crate::lzma2_internal::lzma2_encoder::{ + EncoderParams, LZMA2_PROPS_BYTE, Lzma2Chunk, Lzma2StreamEncoder, +}; /// Dictionary size (in bytes) the encoder advertises to the LZMA chunk /// coder as the match-distance ceiling. Fixed at 4 MiB — the [`crate::xz`] @@ -229,16 +231,22 @@ enum EncPhase { /// Note: unlike the former permanently-`Unsupported` stub (a unit struct), /// the working encoder buffers state, so it is a normal struct and is no /// longer `Copy` — construct it via [`Lzma2::encoder()`](crate::Algorithm). -#[derive(Debug, Clone)] pub struct Encoder { phase: EncPhase, /// Staged bytes for the current chunk (or end marker), drained to the /// caller from `pending[pending_idx..]`. pending: Vec, pending_idx: usize, - /// Input accumulated for the next chunk; flushed at `ENC_CHUNK_MAX` or on - /// `finish`. - in_buf: Vec, + /// Bounded-memory continuous-dictionary LZMA2 chunk encoder; emits framed + /// chunks incrementally so the whole input is never accumulated. `None` + /// until the first input byte arrives. + stream: Option, + /// Raw input pushed into `stream` but not yet consumed by an emitted chunk. + /// Bounded by one chunk's worth, used to frame uncompressed-fallback chunks. + staged_input: Vec, + /// Set once `stream.finish()` has run, so a multi-call `finish` doesn't + /// re-finish. + stream_finished: bool, /// Level-derived match-finder tuning; preserved across `reset`. params: EncoderParams, } @@ -256,11 +264,19 @@ impl Encoder { phase: EncPhase::Body, pending: Vec::new(), pending_idx: 0, - in_buf: Vec::new(), + stream: None, + staged_input: Vec::new(), + stream_finished: false, params: EncoderParams::from_level(ENC_DEFAULT_LEVEL), } } + /// Lazily create the bounded-memory LZMA2 stream encoder on first use. + fn stream(&mut self) -> &mut Lzma2StreamEncoder { + self.stream + .get_or_insert_with(|| Lzma2StreamEncoder::new(ENC_DICT_SIZE, self.params)) + } + /// Push staged bytes from `pending[pending_idx..]` into `output`. Returns /// true once the buffer is fully drained. fn drain_pending(&mut self, output: &mut [u8], written: &mut usize) -> bool { @@ -278,24 +294,24 @@ impl Encoder { } } - /// Stage the whole stream's LZMA2 chunks from the fully-buffered `input`. + /// Frame each produced LZMA2 chunk into `pending`, consuming the matching + /// raw bytes from `staged_input` (needed for the uncompressed fallback). /// - /// The chunks come from a single continuous match-finder - /// ([`encode_lzma2_stream`]): the first chunk resets the dictionary - /// (`0xE0` compressed / `0x01` uncompressed) and every later chunk - /// continues it (`0xC0` compressed / `0x02` uncompressed), so cross-chunk - /// matches (up to the 4 MiB dictionary) are found. The caller appends the - /// `0x00` end marker afterwards. - fn stage_payload(&mut self, input: &[u8]) { - let chunks = encode_lzma2_stream(input, ENC_DICT_SIZE, self.params); - let mut pos = 0usize; + /// The chunks come from a single continuous, **bounded-memory** match-finder + /// ([`Lzma2StreamEncoder`]): the first chunk resets the dictionary (`0xE0` + /// compressed / `0x01` uncompressed) and every later chunk continues it + /// (`0xC0` compressed / `0x02` uncompressed), so cross-chunk matches (up to + /// the 4 MiB dictionary) are found, while peak memory stays `O(dict_size)`. + /// The caller appends the `0x00` end marker afterwards. + fn stage_chunks(&mut self, chunks: Vec) { for chunk in chunks { - let data = &input[pos..pos + chunk.uncomp_len]; + let n = chunk.uncomp_len; + debug_assert!(self.staged_input.len() >= n); + let data: Vec = self.staged_input.drain(..n).collect(); match chunk.body { - Some(ref body) => self.stage_compressed_chunk(data, body, chunk.reset_dict), - None => self.stage_uncompressed_chunk(data, chunk.reset_dict), + Some(ref body) => self.stage_compressed_chunk(&data, body, chunk.reset_dict), + None => self.stage_uncompressed_chunk(&data, chunk.reset_dict), } - pos += chunk.uncomp_len; } } @@ -350,19 +366,32 @@ impl RawEncoder for Encoder { loop { match self.phase { EncPhase::Body => { - // Buffer the entire input so the chunk stream can be - // produced by a single continuous match-finder at `finish` - // — the dictionary then spans every chunk instead of - // resetting per chunk. No mid-stream flush. - if consumed < input.len() { - self.in_buf.extend_from_slice(&input[consumed..]); - consumed = input.len(); + // Feed input into the bounded-memory streaming LZMA2 encoder + // and frame any chunks it emits, draining them to the caller + // as we go. The whole input is never accumulated — the + // dictionary is a sliding `~dict_size` window inside the + // stream encoder. + if self.pending_idx < self.pending.len() { + self.phase = EncPhase::DrainPending; + } else if consumed < input.len() { + let take = (input.len() - consumed).min(ENC_CHUNK_MAX); + let slice = &input[consumed..consumed + take]; + self.staged_input.extend_from_slice(slice); + let chunks = self.stream().push(slice); + consumed += take; + if !chunks.is_empty() { + self.stage_chunks(chunks); + } + if self.pending_idx < self.pending.len() { + self.phase = EncPhase::DrainPending; + } + } else { + return Ok(RawProgress { + consumed, + written, + done: false, + }); } - return Ok(RawProgress { - consumed, - written, - done: false, - }); } EncPhase::DrainPending => { if self.drain_pending(output, &mut written) { @@ -401,7 +430,7 @@ impl RawEncoder for Encoder { loop { match self.phase { EncPhase::Finishing => { - if !self.pending.is_empty() { + if self.pending_idx < self.pending.len() { // Drain a chunk staged during `encode` first. if !self.drain_pending(output, &mut written) { return Ok(RawProgress { @@ -411,14 +440,17 @@ impl RawEncoder for Encoder { }); } } - if !self.in_buf.is_empty() { - let data = core::mem::take(&mut self.in_buf); - self.stage_payload(&data); - // Stay in `Finishing`; the loop drains the staged - // chunks then re-checks the (now empty) buffer. + if self.stream.is_some() && !self.stream_finished { + // Flush the remaining buffered bytes as the final + // chunk(s) from the bounded-memory stream encoder, then + // frame them. Memory stays `O(dict_size)`. + let chunks = self.stream().finish(); + self.stream_finished = true; + self.stage_chunks(chunks); + // Stay in `Finishing`; the loop drains the staged chunks + // then emits the end marker. } else { - // Buffer empty and any staged chunk drained: emit the - // single 0x00 end marker. + // All chunks drained: emit the single 0x00 end marker. self.pending.push(0x00); self.pending_idx = 0; self.phase = EncPhase::DrainEnd; @@ -459,7 +491,9 @@ impl RawEncoder for Encoder { self.phase = EncPhase::Body; self.pending.clear(); self.pending_idx = 0; - self.in_buf.clear(); + self.stream = None; + self.staged_input.clear(); + self.stream_finished = false; self.params = params; } } diff --git a/src/lzma2_internal/lzma2_encoder.rs b/src/lzma2_internal/lzma2_encoder.rs index 23f19e9..7dea913 100644 --- a/src/lzma2_internal/lzma2_encoder.rs +++ b/src/lzma2_internal/lzma2_encoder.rs @@ -800,19 +800,22 @@ impl LzmaEncCore { total } - fn emit_literal(&mut self, input: &[u8], pos: usize) { + /// Emit the literal at absolute position `pos`, reading bytes from the + /// sliding window `win` (whose first byte is absolute offset `base`). + fn emit_literal(&mut self, win: &[u8], base: usize, pos: usize) { let pos_state = self.pos_state(); let idx = self.state * POS_STATES_MAX + pos_state as usize; rc_encode_bit(&mut self.rc, &mut self.is_match[idx], 0); - let prev_byte = if pos > 0 { input[pos - 1] } else { 0 }; + let i = pos - base; + let prev_byte = if pos > 0 { win[i - 1] } else { 0 }; let match_byte = if self.state < LIT_STATES { None } else { let d = self.rep0 as usize + 1; - if d <= pos { Some(input[pos - d]) } else { None } + if d <= pos { Some(win[i - d]) } else { None } }; - self.encode_literal_full(input[pos], prev_byte, match_byte); + self.encode_literal_full(win[i], prev_byte, match_byte); self.state = state_after_literal(self.state); self.output_pos += 1; @@ -980,41 +983,71 @@ fn get_dist_slot(distance: u32) -> u32 { // ─── match finder ──────────────────────────────────────────────────────── +/// Sliding-window 3-byte hash-chain match finder. +/// +/// Positions stored in `head`/`prev` are **absolute** input offsets, but `prev` +/// is a ring buffer of `prev.len()` slots indexed `pos & prev_mask`. The ring +/// is sized strictly larger than `dict_size + MAX_MATCH_LEN`, so every chain +/// link the finder can legally follow (distance ≤ `dict_size`) is still intact: +/// a slot is only overwritten after `prev.len()` further insertions, by which +/// point that older position is already out of dictionary range and the walk +/// has broken on `dist > max_dist`. This keeps peak memory `O(dict_size)` +/// regardless of total input length while finding exactly the same matches a +/// whole-buffer chain would. +/// +/// All byte reads go through the sliding window `win`, whose first byte is the +/// absolute offset `base`; an absolute position `p` reads `win[p - base]`. struct HashChain { head: Box<[u32; HASH_SIZE]>, prev: Vec, + prev_mask: usize, } impl HashChain { - fn new(buf_len: usize) -> Self { + /// Build a finder whose `prev` ring covers at least `dict_size + + /// MAX_MATCH_LEN` positions (rounded up to a power of two). `cap_hint` + /// caps the ring when the total input is known to be smaller, so small + /// inputs (and the unit tests) don't over-allocate. + fn new(dict_size: u32, cap_hint: usize) -> Self { + let needed = (dict_size as usize) + .saturating_add(MAX_MATCH_LEN as usize) + .saturating_add(2); + let want = needed.min(cap_hint.max(1)); + let cap = want.max(1).next_power_of_two(); Self { head: Box::new([NIL; HASH_SIZE]), - prev: vec![NIL; buf_len], + prev: vec![NIL; cap], + prev_mask: cap - 1, } } - fn insert(&mut self, input: &[u8], pos: usize) { - if pos + 3 > input.len() { + /// Splice absolute position `pos` into the chain. No-op if fewer than three + /// bytes follow in the window. + fn insert(&mut self, win: &[u8], base: usize, pos: usize) { + let i = pos - base; + if i + 3 > win.len() { return; } - let h = hash3(input[pos], input[pos + 1], input[pos + 2]) as usize; - self.prev[pos] = self.head[h]; + let h = hash3(win[i], win[i + 1], win[i + 2]) as usize; + self.prev[pos & self.prev_mask] = self.head[h]; self.head[h] = pos as u32; } /// Find the single longest match (greedy use). Returns `(len, dist0based)`. fn find_longest( &self, - input: &[u8], + win: &[u8], + base: usize, pos: usize, dict_size: u32, params: EncoderParams, ) -> Option<(u32, u32)> { - if pos + 3 > input.len() { + let pi = pos - base; + if pi + 3 > win.len() { return None; } - let h = hash3(input[pos], input[pos + 1], input[pos + 2]) as usize; - let max_len = MAX_MATCH_LEN.min((input.len() - pos) as u32); + let h = hash3(win[pi], win[pi + 1], win[pi + 2]) as usize; + let max_len = MAX_MATCH_LEN.min((win.len() - pi) as u32); let max_dist = (dict_size as usize).min(pos); let mut best_len: u32 = 0; let mut best_dist: u32 = 0; @@ -1023,7 +1056,7 @@ impl HashChain { while cur != NIL && steps < params.max_chain { let cur_pos = cur as usize; if cur_pos >= pos { - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; continue; } @@ -1031,16 +1064,17 @@ impl HashChain { if dist > max_dist { break; } + let ci = cur_pos - base; if best_len > 2 - && (best_len as usize) < (input.len() - pos) - && input[cur_pos + best_len as usize] != input[pos + best_len as usize] + && (best_len as usize) < (win.len() - pi) + && win[ci + best_len as usize] != win[pi + best_len as usize] { - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; continue; } let mut len = 0u32; - while len < max_len && input[cur_pos + len as usize] == input[pos + len as usize] { + while len < max_len && win[ci + len as usize] == win[pi + len as usize] { len += 1; } if len >= MATCH_LEN_MIN && len > best_len { @@ -1050,7 +1084,7 @@ impl HashChain { break; } } - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; } if best_len >= MATCH_LEN_MIN { @@ -1066,18 +1100,20 @@ impl HashChain { /// strictly increasing length order. Returns the longest length found. fn find_matches( &self, - input: &[u8], + win: &[u8], + base: usize, pos: usize, dict_size: u32, params: EncoderParams, out: &mut Vec<(u32, u32)>, ) -> u32 { out.clear(); - if pos + 3 > input.len() { + let pi = pos - base; + if pi + 3 > win.len() { return 0; } - let h = hash3(input[pos], input[pos + 1], input[pos + 2]) as usize; - let max_len = MAX_MATCH_LEN.min((input.len() - pos) as u32); + let h = hash3(win[pi], win[pi + 1], win[pi + 2]) as usize; + let max_len = MAX_MATCH_LEN.min((win.len() - pi) as u32); let max_dist = (dict_size as usize).min(pos); let mut best_len: u32 = MATCH_LEN_MIN - 1; let mut cur = self.head[h]; @@ -1085,7 +1121,7 @@ impl HashChain { while cur != NIL && steps < params.max_chain { let cur_pos = cur as usize; if cur_pos >= pos { - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; continue; } @@ -1093,16 +1129,17 @@ impl HashChain { if dist > max_dist { break; } + let ci = cur_pos - base; if best_len >= MATCH_LEN_MIN - && (best_len as usize) < (input.len() - pos) - && input[cur_pos + best_len as usize] != input[pos + best_len as usize] + && (best_len as usize) < (win.len() - pi) + && win[ci + best_len as usize] != win[pi + best_len as usize] { - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; continue; } let mut len = 0u32; - while len < max_len && input[cur_pos + len as usize] == input[pos + len as usize] { + while len < max_len && win[ci + len as usize] == win[pi + len as usize] { len += 1; } if len >= MATCH_LEN_MIN && len > best_len { @@ -1115,7 +1152,7 @@ impl HashChain { break; } } - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; } if best_len >= MATCH_LEN_MIN { @@ -1128,14 +1165,17 @@ impl HashChain { // ─── rep-match helpers ─────────────────────────────────────────────────── -fn rep_match_len(input: &[u8], pos: usize, dist: u32) -> u32 { +/// Length of a repeat match at absolute `pos` against 0-based LZ distance +/// `dist`, reading from the sliding window `win` (first byte = absolute `base`). +fn rep_match_len(win: &[u8], base: usize, pos: usize, dist: u32) -> u32 { let d = dist as usize + 1; if d > pos { return 0; } - let max_len = MAX_MATCH_LEN.min((input.len() - pos) as u32) as usize; + let pi = pos - base; + let max_len = MAX_MATCH_LEN.min((win.len() - pi) as u32) as usize; let mut len = 0usize; - while len < max_len && input[pos - d + len] == input[pos + len] { + while len < max_len && win[pi - d + len] == win[pi + len] { len += 1; } len as u32 @@ -1202,13 +1242,15 @@ impl Optimizer { } } -/// Compute the price of a literal at `pos` given the encoder's live state. +/// Compute the price of a literal at absolute `pos` given the encoder's live +/// state, reading bytes from the sliding window `win` (first byte = `base`). #[allow(clippy::too_many_arguments)] fn literal_price_at( core: &LzmaEncCore, prices: &[u32; PRICE_TABLE_SIZE], snap: &PriceSnapshot, - input: &[u8], + win: &[u8], + base: usize, pos: usize, out_pos: u64, state: usize, @@ -1216,15 +1258,15 @@ fn literal_price_at( ) -> u32 { let pos_state = (out_pos as u32) & core.pos_mask; let im_idx = state * POS_STATES_MAX + pos_state as usize; - let prev_byte = if pos > 0 { input[pos - 1] } else { 0 }; + let i = pos - base; + let prev_byte = if pos > 0 { win[i - 1] } else { 0 }; let match_byte = if state < LIT_STATES { None } else { let d = rep0 as usize + 1; - if d <= pos { Some(input[pos - d]) } else { None } + if d <= pos { Some(win[i - d]) } else { None } }; - snap.is_match[im_idx][0] - + core.literal_price(prices, out_pos, input[pos], prev_byte, match_byte) + snap.is_match[im_idx][0] + core.literal_price(prices, out_pos, win[i], prev_byte, match_byte) } // ─── public chunk encoder ──────────────────────────────────────────────── @@ -1244,8 +1286,8 @@ fn literal_price_at( /// [`EncoderParams::from_level`]. /// /// Single-chunk helper retained for the LZMA2 unit tests (which exercise the -/// chunk codec directly); the production encoders use [`encode_lzma2_stream`], -/// which keeps one continuous match-finder across chunks. +/// chunk codec directly); the production encoders use [`Lzma2StreamEncoder`], +/// which keeps one continuous, bounded-memory match-finder across chunks. #[cfg_attr(not(test), allow(dead_code))] pub(crate) fn encode_lzma_chunk(input: &[u8], dict_size: u32, params: EncoderParams) -> Vec { if params.opt_window == 0 { @@ -1264,7 +1306,14 @@ pub(crate) fn encode_lzma_chunk(input: &[u8], dict_size: u32, params: EncoderPar } } -/// One framed LZMA2 chunk produced by [`encode_lzma2_stream`]. +/// Extra window history retained behind the current parse position, on top of +/// `dict_size`, so an insertion at `pos` never overwrites a still-reachable +/// older ring slot and a match at the very back of the dictionary can still be +/// fully read from the window. `MAX_MATCH_LEN` covers the longest readable +/// match; a small constant covers the `prev_byte`/`match_byte` look-back. +const WINDOW_SLOP: usize = MAX_MATCH_LEN as usize + 16; + +/// One framed LZMA2 chunk produced by [`Lzma2StreamEncoder`]. /// /// The caller (xz Block payload or raw LZMA2 stream framing) turns this into /// the on-wire chunk header + body. `uncomp_len` is the number of input bytes @@ -1292,106 +1341,152 @@ pub(crate) struct Lzma2Chunk { /// slice size — only the output framing is chunked. const STREAM_CHUNK_UNCOMP_MAX: usize = 65_536; -/// Encode the *entire* `input` as a continuous LZMA2 chunk stream, returning -/// the framed chunks in order. +/// Streaming continuous-dictionary LZMA2 chunk encoder with **bounded memory**. /// -/// Unlike calling [`encode_lzma_chunk`] once per slice, this keeps a single LZ -/// match-finder (and a continuous `output_pos`) across all chunks: the first -/// chunk resets the dictionary, every later chunk *continues* it, so a match -/// in a later chunk can reference data from any earlier chunk up to -/// `dict_size`. This is what closes the ratio gap with the `.lzma` (single -/// continuous stream) path. The range coder and probability model still reset -/// per chunk (state reset) — only the dictionary/history is continuous. +/// Keeps a single LZ match-finder and a continuous `output_pos` across all +/// chunks — the first chunk resets the dictionary, every later chunk continues +/// it, so a match in a later chunk references data from any earlier chunk up to +/// `dict_size`. Memory is bounded to `O(dict_size)` regardless of input length: /// -/// The caller is responsible for the `0x00` end marker and any container -/// framing (xz Block/Index, raw-stream terminator). -pub(crate) fn encode_lzma2_stream( - input: &[u8], +/// - The match finder's `prev` ring is sized `O(dict_size)` (see [`HashChain`]). +/// - Only a sliding window of roughly `dict_size + WINDOW_SLOP` history plus one +/// pending chunk of lookahead is retained in `win`; older bytes are dropped +/// once the parse position has moved `> dict_size + WINDOW_SLOP` past them. +/// +/// Feed input with [`push`](Self::push) (which returns any chunks that became +/// fully buffered) and finish with [`finish`](Self::finish). The caller frames +/// each [`Lzma2Chunk`] and is responsible for the `0x00` end marker and any +/// container framing. +pub(crate) struct Lzma2StreamEncoder { + core: LzmaEncCore, + hc: HashChain, dict_size: u32, params: EncoderParams, -) -> Vec { - let mut chunks = Vec::new(); - if input.is_empty() { - return chunks; + /// Retained window bytes; `win[0]` is absolute offset `win_base`. + win: Vec, + /// Absolute offset of `win[0]`. + win_base: usize, + /// Absolute offset of the next byte to encode (== bytes already framed). + pos: usize, + /// Absolute count of bytes appended via `push` (encodable extent). + appended: usize, + /// `true` until the first chunk is emitted. + first: bool, +} + +impl Lzma2StreamEncoder { + pub fn new(dict_size: u32, params: EncoderParams) -> Self { + // The ring need never exceed what a 32-bit input could address; the + // dict cap already keeps this `O(dict_size)`. + let cap_hint = (dict_size as usize) + .saturating_add(MAX_MATCH_LEN as usize) + .saturating_add(WINDOW_SLOP + STREAM_CHUNK_UNCOMP_MAX); + Self { + core: LzmaEncCore::new(), + hc: HashChain::new(dict_size, cap_hint), + dict_size, + params, + win: Vec::new(), + win_base: 0, + pos: 0, + appended: 0, + first: true, + } } - let mut core = LzmaEncCore::new(); - // One hash chain over the whole input: the LZ history is never cleared at - // a chunk boundary, so cross-chunk matches are found naturally. - let mut hc = HashChain::new(input.len()); - - let mut pos = 0usize; - let mut first = true; - while pos < input.len() { - let chunk_end = (pos + STREAM_CHUNK_UNCOMP_MAX).min(input.len()); - let uncomp_len = chunk_end - pos; - - // Range-code this slice. State/probabilities/range coder are reset for - // the chunk; `output_pos` and the hash chain continue. - let body = - encode_stream_chunk_body(&mut core, &mut hc, input, pos, chunk_end, dict_size, params); - - // Compressed only pays off when the body both shrinks the slice and - // fits the 16-bit (+1) compressed-size field; otherwise the - // uncompressed fallback is strictly smaller. - let use_compressed = !body.is_empty() && body.len() <= 65_536 && body.len() < uncomp_len; + /// Append `data` and emit every chunk that is now fully buffered. A chunk is + /// only encoded once a whole `STREAM_CHUNK_UNCOMP_MAX` slice (or the rest of + /// the stream, at `finish`) is available, so the optimal parser always sees + /// its full forward lookahead within the chunk. + pub fn push(&mut self, data: &[u8]) -> Vec { + self.win.extend_from_slice(data); + self.appended += data.len(); + let mut out = Vec::new(); + // Encode while a full chunk's worth of bytes is buffered ahead of `pos`. + while self.appended - self.pos >= STREAM_CHUNK_UNCOMP_MAX { + out.push(self.encode_one_chunk(self.pos + STREAM_CHUNK_UNCOMP_MAX)); + self.trim_window(); + } + out + } - if use_compressed { - chunks.push(Lzma2Chunk { - uncomp_len, - reset_dict: first, - body: Some(body), - }); - } else { - // Uncompressed fallback. The range-coded attempt mutated `core` - // (output_pos advanced, hash chain populated for this slice), which - // is exactly the state we want for continuing the dictionary into - // later chunks — the bytes are already in the LZ history and - // `output_pos` already advanced by `uncomp_len`. So nothing extra - // to do here; just frame an uncompressed chunk. - chunks.push(Lzma2Chunk { - uncomp_len, - reset_dict: first, - body: None, - }); + /// Flush any remaining buffered bytes as a final chunk (or chunks). + pub fn finish(&mut self) -> Vec { + let mut out = Vec::new(); + while self.pos < self.appended { + let end = (self.pos + STREAM_CHUNK_UNCOMP_MAX).min(self.appended); + out.push(self.encode_one_chunk(end)); + self.trim_window(); } + out + } - pos = chunk_end; - first = false; + /// Encode the chunk `[self.pos, chunk_end)` and advance `self.pos`. + fn encode_one_chunk(&mut self, chunk_end: usize) -> Lzma2Chunk { + let uncomp_len = chunk_end - self.pos; + let body = self.encode_chunk_body(chunk_end); + let use_compressed = !body.is_empty() && body.len() <= 65_536 && body.len() < uncomp_len; + let chunk = Lzma2Chunk { + uncomp_len, + reset_dict: self.first, + body: if use_compressed { Some(body) } else { None }, + }; + self.pos = chunk_end; + self.first = false; + chunk } - chunks -} + /// Range-code `[self.pos, chunk_end)` through the shared core/hash chain, + /// resetting LZMA state first (keeping `output_pos` and the LZ history). + fn encode_chunk_body(&mut self, chunk_end: usize) -> Vec { + self.core.reset_state_keep_pos(); + let base = self.win_base; + let start = self.pos; + if self.params.opt_window == 0 { + encode_greedy( + &mut self.core, + &mut self.hc, + &self.win, + base, + start, + chunk_end, + self.dict_size, + self.params, + ); + } else { + encode_optimal( + &mut self.core, + &mut self.hc, + &self.win, + base, + start, + chunk_end, + self.dict_size, + self.params, + ); + } + self.core.rc.flush(); + self.core.rc.out.clone() + } -/// Range-code `input[pos_start..pos_end]` through `core`/`hc`, performing a -/// per-chunk state reset first (keeping `output_pos` and the LZ history). -/// Returns the flushed range-coded body. -/// -/// Uses the optimal parse when the level enables it (`opt_window != 0`) and the -/// greedy parse otherwise — the same selection [`encode_lzma_chunk`] makes per -/// level. We do not additionally run both parses and keep the smaller body (as -/// the single-chunk path does): doing so would require snapshotting/replaying -/// the shared cross-chunk LZ history, and the optimal parse only loses to -/// greedy on pathological tiny inputs, which the uncompressed-chunk fallback -/// already covers for whole chunks. -#[allow(clippy::too_many_arguments)] -fn encode_stream_chunk_body( - core: &mut LzmaEncCore, - hc: &mut HashChain, - input: &[u8], - pos_start: usize, - pos_end: usize, - dict_size: u32, - params: EncoderParams, -) -> Vec { - core.reset_state_keep_pos(); - if params.opt_window == 0 { - encode_greedy(core, hc, input, pos_start, pos_end, dict_size, params); - } else { - encode_optimal(core, hc, input, pos_start, pos_end, dict_size, params); + /// Drop window history older than `dict_size + WINDOW_SLOP` before `pos`, so + /// peak `win` memory stays `O(dict_size)`. Never drops bytes a future match + /// could read (distance ≤ `dict_size`) or the parse look-back needs. + /// + /// The front-shift is only performed once the droppable prefix grows past a + /// whole `dict_size + WINDOW_SLOP` of waste, so the `drain` memmove cost is + /// amortised `O(1)` per input byte (rather than memmoving `~dict_size` bytes + /// every chunk, which would be quadratic over a large stream). + fn trim_window(&mut self) { + let keep_from = self + .pos + .saturating_sub(self.dict_size as usize + WINDOW_SLOP); + let droppable = keep_from.saturating_sub(self.win_base); + if droppable >= self.dict_size as usize + WINDOW_SLOP { + self.win.drain(..droppable); + self.win_base = keep_from; + } } - core.rc.flush(); - core.rc.out.clone() } /// Encode one chunk body (range-coded packets + 5-byte flush, no EOS marker) @@ -1405,12 +1500,30 @@ fn encode_chunk_body( optimal: bool, ) -> Vec { let mut core = LzmaEncCore::new(); - let mut hc = HashChain::new(input.len()); + let mut hc = HashChain::new(dict_size, input.len().max(1)); if optimal { - encode_optimal(&mut core, &mut hc, input, 0, input.len(), dict_size, params); + encode_optimal( + &mut core, + &mut hc, + input, + 0, + 0, + input.len(), + dict_size, + params, + ); } else { - encode_greedy(&mut core, &mut hc, input, 0, input.len(), dict_size, params); + encode_greedy( + &mut core, + &mut hc, + input, + 0, + 0, + input.len(), + dict_size, + params, + ); } // Flush the range coder. NO EOS marker — LZMA2 frames the uncompressed @@ -1427,29 +1540,32 @@ fn encode_chunk_body( /// emitted match/rep lengths are clamped so the parse stops exactly at /// `pos_end` — this lets a continuous encoder slice the output into chunks /// without ever crossing a chunk's uncompressed-size boundary. +#[allow(clippy::too_many_arguments)] fn encode_greedy( core: &mut LzmaEncCore, hc: &mut HashChain, - input: &[u8], + win: &[u8], + base: usize, pos_start: usize, pos_end: usize, dict_size: u32, params: EncoderParams, ) { + let win_end = base + win.len(); let mut pos = pos_start; while pos < pos_end { // Bytes left until this chunk's boundary; emitted lengths never exceed // it so the chunk ends exactly at `pos_end`. let cap = (pos_end - pos) as u32; let rep_lens = [ - rep_match_len(input, pos, core.rep0).min(cap), - rep_match_len(input, pos, core.rep1).min(cap), - rep_match_len(input, pos, core.rep2).min(cap), - rep_match_len(input, pos, core.rep3).min(cap), + rep_match_len(win, base, pos, core.rep0).min(cap), + rep_match_len(win, base, pos, core.rep1).min(cap), + rep_match_len(win, base, pos, core.rep2).min(cap), + rep_match_len(win, base, pos, core.rep3).min(cap), ]; let new_match = hc - .find_longest(input, pos, dict_size, params) + .find_longest(win, base, pos, dict_size, params) .map(|(l, d)| (l.min(cap), d)); let best_rep_len = rep_lens.iter().copied().max().unwrap_or(0); @@ -1466,14 +1582,14 @@ fn encode_greedy( let emit_rep_long = !emit_new && best_rep_len >= MATCH_LEN_MIN; let emit_short_rep = !emit_new && !emit_rep_long && rep_lens[0] >= 1; - hc.insert(input, pos); + hc.insert(win, base, pos); if emit_new { let (len, dist) = new_match.unwrap(); for j in 1..(len as usize) { let p = pos + j; - if p + 3 <= input.len() { - hc.insert(input, p); + if p + 3 <= win_end { + hc.insert(win, base, p); } } core.emit_match(dist, len); @@ -1481,8 +1597,8 @@ fn encode_greedy( } else if emit_rep_long { for j in 1..(best_rep_len as usize) { let p = pos + j; - if p + 3 <= input.len() { - hc.insert(input, p); + if p + 3 <= win_end { + hc.insert(win, base, p); } } core.emit_long_rep(best_rep_idx, best_rep_len); @@ -1491,7 +1607,7 @@ fn encode_greedy( core.emit_short_rep(); pos += 1; } else { - core.emit_literal(input, pos); + core.emit_literal(win, base, pos); pos += 1; } } @@ -1504,10 +1620,12 @@ fn encode_greedy( /// Encodes `input[pos_start..pos_end]`; matches still reference the whole LZ /// history before `pos`, but the parse never advances past `pos_end`, so the /// chunk ends exactly there. +#[allow(clippy::too_many_arguments)] fn encode_optimal( core: &mut LzmaEncCore, hc: &mut HashChain, - input: &[u8], + win: &[u8], + base: usize, pos_start: usize, pos_end: usize, dict_size: u32, @@ -1526,7 +1644,8 @@ fn encode_optimal( let parsed = parse_window( core, hc, - input, + win, + base, pos, pos_end, dict_size, @@ -1539,7 +1658,7 @@ fn encode_optimal( debug_assert!(parsed > 0); // Replay the chosen decisions through the real emit path. `pos` // advances by exactly `parsed` bytes. - replay(core, hc, input, pos, &opt.decisions); + replay(core, hc, win, base, pos, &opt.decisions); pos += parsed; } } @@ -1553,7 +1672,8 @@ fn encode_optimal( fn parse_window( core: &LzmaEncCore, hc: &HashChain, - input: &[u8], + win: &[u8], + base: usize, start: usize, pos_end: usize, dict_size: u32, @@ -1622,7 +1742,7 @@ fn parse_window( // ── literal transition ────────────────────────────────────────── { - let lp = literal_price_at(core, prices, snap, input, pos, out_pos, state, reps[0]); + let lp = literal_price_at(core, prices, snap, win, base, pos, out_pos, state, reps[0]); let np = node.price.saturating_add(lp); let to = cur + 1; if to <= limit && np < opt.opt[to].price { @@ -1644,7 +1764,7 @@ fn parse_window( // ── rep matches (rep0..rep3) ──────────────────────────────────── for rep_idx in 0..4u32 { - let rlen = rep_match_len(input, pos, reps[rep_idx as usize]); + let rlen = rep_match_len(win, base, pos, reps[rep_idx as usize]); if rlen < 1 { continue; } @@ -1711,7 +1831,7 @@ fn parse_window( // ── new matches ───────────────────────────────────────────────── let longest = { let opt_matches = &mut opt.matches; - hc.find_matches(input, pos, dict_size, params, opt_matches) + hc.find_matches(win, base, pos, dict_size, params, opt_matches) }; if longest >= MATCH_LEN_MIN { if longest > best_here { @@ -1804,28 +1924,30 @@ fn trace_back(opt: &mut Optimizer, end: usize) { fn replay( core: &mut LzmaEncCore, hc: &mut HashChain, - input: &[u8], + win: &[u8], + base: usize, start: usize, decisions: &[Decision], ) { + let win_end = base + win.len(); let mut pos = start; for &d in decisions { match d { Decision::Literal => { - hc.insert(input, pos); - core.emit_literal(input, pos); + hc.insert(win, base, pos); + core.emit_literal(win, base, pos); pos += 1; } Decision::ShortRep => { - hc.insert(input, pos); + hc.insert(win, base, pos); core.emit_short_rep(); pos += 1; } Decision::Match(dist, len) => { for j in 0..(len as usize) { let p = pos + j; - if p + 3 <= input.len() { - hc.insert(input, p); + if p + 3 <= win_end { + hc.insert(win, base, p); } } core.emit_match(dist, len); @@ -1834,8 +1956,8 @@ fn replay( Decision::Rep(idx, len) => { for j in 0..(len as usize) { let p = pos + j; - if p + 3 <= input.len() { - hc.insert(input, p); + if p + 3 <= win_end { + hc.insert(win, base, p); } } core.emit_long_rep(idx, len); diff --git a/src/xz/mod.rs b/src/xz/mod.rs index 0bc2574..560280e 100644 --- a/src/xz/mod.rs +++ b/src/xz/mod.rs @@ -58,7 +58,9 @@ use crate::traits::{Algorithm, RawDecoder, RawEncoder, RawProgress}; // decoder exposed under the `lzma2` feature without pulling in this xz // container. See `lib.rs`. use crate::lzma2_internal::lzma2_decoder::{Lzma2Props, LzmaCore, lzma2_dict_size}; -use crate::lzma2_internal::lzma2_encoder::{EncoderParams, LZMA2_PROPS_BYTE, encode_lzma2_stream}; +use crate::lzma2_internal::lzma2_encoder::{ + EncoderParams, LZMA2_PROPS_BYTE, Lzma2Chunk, Lzma2StreamEncoder, +}; // ─── constants ───────────────────────────────────────────────────────────── @@ -394,9 +396,19 @@ pub struct Encoder { // the caller's `output`. We push from `pending[pending_idx..]`. pending: Vec, pending_idx: usize, - // Input buffer for the next LZMA2 chunk; flushed at LZMA2_CHUNK_MAX or - // on finish(). - in_buf: Vec, + // Bounded-memory continuous-dictionary LZMA2 chunk encoder. Fed input + // incrementally; emits framed chunks as they become fully buffered, so the + // whole input is never accumulated. `None` until the Body phase starts. + stream: Option, + // Raw input bytes pushed into `stream` but not yet consumed by an emitted + // chunk. Bounded by one chunk's worth (`stream` emits as soon as a full + // chunk is buffered), so this stays `O(LZMA2_CHUNK_MAX)`. Used to frame + // uncompressed-fallback chunks (raw bytes copied verbatim) and to feed the + // Block Check CRC32 in encode order. + staged_input: Vec, + // Set once `stream.finish()` has been called, so a multi-call `raw_finish` + // (draining across several small output buffers) doesn't re-finish. + stream_finished: bool, // CRC32 of all uncompressed input bytes — becomes the Block Check. check: Crc32, // Bookkeeping for the Index Record. @@ -427,7 +439,9 @@ impl Encoder { phase: EncPhase::StreamHeader, pending, pending_idx: 0, - in_buf: Vec::new(), + stream: None, + staged_input: Vec::new(), + stream_finished: false, check: Crc32::new(), uncompressed_total: 0, compressed_payload_bytes: 0, @@ -453,30 +467,36 @@ impl Encoder { } } - /// Stage the whole block's LZMA2 payload from the fully-buffered input. - /// - /// The LZMA2 chunks are produced by a single continuous match-finder - /// ([`encode_lzma2_stream`]): the first chunk resets the dictionary - /// (`0xE0` compressed / `0x01` uncompressed) and every later chunk - /// *continues* it (`0xC0` compressed / `0x02` uncompressed), so a match in - /// a later chunk can reference data from any earlier chunk (up to the - /// dictionary size). The range coder still resets per chunk, but the - /// dictionary/history is continuous — this is what brings the ratio in - /// line with the single-stream `.lzma` path. + /// Frame each produced LZMA2 chunk into `pending`, consuming the matching + /// raw bytes from `staged_input` (for CRC + the uncompressed fallback). /// - /// `input` is the entire block (`uncompressed_total` bytes). Each framed - /// chunk is appended to `pending`; the caller drains it before the block - /// trailer. - fn stage_payload(&mut self, input: &[u8]) { - let chunks = encode_lzma2_stream(input, LZMA2_DICT_SIZE, self.enc_params); - let mut pos = 0usize; + /// The chunks come from a single continuous, **bounded-memory** match-finder + /// ([`Lzma2StreamEncoder`]): the first chunk resets the dictionary (`0xE0` + /// compressed / `0x01` uncompressed) and every later chunk *continues* it + /// (`0xC0` compressed / `0x02` uncompressed), so a match in a later chunk + /// can reference data from any earlier chunk up to the dictionary size. The + /// range coder still resets per chunk, but the dictionary/history is + /// continuous — this is what brings the ratio in line with the single-stream + /// `.lzma` path, while peak memory stays `O(dict_size)`. + /// Lazily create the bounded-memory LZMA2 stream encoder on first use. + fn stream(&mut self) -> &mut Lzma2StreamEncoder { + self.stream + .get_or_insert_with(|| Lzma2StreamEncoder::new(LZMA2_DICT_SIZE, self.enc_params)) + } + + fn stage_chunks(&mut self, chunks: Vec) { for chunk in chunks { - let data = &input[pos..pos + chunk.uncomp_len]; + let n = chunk.uncomp_len; + debug_assert!(self.staged_input.len() >= n); + // The bytes belonging to this chunk are the front `n` of the staged + // buffer (chunks are produced in encode order). + let data: Vec = self.staged_input.drain(..n).collect(); + self.check.update(&data); + self.uncompressed_total += n as u64; match chunk.body { - Some(ref body) => self.stage_compressed_chunk(data, body, chunk.reset_dict), - None => self.stage_uncompressed_chunk(data, chunk.reset_dict), + Some(ref body) => self.stage_compressed_chunk(&data, body, chunk.reset_dict), + None => self.stage_uncompressed_chunk(&data, chunk.reset_dict), } - pos += chunk.uncomp_len; } } @@ -602,20 +622,35 @@ impl RawEncoder for Encoder { } } EncPhase::Body => { - // Buffer the entire block's input so the LZMA2 payload can - // be produced by a single continuous match-finder at - // `finish` (the dictionary spans the whole input rather - // than resetting per chunk). We never flush mid-stream; - // chunking happens only at the output-framing level. - if consumed < input.len() { - self.in_buf.extend_from_slice(&input[consumed..]); - consumed = input.len(); + // Feed input into the bounded-memory streaming LZMA2 encoder + // and frame any chunks it emits, draining them to the caller + // as we go. The whole input is never accumulated — the + // dictionary is a sliding `~dict_size` window inside the + // stream encoder. If a previous pass staged chunk bytes, + // drain those first. + if self.pending_idx < self.pending.len() { + self.phase = EncPhase::DrainPending; + } else if consumed < input.len() { + // Push a bounded slice so `staged_input` stays small. + let take = (input.len() - consumed).min(LZMA2_CHUNK_MAX); + let slice = &input[consumed..consumed + take]; + self.staged_input.extend_from_slice(slice); + let chunks = self.stream().push(slice); + consumed += take; + if !chunks.is_empty() { + self.stage_chunks(chunks); + } + // Drain whatever was staged before consuming more input. + if self.pending_idx < self.pending.len() { + self.phase = EncPhase::DrainPending; + } + } else { + return Ok(RawProgress { + consumed, + written, + done: false, + }); } - return Ok(RawProgress { - consumed, - written, - done: false, - }); } EncPhase::DrainPending => { if self.drain_pending(output, &mut written) { @@ -684,19 +719,27 @@ impl RawEncoder for Encoder { } } EncPhase::Body => { - if !self.in_buf.is_empty() { - // Produce the whole block's LZMA2 payload at once from - // the fully-buffered input, using a single continuous - // match-finder so the dictionary spans every chunk. - let data = core::mem::take(&mut self.in_buf); - self.check.update(&data); - self.uncompressed_total += data.len() as u64; - self.stage_payload(&data); + if self.pending_idx < self.pending.len() { + // Drain bytes staged in a prior pass first. self.phase = EncPhase::DrainPending; } else { - // No pending input. Move to block trailer. - self.stage_block_trailer(); - self.phase = EncPhase::BlockTrailer; + // Flush the remaining buffered bytes as the final + // chunk(s) from the bounded-memory stream encoder (once), + // then frame them. Memory stays `O(dict_size)`. + if self.stream.is_some() && !self.stream_finished { + let chunks = self.stream().finish(); + self.stream_finished = true; + self.stage_chunks(chunks); + } + if self.pending_idx < self.pending.len() { + self.phase = EncPhase::DrainPending; + } else { + // No more chunk bytes. Move to the block trailer in + // this same iteration so the loop makes progress + // even when `finish` produced nothing. + self.stage_block_trailer(); + self.phase = EncPhase::BlockTrailer; + } } } EncPhase::DrainPending => { @@ -787,7 +830,9 @@ impl RawEncoder for Encoder { phase: EncPhase::StreamHeader, pending, pending_idx: 0, - in_buf: Vec::new(), + stream: None, + staged_input: Vec::new(), + stream_finished: false, check: Crc32::new(), uncompressed_total: 0, compressed_payload_bytes: 0, diff --git a/tests/lzma.rs b/tests/lzma.rs index 87137bd..89750b2 100644 --- a/tests/lzma.rs +++ b/tests/lzma.rs @@ -235,27 +235,28 @@ fn encode_at_level(payload: &[u8], level: u8) -> Vec { } fn encode_with(enc: &mut Encoder, payload: &[u8]) -> Vec { - // The encoder buffers all input internally and emits nothing until - // `finish`, so a small scratch buffer is fine for the `encode` calls. + // The `.lzma` encoder is now a bounded-memory streaming encoder: it emits + // range-coded output during `encode` as the sliding-window parse produces + // it (header up front, then body), so we must drain `output` while feeding + // input. A small scratch buffer exercises the `OutputFull` path. + let mut out = Vec::new(); let mut scratch = [0u8; 64]; let mut consumed = 0; while consumed < payload.len() { let (p, status) = enc.encode(&payload[consumed..], &mut scratch).unwrap(); + out.extend_from_slice(&scratch[..p.written]); consumed += p.consumed; - // Output should always be empty from encode() for LZMA. - assert_eq!(p.written, 0); match status { Status::InputEmpty | Status::StreamEnd => break, Status::OutputFull => { - // Shouldn't happen — encode() doesn't write anything. - if p.consumed == 0 { + // Drain and retry; making no progress at all is a real stall. + if p.consumed == 0 && p.written == 0 { panic!("encoder stalled mid-input"); } } } } - let mut out = Vec::new(); let mut buf = vec![0u8; 4096]; loop { let (p, status) = enc.finish(&mut buf).unwrap(); @@ -350,14 +351,23 @@ fn encode_streaming_one_byte_chunks_round_trip() { let mut enc = Encoder::new(); let mut scratch = [0u8; 4]; + let mut compressed = Vec::new(); + // Streaming `.lzma` may emit header/body bytes during `encode`; drain them + // and re-offer a byte until it is consumed. Feed exactly one input byte at + // a time. for byte in payload { - let (p, _status) = enc - .encode(core::slice::from_ref(byte), &mut scratch) - .unwrap(); - assert_eq!(p.consumed, 1); - assert_eq!(p.written, 0); + let mut consumed = 0; + while consumed == 0 { + let (p, _status) = enc + .encode(core::slice::from_ref(byte), &mut scratch) + .unwrap(); + compressed.extend_from_slice(&scratch[..p.written]); + consumed += p.consumed; + if p.consumed == 0 && p.written == 0 { + panic!("encoder stalled feeding a single byte"); + } + } } - let mut compressed = Vec::new(); let mut buf = [0u8; 1]; loop { let (p, status) = enc.finish(&mut buf).unwrap(); From 73957f8cab540ac0aada66e09f9a203359c35a06 Mon Sep 17 00:00:00 2001 From: Mark Karpeles Date: Mon, 15 Jun 2026 23:18:43 +0900 Subject: [PATCH 2/3] lzma/lzma2: early-commit long matches in the optimal parser MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The optimal parser set a `commit_end` once a match >= nice_len was found but kept filling the DP for every position the match spans, doing O(nice..273) price work per covered byte. On highly repetitive input (e.g. 600 MB of one byte, or a short repeated phrase) this made the parse effectively quadratic — a 20 MB all-`a` input took ~3 minutes. The long match from the current node already records the cheapest arrival at the commit boundary (a single match decision the traceback will pick), so break out of the window loop immediately instead of grinding through the spanned positions. This mirrors the SDK's greedy `nice_len` acceptance in GetOptimum. Effect: - 20 MB all-`a`: ~174 s -> ~0.9 s (both encoders). - Ratio essentially unchanged on the 2.9 MB corpus at -l 9: xz 532316 -> 532468 (+0.03%), lzma 521957 -> 522057 (+0.03%) — still far from the per-chunk-reset regression and well within 1% of the pre-change baselines (xz 532320, lzma 521918). - 600 MB all-`a` now compresses in seconds under the 244 MB memory cap; reference cross-decode (`xz -d`, `xz --format=lzma -d`) byte-exact. Co-Authored-By: Claude Fable 5 --- src/lzma/encoder.rs | 7 +++++++ src/lzma2_internal/lzma2_encoder.rs | 9 +++++++++ 2 files changed, 16 insertions(+) diff --git a/src/lzma/encoder.rs b/src/lzma/encoder.rs index f31b7a4..933a983 100644 --- a/src/lzma/encoder.rs +++ b/src/lzma/encoder.rs @@ -1908,6 +1908,13 @@ fn parse_window( // `nice_len` cut-off in GetOptimum. if commit_end.is_none() && best_here >= params.nice_len { commit_end = Some((cur + best_here as usize).min(limit)); + // The long match from this node already records the cheapest arrival + // at the commit boundary; stop extending the DP rather than grinding + // through every position the match spans (otherwise the band fill is + // O(nice..273) per covered byte and the parse goes quadratic on + // highly-repetitive input). Matches the SDK's greedy `nice_len` + // acceptance and leaves ratio essentially unchanged. + break; } cur += 1; diff --git a/src/lzma2_internal/lzma2_encoder.rs b/src/lzma2_internal/lzma2_encoder.rs index 7dea913..4e365d3 100644 --- a/src/lzma2_internal/lzma2_encoder.rs +++ b/src/lzma2_internal/lzma2_encoder.rs @@ -1879,6 +1879,15 @@ fn parse_window( if commit_end.is_none() && best_here >= params.nice_len { let bounded = (cur + best_here as usize).min(limit); commit_end = Some(bounded); + // The long match from this node already writes the cheapest known + // arrival at `bounded` (a single match decision). Stop extending the + // DP now instead of grinding through every position the match spans + // — on long-match runs (highly repetitive input) that band would + // otherwise cost O(nice..273) work per covered byte, turning the + // parse quadratic. Committing the match here matches the SDK's + // greedy `nice_len` acceptance and leaves ratio essentially + // unchanged. + break; } cur += 1; From cc42fd9a8f98c371166fbd69108830f9cf6b0e18 Mon Sep 17 00:00:00 2001 From: Mark Karpeles Date: Mon, 15 Jun 2026 23:30:32 +0900 Subject: [PATCH 3/3] docs: changelog for bounded-memory LZMA encoders; demote private doc link --- CHANGELOG.md | 13 +++++++++---- src/lzma/encoder.rs | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e6f10a7..9615954 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,10 +84,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 linear-time SA-IS suffix-array construction. - **mtf** encode ~2.3× faster (single-pass scan-and-shift); **rangecoder** encode/decode ~+15% (tightened hot loops). - - Note: the xz/lzma2 and zstd encoders now buffer the whole input to drive a - single continuous match-finder (same memory profile the `.lzma` path already - had); higher levels trade encode time for ratio, decode speed is unchanged. +- **Bounded-memory LZMA encoders.** The `xz`, raw `lzma2`, and `.lzma` + encoders now stream with a sliding window whose match finder, history, and + hash chains are all `O(dict_size)` (default 4 MiB) instead of buffering the + whole input — peak memory is now independent of input length (≈45 MB RSS + encoding a 600 MB file; previously O(input) and OOM-prone). The continuous + dictionary, and therefore the ratio, is unchanged (matches already could not + reach past `dict_size`): `xz`/`.lzma` on the 2.9 MB corpus stay within 0.03%. + Output continues to decode byte-for-byte with system `xz`. (`zstd` already + streamed within its bounded window.) ## [0.6.3](https://github.com/KarpelesLab/compcol/compare/v0.6.2...v0.6.3) - 2026-06-15 diff --git a/src/lzma/encoder.rs b/src/lzma/encoder.rs index 933a983..e347627 100644 --- a/src/lzma/encoder.rs +++ b/src/lzma/encoder.rs @@ -1990,7 +1990,7 @@ fn replay( /// Streaming `.lzma` (alone) encoder with **bounded memory**. /// -/// Drives a [`LzmaStreamEncoder`] whose match finder, sliding window, and LZ +/// Drives an `LzmaStreamEncoder` whose match finder, sliding window, and LZ /// history are all `O(dict_size)` — so peak memory is independent of the input /// length. `encode` feeds input incrementally and emits range-coded output as /// it is produced (the header up front, then body bytes); `finish` emits the