Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.

Commit 4b31a61

Browse files
authored
Merge pull request #789 from evakhoni/qemu_image
Add compressed image support for QEMU driver
2 parents 8dd161a + fc6933e commit 4b31a61

3 files changed

Lines changed: 294 additions & 3 deletions

File tree

packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from qemu.qmp.protocol import ConnectError, Runstate
2626

2727
from jumpstarter.driver import Driver, export
28+
from jumpstarter.streams.encoding import AutoDecompressIterator
2829

2930

3031
def _vsock_available():
@@ -42,9 +43,15 @@ class QemuFlasher(FlasherInterface, Driver):
4243

4344
@export
4445
async def flash(self, source, partition: str | None = None):
46+
"""Flash an image to the specified partition.
47+
48+
Supports transparent decompression of gzip, xz, bz2, and zstd compressed images.
49+
Compression format is auto-detected from file signature.
50+
"""
4551
async with await FileWriteStream.from_path(self.parent.validate_partition(partition)) as stream:
4652
async with self.resource(source) as res:
47-
async for chunk in res:
53+
# Wrap with auto-decompression to handle .gz, .xz, .bz2, .zstd files
54+
async for chunk in AutoDecompressIterator(source=res):
4855
await stream.send(chunk)
4956

5057
@export

packages/jumpstarter/jumpstarter/streams/encoding.py

Lines changed: 139 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
import lzma
33
import sys
44
import zlib
5-
from dataclasses import dataclass
5+
from collections.abc import AsyncIterator
6+
from dataclasses import dataclass, field
67
from enum import StrEnum
78
from typing import Any, Callable, Mapping
89

@@ -22,6 +23,55 @@ class Compression(StrEnum):
2223
ZSTD = "zstd"
2324

2425

26+
@dataclass(frozen=True)
27+
class FileSignature:
28+
"""File signature (magic bytes) for a compression format."""
29+
30+
signature: bytes
31+
compression: Compression
32+
33+
34+
# File signatures for compression format detection
35+
# Reference: https://file-extension.net/seeker/
36+
COMPRESSION_SIGNATURES: tuple[FileSignature, ...] = (
37+
FileSignature(b"\x1f\x8b\x08", Compression.GZIP),
38+
FileSignature(b"\xfd\x37\x7a\x58\x5a\x00", Compression.XZ),
39+
FileSignature(b"\x42\x5a\x68", Compression.BZ2),
40+
FileSignature(b"\x28\xb5\x2f\xfd", Compression.ZSTD),
41+
)
42+
43+
# Standard buffer size for file signature detection (covers most formats)
44+
SIGNATURE_BUFFER_SIZE = 8
45+
46+
47+
def detect_compression_from_signature(data: bytes) -> Compression | None:
48+
"""Detect compression format from file signature bytes at the start of data.
49+
50+
Args:
51+
data: The first few bytes of the file/stream (at least SIGNATURE_BUFFER_SIZE bytes recommended)
52+
53+
Returns:
54+
The detected Compression type, or None if uncompressed/unknown
55+
"""
56+
for sig in COMPRESSION_SIGNATURES:
57+
if data.startswith(sig.signature):
58+
return sig.compression
59+
return None
60+
61+
62+
def create_decompressor(compression: Compression) -> Any:
63+
"""Create a decompressor object for the given compression type."""
64+
match compression:
65+
case Compression.GZIP:
66+
return zlib.decompressobj(wbits=47) # Auto-detect gzip/zlib
67+
case Compression.XZ:
68+
return lzma.LZMADecompressor()
69+
case Compression.BZ2:
70+
return bz2.BZ2Decompressor()
71+
case Compression.ZSTD:
72+
return zstd.ZstdDecompressor()
73+
74+
2575
@dataclass(kw_only=True)
2676
class CompressedStream(ObjectStream[bytes]):
2777
stream: AnyByteStream
@@ -99,3 +149,91 @@ def compress_stream(stream: AnyByteStream, compression: Compression | None) -> A
99149
compressor=zstd.ZstdCompressor(),
100150
decompressor=zstd.ZstdDecompressor(),
101151
)
152+
153+
154+
@dataclass(kw_only=True)
155+
class AutoDecompressIterator(AsyncIterator[bytes]):
156+
"""An async iterator that auto-detects and decompresses compressed data.
157+
158+
This wraps an async iterator of bytes and transparently decompresses
159+
gzip, xz, bz2, or zstd compressed data based on file signature detection.
160+
Uncompressed data passes through unchanged.
161+
"""
162+
163+
source: AsyncIterator[bytes]
164+
_decompressor: Any = field(init=False, default=None)
165+
_compression: Compression | None = field(init=False, default=None)
166+
_detected: bool = field(init=False, default=False)
167+
_buffer: bytes = field(init=False, default=b"")
168+
_exhausted: bool = field(init=False, default=False)
169+
170+
def _call_decompressor(self, method_name: str, *args) -> bytes:
171+
"""Call decompressor method with error handling.
172+
173+
Args:
174+
method_name: decompressor method to call
175+
*args: Arguments to the method
176+
"""
177+
try:
178+
method = getattr(self._decompressor, method_name)
179+
return method(*args)
180+
except (zlib.error, lzma.LZMAError, OSError, zstd.ZstdError) as e:
181+
raise RuntimeError(
182+
f"Failed to decompress {self._compression}: {e}"
183+
) from e
184+
185+
async def _detect_compression(self) -> None:
186+
"""Read enough bytes to detect compression format."""
187+
# Buffer data until we have enough for detection
188+
while len(self._buffer) < SIGNATURE_BUFFER_SIZE and not self._exhausted:
189+
try:
190+
chunk = await self.source.__anext__()
191+
self._buffer += chunk
192+
except StopAsyncIteration:
193+
self._exhausted = True
194+
break
195+
196+
# Detect compression from buffered data
197+
compression = detect_compression_from_signature(self._buffer)
198+
if compression is not None:
199+
self._compression = compression
200+
self._decompressor = create_decompressor(compression)
201+
202+
self._detected = True
203+
204+
async def __anext__(self) -> bytes:
205+
# First call: detect compression format
206+
if not self._detected:
207+
await self._detect_compression()
208+
209+
# Process buffered data first
210+
if self._buffer:
211+
data = self._buffer
212+
self._buffer = b""
213+
if self._decompressor is not None:
214+
return self._call_decompressor("decompress", data)
215+
return data
216+
217+
# Stream exhausted
218+
if self._exhausted:
219+
raise StopAsyncIteration
220+
221+
# Read and process next chunk
222+
try:
223+
chunk = await self.source.__anext__()
224+
except StopAsyncIteration:
225+
self._exhausted = True
226+
# Flush any remaining data from decompressor (gzip needs this)
227+
if self._decompressor is not None and hasattr(self._decompressor, "flush"):
228+
remaining = self._call_decompressor("flush")
229+
self._decompressor = None
230+
if remaining:
231+
return remaining
232+
raise
233+
234+
if self._decompressor is not None:
235+
return self._call_decompressor("decompress", chunk)
236+
return chunk
237+
238+
def __aiter__(self) -> AsyncIterator[bytes]:
239+
return self

