Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ composefs-ioctls = { version = "0.3.0", path = "crates/composefs-ioctls", defaul
composefs-oci = { version = "0.3.0", path = "crates/composefs-oci", default-features = false }
composefs-boot = { version = "0.3.0", path = "crates/composefs-boot", default-features = false }
composefs-http = { version = "0.3.0", path = "crates/composefs-http", default-features = false }
cap-std-ext = "5.0"
ocidir = "0.7.1"

# JSON-RPC with FD passing for userns helper
jsonrpc-fdpass = { version = "0.1.0", default-features = false }
Expand Down
2 changes: 2 additions & 0 deletions crates/composefs-oci/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ tar-core = "0.1.0"
tokio = { version = "1.24.2", features = ["macros", "rt-multi-thread"] }
tokio-util = { version = "0.7", default-features = false, features = ["io"] }
tracing = { version = "0.1", default-features = false }
cap-std-ext = { workspace = true }
ocidir = { workspace = true }

[dev-dependencies]
cap-std = { version = "4.0.0", default-features = false }
Expand Down
6 changes: 5 additions & 1 deletion crates/composefs-oci/src/boot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ pub fn remove_boot_image<ObjectID: FsVerityHashValue>(
)?;

let manifest_json = img.read_manifest_json(repo)?;
let layer_verities = img.layer_refs().clone();
let layer_verities: Vec<_> = img
.layer_refs()
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();

crate::oci_image::rewrite_manifest(
repo,
Expand Down
99 changes: 99 additions & 0 deletions crates/composefs-oci/src/layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//! Shared layer import logic for OCI container images.
//!
//! This module provides common functionality for importing OCI image layers
//! into a composefs repository, shared between the skopeo proxy path and
//! direct OCI layout import.

use std::sync::Arc;

use anyhow::{Result, bail};
use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
use containers_image_proxy::oci_spec::image::MediaType;
use tokio::io::{AsyncRead, AsyncWriteExt, BufReader};

use composefs::fsverity::FsVerityHashValue;
use composefs::repository::{ObjectStoreMethod, Repository};
use composefs::shared_internals::IO_BUF_CAPACITY;

use crate::skopeo::TAR_LAYER_CONTENT_TYPE;
use crate::tar::split_async;

/// Check if a media type represents a tar-based layer.
pub fn is_tar_media_type(media_type: &MediaType) -> bool {
matches!(
media_type,
MediaType::ImageLayer
| MediaType::ImageLayerGzip
| MediaType::ImageLayerZstd
| MediaType::ImageLayerNonDistributable
| MediaType::ImageLayerNonDistributableGzip
| MediaType::ImageLayerNonDistributableZstd
)
}

/// Wrap an async reader with the appropriate decompressor for the media type.
///
/// Returns a boxed reader that decompresses the stream if needed.
/// The output is `AsyncRead` (not `AsyncBufRead`) because `split_async`
/// does its own buffering via `BytesMut`.
pub fn decompress_async<'a, R>(
reader: R,
media_type: &MediaType,
) -> Result<Box<dyn AsyncRead + Unpin + Send + 'a>>
where
R: AsyncRead + Unpin + Send + 'a,
{
let buf = BufReader::new(reader);
let reader: Box<dyn AsyncRead + Unpin + Send> = match media_type {
MediaType::ImageLayer | MediaType::ImageLayerNonDistributable => {
Box::new(BufReader::with_capacity(IO_BUF_CAPACITY, buf))
}
MediaType::ImageLayerGzip | MediaType::ImageLayerNonDistributableGzip => Box::new(
BufReader::with_capacity(IO_BUF_CAPACITY, GzipDecoder::new(buf)),
),
MediaType::ImageLayerZstd | MediaType::ImageLayerNonDistributableZstd => Box::new(
BufReader::with_capacity(IO_BUF_CAPACITY, ZstdDecoder::new(buf)),
),
_ => bail!("Unsupported layer media type for decompression: {media_type}"),
};
Ok(reader)
}

/// Import a tar layer from an async reader into the repository.
///
/// The reader should already be decompressed (use `decompress_async` first).
/// Returns the fs-verity object ID and import stats of the imported splitstream.
pub async fn import_tar_async<ObjectID, R>(
repo: Arc<Repository<ObjectID>>,
reader: R,
) -> Result<(ObjectID, crate::ImportStats)>
where
ObjectID: FsVerityHashValue,
R: AsyncRead + Unpin + Send,
{
split_async(reader, repo, TAR_LAYER_CONTENT_TYPE).await
}

/// Store raw bytes from an async reader as a repository object.
///
/// Streams the raw bytes into a repository object without creating a splitstream.
/// Use this for non-tar blobs (OCI artifacts) where the caller will create
/// the splitstream wrapper.
///
/// Returns (object_id, size, store_method) of the stored object.
pub async fn store_blob_async<ObjectID, R>(
repo: &Repository<ObjectID>,
mut reader: R,
) -> Result<(ObjectID, u64, ObjectStoreMethod)>
where
ObjectID: FsVerityHashValue,
R: AsyncRead + Unpin,
{
let tmpfile = repo.create_object_tmpfile()?;
let mut writer = tokio::fs::File::from(std::fs::File::from(tmpfile));
let size = tokio::io::copy(&mut reader, &mut writer).await?;
writer.flush().await?;
let tmpfile = writer.into_std().await;
let (object_id, method) = repo.finalize_object_tmpfile(tmpfile, size)?;
Ok((object_id, size, method))
}
Loading
Loading