diff --git a/CHANGELOG.md b/CHANGELOG.md index e6f10a7..9615954 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,10 +84,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 linear-time SA-IS suffix-array construction. - **mtf** encode ~2.3× faster (single-pass scan-and-shift); **rangecoder** encode/decode ~+15% (tightened hot loops). - - Note: the xz/lzma2 and zstd encoders now buffer the whole input to drive a - single continuous match-finder (same memory profile the `.lzma` path already - had); higher levels trade encode time for ratio, decode speed is unchanged. +- **Bounded-memory LZMA encoders.** The `xz`, raw `lzma2`, and `.lzma` + encoders now stream with a sliding window whose match finder, history, and + hash chains are all `O(dict_size)` (default 4 MiB) instead of buffering the + whole input — peak memory is now independent of input length (≈45 MB RSS + encoding a 600 MB file; previously O(input) and OOM-prone). The continuous + dictionary, and therefore the ratio, is unchanged (matches already could not + reach past `dict_size`): `xz`/`.lzma` on the 2.9 MB corpus stay within 0.03%. + Output continues to decode byte-for-byte with system `xz`. (`zstd` already + streamed within its bounded window.) ## [0.6.3](https://github.com/KarpelesLab/compcol/compare/v0.6.2...v0.6.3) - 2026-06-15 diff --git a/src/lzma/encoder.rs b/src/lzma/encoder.rs index 32345ee..e347627 100644 --- a/src/lzma/encoder.rs +++ b/src/lzma/encoder.rs @@ -189,26 +189,6 @@ impl LevelParams { }, } } - - /// Compute the dict size actually written into the header for an input of - /// `input_len` bytes. We never claim more than what could possibly be - /// referenced, so a 1 KiB input doesn't force the decoder to allocate a - /// 64 MiB window. The advertised size is also clamped to the decoder's - /// minimum of 4 KiB. - fn effective_dict_size(&self, input_len: usize) -> u32 { - // Round `input_len` up to a power of two (clamped at u32::MAX). Empty - // input gets the minimum dict; `next_power_of_two` would panic on 0. - let needed = if input_len == 0 { - MIN_DICT_SIZE - } else { - let np2 = (input_len as u64) - .checked_next_power_of_two() - .unwrap_or(u32::MAX as u64); - np2.min(u32::MAX as u64) as u32 - }; - let needed = needed.max(MIN_DICT_SIZE); - needed.min(self.dict_size) - } } fn hash3(b0: u8, b1: u8, b2: u8) -> u32 { @@ -841,28 +821,30 @@ impl LzmaEncCore { } } - /// Emit a literal packet for the byte at index `pos` in the input. - fn emit_literal(&mut self, input: &[u8], pos: usize) { + /// Emit a literal packet for the byte at absolute position `pos`, reading + /// from the sliding window `win` (whose first byte is absolute `base`). + fn emit_literal(&mut self, win: &[u8], base: usize, pos: usize) { let pos_state = self.pos_state(); let idx = self.state * POS_STATES_MAX + pos_state as usize; rc_encode_bit(&mut self.rc, &mut self.is_match[idx], 0); - let prev_byte = if pos > 0 { input[pos - 1] } else { 0 }; + let i = pos - base; + let prev_byte = if pos > 0 { win[i - 1] } else { 0 }; let match_byte = if self.state < LIT_STATES { None } else { - // The byte at distance rep0; we always have it available in the - // input buffer because the encoder buffered everything. + // The byte at distance rep0; always retained in the sliding window + // (distance ≤ dict_size ≤ retained history). let d = self.rep0 as usize + 1; if d <= pos { - Some(input[pos - d]) + Some(win[i - d]) } else { // Shouldn't happen at a literal-after-match state, but be // safe — fall back to plain literal coding. None } }; - self.encode_literal_full(input[pos], prev_byte, match_byte); + self.encode_literal_full(win[i], prev_byte, match_byte); self.state = state_after_literal(self.state); self.output_pos += 1; @@ -1014,46 +996,67 @@ fn get_dist_slot(distance: u32) -> u32 { // all input before encoding, there's no sliding window to maintain; the head // array and prev links cover the whole buffer in one shot. +/// Sliding-window 3-byte hash-chain match finder. +/// +/// Positions in `head`/`prev` are **absolute** input offsets; `prev` is a ring +/// of `prev.len()` slots indexed `pos & prev_mask`, sized larger than +/// `dict_size + MAX_MATCH_LEN` so every legally-reachable link (distance ≤ +/// `dict_size`) is still intact. Byte reads go through the sliding window `win` +/// whose first byte is absolute `base` (absolute `p` reads `win[p - base]`). +/// This keeps peak memory `O(dict_size)` regardless of input length. struct HashChain { head: Box<[u32; HASH_SIZE]>, prev: Vec, + prev_mask: usize, } impl HashChain { - fn new(buf_len: usize) -> Self { + /// Build a finder whose `prev` ring covers at least `dict_size + + /// MAX_MATCH_LEN` positions (rounded up to a power of two), capped by + /// `cap_hint` when the total input is known smaller. + fn new(dict_size: u32, cap_hint: usize) -> Self { + let needed = (dict_size as usize) + .saturating_add(MAX_MATCH_LEN as usize) + .saturating_add(2); + let want = needed.min(cap_hint.max(1)); + let cap = want.max(1).next_power_of_two(); Self { head: Box::new([NIL; HASH_SIZE]), - prev: vec![NIL; buf_len], + prev: vec![NIL; cap], + prev_mask: cap - 1, } } - /// Splice position `pos` into the hash chain. No-op if there aren't - /// three bytes available. - fn insert(&mut self, input: &[u8], pos: usize) { - if pos + 3 > input.len() { + /// Splice absolute position `pos` into the hash chain. No-op if fewer than + /// three bytes follow in the window. + fn insert(&mut self, win: &[u8], base: usize, pos: usize) { + let i = pos - base; + if i + 3 > win.len() { return; } - let h = hash3(input[pos], input[pos + 1], input[pos + 2]) as usize; - self.prev[pos] = self.head[h]; + let h = hash3(win[i], win[i + 1], win[i + 2]) as usize; + self.prev[pos & self.prev_mask] = self.head[h]; self.head[h] = pos as u32; } - /// Find the longest match for `input[pos..]` against earlier positions, - /// bounded by `MAX_MATCH_LEN`, the per-level chain budget, and the - /// per-level "nice match" early-exit. + /// Find the longest match for the window position at absolute `pos` against + /// earlier positions, bounded by `MAX_MATCH_LEN`, the per-level chain + /// budget, and the per-level "nice match" early-exit. fn find_longest( &self, - input: &[u8], + win: &[u8], + base: usize, pos: usize, dict_size: u32, max_chain: usize, nice_match: u32, ) -> Option<(u32, u32)> { - if pos + 3 > input.len() { + let pi = pos - base; + if pi + 3 > win.len() { return None; } - let h = hash3(input[pos], input[pos + 1], input[pos + 2]) as usize; - let max_len = MAX_MATCH_LEN.min((input.len() - pos) as u32); + let h = hash3(win[pi], win[pi + 1], win[pi + 2]) as usize; + let max_len = MAX_MATCH_LEN.min((win.len() - pi) as u32); let max_dist = (dict_size as usize).min(pos); let mut best_len: u32 = 0; let mut best_dist: u32 = 0; @@ -1062,7 +1065,7 @@ impl HashChain { while cur != NIL && steps < max_chain { let cur_pos = cur as usize; if cur_pos >= pos { - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; continue; } @@ -1070,17 +1073,18 @@ impl HashChain { if dist > max_dist { break; } + let ci = cur_pos - base; // Cheap rejection by the (best_len)-th byte. if best_len > 2 - && (best_len as usize) < (input.len() - pos) - && input[cur_pos + best_len as usize] != input[pos + best_len as usize] + && (best_len as usize) < (win.len() - pi) + && win[ci + best_len as usize] != win[pi + best_len as usize] { - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; continue; } let mut len = 0u32; - while len < max_len && input[cur_pos + len as usize] == input[pos + len as usize] { + while len < max_len && win[ci + len as usize] == win[pi + len as usize] { len += 1; } if len >= MATCH_LEN_MIN && len > best_len { @@ -1091,7 +1095,7 @@ impl HashChain { break; } } - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; } if best_len >= MATCH_LEN_MIN { @@ -1105,9 +1109,11 @@ impl HashChain { /// achievable length `>= MATCH_LEN_MIN`, the *shortest* distance that /// achieves it. `out` is filled with `(len, dist0based)` pairs in /// strictly increasing length order. Returns the longest length found. + #[allow(clippy::too_many_arguments)] fn find_matches( &self, - input: &[u8], + win: &[u8], + base: usize, pos: usize, dict_size: u32, max_chain: usize, @@ -1115,11 +1121,12 @@ impl HashChain { out: &mut Vec<(u32, u32)>, ) -> u32 { out.clear(); - if pos + 3 > input.len() { + let pi = pos - base; + if pi + 3 > win.len() { return 0; } - let h = hash3(input[pos], input[pos + 1], input[pos + 2]) as usize; - let max_len = MAX_MATCH_LEN.min((input.len() - pos) as u32); + let h = hash3(win[pi], win[pi + 1], win[pi + 2]) as usize; + let max_len = MAX_MATCH_LEN.min((win.len() - pi) as u32); let max_dist = (dict_size as usize).min(pos); let mut best_len: u32 = MATCH_LEN_MIN - 1; let mut cur = self.head[h]; @@ -1127,7 +1134,7 @@ impl HashChain { while cur != NIL && steps < max_chain { let cur_pos = cur as usize; if cur_pos >= pos { - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; continue; } @@ -1135,16 +1142,17 @@ impl HashChain { if dist > max_dist { break; } + let ci = cur_pos - base; if best_len >= MATCH_LEN_MIN - && (best_len as usize) < (input.len() - pos) - && input[cur_pos + best_len as usize] != input[pos + best_len as usize] + && (best_len as usize) < (win.len() - pi) + && win[ci + best_len as usize] != win[pi + best_len as usize] { - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; continue; } let mut len = 0u32; - while len < max_len && input[cur_pos + len as usize] == input[pos + len as usize] { + while len < max_len && win[ci + len as usize] == win[pi + len as usize] { len += 1; } if len >= MATCH_LEN_MIN && len > best_len { @@ -1156,7 +1164,7 @@ impl HashChain { break; } } - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; } if best_len >= MATCH_LEN_MIN { @@ -1169,18 +1177,18 @@ impl HashChain { // ─── rep-match helpers ─────────────────────────────────────────────────── -/// Try to extend a match starting at `pos` using a 0-based LZ distance -/// `dist`. Returns the match length (0 if not even 1 byte matches), capped -/// at `MAX_MATCH_LEN`. The byte at index `pos - dist - 1` is the first -/// candidate. -fn rep_match_len(input: &[u8], pos: usize, dist: u32) -> u32 { +/// Length of a repeat match at absolute `pos` against 0-based LZ distance +/// `dist`, reading from the sliding window `win` (first byte = absolute `base`). +/// Capped at `MAX_MATCH_LEN`. +fn rep_match_len(win: &[u8], base: usize, pos: usize, dist: u32) -> u32 { let d = dist as usize + 1; if d > pos { return 0; } - let max_len = MAX_MATCH_LEN.min((input.len() - pos) as u32) as usize; + let pi = pos - base; + let max_len = MAX_MATCH_LEN.min((win.len() - pi) as u32) as usize; let mut len = 0usize; - while len < max_len && input[pos - d + len] == input[pos + len] { + while len < max_len && win[pi - d + len] == win[pi + len] { len += 1; } len as u32 @@ -1280,7 +1288,8 @@ fn literal_price_at( core: &LzmaEncCore, prices: &[u32; PRICE_TABLE_SIZE], snap: &PriceSnapshot, - input: &[u8], + win: &[u8], + base: usize, pos: usize, out_pos: u64, state: usize, @@ -1288,85 +1297,322 @@ fn literal_price_at( ) -> u32 { let pos_state = (out_pos as u32) & core.pos_mask; let im_idx = state * POS_STATES_MAX + pos_state as usize; - let prev_byte = if pos > 0 { input[pos - 1] } else { 0 }; + let i = pos - base; + let prev_byte = if pos > 0 { win[i - 1] } else { 0 }; let match_byte = if state < LIT_STATES { None } else { let d = rep0 as usize + 1; - if d <= pos { Some(input[pos - d]) } else { None } + if d <= pos { Some(win[i - d]) } else { None } }; - snap.is_match[im_idx][0] - + core.literal_price(prices, out_pos, input[pos], prev_byte, match_byte) + snap.is_match[im_idx][0] + core.literal_price(prices, out_pos, win[i], prev_byte, match_byte) } // ─── full encode pass ──────────────────────────────────────────────────── -fn encode_all(input: &[u8], params: LevelParams) -> Vec { - let dict_size = params.effective_dict_size(input.len()); - - // Threshold below which we also run a greedy pass and keep the smaller - // body. The optimal parser's cold-start price model can briefly lose to - // greedy on small, highly-repetitive inputs; the absolute loss is bounded - // by the first few price-refresh segments, so on larger inputs the optimal - // parse always wins overall and the extra greedy pass is pure waste. We - // therefore only run the guard pass on small inputs. - const GUARD_LIMIT: usize = 64 * 1024; - - let body = if params.opt_window == 0 { - encode_body(input, dict_size, params, false) - } else if input.len() <= GUARD_LIMIT { - let opt = encode_body(input, dict_size, params, true); - let greedy = encode_body(input, dict_size, params, false); - if greedy.len() < opt.len() { - greedy +/// Extra window history retained behind the current parse position, on top of +/// `dict_size`, so an insertion never overwrites a still-reachable older ring +/// slot and the longest match / literal look-back can always be read. +const WINDOW_SLOP: usize = MAX_MATCH_LEN as usize + 16; + +/// How many bytes of forward lookahead must be buffered past the parse position +/// before a streaming `push` will encode them, so the optimal parser always has +/// its full look-ahead window plus a longest-match's worth of context. On +/// `finish` the remaining tail is encoded without this margin. +fn lookahead_margin(params: LevelParams) -> usize { + params.opt_window as usize + MAX_MATCH_LEN as usize + 1 +} + +/// Streaming `.lzma` body encoder with **bounded memory**. +/// +/// The `.lzma` range coder is a single continuous stream (no per-chunk reset); +/// this struct drives the parse incrementally over a sliding `~dict_size` +/// window and forwards range-coded output bytes as they are produced. Peak +/// memory is `O(dict_size)`: +/// +/// - the match finder's `prev` ring is `O(dict_size)` (see [`HashChain`]); +/// - only `dict_size + WINDOW_SLOP` history plus one lookahead margin of bytes +/// is retained in `win` — older bytes are dropped once the parse position has +/// moved past them. +/// +/// The 13-byte header (props + dict size + `u64::MAX` length) is emitted before +/// the first body bytes; the EOS marker + range-coder flush are appended by +/// [`finish`](Self::finish). +struct LzmaStreamEncoder { + core: LzmaEncCore, + hc: HashChain, + params: LevelParams, + dict_size: u32, + /// Reusable optimal-parser scratch (only used at levels with `opt_window`). + opt: Optimizer, + prob_prices: [u32; PRICE_TABLE_SIZE], + /// Retained window bytes; `win[0]` is absolute offset `win_base`. + win: Vec, + win_base: usize, + /// Absolute offset of the next byte to encode. + pos: usize, + /// Absolute count of bytes appended so far. + appended: usize, + /// Range-coder output bytes already forwarded to the caller. + drained: usize, + /// Set once the header has been emitted (prepended to the first push). + header_done: bool, + /// Set once the EOS marker + flush have been emitted. + finished: bool, + /// `true` until we commit to true incremental streaming. While `false` and + /// the total stays `<= GUARD_LIMIT`, input is only buffered (no output) so + /// `finish` can run the greedy-vs-optimal guard pass over the whole small + /// input and keep the smaller body — matching the old one-shot behaviour and + /// preventing the optimal parser's cold start from ever losing to greedy on + /// small inputs. Once the total exceeds `GUARD_LIMIT` we commit and stream. + committed: bool, +} + +/// Inputs at or below this size run the greedy-vs-optimal guard pass at +/// `finish` (and are therefore buffered whole — bounded, since this is far +/// below `dict_size`). Larger inputs stream incrementally with the optimal +/// parse only. Mirrors the old `encode_all` GUARD_LIMIT. +const GUARD_LIMIT: usize = 64 * 1024; + +impl LzmaStreamEncoder { + fn new(params: LevelParams) -> Self { + // Advertise (and cap matches at) the level's dictionary size, clamped to + // the decoder's 4 KiB minimum. Independent of input length so the header + // can be emitted up front without buffering the whole stream. + let dict_size = params.dict_size.max(MIN_DICT_SIZE); + let cap_hint = (dict_size as usize) + .saturating_add(MAX_MATCH_LEN as usize) + .saturating_add(WINDOW_SLOP + lookahead_margin(params)); + Self { + core: LzmaEncCore::new(), + hc: HashChain::new(dict_size, cap_hint), + params, + dict_size, + opt: Optimizer::new(params.opt_window as usize), + prob_prices: build_prob_prices(), + win: Vec::new(), + win_base: 0, + pos: 0, + appended: 0, + drained: 0, + header_done: false, + finished: false, + // Greedy-only levels (`opt_window == 0`) have no cold-start + // regression, so they commit to streaming immediately and skip the + // guard buffering. + committed: params.opt_window == 0, + } + } + + /// Append `data`, encode any bytes now safely buffered (keeping a forward + /// lookahead margin), and return all output produced so far (header on the + /// first commit, then range-coded body bytes). + fn push(&mut self, data: &[u8]) -> Vec { + self.win.extend_from_slice(data); + self.appended += data.len(); + // While still small and uncommitted, only buffer — `finish` will run the + // greedy-vs-optimal guard. Commit (and start streaming) once the total + // crosses GUARD_LIMIT. + if !self.committed { + if self.appended <= GUARD_LIMIT { + return Vec::new(); + } + self.committed = true; + } + let margin = lookahead_margin(self.params); + // Only encode up to `appended - margin` so the parser keeps full + // forward context; the tail is encoded at `finish`. + let encode_to = self.appended.saturating_sub(margin); + if encode_to > self.pos { + self.encode_range(encode_to); + self.trim_window(); + } + self.take_output() + } + + /// Encode the remaining tail, emit the EOS marker + flush, and return all + /// remaining output bytes. + fn finish(&mut self) -> Vec { + if !self.finished { + if !self.committed { + // Small input: run the greedy-vs-optimal guard over the whole + // buffered window and keep the smaller body, exactly as the old + // one-shot encoder did. `win` holds the entire input (≤ GUARD). + return self.finish_small(); + } + if self.pos < self.appended { + self.encode_range(self.appended); + } + self.core.emit_eos_marker(); + self.core.rc.flush(); + self.finished = true; + } + self.take_output() + } + + /// Guard path for small inputs: encode the whole buffered `win` twice + /// (greedy + optimal) from a fresh core and keep the smaller body, then + /// emit header + body. Memory is bounded (input ≤ GUARD_LIMIT). + fn finish_small(&mut self) -> Vec { + let input = &self.win; + let dict_size = self.dict_size; + let params = self.params; + + let opt_body = { + let mut core = LzmaEncCore::new(); + let mut hc = HashChain::new(dict_size, input.len().max(1)); + let mut opt = Optimizer::new(params.opt_window as usize); + let prices = build_prob_prices(); + encode_optimal( + &mut core, + &mut hc, + input, + 0, + 0, + input.len(), + dict_size, + params, + &prices, + &mut opt, + ); + core.emit_eos_marker(); + core.rc.flush(); + core.rc.out + }; + let greedy_body = { + let mut core = LzmaEncCore::new(); + let mut hc = HashChain::new(dict_size, input.len().max(1)); + encode_greedy( + &mut core, + &mut hc, + input, + 0, + 0, + input.len(), + dict_size, + params, + ); + core.emit_eos_marker(); + core.rc.flush(); + core.rc.out + }; + let body = if greedy_body.len() < opt_body.len() { + greedy_body } else { - opt + opt_body + }; + + self.finished = true; + let mut out = Vec::with_capacity(13 + body.len()); + out.push(ENC_PROPS_BYTE); + out.extend_from_slice(&self.dict_size.to_le_bytes()); + out.extend_from_slice(&u64::MAX.to_le_bytes()); + out.extend_from_slice(&body); + self.header_done = true; + out + } + + /// Encode absolute positions `[self.pos, end)` into the continuous range + /// coder, advancing `self.pos`. + fn encode_range(&mut self, end: usize) { + let base = self.win_base; + let start = self.pos; + if self.params.opt_window == 0 { + encode_greedy( + &mut self.core, + &mut self.hc, + &self.win, + base, + start, + end, + self.dict_size, + self.params, + ); + } else { + encode_optimal( + &mut self.core, + &mut self.hc, + &self.win, + base, + start, + end, + self.dict_size, + self.params, + &self.prob_prices, + &mut self.opt, + ); } - } else { - encode_body(input, dict_size, params, true) - }; + self.pos = end; + } - let mut out = Vec::with_capacity(13 + body.len()); - out.push(ENC_PROPS_BYTE); - out.extend_from_slice(&dict_size.to_le_bytes()); - out.extend_from_slice(&u64::MAX.to_le_bytes()); - out.extend_from_slice(&body); - out -} + /// Drop window history older than `dict_size + WINDOW_SLOP` before `pos`. + /// Only shifts once the droppable prefix exceeds a whole `dict_size + + /// WINDOW_SLOP`, amortising the `drain` memmove to `O(1)` per input byte. + fn trim_window(&mut self) { + let keep_from = self + .pos + .saturating_sub(self.dict_size as usize + WINDOW_SLOP); + let droppable = keep_from.saturating_sub(self.win_base); + if droppable >= self.dict_size as usize + WINDOW_SLOP { + self.win.drain(..droppable); + self.win_base = keep_from; + } + } -/// Encode the range-coded body (with EOS marker + flush, no 13-byte header) -/// using the greedy or optimal parse. Returns the raw body bytes. -fn encode_body(input: &[u8], dict_size: u32, params: LevelParams, optimal: bool) -> Vec { - let mut core = LzmaEncCore::new(); - let mut hc = HashChain::new(input.len()); - if optimal { - encode_optimal(&mut core, &mut hc, input, dict_size, params); - } else { - encode_greedy(&mut core, &mut hc, input, dict_size, params); + /// Pull any newly-produced output: the 13-byte header on first call, then + /// the range coder's freshly-flushed bytes. + fn take_output(&mut self) -> Vec { + let mut out = Vec::new(); + if !self.header_done { + out.push(ENC_PROPS_BYTE); + out.extend_from_slice(&self.dict_size.to_le_bytes()); + out.extend_from_slice(&u64::MAX.to_le_bytes()); + self.header_done = true; + } + let body = &self.core.rc.out; + if self.drained < body.len() { + out.extend_from_slice(&body[self.drained..]); + self.drained = body.len(); + } + out } - core.emit_eos_marker(); - core.rc.flush(); - core.rc.out } -/// Greedy/lazy parse — used by the lowest levels. +/// Greedy/lazy parse over the sliding window. Encodes absolute positions +/// `[pos_start, pos_end)`; match lengths are clamped to `pos_end` so a streaming +/// driver can stop at a lookahead boundary without crossing it. +#[allow(clippy::too_many_arguments)] fn encode_greedy( core: &mut LzmaEncCore, hc: &mut HashChain, - input: &[u8], + win: &[u8], + base: usize, + pos_start: usize, + pos_end: usize, dict_size: u32, params: LevelParams, ) { - let mut pos = 0usize; - while pos < input.len() { + let win_end = base + win.len(); + let mut pos = pos_start; + while pos < pos_end { + let cap = (pos_end - pos) as u32; let rep_lens = [ - rep_match_len(input, pos, core.rep0), - rep_match_len(input, pos, core.rep1), - rep_match_len(input, pos, core.rep2), - rep_match_len(input, pos, core.rep3), + rep_match_len(win, base, pos, core.rep0).min(cap), + rep_match_len(win, base, pos, core.rep1).min(cap), + rep_match_len(win, base, pos, core.rep2).min(cap), + rep_match_len(win, base, pos, core.rep3).min(cap), ]; - let new_match = hc.find_longest(input, pos, dict_size, params.max_chain, params.nice_match); + let new_match = hc + .find_longest( + win, + base, + pos, + dict_size, + params.max_chain, + params.nice_match, + ) + .map(|(l, d)| (l.min(cap), d)); let best_rep_len = rep_lens.iter().copied().max().unwrap_or(0); let best_rep_idx = rep_lens @@ -1382,14 +1628,14 @@ fn encode_greedy( let emit_rep_long = !emit_new && best_rep_len >= MATCH_LEN_MIN; let emit_short_rep = !emit_new && !emit_rep_long && rep_lens[0] >= 1; - hc.insert(input, pos); + hc.insert(win, base, pos); if emit_new { let (len, dist) = new_match.unwrap(); for j in 1..(len as usize) { let p = pos + j; - if p + 3 <= input.len() { - hc.insert(input, p); + if p + 3 <= win_end { + hc.insert(win, base, p); } } core.emit_match(dist, len); @@ -1397,8 +1643,8 @@ fn encode_greedy( } else if emit_rep_long { for j in 1..(best_rep_len as usize) { let p = pos + j; - if p + 3 <= input.len() { - hc.insert(input, p); + if p + 3 <= win_end { + hc.insert(win, base, p); } } core.emit_long_rep(best_rep_idx, best_rep_len); @@ -1407,41 +1653,48 @@ fn encode_greedy( core.emit_short_rep(); pos += 1; } else { - core.emit_literal(input, pos); + core.emit_literal(win, base, pos); pos += 1; } } } -/// Cost-based optimal parse over a look-ahead window. +/// Cost-based optimal parse over a look-ahead window, encoding absolute +/// positions `[pos_start, pos_end)`. +#[allow(clippy::too_many_arguments)] fn encode_optimal( core: &mut LzmaEncCore, hc: &mut HashChain, - input: &[u8], + win: &[u8], + base: usize, + pos_start: usize, + pos_end: usize, dict_size: u32, params: LevelParams, + prob_prices: &[u32; PRICE_TABLE_SIZE], + opt: &mut Optimizer, ) { - let prob_prices = build_prob_prices(); let window = params.opt_window as usize; - let mut opt = Optimizer::new(window); - let mut pos = 0usize; - while pos < input.len() { - let snap = core.price_snapshot(&prob_prices); + let mut pos = pos_start; + while pos < pos_end { + let snap = core.price_snapshot(prob_prices); let parsed = parse_window( core, hc, - input, + win, + base, pos, + pos_end, dict_size, params, window, - &prob_prices, + prob_prices, &snap, - &mut opt, + opt, ); debug_assert!(parsed > 0); - replay(core, hc, input, pos, &opt.decisions); + replay(core, hc, win, base, pos, &opt.decisions); pos += parsed; } } @@ -1452,8 +1705,10 @@ fn encode_optimal( fn parse_window( core: &LzmaEncCore, hc: &HashChain, - input: &[u8], + win: &[u8], + base: usize, start: usize, + pos_end: usize, dict_size: u32, params: LevelParams, window: usize, @@ -1461,7 +1716,9 @@ fn parse_window( snap: &PriceSnapshot, opt: &mut Optimizer, ) -> usize { - let avail = input.len() - start; + // `avail` is bounded by the encode boundary `pos_end`, not the end of the + // window, so the DP never produces a decision that carries past `pos_end`. + let avail = pos_end - start; let limit = window.min(avail); opt.opt[0] = OptNode { @@ -1511,7 +1768,7 @@ fn parse_window( // ── literal ────────────────────────────────────────────────────── { - let lp = literal_price_at(core, prices, snap, input, pos, out_pos, state, reps[0]); + let lp = literal_price_at(core, prices, snap, win, base, pos, out_pos, state, reps[0]); let np = node.price.saturating_add(lp); let to = cur + 1; if to <= limit && np < opt.opt[to].price { @@ -1532,7 +1789,7 @@ fn parse_window( // ── rep matches ────────────────────────────────────────────────── for rep_idx in 0..4u32 { - let rlen = rep_match_len(input, pos, reps[rep_idx as usize]); + let rlen = rep_match_len(win, base, pos, reps[rep_idx as usize]); if rlen < 1 { continue; } @@ -1598,7 +1855,8 @@ fn parse_window( let longest = { let opt_matches = &mut opt.matches; hc.find_matches( - input, + win, + base, pos, dict_size, params.max_chain, @@ -1650,6 +1908,13 @@ fn parse_window( // `nice_len` cut-off in GetOptimum. if commit_end.is_none() && best_here >= params.nice_len { commit_end = Some((cur + best_here as usize).min(limit)); + // The long match from this node already records the cheapest arrival + // at the commit boundary; stop extending the DP rather than grinding + // through every position the match spans (otherwise the band fill is + // O(nice..273) per covered byte and the parse goes quadratic on + // highly-repetitive input). Matches the SDK's greedy `nice_len` + // acceptance and leaves ratio essentially unchanged. + break; } cur += 1; @@ -1678,28 +1943,30 @@ fn trace_back(opt: &mut Optimizer, end: usize) { fn replay( core: &mut LzmaEncCore, hc: &mut HashChain, - input: &[u8], + win: &[u8], + base: usize, start: usize, decisions: &[Decision], ) { + let win_end = base + win.len(); let mut pos = start; for &d in decisions { match d { Decision::Literal => { - hc.insert(input, pos); - core.emit_literal(input, pos); + hc.insert(win, base, pos); + core.emit_literal(win, base, pos); pos += 1; } Decision::ShortRep => { - hc.insert(input, pos); + hc.insert(win, base, pos); core.emit_short_rep(); pos += 1; } Decision::Match(dist, len) => { for j in 0..(len as usize) { let p = pos + j; - if p + 3 <= input.len() { - hc.insert(input, p); + if p + 3 <= win_end { + hc.insert(win, base, p); } } core.emit_match(dist, len); @@ -1708,8 +1975,8 @@ fn replay( Decision::Rep(idx, len) => { for j in 0..(len as usize) { let p = pos + j; - if p + 3 <= input.len() { - hc.insert(input, p); + if p + 3 <= win_end { + hc.insert(win, base, p); } } core.emit_long_rep(idx, len); @@ -1721,19 +1988,21 @@ fn replay( // ─── public streaming Encoder ──────────────────────────────────────────── -/// Streaming `.lzma` (alone) encoder. +/// Streaming `.lzma` (alone) encoder with **bounded memory**. /// -/// Implementation note: LZMA's range coder operates on the entire stream, so -/// the simplest correct approach is to accumulate input into a `Vec` and -/// produce the compressed output in one shot on `finish`. The streaming -/// `encode` calls append to the buffer and never write output; `finish` -/// builds the full output and then drains it across however many calls the -/// caller's output buffer requires. +/// Drives an `LzmaStreamEncoder` whose match finder, sliding window, and LZ +/// history are all `O(dict_size)` — so peak memory is independent of the input +/// length. `encode` feeds input incrementally and emits range-coded output as +/// it is produced (the header up front, then body bytes); `finish` emits the +/// EOS marker + flush. Output the codec produces faster than the caller drains +/// it is held in a small `pending` buffer (bounded by one push's worth). pub struct Encoder { - input_buf: Vec, - output_buf: Vec, - output_pos: usize, - finished: bool, + stream: Option, + /// Produced output bytes not yet handed to the caller. + pending: Vec, + pending_idx: usize, + /// Set once `stream.finish()` has run. + stream_finished: bool, /// Match-finder tuning derived from [`EncoderConfig::level`]. Persisted /// across `reset` since configuration is meant to survive resets. params: LevelParams, @@ -1756,52 +2025,118 @@ impl Encoder { /// the nearest valid level rather than rejected. pub fn with_config(config: EncoderConfig) -> Self { Self { - input_buf: Vec::new(), - output_buf: Vec::new(), - output_pos: 0, - finished: false, + stream: None, + pending: Vec::new(), + pending_idx: 0, + stream_finished: false, params: LevelParams::from_level(config.level), } } + + fn stream(&mut self) -> &mut LzmaStreamEncoder { + let params = self.params; + self.stream + .get_or_insert_with(|| LzmaStreamEncoder::new(params)) + } + + /// Push staged output bytes from `pending[pending_idx..]` into `output`. + /// Returns true once the buffer is fully drained. + fn drain_pending(&mut self, output: &mut [u8], written: &mut usize) -> bool { + while self.pending_idx < self.pending.len() && *written < output.len() { + output[*written] = self.pending[self.pending_idx]; + *written += 1; + self.pending_idx += 1; + } + if self.pending_idx >= self.pending.len() { + self.pending.clear(); + self.pending_idx = 0; + true + } else { + false + } + } } +/// Bytes of input pulled into the stream encoder per `raw_encode` step, so +/// `pending` (produced output) stays bounded between drains. +const ENC_PUSH_MAX: usize = 1 << 16; + impl RawEncoder for Encoder { - fn raw_encode(&mut self, input: &[u8], _output: &mut [u8]) -> Result { - if self.finished { + fn raw_encode(&mut self, input: &[u8], output: &mut [u8]) -> Result { + if self.stream_finished { return Err(Error::Corrupt); } - self.input_buf.extend_from_slice(input); - Ok(RawProgress { - consumed: input.len(), - written: 0, - done: false, - }) + let mut consumed = 0usize; + let mut written = 0usize; + loop { + // Drain any output we already produced first. + if self.pending_idx < self.pending.len() { + if !self.drain_pending(output, &mut written) { + return Ok(RawProgress { + consumed, + written, + done: false, + }); + } + continue; + } + if consumed < input.len() { + let take = (input.len() - consumed).min(ENC_PUSH_MAX); + let slice = &input[consumed..consumed + take]; + let produced = self.stream().push(slice); + consumed += take; + if !produced.is_empty() { + self.pending = produced; + self.pending_idx = 0; + } + } else { + return Ok(RawProgress { + consumed, + written, + done: false, + }); + } + } } fn raw_finish(&mut self, output: &mut [u8]) -> Result { - if !self.finished { - // One-shot encode of everything we've buffered. - self.output_buf = encode_all(&self.input_buf, self.params); - self.output_pos = 0; - self.finished = true; + let mut written = 0usize; + loop { + if self.pending_idx < self.pending.len() { + if !self.drain_pending(output, &mut written) { + return Ok(RawProgress { + consumed: 0, + written, + done: false, + }); + } + continue; + } + if !self.stream_finished { + // Finish the stream (emit EOS + flush) and stage its remaining + // output. An encoder that never saw input still emits a valid + // empty stream (header + EOS). + let produced = self.stream().finish(); + self.stream_finished = true; + if !produced.is_empty() { + self.pending = produced; + self.pending_idx = 0; + } + } else { + return Ok(RawProgress { + consumed: 0, + written, + done: true, + }); + } } - let remaining = self.output_buf.len() - self.output_pos; - let n = remaining.min(output.len()); - output[..n].copy_from_slice(&self.output_buf[self.output_pos..self.output_pos + n]); - self.output_pos += n; - let done = self.output_pos >= self.output_buf.len(); - Ok(RawProgress { - consumed: 0, - written: n, - done, - }) } fn raw_reset(&mut self) { - self.input_buf.clear(); - self.output_buf.clear(); - self.output_pos = 0; - self.finished = false; + self.stream = None; + self.pending.clear(); + self.pending_idx = 0; + self.stream_finished = false; // Note: params is preserved per the trait contract. } } diff --git a/src/lzma2/mod.rs b/src/lzma2/mod.rs index 742a8e3..c7069fa 100644 --- a/src/lzma2/mod.rs +++ b/src/lzma2/mod.rs @@ -181,7 +181,9 @@ fn resolve_dict_size(cfg: &DecoderConfig) -> Result { // ─── encoder ────────────────────────────────────────────────────────────── -use crate::lzma2_internal::lzma2_encoder::{EncoderParams, LZMA2_PROPS_BYTE, encode_lzma2_stream}; +use crate::lzma2_internal::lzma2_encoder::{ + EncoderParams, LZMA2_PROPS_BYTE, Lzma2Chunk, Lzma2StreamEncoder, +}; /// Dictionary size (in bytes) the encoder advertises to the LZMA chunk /// coder as the match-distance ceiling. Fixed at 4 MiB — the [`crate::xz`] @@ -229,16 +231,22 @@ enum EncPhase { /// Note: unlike the former permanently-`Unsupported` stub (a unit struct), /// the working encoder buffers state, so it is a normal struct and is no /// longer `Copy` — construct it via [`Lzma2::encoder()`](crate::Algorithm). -#[derive(Debug, Clone)] pub struct Encoder { phase: EncPhase, /// Staged bytes for the current chunk (or end marker), drained to the /// caller from `pending[pending_idx..]`. pending: Vec, pending_idx: usize, - /// Input accumulated for the next chunk; flushed at `ENC_CHUNK_MAX` or on - /// `finish`. - in_buf: Vec, + /// Bounded-memory continuous-dictionary LZMA2 chunk encoder; emits framed + /// chunks incrementally so the whole input is never accumulated. `None` + /// until the first input byte arrives. + stream: Option, + /// Raw input pushed into `stream` but not yet consumed by an emitted chunk. + /// Bounded by one chunk's worth, used to frame uncompressed-fallback chunks. + staged_input: Vec, + /// Set once `stream.finish()` has run, so a multi-call `finish` doesn't + /// re-finish. + stream_finished: bool, /// Level-derived match-finder tuning; preserved across `reset`. params: EncoderParams, } @@ -256,11 +264,19 @@ impl Encoder { phase: EncPhase::Body, pending: Vec::new(), pending_idx: 0, - in_buf: Vec::new(), + stream: None, + staged_input: Vec::new(), + stream_finished: false, params: EncoderParams::from_level(ENC_DEFAULT_LEVEL), } } + /// Lazily create the bounded-memory LZMA2 stream encoder on first use. + fn stream(&mut self) -> &mut Lzma2StreamEncoder { + self.stream + .get_or_insert_with(|| Lzma2StreamEncoder::new(ENC_DICT_SIZE, self.params)) + } + /// Push staged bytes from `pending[pending_idx..]` into `output`. Returns /// true once the buffer is fully drained. fn drain_pending(&mut self, output: &mut [u8], written: &mut usize) -> bool { @@ -278,24 +294,24 @@ impl Encoder { } } - /// Stage the whole stream's LZMA2 chunks from the fully-buffered `input`. + /// Frame each produced LZMA2 chunk into `pending`, consuming the matching + /// raw bytes from `staged_input` (needed for the uncompressed fallback). /// - /// The chunks come from a single continuous match-finder - /// ([`encode_lzma2_stream`]): the first chunk resets the dictionary - /// (`0xE0` compressed / `0x01` uncompressed) and every later chunk - /// continues it (`0xC0` compressed / `0x02` uncompressed), so cross-chunk - /// matches (up to the 4 MiB dictionary) are found. The caller appends the - /// `0x00` end marker afterwards. - fn stage_payload(&mut self, input: &[u8]) { - let chunks = encode_lzma2_stream(input, ENC_DICT_SIZE, self.params); - let mut pos = 0usize; + /// The chunks come from a single continuous, **bounded-memory** match-finder + /// ([`Lzma2StreamEncoder`]): the first chunk resets the dictionary (`0xE0` + /// compressed / `0x01` uncompressed) and every later chunk continues it + /// (`0xC0` compressed / `0x02` uncompressed), so cross-chunk matches (up to + /// the 4 MiB dictionary) are found, while peak memory stays `O(dict_size)`. + /// The caller appends the `0x00` end marker afterwards. + fn stage_chunks(&mut self, chunks: Vec) { for chunk in chunks { - let data = &input[pos..pos + chunk.uncomp_len]; + let n = chunk.uncomp_len; + debug_assert!(self.staged_input.len() >= n); + let data: Vec = self.staged_input.drain(..n).collect(); match chunk.body { - Some(ref body) => self.stage_compressed_chunk(data, body, chunk.reset_dict), - None => self.stage_uncompressed_chunk(data, chunk.reset_dict), + Some(ref body) => self.stage_compressed_chunk(&data, body, chunk.reset_dict), + None => self.stage_uncompressed_chunk(&data, chunk.reset_dict), } - pos += chunk.uncomp_len; } } @@ -350,19 +366,32 @@ impl RawEncoder for Encoder { loop { match self.phase { EncPhase::Body => { - // Buffer the entire input so the chunk stream can be - // produced by a single continuous match-finder at `finish` - // — the dictionary then spans every chunk instead of - // resetting per chunk. No mid-stream flush. - if consumed < input.len() { - self.in_buf.extend_from_slice(&input[consumed..]); - consumed = input.len(); + // Feed input into the bounded-memory streaming LZMA2 encoder + // and frame any chunks it emits, draining them to the caller + // as we go. The whole input is never accumulated — the + // dictionary is a sliding `~dict_size` window inside the + // stream encoder. + if self.pending_idx < self.pending.len() { + self.phase = EncPhase::DrainPending; + } else if consumed < input.len() { + let take = (input.len() - consumed).min(ENC_CHUNK_MAX); + let slice = &input[consumed..consumed + take]; + self.staged_input.extend_from_slice(slice); + let chunks = self.stream().push(slice); + consumed += take; + if !chunks.is_empty() { + self.stage_chunks(chunks); + } + if self.pending_idx < self.pending.len() { + self.phase = EncPhase::DrainPending; + } + } else { + return Ok(RawProgress { + consumed, + written, + done: false, + }); } - return Ok(RawProgress { - consumed, - written, - done: false, - }); } EncPhase::DrainPending => { if self.drain_pending(output, &mut written) { @@ -401,7 +430,7 @@ impl RawEncoder for Encoder { loop { match self.phase { EncPhase::Finishing => { - if !self.pending.is_empty() { + if self.pending_idx < self.pending.len() { // Drain a chunk staged during `encode` first. if !self.drain_pending(output, &mut written) { return Ok(RawProgress { @@ -411,14 +440,17 @@ impl RawEncoder for Encoder { }); } } - if !self.in_buf.is_empty() { - let data = core::mem::take(&mut self.in_buf); - self.stage_payload(&data); - // Stay in `Finishing`; the loop drains the staged - // chunks then re-checks the (now empty) buffer. + if self.stream.is_some() && !self.stream_finished { + // Flush the remaining buffered bytes as the final + // chunk(s) from the bounded-memory stream encoder, then + // frame them. Memory stays `O(dict_size)`. + let chunks = self.stream().finish(); + self.stream_finished = true; + self.stage_chunks(chunks); + // Stay in `Finishing`; the loop drains the staged chunks + // then emits the end marker. } else { - // Buffer empty and any staged chunk drained: emit the - // single 0x00 end marker. + // All chunks drained: emit the single 0x00 end marker. self.pending.push(0x00); self.pending_idx = 0; self.phase = EncPhase::DrainEnd; @@ -459,7 +491,9 @@ impl RawEncoder for Encoder { self.phase = EncPhase::Body; self.pending.clear(); self.pending_idx = 0; - self.in_buf.clear(); + self.stream = None; + self.staged_input.clear(); + self.stream_finished = false; self.params = params; } } diff --git a/src/lzma2_internal/lzma2_encoder.rs b/src/lzma2_internal/lzma2_encoder.rs index 23f19e9..4e365d3 100644 --- a/src/lzma2_internal/lzma2_encoder.rs +++ b/src/lzma2_internal/lzma2_encoder.rs @@ -800,19 +800,22 @@ impl LzmaEncCore { total } - fn emit_literal(&mut self, input: &[u8], pos: usize) { + /// Emit the literal at absolute position `pos`, reading bytes from the + /// sliding window `win` (whose first byte is absolute offset `base`). + fn emit_literal(&mut self, win: &[u8], base: usize, pos: usize) { let pos_state = self.pos_state(); let idx = self.state * POS_STATES_MAX + pos_state as usize; rc_encode_bit(&mut self.rc, &mut self.is_match[idx], 0); - let prev_byte = if pos > 0 { input[pos - 1] } else { 0 }; + let i = pos - base; + let prev_byte = if pos > 0 { win[i - 1] } else { 0 }; let match_byte = if self.state < LIT_STATES { None } else { let d = self.rep0 as usize + 1; - if d <= pos { Some(input[pos - d]) } else { None } + if d <= pos { Some(win[i - d]) } else { None } }; - self.encode_literal_full(input[pos], prev_byte, match_byte); + self.encode_literal_full(win[i], prev_byte, match_byte); self.state = state_after_literal(self.state); self.output_pos += 1; @@ -980,41 +983,71 @@ fn get_dist_slot(distance: u32) -> u32 { // ─── match finder ──────────────────────────────────────────────────────── +/// Sliding-window 3-byte hash-chain match finder. +/// +/// Positions stored in `head`/`prev` are **absolute** input offsets, but `prev` +/// is a ring buffer of `prev.len()` slots indexed `pos & prev_mask`. The ring +/// is sized strictly larger than `dict_size + MAX_MATCH_LEN`, so every chain +/// link the finder can legally follow (distance ≤ `dict_size`) is still intact: +/// a slot is only overwritten after `prev.len()` further insertions, by which +/// point that older position is already out of dictionary range and the walk +/// has broken on `dist > max_dist`. This keeps peak memory `O(dict_size)` +/// regardless of total input length while finding exactly the same matches a +/// whole-buffer chain would. +/// +/// All byte reads go through the sliding window `win`, whose first byte is the +/// absolute offset `base`; an absolute position `p` reads `win[p - base]`. struct HashChain { head: Box<[u32; HASH_SIZE]>, prev: Vec, + prev_mask: usize, } impl HashChain { - fn new(buf_len: usize) -> Self { + /// Build a finder whose `prev` ring covers at least `dict_size + + /// MAX_MATCH_LEN` positions (rounded up to a power of two). `cap_hint` + /// caps the ring when the total input is known to be smaller, so small + /// inputs (and the unit tests) don't over-allocate. + fn new(dict_size: u32, cap_hint: usize) -> Self { + let needed = (dict_size as usize) + .saturating_add(MAX_MATCH_LEN as usize) + .saturating_add(2); + let want = needed.min(cap_hint.max(1)); + let cap = want.max(1).next_power_of_two(); Self { head: Box::new([NIL; HASH_SIZE]), - prev: vec![NIL; buf_len], + prev: vec![NIL; cap], + prev_mask: cap - 1, } } - fn insert(&mut self, input: &[u8], pos: usize) { - if pos + 3 > input.len() { + /// Splice absolute position `pos` into the chain. No-op if fewer than three + /// bytes follow in the window. + fn insert(&mut self, win: &[u8], base: usize, pos: usize) { + let i = pos - base; + if i + 3 > win.len() { return; } - let h = hash3(input[pos], input[pos + 1], input[pos + 2]) as usize; - self.prev[pos] = self.head[h]; + let h = hash3(win[i], win[i + 1], win[i + 2]) as usize; + self.prev[pos & self.prev_mask] = self.head[h]; self.head[h] = pos as u32; } /// Find the single longest match (greedy use). Returns `(len, dist0based)`. fn find_longest( &self, - input: &[u8], + win: &[u8], + base: usize, pos: usize, dict_size: u32, params: EncoderParams, ) -> Option<(u32, u32)> { - if pos + 3 > input.len() { + let pi = pos - base; + if pi + 3 > win.len() { return None; } - let h = hash3(input[pos], input[pos + 1], input[pos + 2]) as usize; - let max_len = MAX_MATCH_LEN.min((input.len() - pos) as u32); + let h = hash3(win[pi], win[pi + 1], win[pi + 2]) as usize; + let max_len = MAX_MATCH_LEN.min((win.len() - pi) as u32); let max_dist = (dict_size as usize).min(pos); let mut best_len: u32 = 0; let mut best_dist: u32 = 0; @@ -1023,7 +1056,7 @@ impl HashChain { while cur != NIL && steps < params.max_chain { let cur_pos = cur as usize; if cur_pos >= pos { - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; continue; } @@ -1031,16 +1064,17 @@ impl HashChain { if dist > max_dist { break; } + let ci = cur_pos - base; if best_len > 2 - && (best_len as usize) < (input.len() - pos) - && input[cur_pos + best_len as usize] != input[pos + best_len as usize] + && (best_len as usize) < (win.len() - pi) + && win[ci + best_len as usize] != win[pi + best_len as usize] { - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; continue; } let mut len = 0u32; - while len < max_len && input[cur_pos + len as usize] == input[pos + len as usize] { + while len < max_len && win[ci + len as usize] == win[pi + len as usize] { len += 1; } if len >= MATCH_LEN_MIN && len > best_len { @@ -1050,7 +1084,7 @@ impl HashChain { break; } } - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; } if best_len >= MATCH_LEN_MIN { @@ -1066,18 +1100,20 @@ impl HashChain { /// strictly increasing length order. Returns the longest length found. fn find_matches( &self, - input: &[u8], + win: &[u8], + base: usize, pos: usize, dict_size: u32, params: EncoderParams, out: &mut Vec<(u32, u32)>, ) -> u32 { out.clear(); - if pos + 3 > input.len() { + let pi = pos - base; + if pi + 3 > win.len() { return 0; } - let h = hash3(input[pos], input[pos + 1], input[pos + 2]) as usize; - let max_len = MAX_MATCH_LEN.min((input.len() - pos) as u32); + let h = hash3(win[pi], win[pi + 1], win[pi + 2]) as usize; + let max_len = MAX_MATCH_LEN.min((win.len() - pi) as u32); let max_dist = (dict_size as usize).min(pos); let mut best_len: u32 = MATCH_LEN_MIN - 1; let mut cur = self.head[h]; @@ -1085,7 +1121,7 @@ impl HashChain { while cur != NIL && steps < params.max_chain { let cur_pos = cur as usize; if cur_pos >= pos { - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; continue; } @@ -1093,16 +1129,17 @@ impl HashChain { if dist > max_dist { break; } + let ci = cur_pos - base; if best_len >= MATCH_LEN_MIN - && (best_len as usize) < (input.len() - pos) - && input[cur_pos + best_len as usize] != input[pos + best_len as usize] + && (best_len as usize) < (win.len() - pi) + && win[ci + best_len as usize] != win[pi + best_len as usize] { - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; continue; } let mut len = 0u32; - while len < max_len && input[cur_pos + len as usize] == input[pos + len as usize] { + while len < max_len && win[ci + len as usize] == win[pi + len as usize] { len += 1; } if len >= MATCH_LEN_MIN && len > best_len { @@ -1115,7 +1152,7 @@ impl HashChain { break; } } - cur = self.prev[cur_pos]; + cur = self.prev[cur_pos & self.prev_mask]; steps += 1; } if best_len >= MATCH_LEN_MIN { @@ -1128,14 +1165,17 @@ impl HashChain { // ─── rep-match helpers ─────────────────────────────────────────────────── -fn rep_match_len(input: &[u8], pos: usize, dist: u32) -> u32 { +/// Length of a repeat match at absolute `pos` against 0-based LZ distance +/// `dist`, reading from the sliding window `win` (first byte = absolute `base`). +fn rep_match_len(win: &[u8], base: usize, pos: usize, dist: u32) -> u32 { let d = dist as usize + 1; if d > pos { return 0; } - let max_len = MAX_MATCH_LEN.min((input.len() - pos) as u32) as usize; + let pi = pos - base; + let max_len = MAX_MATCH_LEN.min((win.len() - pi) as u32) as usize; let mut len = 0usize; - while len < max_len && input[pos - d + len] == input[pos + len] { + while len < max_len && win[pi - d + len] == win[pi + len] { len += 1; } len as u32 @@ -1202,13 +1242,15 @@ impl Optimizer { } } -/// Compute the price of a literal at `pos` given the encoder's live state. +/// Compute the price of a literal at absolute `pos` given the encoder's live +/// state, reading bytes from the sliding window `win` (first byte = `base`). #[allow(clippy::too_many_arguments)] fn literal_price_at( core: &LzmaEncCore, prices: &[u32; PRICE_TABLE_SIZE], snap: &PriceSnapshot, - input: &[u8], + win: &[u8], + base: usize, pos: usize, out_pos: u64, state: usize, @@ -1216,15 +1258,15 @@ fn literal_price_at( ) -> u32 { let pos_state = (out_pos as u32) & core.pos_mask; let im_idx = state * POS_STATES_MAX + pos_state as usize; - let prev_byte = if pos > 0 { input[pos - 1] } else { 0 }; + let i = pos - base; + let prev_byte = if pos > 0 { win[i - 1] } else { 0 }; let match_byte = if state < LIT_STATES { None } else { let d = rep0 as usize + 1; - if d <= pos { Some(input[pos - d]) } else { None } + if d <= pos { Some(win[i - d]) } else { None } }; - snap.is_match[im_idx][0] - + core.literal_price(prices, out_pos, input[pos], prev_byte, match_byte) + snap.is_match[im_idx][0] + core.literal_price(prices, out_pos, win[i], prev_byte, match_byte) } // ─── public chunk encoder ──────────────────────────────────────────────── @@ -1244,8 +1286,8 @@ fn literal_price_at( /// [`EncoderParams::from_level`]. /// /// Single-chunk helper retained for the LZMA2 unit tests (which exercise the -/// chunk codec directly); the production encoders use [`encode_lzma2_stream`], -/// which keeps one continuous match-finder across chunks. +/// chunk codec directly); the production encoders use [`Lzma2StreamEncoder`], +/// which keeps one continuous, bounded-memory match-finder across chunks. #[cfg_attr(not(test), allow(dead_code))] pub(crate) fn encode_lzma_chunk(input: &[u8], dict_size: u32, params: EncoderParams) -> Vec { if params.opt_window == 0 { @@ -1264,7 +1306,14 @@ pub(crate) fn encode_lzma_chunk(input: &[u8], dict_size: u32, params: EncoderPar } } -/// One framed LZMA2 chunk produced by [`encode_lzma2_stream`]. +/// Extra window history retained behind the current parse position, on top of +/// `dict_size`, so an insertion at `pos` never overwrites a still-reachable +/// older ring slot and a match at the very back of the dictionary can still be +/// fully read from the window. `MAX_MATCH_LEN` covers the longest readable +/// match; a small constant covers the `prev_byte`/`match_byte` look-back. +const WINDOW_SLOP: usize = MAX_MATCH_LEN as usize + 16; + +/// One framed LZMA2 chunk produced by [`Lzma2StreamEncoder`]. /// /// The caller (xz Block payload or raw LZMA2 stream framing) turns this into /// the on-wire chunk header + body. `uncomp_len` is the number of input bytes @@ -1292,106 +1341,152 @@ pub(crate) struct Lzma2Chunk { /// slice size — only the output framing is chunked. const STREAM_CHUNK_UNCOMP_MAX: usize = 65_536; -/// Encode the *entire* `input` as a continuous LZMA2 chunk stream, returning -/// the framed chunks in order. +/// Streaming continuous-dictionary LZMA2 chunk encoder with **bounded memory**. /// -/// Unlike calling [`encode_lzma_chunk`] once per slice, this keeps a single LZ -/// match-finder (and a continuous `output_pos`) across all chunks: the first -/// chunk resets the dictionary, every later chunk *continues* it, so a match -/// in a later chunk can reference data from any earlier chunk up to -/// `dict_size`. This is what closes the ratio gap with the `.lzma` (single -/// continuous stream) path. The range coder and probability model still reset -/// per chunk (state reset) — only the dictionary/history is continuous. +/// Keeps a single LZ match-finder and a continuous `output_pos` across all +/// chunks — the first chunk resets the dictionary, every later chunk continues +/// it, so a match in a later chunk references data from any earlier chunk up to +/// `dict_size`. Memory is bounded to `O(dict_size)` regardless of input length: /// -/// The caller is responsible for the `0x00` end marker and any container -/// framing (xz Block/Index, raw-stream terminator). -pub(crate) fn encode_lzma2_stream( - input: &[u8], +/// - The match finder's `prev` ring is sized `O(dict_size)` (see [`HashChain`]). +/// - Only a sliding window of roughly `dict_size + WINDOW_SLOP` history plus one +/// pending chunk of lookahead is retained in `win`; older bytes are dropped +/// once the parse position has moved `> dict_size + WINDOW_SLOP` past them. +/// +/// Feed input with [`push`](Self::push) (which returns any chunks that became +/// fully buffered) and finish with [`finish`](Self::finish). The caller frames +/// each [`Lzma2Chunk`] and is responsible for the `0x00` end marker and any +/// container framing. +pub(crate) struct Lzma2StreamEncoder { + core: LzmaEncCore, + hc: HashChain, dict_size: u32, params: EncoderParams, -) -> Vec { - let mut chunks = Vec::new(); - if input.is_empty() { - return chunks; + /// Retained window bytes; `win[0]` is absolute offset `win_base`. + win: Vec, + /// Absolute offset of `win[0]`. + win_base: usize, + /// Absolute offset of the next byte to encode (== bytes already framed). + pos: usize, + /// Absolute count of bytes appended via `push` (encodable extent). + appended: usize, + /// `true` until the first chunk is emitted. + first: bool, +} + +impl Lzma2StreamEncoder { + pub fn new(dict_size: u32, params: EncoderParams) -> Self { + // The ring need never exceed what a 32-bit input could address; the + // dict cap already keeps this `O(dict_size)`. + let cap_hint = (dict_size as usize) + .saturating_add(MAX_MATCH_LEN as usize) + .saturating_add(WINDOW_SLOP + STREAM_CHUNK_UNCOMP_MAX); + Self { + core: LzmaEncCore::new(), + hc: HashChain::new(dict_size, cap_hint), + dict_size, + params, + win: Vec::new(), + win_base: 0, + pos: 0, + appended: 0, + first: true, + } } - let mut core = LzmaEncCore::new(); - // One hash chain over the whole input: the LZ history is never cleared at - // a chunk boundary, so cross-chunk matches are found naturally. - let mut hc = HashChain::new(input.len()); - - let mut pos = 0usize; - let mut first = true; - while pos < input.len() { - let chunk_end = (pos + STREAM_CHUNK_UNCOMP_MAX).min(input.len()); - let uncomp_len = chunk_end - pos; - - // Range-code this slice. State/probabilities/range coder are reset for - // the chunk; `output_pos` and the hash chain continue. - let body = - encode_stream_chunk_body(&mut core, &mut hc, input, pos, chunk_end, dict_size, params); - - // Compressed only pays off when the body both shrinks the slice and - // fits the 16-bit (+1) compressed-size field; otherwise the - // uncompressed fallback is strictly smaller. - let use_compressed = !body.is_empty() && body.len() <= 65_536 && body.len() < uncomp_len; + /// Append `data` and emit every chunk that is now fully buffered. A chunk is + /// only encoded once a whole `STREAM_CHUNK_UNCOMP_MAX` slice (or the rest of + /// the stream, at `finish`) is available, so the optimal parser always sees + /// its full forward lookahead within the chunk. + pub fn push(&mut self, data: &[u8]) -> Vec { + self.win.extend_from_slice(data); + self.appended += data.len(); + let mut out = Vec::new(); + // Encode while a full chunk's worth of bytes is buffered ahead of `pos`. + while self.appended - self.pos >= STREAM_CHUNK_UNCOMP_MAX { + out.push(self.encode_one_chunk(self.pos + STREAM_CHUNK_UNCOMP_MAX)); + self.trim_window(); + } + out + } - if use_compressed { - chunks.push(Lzma2Chunk { - uncomp_len, - reset_dict: first, - body: Some(body), - }); - } else { - // Uncompressed fallback. The range-coded attempt mutated `core` - // (output_pos advanced, hash chain populated for this slice), which - // is exactly the state we want for continuing the dictionary into - // later chunks — the bytes are already in the LZ history and - // `output_pos` already advanced by `uncomp_len`. So nothing extra - // to do here; just frame an uncompressed chunk. - chunks.push(Lzma2Chunk { - uncomp_len, - reset_dict: first, - body: None, - }); + /// Flush any remaining buffered bytes as a final chunk (or chunks). + pub fn finish(&mut self) -> Vec { + let mut out = Vec::new(); + while self.pos < self.appended { + let end = (self.pos + STREAM_CHUNK_UNCOMP_MAX).min(self.appended); + out.push(self.encode_one_chunk(end)); + self.trim_window(); } + out + } - pos = chunk_end; - first = false; + /// Encode the chunk `[self.pos, chunk_end)` and advance `self.pos`. + fn encode_one_chunk(&mut self, chunk_end: usize) -> Lzma2Chunk { + let uncomp_len = chunk_end - self.pos; + let body = self.encode_chunk_body(chunk_end); + let use_compressed = !body.is_empty() && body.len() <= 65_536 && body.len() < uncomp_len; + let chunk = Lzma2Chunk { + uncomp_len, + reset_dict: self.first, + body: if use_compressed { Some(body) } else { None }, + }; + self.pos = chunk_end; + self.first = false; + chunk } - chunks -} + /// Range-code `[self.pos, chunk_end)` through the shared core/hash chain, + /// resetting LZMA state first (keeping `output_pos` and the LZ history). + fn encode_chunk_body(&mut self, chunk_end: usize) -> Vec { + self.core.reset_state_keep_pos(); + let base = self.win_base; + let start = self.pos; + if self.params.opt_window == 0 { + encode_greedy( + &mut self.core, + &mut self.hc, + &self.win, + base, + start, + chunk_end, + self.dict_size, + self.params, + ); + } else { + encode_optimal( + &mut self.core, + &mut self.hc, + &self.win, + base, + start, + chunk_end, + self.dict_size, + self.params, + ); + } + self.core.rc.flush(); + self.core.rc.out.clone() + } -/// Range-code `input[pos_start..pos_end]` through `core`/`hc`, performing a -/// per-chunk state reset first (keeping `output_pos` and the LZ history). -/// Returns the flushed range-coded body. -/// -/// Uses the optimal parse when the level enables it (`opt_window != 0`) and the -/// greedy parse otherwise — the same selection [`encode_lzma_chunk`] makes per -/// level. We do not additionally run both parses and keep the smaller body (as -/// the single-chunk path does): doing so would require snapshotting/replaying -/// the shared cross-chunk LZ history, and the optimal parse only loses to -/// greedy on pathological tiny inputs, which the uncompressed-chunk fallback -/// already covers for whole chunks. -#[allow(clippy::too_many_arguments)] -fn encode_stream_chunk_body( - core: &mut LzmaEncCore, - hc: &mut HashChain, - input: &[u8], - pos_start: usize, - pos_end: usize, - dict_size: u32, - params: EncoderParams, -) -> Vec { - core.reset_state_keep_pos(); - if params.opt_window == 0 { - encode_greedy(core, hc, input, pos_start, pos_end, dict_size, params); - } else { - encode_optimal(core, hc, input, pos_start, pos_end, dict_size, params); + /// Drop window history older than `dict_size + WINDOW_SLOP` before `pos`, so + /// peak `win` memory stays `O(dict_size)`. Never drops bytes a future match + /// could read (distance ≤ `dict_size`) or the parse look-back needs. + /// + /// The front-shift is only performed once the droppable prefix grows past a + /// whole `dict_size + WINDOW_SLOP` of waste, so the `drain` memmove cost is + /// amortised `O(1)` per input byte (rather than memmoving `~dict_size` bytes + /// every chunk, which would be quadratic over a large stream). + fn trim_window(&mut self) { + let keep_from = self + .pos + .saturating_sub(self.dict_size as usize + WINDOW_SLOP); + let droppable = keep_from.saturating_sub(self.win_base); + if droppable >= self.dict_size as usize + WINDOW_SLOP { + self.win.drain(..droppable); + self.win_base = keep_from; + } } - core.rc.flush(); - core.rc.out.clone() } /// Encode one chunk body (range-coded packets + 5-byte flush, no EOS marker) @@ -1405,12 +1500,30 @@ fn encode_chunk_body( optimal: bool, ) -> Vec { let mut core = LzmaEncCore::new(); - let mut hc = HashChain::new(input.len()); + let mut hc = HashChain::new(dict_size, input.len().max(1)); if optimal { - encode_optimal(&mut core, &mut hc, input, 0, input.len(), dict_size, params); + encode_optimal( + &mut core, + &mut hc, + input, + 0, + 0, + input.len(), + dict_size, + params, + ); } else { - encode_greedy(&mut core, &mut hc, input, 0, input.len(), dict_size, params); + encode_greedy( + &mut core, + &mut hc, + input, + 0, + 0, + input.len(), + dict_size, + params, + ); } // Flush the range coder. NO EOS marker — LZMA2 frames the uncompressed @@ -1427,29 +1540,32 @@ fn encode_chunk_body( /// emitted match/rep lengths are clamped so the parse stops exactly at /// `pos_end` — this lets a continuous encoder slice the output into chunks /// without ever crossing a chunk's uncompressed-size boundary. +#[allow(clippy::too_many_arguments)] fn encode_greedy( core: &mut LzmaEncCore, hc: &mut HashChain, - input: &[u8], + win: &[u8], + base: usize, pos_start: usize, pos_end: usize, dict_size: u32, params: EncoderParams, ) { + let win_end = base + win.len(); let mut pos = pos_start; while pos < pos_end { // Bytes left until this chunk's boundary; emitted lengths never exceed // it so the chunk ends exactly at `pos_end`. let cap = (pos_end - pos) as u32; let rep_lens = [ - rep_match_len(input, pos, core.rep0).min(cap), - rep_match_len(input, pos, core.rep1).min(cap), - rep_match_len(input, pos, core.rep2).min(cap), - rep_match_len(input, pos, core.rep3).min(cap), + rep_match_len(win, base, pos, core.rep0).min(cap), + rep_match_len(win, base, pos, core.rep1).min(cap), + rep_match_len(win, base, pos, core.rep2).min(cap), + rep_match_len(win, base, pos, core.rep3).min(cap), ]; let new_match = hc - .find_longest(input, pos, dict_size, params) + .find_longest(win, base, pos, dict_size, params) .map(|(l, d)| (l.min(cap), d)); let best_rep_len = rep_lens.iter().copied().max().unwrap_or(0); @@ -1466,14 +1582,14 @@ fn encode_greedy( let emit_rep_long = !emit_new && best_rep_len >= MATCH_LEN_MIN; let emit_short_rep = !emit_new && !emit_rep_long && rep_lens[0] >= 1; - hc.insert(input, pos); + hc.insert(win, base, pos); if emit_new { let (len, dist) = new_match.unwrap(); for j in 1..(len as usize) { let p = pos + j; - if p + 3 <= input.len() { - hc.insert(input, p); + if p + 3 <= win_end { + hc.insert(win, base, p); } } core.emit_match(dist, len); @@ -1481,8 +1597,8 @@ fn encode_greedy( } else if emit_rep_long { for j in 1..(best_rep_len as usize) { let p = pos + j; - if p + 3 <= input.len() { - hc.insert(input, p); + if p + 3 <= win_end { + hc.insert(win, base, p); } } core.emit_long_rep(best_rep_idx, best_rep_len); @@ -1491,7 +1607,7 @@ fn encode_greedy( core.emit_short_rep(); pos += 1; } else { - core.emit_literal(input, pos); + core.emit_literal(win, base, pos); pos += 1; } } @@ -1504,10 +1620,12 @@ fn encode_greedy( /// Encodes `input[pos_start..pos_end]`; matches still reference the whole LZ /// history before `pos`, but the parse never advances past `pos_end`, so the /// chunk ends exactly there. +#[allow(clippy::too_many_arguments)] fn encode_optimal( core: &mut LzmaEncCore, hc: &mut HashChain, - input: &[u8], + win: &[u8], + base: usize, pos_start: usize, pos_end: usize, dict_size: u32, @@ -1526,7 +1644,8 @@ fn encode_optimal( let parsed = parse_window( core, hc, - input, + win, + base, pos, pos_end, dict_size, @@ -1539,7 +1658,7 @@ fn encode_optimal( debug_assert!(parsed > 0); // Replay the chosen decisions through the real emit path. `pos` // advances by exactly `parsed` bytes. - replay(core, hc, input, pos, &opt.decisions); + replay(core, hc, win, base, pos, &opt.decisions); pos += parsed; } } @@ -1553,7 +1672,8 @@ fn encode_optimal( fn parse_window( core: &LzmaEncCore, hc: &HashChain, - input: &[u8], + win: &[u8], + base: usize, start: usize, pos_end: usize, dict_size: u32, @@ -1622,7 +1742,7 @@ fn parse_window( // ── literal transition ────────────────────────────────────────── { - let lp = literal_price_at(core, prices, snap, input, pos, out_pos, state, reps[0]); + let lp = literal_price_at(core, prices, snap, win, base, pos, out_pos, state, reps[0]); let np = node.price.saturating_add(lp); let to = cur + 1; if to <= limit && np < opt.opt[to].price { @@ -1644,7 +1764,7 @@ fn parse_window( // ── rep matches (rep0..rep3) ──────────────────────────────────── for rep_idx in 0..4u32 { - let rlen = rep_match_len(input, pos, reps[rep_idx as usize]); + let rlen = rep_match_len(win, base, pos, reps[rep_idx as usize]); if rlen < 1 { continue; } @@ -1711,7 +1831,7 @@ fn parse_window( // ── new matches ───────────────────────────────────────────────── let longest = { let opt_matches = &mut opt.matches; - hc.find_matches(input, pos, dict_size, params, opt_matches) + hc.find_matches(win, base, pos, dict_size, params, opt_matches) }; if longest >= MATCH_LEN_MIN { if longest > best_here { @@ -1759,6 +1879,15 @@ fn parse_window( if commit_end.is_none() && best_here >= params.nice_len { let bounded = (cur + best_here as usize).min(limit); commit_end = Some(bounded); + // The long match from this node already writes the cheapest known + // arrival at `bounded` (a single match decision). Stop extending the + // DP now instead of grinding through every position the match spans + // — on long-match runs (highly repetitive input) that band would + // otherwise cost O(nice..273) work per covered byte, turning the + // parse quadratic. Committing the match here matches the SDK's + // greedy `nice_len` acceptance and leaves ratio essentially + // unchanged. + break; } cur += 1; @@ -1804,28 +1933,30 @@ fn trace_back(opt: &mut Optimizer, end: usize) { fn replay( core: &mut LzmaEncCore, hc: &mut HashChain, - input: &[u8], + win: &[u8], + base: usize, start: usize, decisions: &[Decision], ) { + let win_end = base + win.len(); let mut pos = start; for &d in decisions { match d { Decision::Literal => { - hc.insert(input, pos); - core.emit_literal(input, pos); + hc.insert(win, base, pos); + core.emit_literal(win, base, pos); pos += 1; } Decision::ShortRep => { - hc.insert(input, pos); + hc.insert(win, base, pos); core.emit_short_rep(); pos += 1; } Decision::Match(dist, len) => { for j in 0..(len as usize) { let p = pos + j; - if p + 3 <= input.len() { - hc.insert(input, p); + if p + 3 <= win_end { + hc.insert(win, base, p); } } core.emit_match(dist, len); @@ -1834,8 +1965,8 @@ fn replay( Decision::Rep(idx, len) => { for j in 0..(len as usize) { let p = pos + j; - if p + 3 <= input.len() { - hc.insert(input, p); + if p + 3 <= win_end { + hc.insert(win, base, p); } } core.emit_long_rep(idx, len); diff --git a/src/xz/mod.rs b/src/xz/mod.rs index 0bc2574..560280e 100644 --- a/src/xz/mod.rs +++ b/src/xz/mod.rs @@ -58,7 +58,9 @@ use crate::traits::{Algorithm, RawDecoder, RawEncoder, RawProgress}; // decoder exposed under the `lzma2` feature without pulling in this xz // container. See `lib.rs`. use crate::lzma2_internal::lzma2_decoder::{Lzma2Props, LzmaCore, lzma2_dict_size}; -use crate::lzma2_internal::lzma2_encoder::{EncoderParams, LZMA2_PROPS_BYTE, encode_lzma2_stream}; +use crate::lzma2_internal::lzma2_encoder::{ + EncoderParams, LZMA2_PROPS_BYTE, Lzma2Chunk, Lzma2StreamEncoder, +}; // ─── constants ───────────────────────────────────────────────────────────── @@ -394,9 +396,19 @@ pub struct Encoder { // the caller's `output`. We push from `pending[pending_idx..]`. pending: Vec, pending_idx: usize, - // Input buffer for the next LZMA2 chunk; flushed at LZMA2_CHUNK_MAX or - // on finish(). - in_buf: Vec, + // Bounded-memory continuous-dictionary LZMA2 chunk encoder. Fed input + // incrementally; emits framed chunks as they become fully buffered, so the + // whole input is never accumulated. `None` until the Body phase starts. + stream: Option, + // Raw input bytes pushed into `stream` but not yet consumed by an emitted + // chunk. Bounded by one chunk's worth (`stream` emits as soon as a full + // chunk is buffered), so this stays `O(LZMA2_CHUNK_MAX)`. Used to frame + // uncompressed-fallback chunks (raw bytes copied verbatim) and to feed the + // Block Check CRC32 in encode order. + staged_input: Vec, + // Set once `stream.finish()` has been called, so a multi-call `raw_finish` + // (draining across several small output buffers) doesn't re-finish. + stream_finished: bool, // CRC32 of all uncompressed input bytes — becomes the Block Check. check: Crc32, // Bookkeeping for the Index Record. @@ -427,7 +439,9 @@ impl Encoder { phase: EncPhase::StreamHeader, pending, pending_idx: 0, - in_buf: Vec::new(), + stream: None, + staged_input: Vec::new(), + stream_finished: false, check: Crc32::new(), uncompressed_total: 0, compressed_payload_bytes: 0, @@ -453,30 +467,36 @@ impl Encoder { } } - /// Stage the whole block's LZMA2 payload from the fully-buffered input. - /// - /// The LZMA2 chunks are produced by a single continuous match-finder - /// ([`encode_lzma2_stream`]): the first chunk resets the dictionary - /// (`0xE0` compressed / `0x01` uncompressed) and every later chunk - /// *continues* it (`0xC0` compressed / `0x02` uncompressed), so a match in - /// a later chunk can reference data from any earlier chunk (up to the - /// dictionary size). The range coder still resets per chunk, but the - /// dictionary/history is continuous — this is what brings the ratio in - /// line with the single-stream `.lzma` path. + /// Frame each produced LZMA2 chunk into `pending`, consuming the matching + /// raw bytes from `staged_input` (for CRC + the uncompressed fallback). /// - /// `input` is the entire block (`uncompressed_total` bytes). Each framed - /// chunk is appended to `pending`; the caller drains it before the block - /// trailer. - fn stage_payload(&mut self, input: &[u8]) { - let chunks = encode_lzma2_stream(input, LZMA2_DICT_SIZE, self.enc_params); - let mut pos = 0usize; + /// The chunks come from a single continuous, **bounded-memory** match-finder + /// ([`Lzma2StreamEncoder`]): the first chunk resets the dictionary (`0xE0` + /// compressed / `0x01` uncompressed) and every later chunk *continues* it + /// (`0xC0` compressed / `0x02` uncompressed), so a match in a later chunk + /// can reference data from any earlier chunk up to the dictionary size. The + /// range coder still resets per chunk, but the dictionary/history is + /// continuous — this is what brings the ratio in line with the single-stream + /// `.lzma` path, while peak memory stays `O(dict_size)`. + /// Lazily create the bounded-memory LZMA2 stream encoder on first use. + fn stream(&mut self) -> &mut Lzma2StreamEncoder { + self.stream + .get_or_insert_with(|| Lzma2StreamEncoder::new(LZMA2_DICT_SIZE, self.enc_params)) + } + + fn stage_chunks(&mut self, chunks: Vec) { for chunk in chunks { - let data = &input[pos..pos + chunk.uncomp_len]; + let n = chunk.uncomp_len; + debug_assert!(self.staged_input.len() >= n); + // The bytes belonging to this chunk are the front `n` of the staged + // buffer (chunks are produced in encode order). + let data: Vec = self.staged_input.drain(..n).collect(); + self.check.update(&data); + self.uncompressed_total += n as u64; match chunk.body { - Some(ref body) => self.stage_compressed_chunk(data, body, chunk.reset_dict), - None => self.stage_uncompressed_chunk(data, chunk.reset_dict), + Some(ref body) => self.stage_compressed_chunk(&data, body, chunk.reset_dict), + None => self.stage_uncompressed_chunk(&data, chunk.reset_dict), } - pos += chunk.uncomp_len; } } @@ -602,20 +622,35 @@ impl RawEncoder for Encoder { } } EncPhase::Body => { - // Buffer the entire block's input so the LZMA2 payload can - // be produced by a single continuous match-finder at - // `finish` (the dictionary spans the whole input rather - // than resetting per chunk). We never flush mid-stream; - // chunking happens only at the output-framing level. - if consumed < input.len() { - self.in_buf.extend_from_slice(&input[consumed..]); - consumed = input.len(); + // Feed input into the bounded-memory streaming LZMA2 encoder + // and frame any chunks it emits, draining them to the caller + // as we go. The whole input is never accumulated — the + // dictionary is a sliding `~dict_size` window inside the + // stream encoder. If a previous pass staged chunk bytes, + // drain those first. + if self.pending_idx < self.pending.len() { + self.phase = EncPhase::DrainPending; + } else if consumed < input.len() { + // Push a bounded slice so `staged_input` stays small. + let take = (input.len() - consumed).min(LZMA2_CHUNK_MAX); + let slice = &input[consumed..consumed + take]; + self.staged_input.extend_from_slice(slice); + let chunks = self.stream().push(slice); + consumed += take; + if !chunks.is_empty() { + self.stage_chunks(chunks); + } + // Drain whatever was staged before consuming more input. + if self.pending_idx < self.pending.len() { + self.phase = EncPhase::DrainPending; + } + } else { + return Ok(RawProgress { + consumed, + written, + done: false, + }); } - return Ok(RawProgress { - consumed, - written, - done: false, - }); } EncPhase::DrainPending => { if self.drain_pending(output, &mut written) { @@ -684,19 +719,27 @@ impl RawEncoder for Encoder { } } EncPhase::Body => { - if !self.in_buf.is_empty() { - // Produce the whole block's LZMA2 payload at once from - // the fully-buffered input, using a single continuous - // match-finder so the dictionary spans every chunk. - let data = core::mem::take(&mut self.in_buf); - self.check.update(&data); - self.uncompressed_total += data.len() as u64; - self.stage_payload(&data); + if self.pending_idx < self.pending.len() { + // Drain bytes staged in a prior pass first. self.phase = EncPhase::DrainPending; } else { - // No pending input. Move to block trailer. - self.stage_block_trailer(); - self.phase = EncPhase::BlockTrailer; + // Flush the remaining buffered bytes as the final + // chunk(s) from the bounded-memory stream encoder (once), + // then frame them. Memory stays `O(dict_size)`. + if self.stream.is_some() && !self.stream_finished { + let chunks = self.stream().finish(); + self.stream_finished = true; + self.stage_chunks(chunks); + } + if self.pending_idx < self.pending.len() { + self.phase = EncPhase::DrainPending; + } else { + // No more chunk bytes. Move to the block trailer in + // this same iteration so the loop makes progress + // even when `finish` produced nothing. + self.stage_block_trailer(); + self.phase = EncPhase::BlockTrailer; + } } } EncPhase::DrainPending => { @@ -787,7 +830,9 @@ impl RawEncoder for Encoder { phase: EncPhase::StreamHeader, pending, pending_idx: 0, - in_buf: Vec::new(), + stream: None, + staged_input: Vec::new(), + stream_finished: false, check: Crc32::new(), uncompressed_total: 0, compressed_payload_bytes: 0, diff --git a/tests/lzma.rs b/tests/lzma.rs index 87137bd..89750b2 100644 --- a/tests/lzma.rs +++ b/tests/lzma.rs @@ -235,27 +235,28 @@ fn encode_at_level(payload: &[u8], level: u8) -> Vec { } fn encode_with(enc: &mut Encoder, payload: &[u8]) -> Vec { - // The encoder buffers all input internally and emits nothing until - // `finish`, so a small scratch buffer is fine for the `encode` calls. + // The `.lzma` encoder is now a bounded-memory streaming encoder: it emits + // range-coded output during `encode` as the sliding-window parse produces + // it (header up front, then body), so we must drain `output` while feeding + // input. A small scratch buffer exercises the `OutputFull` path. + let mut out = Vec::new(); let mut scratch = [0u8; 64]; let mut consumed = 0; while consumed < payload.len() { let (p, status) = enc.encode(&payload[consumed..], &mut scratch).unwrap(); + out.extend_from_slice(&scratch[..p.written]); consumed += p.consumed; - // Output should always be empty from encode() for LZMA. - assert_eq!(p.written, 0); match status { Status::InputEmpty | Status::StreamEnd => break, Status::OutputFull => { - // Shouldn't happen — encode() doesn't write anything. - if p.consumed == 0 { + // Drain and retry; making no progress at all is a real stall. + if p.consumed == 0 && p.written == 0 { panic!("encoder stalled mid-input"); } } } } - let mut out = Vec::new(); let mut buf = vec![0u8; 4096]; loop { let (p, status) = enc.finish(&mut buf).unwrap(); @@ -350,14 +351,23 @@ fn encode_streaming_one_byte_chunks_round_trip() { let mut enc = Encoder::new(); let mut scratch = [0u8; 4]; + let mut compressed = Vec::new(); + // Streaming `.lzma` may emit header/body bytes during `encode`; drain them + // and re-offer a byte until it is consumed. Feed exactly one input byte at + // a time. for byte in payload { - let (p, _status) = enc - .encode(core::slice::from_ref(byte), &mut scratch) - .unwrap(); - assert_eq!(p.consumed, 1); - assert_eq!(p.written, 0); + let mut consumed = 0; + while consumed == 0 { + let (p, _status) = enc + .encode(core::slice::from_ref(byte), &mut scratch) + .unwrap(); + compressed.extend_from_slice(&scratch[..p.written]); + consumed += p.consumed; + if p.consumed == 0 && p.written == 0 { + panic!("encoder stalled feeding a single byte"); + } + } } - let mut compressed = Vec::new(); let mut buf = [0u8; 1]; loop { let (p, status) = enc.finish(&mut buf).unwrap();