Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 132 additions & 24 deletions crates/core/common/src/catalog/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ pub struct PhysicalTable {
/// Logical table representation.
table: ResolvedTable,

/// Absolute URL to the data location, path section of the URL and the corresponding object store.
url: Url,
/// Base directory URL containing all parquet files for this table.
url: PhyTableUrl,
/// Path to the data location in the object store.
path: Path,

Expand All @@ -201,15 +201,18 @@ impl PhysicalTable {
/// Create a new physical table with the given dataset name, table, URL, and object store.
pub fn new(
table: ResolvedTable,
url: Url,
raw_url: Url,
location_id: LocationId,
metadata_db: MetadataDb,
dataset_reference: HashReference,
) -> Result<Self, BoxError> {
let path = Path::from_url_path(url.path()).unwrap();
let object_store_url = url.clone().try_into()?;
let path = Path::from_url_path(raw_url.path()).unwrap();
let object_store_url = raw_url.clone().try_into()?;
let (object_store, _) = crate::store::object_store(&object_store_url)?;

// SAFETY: URL is validated by caller
let url = PhyTableUrl::new_unchecked(raw_url);

Ok(Self {
table,
url,
Expand All @@ -232,18 +235,19 @@ impl PhysicalTable {
set_active: bool,
dataset_reference: &HashReference,
) -> Result<Self, BoxError> {
let manifest_hash = dataset_reference.hash();
let table_name = table.name();
let path = make_location_path(dataset_reference, table.name());
let raw_url = data_store.url().join(&path)?;

// SAFETY: URL comes from data_store.url().join() which produces valid URLs
let url = PhyTableUrl::new_unchecked(raw_url);

let path = make_location_path(dataset_reference, table_name);
let url = data_store.url().join(&path)?;
let location_id = metadata_db::physical_table::register(
&metadata_db,
dataset_reference.namespace(),
dataset_reference.name(),
manifest_hash,
table_name,
url.as_str(),
dataset_reference.hash(),
table.name(),
&url,
false,
)
.await?;
Expand All @@ -252,15 +256,15 @@ impl PhysicalTable {
let mut tx = metadata_db.begin_txn().await?;
metadata_db::physical_table::mark_inactive_by_table_id(
&mut tx,
manifest_hash,
table_name,
dataset_reference.hash(),
table.name(),
)
.await?;
metadata_db::physical_table::mark_active_by_id(
&mut tx,
location_id,
manifest_hash,
table_name,
dataset_reference.hash(),
table.name(),
)
.await?;
tx.commit().await?;
Expand Down Expand Up @@ -338,11 +342,14 @@ impl PhysicalTable {
return Ok(None);
};

let url = physical_table.url;
let table_url = physical_table.url;
let location_id = physical_table.id;

// Convert TableUrl to PhyTableUrl
let url: PhyTableUrl = table_url.into();
let path = Path::from_url_path(url.path()).unwrap();

let object_store_url = url.clone().try_into()?;
let object_store_url = url.inner().clone().try_into()?;
let (object_store, _) = crate::store::object_store(&object_store_url)?;

Ok(Some(Self {
Expand Down Expand Up @@ -400,18 +407,21 @@ impl PhysicalTable {
manifest_hash: &Hash,
table_name: &TableName,
path: &Path,
url: &Url,
raw_url: &Url,
data_store: Arc<Store>,
metadata_db: MetadataDb,
dataset_reference: &HashReference,
) -> Result<Self, RestoreError> {
// SAFETY: The URL is validated by the caller and comes from the restore operation
let url = PhyTableUrl::new_unchecked(raw_url.clone());

let location_id = metadata_db::physical_table::register(
&metadata_db,
dataset_reference.namespace(),
dataset_reference.name(),
manifest_hash,
table_name,
url.as_str(),
&url,
false,
)
.await
Expand Down Expand Up @@ -483,7 +493,7 @@ impl PhysicalTable {
metadata_db
.register_file(
location_id,
url,
url.inner(),
file_name,
object_size,
object_e_tag,
Expand All @@ -497,7 +507,7 @@ impl PhysicalTable {

let physical_table = Self {
table: table.clone(),
url: url.clone(),
url,
path: path.clone(),
location_id,
metadata_db,
Expand Down Expand Up @@ -549,7 +559,7 @@ impl PhysicalTable {
}

pub fn url(&self) -> &Url {
&self.url
self.url.inner()
}

pub fn path(&self) -> &Path {
Expand Down Expand Up @@ -722,7 +732,7 @@ impl PhysicalTable {

impl PhysicalTable {
fn object_store_url(&self) -> DataFusionResult<ObjectStoreUrl> {
Ok(ListingTableUrl::try_new(self.url.clone(), None)?.object_store())
Ok(ListingTableUrl::try_new(self.url.inner().clone(), None)?.object_store())
}

fn output_ordering(&self) -> DataFusionResult<Vec<LexOrdering>> {
Expand Down Expand Up @@ -975,3 +985,101 @@ pub enum RestoreLatestRevisionError {
#[error("Failed to restore latest revision")]
RestoreLatest(#[source] RestoreLatestError),
}

/// Physical table URL _new-type_ wrapper
///
/// Represents a base directory URL in the object store containing all parquet files for a table.
/// Individual file URLs are constructed by appending the filename to this base URL.
///
/// ## URL Format
///
/// `<store_url>/<dataset_name>/<table>/<uuid_v7>/`
///
/// Where:
/// - `store_url`: The object store root URL (e.g., `s3://bucket`, `file:///data`)
/// - `dataset_name`: Dataset name (without namespace)
/// - `table`: Table name
/// - `uuid_v7`: Unique identifier for this table revision/location
///
/// ## Example
///
/// ```text
/// s3://my-bucket/ethereum_mainnet/logs/01234567-89ab-cdef-0123-456789abcdef/
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good documentation! The format description and example clearly explain the URL structure, which helps future maintainers understand what this type represents.

/// ```
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PhyTableUrl(Url);

impl PhyTableUrl {
/// Create a new [`PhyTableUrl`] from a [`url::Url`] without validation
///
/// # Safety
/// The caller must ensure the provided URL is a valid object store URL.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider documenting what constitutes a "valid object store URL" here. While the comment mentions safety, it would be helpful to specify the validation requirements (e.g., must be parseable by url::Url, must have a supported scheme like s3/gs/azure/file).

pub fn new_unchecked(url: Url) -> Self {
Self(url)
}

/// Get the URL as a string slice
pub fn as_str(&self) -> &str {
self.0.as_str()
}

/// Get the path portion of the URL
pub fn path(&self) -> &str {
self.0.path()
}

/// Get a reference to the inner [`url::Url`]
pub fn inner(&self) -> &Url {
&self.0
}
}

impl std::str::FromStr for PhyTableUrl {
type Err = PhyTableUrlParseError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let url = s.parse().map_err(|err| PhyTableUrlParseError {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding validation in the FromStr implementation to ensure the URL meets the requirements (valid scheme, proper format, etc.). Currently it only checks if it's parseable as a URL, but doesn't validate it's a valid object store URL.

url: s.to_string(),
source: err,
})?;
Ok(PhyTableUrl(url))
}
}

impl std::fmt::Display for PhyTableUrl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.as_str())
}
}

impl From<PhyTableUrl> for metadata_db::physical_table::TableUrlOwned {
fn from(value: PhyTableUrl) -> Self {
// SAFETY: PhyTableUrl is validated at construction via FromStr, ensuring invariants are upheld.
metadata_db::physical_table::TableUrl::from_owned_unchecked(value.as_str().to_owned())
}
}

impl<'a> From<&'a PhyTableUrl> for metadata_db::physical_table::TableUrl<'a> {
fn from(value: &'a PhyTableUrl) -> Self {
// SAFETY: PhyTableUrl is validated at construction via FromStr, ensuring invariants are upheld.
metadata_db::physical_table::TableUrl::from_ref_unchecked(value.as_str())
}
}

impl From<metadata_db::physical_table::TableUrlOwned> for PhyTableUrl {
fn from(value: metadata_db::physical_table::TableUrlOwned) -> Self {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling here with expect("database URL should be valid") could be improved. Consider using a more graceful error handling approach that provides context about which URL failed to parse, perhaps by returning a Result or using a custom error type that includes the URL string in the error message.

value
.as_str()
.parse()
.expect("database URL should be valid")
}
}

/// Error type for PhyTableUrl parsing
#[derive(Debug, thiserror::Error)]
#[error("invalid object store URL '{url}'")]
pub struct PhyTableUrlParseError {
url: String,
#[source]
source: url::ParseError,
}
22 changes: 13 additions & 9 deletions crates/core/metadata-db/src/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
//! This module provides a type-safe API for managing physical table locations in the metadata database.
//! Physical tables represent actual storage locations (e.g., Parquet files) for dataset tables.

use url::Url;

pub mod events;
mod location_id;
mod name;
pub(crate) mod sql;
mod url;

pub use self::{
location_id::{LocationId, LocationIdFromStrError, LocationIdI64ConvError, LocationIdU64Error},
name::{Name as TableName, NameOwned as TableNameOwned},
url::{Url as TableUrl, UrlOwned as TableUrlOwned},
};
use crate::{
DatasetName, DatasetNameOwned, DatasetNamespace, DatasetNamespaceOwned, ManifestHashOwned,
Expand All @@ -33,7 +33,7 @@ pub async fn register<'c, E>(
dataset_name: impl Into<DatasetName<'_>> + std::fmt::Debug,
manifest_hash: impl Into<ManifestHash<'_>> + std::fmt::Debug,
table_name: impl Into<TableName<'_>> + std::fmt::Debug,
url: &str,
url: impl Into<TableUrl<'_>> + std::fmt::Debug,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good API design using the Into trait for the url parameter. This provides flexibility for callers while maintaining type safety.

active: bool,
) -> Result<LocationId, Error>
where
Expand All @@ -45,7 +45,7 @@ where
dataset_name.into(),
manifest_hash.into(),
table_name.into(),
url,
url.into(),
active,
)
.await
Expand Down Expand Up @@ -85,11 +85,16 @@ where
/// If multiple locations exist with the same URL (which shouldn't happen in normal operation),
/// this returns the first match found.
#[tracing::instrument(skip(exe), err)]
pub async fn url_to_id<'c, E>(exe: E, url: &str) -> Result<Option<LocationId>, Error>
pub async fn url_to_id<'c, E>(
exe: E,
url: impl Into<TableUrl<'_>> + std::fmt::Debug,
) -> Result<Option<LocationId>, Error>
where
E: Executor<'c>,
{
sql::url_to_id(exe, url).await.map_err(Error::Database)
sql::url_to_id(exe, url.into())
.await
.map_err(Error::Database)
}

/// Get the currently active physical table location for a given table
Expand Down Expand Up @@ -267,8 +272,7 @@ pub struct PhysicalTable {
/// Name of the table within the dataset
pub table_name: TableNameOwned,
/// Full URL to the storage location
#[sqlx(try_from = "&'a str")]
pub url: Url,
pub url: TableUrlOwned,
/// Whether this location is currently active for queries
pub active: bool,
/// Writer job ID (if one exists)
Expand All @@ -291,7 +295,7 @@ impl LocationWithDetails {
}

/// Get the storage URL for this location
pub fn url(&self) -> &Url {
pub fn url(&self) -> &TableUrlOwned {
&self.table.url
}

Expand Down
9 changes: 4 additions & 5 deletions crates/core/metadata-db/src/physical_table/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
//! `Executor` trait and converts errors to `metadata_db::Error`.

use sqlx::{Executor, Postgres, types::JsonValue};
use url::Url;

use super::{
LocationId, LocationWithDetails, PhysicalTable,
name::{Name, NameOwned},
url::{Url, UrlOwned},
};
use crate::{
DatasetName, DatasetNameOwned, DatasetNamespace, DatasetNamespaceOwned, JobStatus,
Expand All @@ -27,7 +27,7 @@ pub async fn insert<'c, E>(
dataset_name: DatasetName<'_>,
manifest_hash: ManifestHash<'_>,
table_name: Name<'_>,
url: &str,
url: Url<'_>,
active: bool,
) -> Result<LocationId, sqlx::Error>
where
Expand Down Expand Up @@ -64,7 +64,7 @@ where
}

/// Get location ID by URL only, returns first match if multiple exist
pub async fn url_to_id<'c, E>(exe: E, url: &str) -> Result<Option<LocationId>, sqlx::Error>
pub async fn url_to_id<'c, E>(exe: E, url: Url<'_>) -> Result<Option<LocationId>, sqlx::Error>
where
E: Executor<'c, Database = Postgres>,
{
Expand Down Expand Up @@ -114,8 +114,7 @@ where
id: LocationId,
manifest_hash: ManifestHashOwned,
table_name: NameOwned,
#[sqlx(try_from = "&'a str")]
url: Url,
url: UrlOwned,
active: bool,
dataset_namespace: DatasetNamespaceOwned,
dataset_name: DatasetNameOwned,
Expand Down
Loading