packages/jumpstarter/jumpstarter/streams/encoding_test.py

Lines changed: 147 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,26 @@
1+
import bz2
2+
import gzip
3+
import lzma
4+
import os
5+
import sys
16
from io import BytesIO
27

38
import pytest
49
from anyio import EndOfStream, create_memory_object_stream
510
from anyio.streams.stapled import StapledObjectStream
611

7-
from .encoding import compress_stream
12+
if sys.version_info >= (3, 14):
13+
from compression import zstd
14+
else:
15+
from backports import zstd
16+
17+
from .encoding import (
18+
COMPRESSION_SIGNATURES,
19+
AutoDecompressIterator,
20+
Compression,
21+
compress_stream,
22+
detect_compression_from_signature,
23+
)
824

925
pytestmark = pytest.mark.anyio
1026

@@ -28,3 +44,133 @@ async def test_compress_stream(compression):
2844
except EndOfStream:
2945
break
3046
assert result.getvalue() == b"hello"
47+
48+
49+
def _get_signature(compression: Compression) -> bytes:
50+
"""Helper to get signature bytes for a compression type."""
51+
for sig in COMPRESSION_SIGNATURES:
52+
if sig.compression == compression:
53+
return sig.signature
54+
raise ValueError(f"No signature found for {compression}")
55+
56+
57+
class TestDetectCompressionFromSignature:
58+
"""Tests for file signature detection."""
59+
60+
@pytest.mark.parametrize(
61+
"compression",
62+
[Compression.GZIP, Compression.XZ, Compression.BZ2, Compression.ZSTD],
63+
)
64+
def test_detect_from_signature(self, compression):
65+
"""Each compression format should be detected from its signature."""
66+
signature = _get_signature(compression)
67+
# Pad with random bytes to simulate real file content
68+
data = signature + os.urandom(4)
69+
assert detect_compression_from_signature(data) == compression
70+
71+
def test_detect_uncompressed(self):
72+
# Random data that doesn't match any compression format
73+
assert detect_compression_from_signature(b"hello world") is None
74+
75+
def test_detect_empty(self):
76+
assert detect_compression_from_signature(b"") is None
77+
78+
def test_detect_too_short(self):
79+
# Truncated signatures should not match
80+
assert detect_compression_from_signature(b"\x1f") is None # gzip partial
81+
assert detect_compression_from_signature(b"\xfd\x37\x7a") is None # xz partial
82+
83+
def test_detect_from_real_gzip_data(self):
84+
compressed = gzip.compress(b"test data")
85+
assert detect_compression_from_signature(compressed) == Compression.GZIP
86+
87+
def test_detect_from_real_xz_data(self):
88+
compressed = lzma.compress(b"test data", format=lzma.FORMAT_XZ)
89+
assert detect_compression_from_signature(compressed) == Compression.XZ
90+
91+
def test_detect_from_real_bz2_data(self):
92+
compressed = bz2.compress(b"test data")
93+
assert detect_compression_from_signature(compressed) == Compression.BZ2
94+
95+
def test_detect_from_real_zstd_data(self):
96+
compressed = zstd.compress(b"test data")
97+
assert detect_compression_from_signature(compressed) == Compression.ZSTD
98+
99+
100+
class TestAutoDecompressIterator:
101+
"""Tests for auto-decompressing async iterator."""
102+
103+
async def _async_iter_from_bytes(self, data: bytes, chunk_size: int):
104+
"""Helper to create an async iterator from bytes."""
105+
for i in range(0, len(data), chunk_size):
106+
yield data[i : i + chunk_size]
107+
108+
async def _decompress_and_check(self, compressed: bytes, expected: bytes, chunk_size: int = 16):
109+
"""Helper to decompress data and verify it matches expected output."""
110+
chunks = []
111+
async for chunk in AutoDecompressIterator(source=self._async_iter_from_bytes(compressed, chunk_size)):
112+
chunks.append(chunk)
113+
assert b"".join(chunks) == expected
114+
115+
async def test_passthrough_uncompressed(self):
116+
"""Uncompressed data should pass through unchanged."""
117+
original = b"hello world, this is uncompressed data"
118+
await self._decompress_and_check(original, original)
119+
120+
async def test_decompress_gzip(self):
121+
"""Gzip compressed data should be decompressed."""
122+
original = b"hello world, this is gzip compressed data"
123+
compressed = gzip.compress(original)
124+
await self._decompress_and_check(compressed, original)
125+
126+
async def test_decompress_xz(self):
127+
"""XZ compressed data should be decompressed."""
128+
original = b"hello world, this is xz compressed data"
129+
compressed = lzma.compress(original, format=lzma.FORMAT_XZ)
130+
await self._decompress_and_check(compressed, original)
131+
132+
async def test_decompress_bz2(self):
133+
"""BZ2 compressed data should be decompressed."""
134+
original = b"hello world, this is bz2 compressed data"
135+
compressed = bz2.compress(original)
136+
await self._decompress_and_check(compressed, original)
137+
138+
async def test_decompress_zstd(self):
139+
"""Zstd compressed data should be decompressed."""
140+
original = b"hello world, this is zstd compressed data"
141+
compressed = zstd.compress(original)
142+
await self._decompress_and_check(compressed, original)
143+
144+
async def test_small_chunks(self):
145+
"""Should work with very small chunks."""
146+
original = b"hello world"
147+
compressed = gzip.compress(original)
148+
await self._decompress_and_check(compressed, original, chunk_size=1)
149+
150+
async def test_empty_input(self):
151+
"""Empty input should produce no output."""
152+
153+
async def empty_iter():
154+
if False:
155+
yield
156+
157+
chunks = []
158+
async for chunk in AutoDecompressIterator(source=empty_iter()):
159+
chunks.append(chunk)
160+
assert chunks == []
161+
162+
async def test_large_data(self):
163+
"""Should handle large data correctly."""
164+
original = b"x" * 1024 * 1024 # 1MB of data
165+
compressed = gzip.compress(original)
166+
await self._decompress_and_check(compressed, original, chunk_size=65536)
167+
168+
async def test_corrupted_gzip(self):
169+
"""Corrupted gzip data should raise RuntimeError with clear message."""
170+
# Create fake gzip data: valid signature but corrupted payload
171+
corrupted = b"\x1f\x8b\x08" + b"corrupted data here"
172+
173+
with pytest.raises(RuntimeError, match=r"Failed to decompress gzip:.*"):
174+
chunks = []
175+
async for chunk in AutoDecompressIterator(source=self._async_iter_from_bytes(corrupted, 16)):
176+
chunks.append(chunk)

0 commit comments

Comments
 (0)