From 6222b7276e2b5a37314cce80e3dfe93716665c81 Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Thu, 11 Dec 2025 17:17:43 +0100 Subject: [PATCH] refactor(common): add `PhyTableUrl` new-type for table base URL Introduce invariant-preserving newtype wrappers for physical table URLs to enforce type safety at compile time rather than relying on runtime string validation. - Add `PhyTableUrl` in common with `FromStr`, `Display`, and conversion traits - Add `TableUrl`/`TableUrlOwned` in metadata-db following the `Name` borrowing pattern - Update `physical_table` module API to use typed URL parameters - Convert tests to use the new URL types Signed-off-by: Lorenzo Delgado --- crates/core/common/src/catalog/physical.rs | 156 ++++++- crates/core/metadata-db/src/physical_table.rs | 22 +- .../metadata-db/src/physical_table/sql.rs | 9 +- .../src/physical_table/tests/it_crud.rs | 441 ++++++++++-------- .../src/physical_table/tests/it_pagination.rs | 95 ++-- .../metadata-db/src/physical_table/url.rs | 168 +++++++ 6 files changed, 609 insertions(+), 282 deletions(-) create mode 100644 crates/core/metadata-db/src/physical_table/url.rs diff --git a/crates/core/common/src/catalog/physical.rs b/crates/core/common/src/catalog/physical.rs index 4c7114a96..b44d1e626 100644 --- a/crates/core/common/src/catalog/physical.rs +++ b/crates/core/common/src/catalog/physical.rs @@ -180,8 +180,8 @@ pub struct PhysicalTable { /// Logical table representation. table: ResolvedTable, - /// Absolute URL to the data location, path section of the URL and the corresponding object store. - url: Url, + /// Base directory URL containing all parquet files for this table. + url: PhyTableUrl, /// Path to the data location in the object store. path: Path, @@ -201,15 +201,18 @@ impl PhysicalTable { /// Create a new physical table with the given dataset name, table, URL, and object store. pub fn new( table: ResolvedTable, - url: Url, + raw_url: Url, location_id: LocationId, metadata_db: MetadataDb, dataset_reference: HashReference, ) -> Result { - let path = Path::from_url_path(url.path()).unwrap(); - let object_store_url = url.clone().try_into()?; + let path = Path::from_url_path(raw_url.path()).unwrap(); + let object_store_url = raw_url.clone().try_into()?; let (object_store, _) = crate::store::object_store(&object_store_url)?; + // SAFETY: URL is validated by caller + let url = PhyTableUrl::new_unchecked(raw_url); + Ok(Self { table, url, @@ -232,18 +235,19 @@ impl PhysicalTable { set_active: bool, dataset_reference: &HashReference, ) -> Result { - let manifest_hash = dataset_reference.hash(); - let table_name = table.name(); + let path = make_location_path(dataset_reference, table.name()); + let raw_url = data_store.url().join(&path)?; + + // SAFETY: URL comes from data_store.url().join() which produces valid URLs + let url = PhyTableUrl::new_unchecked(raw_url); - let path = make_location_path(dataset_reference, table_name); - let url = data_store.url().join(&path)?; let location_id = metadata_db::physical_table::register( &metadata_db, dataset_reference.namespace(), dataset_reference.name(), - manifest_hash, - table_name, - url.as_str(), + dataset_reference.hash(), + table.name(), + &url, false, ) .await?; @@ -252,15 +256,15 @@ impl PhysicalTable { let mut tx = metadata_db.begin_txn().await?; metadata_db::physical_table::mark_inactive_by_table_id( &mut tx, - manifest_hash, - table_name, + dataset_reference.hash(), + table.name(), ) .await?; metadata_db::physical_table::mark_active_by_id( &mut tx, location_id, - manifest_hash, - table_name, + dataset_reference.hash(), + table.name(), ) .await?; tx.commit().await?; @@ -338,11 +342,14 @@ impl PhysicalTable { return Ok(None); }; - let url = physical_table.url; + let table_url = physical_table.url; let location_id = physical_table.id; + + // Convert TableUrl to PhyTableUrl + let url: PhyTableUrl = table_url.into(); let path = Path::from_url_path(url.path()).unwrap(); - let object_store_url = url.clone().try_into()?; + let object_store_url = url.inner().clone().try_into()?; let (object_store, _) = crate::store::object_store(&object_store_url)?; Ok(Some(Self { @@ -400,18 +407,21 @@ impl PhysicalTable { manifest_hash: &Hash, table_name: &TableName, path: &Path, - url: &Url, + raw_url: &Url, data_store: Arc, metadata_db: MetadataDb, dataset_reference: &HashReference, ) -> Result { + // SAFETY: The URL is validated by the caller and comes from the restore operation + let url = PhyTableUrl::new_unchecked(raw_url.clone()); + let location_id = metadata_db::physical_table::register( &metadata_db, dataset_reference.namespace(), dataset_reference.name(), manifest_hash, table_name, - url.as_str(), + &url, false, ) .await @@ -483,7 +493,7 @@ impl PhysicalTable { metadata_db .register_file( location_id, - url, + url.inner(), file_name, object_size, object_e_tag, @@ -497,7 +507,7 @@ impl PhysicalTable { let physical_table = Self { table: table.clone(), - url: url.clone(), + url, path: path.clone(), location_id, metadata_db, @@ -549,7 +559,7 @@ impl PhysicalTable { } pub fn url(&self) -> &Url { - &self.url + self.url.inner() } pub fn path(&self) -> &Path { @@ -722,7 +732,7 @@ impl PhysicalTable { impl PhysicalTable { fn object_store_url(&self) -> DataFusionResult { - Ok(ListingTableUrl::try_new(self.url.clone(), None)?.object_store()) + Ok(ListingTableUrl::try_new(self.url.inner().clone(), None)?.object_store()) } fn output_ordering(&self) -> DataFusionResult> { @@ -975,3 +985,101 @@ pub enum RestoreLatestRevisionError { #[error("Failed to restore latest revision")] RestoreLatest(#[source] RestoreLatestError), } + +/// Physical table URL _new-type_ wrapper +/// +/// Represents a base directory URL in the object store containing all parquet files for a table. +/// Individual file URLs are constructed by appending the filename to this base URL. +/// +/// ## URL Format +/// +/// `////` +/// +/// Where: +/// - `store_url`: The object store root URL (e.g., `s3://bucket`, `file:///data`) +/// - `dataset_name`: Dataset name (without namespace) +/// - `table`: Table name +/// - `uuid_v7`: Unique identifier for this table revision/location +/// +/// ## Example +/// +/// ```text +/// s3://my-bucket/ethereum_mainnet/logs/01234567-89ab-cdef-0123-456789abcdef/ +/// ``` +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct PhyTableUrl(Url); + +impl PhyTableUrl { + /// Create a new [`PhyTableUrl`] from a [`url::Url`] without validation + /// + /// # Safety + /// The caller must ensure the provided URL is a valid object store URL. + pub fn new_unchecked(url: Url) -> Self { + Self(url) + } + + /// Get the URL as a string slice + pub fn as_str(&self) -> &str { + self.0.as_str() + } + + /// Get the path portion of the URL + pub fn path(&self) -> &str { + self.0.path() + } + + /// Get a reference to the inner [`url::Url`] + pub fn inner(&self) -> &Url { + &self.0 + } +} + +impl std::str::FromStr for PhyTableUrl { + type Err = PhyTableUrlParseError; + + fn from_str(s: &str) -> Result { + let url = s.parse().map_err(|err| PhyTableUrlParseError { + url: s.to_string(), + source: err, + })?; + Ok(PhyTableUrl(url)) + } +} + +impl std::fmt::Display for PhyTableUrl { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0.as_str()) + } +} + +impl From for metadata_db::physical_table::TableUrlOwned { + fn from(value: PhyTableUrl) -> Self { + // SAFETY: PhyTableUrl is validated at construction via FromStr, ensuring invariants are upheld. + metadata_db::physical_table::TableUrl::from_owned_unchecked(value.as_str().to_owned()) + } +} + +impl<'a> From<&'a PhyTableUrl> for metadata_db::physical_table::TableUrl<'a> { + fn from(value: &'a PhyTableUrl) -> Self { + // SAFETY: PhyTableUrl is validated at construction via FromStr, ensuring invariants are upheld. + metadata_db::physical_table::TableUrl::from_ref_unchecked(value.as_str()) + } +} + +impl From for PhyTableUrl { + fn from(value: metadata_db::physical_table::TableUrlOwned) -> Self { + value + .as_str() + .parse() + .expect("database URL should be valid") + } +} + +/// Error type for PhyTableUrl parsing +#[derive(Debug, thiserror::Error)] +#[error("invalid object store URL '{url}'")] +pub struct PhyTableUrlParseError { + url: String, + #[source] + source: url::ParseError, +} diff --git a/crates/core/metadata-db/src/physical_table.rs b/crates/core/metadata-db/src/physical_table.rs index 4614294d9..92f840495 100644 --- a/crates/core/metadata-db/src/physical_table.rs +++ b/crates/core/metadata-db/src/physical_table.rs @@ -3,16 +3,16 @@ //! This module provides a type-safe API for managing physical table locations in the metadata database. //! Physical tables represent actual storage locations (e.g., Parquet files) for dataset tables. -use url::Url; - pub mod events; mod location_id; mod name; pub(crate) mod sql; +mod url; pub use self::{ location_id::{LocationId, LocationIdFromStrError, LocationIdI64ConvError, LocationIdU64Error}, name::{Name as TableName, NameOwned as TableNameOwned}, + url::{Url as TableUrl, UrlOwned as TableUrlOwned}, }; use crate::{ DatasetName, DatasetNameOwned, DatasetNamespace, DatasetNamespaceOwned, ManifestHashOwned, @@ -33,7 +33,7 @@ pub async fn register<'c, E>( dataset_name: impl Into> + std::fmt::Debug, manifest_hash: impl Into> + std::fmt::Debug, table_name: impl Into> + std::fmt::Debug, - url: &str, + url: impl Into> + std::fmt::Debug, active: bool, ) -> Result where @@ -45,7 +45,7 @@ where dataset_name.into(), manifest_hash.into(), table_name.into(), - url, + url.into(), active, ) .await @@ -85,11 +85,16 @@ where /// If multiple locations exist with the same URL (which shouldn't happen in normal operation), /// this returns the first match found. #[tracing::instrument(skip(exe), err)] -pub async fn url_to_id<'c, E>(exe: E, url: &str) -> Result, Error> +pub async fn url_to_id<'c, E>( + exe: E, + url: impl Into> + std::fmt::Debug, +) -> Result, Error> where E: Executor<'c>, { - sql::url_to_id(exe, url).await.map_err(Error::Database) + sql::url_to_id(exe, url.into()) + .await + .map_err(Error::Database) } /// Get the currently active physical table location for a given table @@ -267,8 +272,7 @@ pub struct PhysicalTable { /// Name of the table within the dataset pub table_name: TableNameOwned, /// Full URL to the storage location - #[sqlx(try_from = "&'a str")] - pub url: Url, + pub url: TableUrlOwned, /// Whether this location is currently active for queries pub active: bool, /// Writer job ID (if one exists) @@ -291,7 +295,7 @@ impl LocationWithDetails { } /// Get the storage URL for this location - pub fn url(&self) -> &Url { + pub fn url(&self) -> &TableUrlOwned { &self.table.url } diff --git a/crates/core/metadata-db/src/physical_table/sql.rs b/crates/core/metadata-db/src/physical_table/sql.rs index 9d7e2026f..ef0e2a827 100644 --- a/crates/core/metadata-db/src/physical_table/sql.rs +++ b/crates/core/metadata-db/src/physical_table/sql.rs @@ -6,11 +6,11 @@ //! `Executor` trait and converts errors to `metadata_db::Error`. use sqlx::{Executor, Postgres, types::JsonValue}; -use url::Url; use super::{ LocationId, LocationWithDetails, PhysicalTable, name::{Name, NameOwned}, + url::{Url, UrlOwned}, }; use crate::{ DatasetName, DatasetNameOwned, DatasetNamespace, DatasetNamespaceOwned, JobStatus, @@ -27,7 +27,7 @@ pub async fn insert<'c, E>( dataset_name: DatasetName<'_>, manifest_hash: ManifestHash<'_>, table_name: Name<'_>, - url: &str, + url: Url<'_>, active: bool, ) -> Result where @@ -64,7 +64,7 @@ where } /// Get location ID by URL only, returns first match if multiple exist -pub async fn url_to_id<'c, E>(exe: E, url: &str) -> Result, sqlx::Error> +pub async fn url_to_id<'c, E>(exe: E, url: Url<'_>) -> Result, sqlx::Error> where E: Executor<'c, Database = Postgres>, { @@ -114,8 +114,7 @@ where id: LocationId, manifest_hash: ManifestHashOwned, table_name: NameOwned, - #[sqlx(try_from = "&'a str")] - url: Url, + url: UrlOwned, active: bool, dataset_namespace: DatasetNamespaceOwned, dataset_name: DatasetNameOwned, diff --git a/crates/core/metadata-db/src/physical_table/tests/it_crud.rs b/crates/core/metadata-db/src/physical_table/tests/it_crud.rs index 8fdb0bf9b..bb4993b33 100644 --- a/crates/core/metadata-db/src/physical_table/tests/it_crud.rs +++ b/crates/core/metadata-db/src/physical_table/tests/it_crud.rs @@ -1,14 +1,13 @@ //! Core location operations tests use pgtemp::PgTempDB; -use url::Url; use crate::{ DatasetName, DatasetNamespace, WorkerInfo, WorkerNodeId, db::Connection, jobs::{self, JobId}, manifests::ManifestHash, - physical_table::{self, LocationId, TableName}, + physical_table::{self, LocationId, TableName, TableUrl}, workers, }; @@ -23,35 +22,26 @@ async fn insert_creates_location_and_returns_id() { .await .expect("Failed to run migrations"); - let columns: Vec = sqlx::query_scalar("SELECT column_name FROM information_schema.columns WHERE table_name = 'physical_table' ORDER BY column_name").fetch_all(&mut conn).await.expect("Failed to query schema"); - println!("Physical table columns: {:?}", columns); - let namespace = DatasetNamespace::from_ref_unchecked("test-namespace"); let name = DatasetName::from_ref_unchecked("test-dataset"); let hash = ManifestHash::from_ref_unchecked( "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef", ); - let table_name = TableName::from_ref_unchecked("test_table"); - let url = - Url::parse("s3://test-bucket/test/path/file.parquet").expect("Failed to parse test URL"); + let url = TableUrl::from_ref_unchecked("s3://test-bucket/test/path/file.parquet"); let active = true; //* When - let location_id = physical_table::register( - &mut conn, - namespace, - name, - hash, - &table_name, - url.as_str(), - active, - ) - .await - .expect("Failed to insert location"); + let location_id = + physical_table::register(&mut conn, namespace, name, hash, table_name, url, active) + .await + .expect("Failed to insert location"); //* Then - assert!(*location_id > 0); + assert!( + *location_id > 0, + "register should return valid positive location_id" + ); // Verify the location was created correctly let (row_location_id, row_manifest_hash, row_table_name, row_url, row_active) = @@ -59,14 +49,23 @@ async fn insert_creates_location_and_returns_id() { .await .expect("Failed to fetch inserted location"); - assert_eq!(row_location_id, location_id); assert_eq!( - row_manifest_hash, - "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef" + row_location_id, location_id, + "database should store location with returned ID" + ); + assert_eq!( + row_manifest_hash, "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef", + "database should persist manifest_hash" + ); + assert_eq!( + row_table_name, "test_table", + "database should persist table_name" ); - assert_eq!(row_table_name, "test_table"); - assert_eq!(row_url, url.as_str()); - assert!(row_active); + assert_eq!( + row_url, "s3://test-bucket/test/path/file.parquet", + "database should persist URL" + ); + assert!(row_active, "database should persist active status"); } #[tokio::test] @@ -85,39 +84,26 @@ async fn insert_on_conflict_returns_existing_id() { let hash = ManifestHash::from_ref_unchecked( "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef", ); - let table_name = TableName::from_ref_unchecked("test_table"); - let url = Url::parse("s3://test-bucket/unique-file.parquet") - .expect("Failed to parse unique file URL"); + let url = TableUrl::from_ref_unchecked("s3://test-bucket/unique-file.parquet"); // Insert first location - let first_id = physical_table::register( - &mut conn, - namespace.clone(), - name.clone(), - hash.clone(), - &table_name, - url.as_str(), - true, - ) - .await - .expect("Failed to insert first location"); + let first_id = + physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, &url, true) + .await + .expect("Failed to insert first location"); //* When - Try to insert with same URL but different data - let second_id = physical_table::register( - &mut conn, - namespace, - name, - hash, - &table_name, - url.as_str(), - false, - ) - .await - .expect("Failed to insert second location"); + let second_id = + physical_table::register(&mut conn, namespace, name, hash, table_name, url, false) + .await + .expect("Failed to insert second location"); //* Then - Should return the same ID due to conflict resolution - assert_eq!(first_id, second_id); + assert_eq!( + first_id, second_id, + "register with duplicate URL should return existing location_id (conflict resolution)" + ); } #[tokio::test] @@ -138,27 +124,24 @@ async fn url_to_location_id_finds_existing_location() { ); let table_name = TableName::from_ref_unchecked("test_table"); - let url = Url::parse("s3://test-bucket/find-me.parquet").expect("Failed to parse find-me URL"); + let url = TableUrl::from_ref_unchecked("s3://test-bucket/find-me.parquet"); - let expected_id = physical_table::register( - &mut conn, - namespace, - name, - hash, - &table_name, - url.as_str(), - false, - ) - .await - .expect("Failed to insert location"); + let expected_id = + physical_table::register(&mut conn, namespace, name, hash, table_name, &url, false) + .await + .expect("Failed to insert location"); //* When - let found_id = physical_table::url_to_id(&mut conn, url.as_str()) + let found_id = physical_table::url_to_id(&mut conn, url) .await .expect("Failed to search for location"); //* Then - assert_eq!(found_id, Some(expected_id)); + assert_eq!( + found_id, + Some(expected_id), + "url_to_id should find location_id by URL lookup" + ); } #[tokio::test] @@ -172,16 +155,18 @@ async fn url_to_location_id_returns_none_when_not_found() { .await .expect("Failed to run migrations"); - let url = Url::parse("s3://test-bucket/nonexistent.parquet") - .expect("Failed to parse nonexistent URL"); + let url = TableUrl::from_ref_unchecked("s3://test-bucket/nonexistent.parquet"); //* When - let found_id = physical_table::url_to_id(&mut conn, url.as_str()) + let found_id = physical_table::url_to_id(&mut conn, url) .await .expect("Failed to search for location"); //* Then - assert_eq!(found_id, None); + assert_eq!( + found_id, None, + "url_to_id should return None when URL not found" + ); } #[tokio::test] @@ -206,85 +191,102 @@ async fn get_active_by_table_id_filters_by_table_and_active_status() { let other_table_name = TableName::from_ref_unchecked("other_table"); // Create active location for target table - let url1 = Url::parse("s3://bucket/active1.parquet").expect("Failed to parse active1 URL"); + let url1 = TableUrl::from_ref_unchecked("s3://bucket/active1.parquet"); let active_id1 = physical_table::register( &mut conn, - namespace.clone(), - name.clone(), - hash.clone(), + &namespace, + &name, + &hash, &table_name, - url1.as_str(), + &url1, true, ) .await .expect("Failed to insert active location 1"); // Create another active location for different table (still should be returned) - let url2 = Url::parse("s3://bucket/active2.parquet").expect("Failed to parse active2 URL"); + let url2 = TableUrl::from_ref_unchecked("s3://bucket/active2.parquet"); let active_id2 = physical_table::register( &mut conn, - namespace.clone(), - name.clone(), - hash.clone(), + &namespace, + &name, + &hash, &table2_name, - url2.as_str(), + &url2, true, ) .await .expect("Failed to insert active location 2"); // Create inactive location for target table (should be filtered out) - let url3 = Url::parse("s3://bucket/inactive.parquet").expect("Failed to parse inactive URL"); + let url3 = TableUrl::from_ref_unchecked("s3://bucket/inactive.parquet"); physical_table::register( &mut conn, - namespace.clone(), - name.clone(), - hash.clone(), + &namespace, + &name, + &hash, &table_name, - url3.as_str(), + url3, false, ) .await .expect("Failed to insert inactive location"); // Create active location for different table (should be filtered out) - let url4 = - Url::parse("s3://bucket/other-table.parquet").expect("Failed to parse other-table URL"); + let url4 = TableUrl::from_ref_unchecked("s3://bucket/other-table.parquet"); physical_table::register( &mut conn, - namespace.clone(), - name.clone(), - hash.clone(), + &namespace, + &name, + &hash, &other_table_name, - url4.as_str(), + url4, true, ) .await .expect("Failed to insert location for other table"); //* When - Get locations for first table - let active_location1 = - physical_table::get_active_physical_table(&mut conn, hash.clone(), &table_name) - .await - .expect("Failed to get active locations for table 1"); - let active_location2 = physical_table::get_active_physical_table(&mut conn, hash, &table2_name) + let active_location1 = physical_table::get_active_physical_table(&mut conn, &hash, &table_name) .await - .expect("Failed to get active locations for table 2"); + .expect("Failed to get active locations for table 1"); + let active_location2 = + physical_table::get_active_physical_table(&mut conn, &hash, &table2_name) + .await + .expect("Failed to get active locations for table 2"); //* Then - assert!(active_location1.is_some()); - assert!(active_location2.is_some()); + assert!( + active_location1.is_some(), + "get_active_physical_table should return active location for table 1" + ); + assert!( + active_location2.is_some(), + "get_active_physical_table should return active location for table 2" + ); let active_location1 = active_location1.unwrap(); let active_location2 = active_location2.unwrap(); // Check that we got the right locations - assert_eq!(active_location1.url, url1); - assert_eq!(active_location2.url, url2); + assert_eq!( + active_location1.url, url1, + "get_active_physical_table should return location with matching URL for table 1" + ); + assert_eq!( + active_location2.url, url2, + "get_active_physical_table should return location with matching URL for table 2" + ); // Check that we got the right IDs - assert_eq!(active_location1.id, active_id1); - assert_eq!(active_location2.id, active_id2); + assert_eq!( + active_location1.id, active_id1, + "get_active_physical_table should filter by table_name (table 1)" + ); + assert_eq!( + active_location2.id, active_id2, + "get_active_physical_table should filter by table_name (table 2)" + ); } #[tokio::test] @@ -309,64 +311,56 @@ async fn mark_inactive_by_table_id_deactivates_only_matching_active_locations() let other_table_name = TableName::from_ref_unchecked("other_table"); // Create active location for first target table - let url1 = Url::parse("s3://bucket/target1.parquet").expect("Failed to parse target1 URL"); - let target_id1 = physical_table::register( - &mut conn, - namespace.clone(), - name.clone(), - hash.clone(), - &table_name, - url1.as_str(), - true, - ) - .await - .expect("Failed to insert target location 1"); + let url1 = TableUrl::from_ref_unchecked("s3://bucket/target1.parquet"); + let target_id1 = + physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, url1, true) + .await + .expect("Failed to insert target location 1"); // Create active location for second target table - let url2 = Url::parse("s3://bucket/target2.parquet").expect("Failed to parse target2 URL"); + let url2 = TableUrl::from_ref_unchecked("s3://bucket/target2.parquet"); let target_id2 = physical_table::register( &mut conn, - namespace.clone(), - name.clone(), - hash.clone(), + &namespace, + &name, + &hash, &table2_name, - url2.as_str(), + url2, true, ) .await .expect("Failed to insert target location 2"); // Create already inactive location for target table (should remain unchanged) - let url3 = Url::parse("s3://bucket/already-inactive.parquet") - .expect("Failed to parse already-inactive URL"); + let url3 = TableUrl::from_ref_unchecked("s3://bucket/already-inactive.parquet"); let inactive_id = physical_table::register( &mut conn, - namespace.clone(), - name.clone(), - hash.clone(), + &namespace, + &name, + &hash, &table_name, - url3.as_str(), + url3, false, ) .await .expect("Failed to insert inactive location"); // Create active location for different table (should remain unchanged) - let url4 = Url::parse("s3://bucket/other.parquet").expect("Failed to parse other URL"); + let url4 = TableUrl::from_ref_unchecked("s3://bucket/other.parquet"); let other_id = physical_table::register( &mut conn, - namespace.clone(), - name.clone(), - hash.clone(), + &namespace, + &name, + &hash, &other_table_name, - url4.as_str(), + url4, true, ) .await .expect("Failed to insert other table location"); //* When - Mark only the first table inactive - physical_table::mark_inactive_by_table_id(&mut conn, hash.clone(), &table_name) + physical_table::mark_inactive_by_table_id(&mut conn, &hash, &table_name) .await .expect("Failed to mark locations inactive"); @@ -385,10 +379,22 @@ async fn mark_inactive_by_table_id_deactivates_only_matching_active_locations() .await .expect("Failed to check other location status"); - assert!(!target1_active); // This was deactivated - assert!(target2_active); // Different table, stays active - assert!(!inactive_still_inactive); // Was already inactive - assert!(other_still_active); // Different dataset, stays active + assert!( + !target1_active, + "mark_inactive_by_table_id should deactivate active location matching table_name" + ); + assert!( + target2_active, + "mark_inactive_by_table_id should not affect different table_name" + ); + assert!( + !inactive_still_inactive, + "mark_inactive_by_table_id should not affect already inactive locations" + ); + assert!( + other_still_active, + "mark_inactive_by_table_id should not affect different table_name" + ); } #[tokio::test] @@ -410,36 +416,34 @@ async fn mark_active_by_id_activates_specific_location() { let table_name = TableName::from_ref_unchecked("test_table"); - let url1 = - Url::parse("s3://bucket/to-activate.parquet").expect("Failed to parse to-activate URL"); + let url1 = TableUrl::from_ref_unchecked("s3://bucket/to-activate.parquet"); let target_id = physical_table::register( &mut conn, - namespace.clone(), - name.clone(), - hash.clone(), + &namespace, + &name, + &hash, &table_name, - url1.as_str(), + url1, false, ) .await .expect("Failed to insert location to activate"); - let url2 = - Url::parse("s3://bucket/stay-inactive.parquet").expect("Failed to parse stay-inactive URL"); + let url2 = TableUrl::from_ref_unchecked("s3://bucket/stay-inactive.parquet"); let other_id = physical_table::register( &mut conn, - namespace, - name, - hash.clone(), + &namespace, + &name, + &hash, &table_name, - url2.as_str(), + url2, false, ) .await .expect("Failed to insert other location"); //* When - physical_table::mark_active_by_id(&mut conn, target_id, hash, &table_name) + physical_table::mark_active_by_id(&mut conn, target_id, &hash, &table_name) .await .expect("Failed to mark location active"); @@ -451,8 +455,14 @@ async fn mark_active_by_id_activates_specific_location() { .await .expect("Failed to check other location active status"); - assert!(target_active); - assert!(!other_still_inactive); + assert!( + target_active, + "mark_active_by_id should activate location matching ID" + ); + assert!( + !other_still_inactive, + "mark_active_by_id should only affect specified location_id" + ); } #[tokio::test] @@ -475,7 +485,7 @@ async fn assign_job_writer_assigns_job_to_multiple_locations() { // Create a worker and job let worker_id = WorkerNodeId::from_ref_unchecked("test-writer-worker"); let worker_info = WorkerInfo::default(); // {} - workers::register(&mut conn, worker_id.clone(), worker_info) + workers::register(&mut conn, &worker_id, worker_info) .await .expect("Failed to register worker"); @@ -489,59 +499,51 @@ async fn assign_job_writer_assigns_job_to_multiple_locations() { let table_name = TableName::from_ref_unchecked("output_table"); // Create locations to assign - let url1 = Url::parse("s3://bucket/assign1.parquet").expect("Failed to parse assign1 URL"); + let url1 = TableUrl::from_ref_unchecked("s3://bucket/assign1.parquet"); let location_id1 = physical_table::register( &mut conn, - namespace.clone(), - name.clone(), - hash.clone(), + &namespace, + &name, + &hash, &table_name, - url1.as_str(), + url1, false, ) .await .expect("Failed to insert location 1"); - let url2 = Url::parse("s3://bucket/assign2.parquet").expect("Failed to parse assign2 URL"); + let url2 = TableUrl::from_ref_unchecked("s3://bucket/assign2.parquet"); let location_id2 = physical_table::register( &mut conn, - namespace.clone(), - name.clone(), - hash.clone(), + &namespace, + &name, + &hash, &table_name, - url2.as_str(), + url2, false, ) .await .expect("Failed to insert location 2"); - let url3 = Url::parse("s3://bucket/assign3.parquet").expect("Failed to parse assign3 URL"); + let url3 = TableUrl::from_ref_unchecked("s3://bucket/assign3.parquet"); let location_id3 = physical_table::register( &mut conn, - namespace.clone(), - name.clone(), - hash.clone(), + &namespace, + &name, + &hash, &table_name, - url3.as_str(), + url3, false, ) .await .expect("Failed to insert location 3"); // Create a location that should not be assigned - let url4 = - Url::parse("s3://bucket/not-assigned.parquet").expect("Failed to parse not-assigned URL"); - let unassigned_id = physical_table::register( - &mut conn, - namespace, - name, - hash, - &table_name, - url4.as_str(), - false, - ) - .await - .expect("Failed to insert unassigned location"); + let url4 = TableUrl::from_ref_unchecked("s3://bucket/not-assigned.parquet"); + let unassigned_id = + physical_table::register(&mut conn, namespace, name, hash, &table_name, url4, false) + .await + .expect("Failed to insert unassigned location"); //* When physical_table::assign_job_writer( @@ -566,10 +568,25 @@ async fn assign_job_writer_assigns_job_to_multiple_locations() { .await .expect("Failed to get writer for unassigned location"); - assert_eq!(writer1, Some(job_id)); - assert_eq!(writer2, Some(job_id)); - assert_eq!(writer3, Some(job_id)); - assert_eq!(writer_unassigned, None); + assert_eq!( + writer1, + Some(job_id), + "assign_job_writer should set writer for location 1" + ); + assert_eq!( + writer2, + Some(job_id), + "assign_job_writer should set writer for location 2" + ); + assert_eq!( + writer3, + Some(job_id), + "assign_job_writer should set writer for location 3" + ); + assert_eq!( + writer_unassigned, None, + "assign_job_writer should only affect specified locations" + ); } #[tokio::test] @@ -590,35 +607,46 @@ async fn get_by_id_returns_existing_location() { ); let table_name = TableName::from_ref_unchecked("test_table"); - let url = Url::parse("s3://bucket/get-by-id.parquet").expect("Failed to parse URL"); + let url = TableUrl::from_ref_unchecked("s3://bucket/get-by-id.parquet"); - let inserted_id = physical_table::register( - &mut conn, - namespace, - name, - hash, - &table_name, - url.as_str(), - true, - ) - .await - .expect("Failed to insert location"); + let inserted_id = + physical_table::register(&mut conn, namespace, name, hash, &table_name, &url, true) + .await + .expect("Failed to insert location"); //* When let location = physical_table::get_by_id_with_details(&mut conn, inserted_id) .await .expect("Failed to get location by id"); - println!("location: {:?}", location); - //* Then - assert!(location.is_some()); + assert!( + location.is_some(), + "get_by_id_with_details should return existing location" + ); let location = location.unwrap(); - assert_eq!(location.id(), inserted_id); - assert_eq!(location.table.dataset_name, "test-dataset"); - assert_eq!(location.table.table_name, "test_table"); - assert_eq!(location.url(), &url); - assert!(location.active()); + assert_eq!( + location.id(), + inserted_id, + "get_by_id_with_details should return location matching ID" + ); + assert_eq!( + location.table.dataset_name, "test-dataset", + "get_by_id_with_details should return location with persisted dataset_name" + ); + assert_eq!( + location.table.table_name, "test_table", + "get_by_id_with_details should return location with persisted table_name" + ); + assert_eq!( + location.url(), + &url, + "get_by_id_with_details should return location with persisted URL" + ); + assert!( + location.active(), + "get_by_id_with_details should return location with persisted active status" + ); } #[tokio::test] @@ -640,7 +668,10 @@ async fn get_by_id_returns_none_for_nonexistent_location() { .expect("Failed to get location by id"); //* Then - assert!(location.is_none()); + assert!( + location.is_none(), + "get_by_id_with_details should return None for nonexistent location_id" + ); } // Helper functions for tests diff --git a/crates/core/metadata-db/src/physical_table/tests/it_pagination.rs b/crates/core/metadata-db/src/physical_table/tests/it_pagination.rs index 0954ccebb..9d402d9b6 100644 --- a/crates/core/metadata-db/src/physical_table/tests/it_pagination.rs +++ b/crates/core/metadata-db/src/physical_table/tests/it_pagination.rs @@ -1,14 +1,13 @@ //! Pagination tests for location listing use pgtemp::PgTempDB; -use url::Url; use crate::{ DatasetName, DatasetNamespace, db::Connection, manifests::ManifestHash, physical_table, - physical_table::{LocationId, TableName}, + physical_table::{LocationId, TableName, TableUrl}, }; #[tokio::test] @@ -28,7 +27,10 @@ async fn list_locations_first_page_when_empty() { .expect("Failed to list locations"); //* Then - assert!(locations.is_empty()); + assert!( + locations.is_empty(), + "list should return empty result when no locations exist" + ); } #[tokio::test] @@ -51,19 +53,10 @@ async fn list_locations_first_page_respects_limit() { // Create 5 locations with unique table names to avoid unique constraint violation for i in 0..5 { let table_name = TableName::from_owned_unchecked(format!("test_table_{}", i)); - let url = - Url::parse(&format!("s3://bucket/file{}.parquet", i)).expect("Failed to parse URL"); - physical_table::register( - &mut conn, - namespace.clone(), - name.clone(), - hash.clone(), - &table_name, - url.as_str(), - true, - ) - .await - .expect("Failed to insert location"); + let url = TableUrl::from_owned_unchecked(format!("s3://bucket/file{}.parquet", i)); + physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, url, true) + .await + .expect("Failed to insert location"); // Small delay to ensure different timestamps tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; @@ -75,12 +68,28 @@ async fn list_locations_first_page_respects_limit() { .expect("Failed to list locations"); //* Then - assert_eq!(locations.len(), 3); - assert!(locations[0].id > locations[1].id); - assert!(locations[1].id > locations[2].id); + assert_eq!( + locations.len(), + 3, + "list should respect limit and return 3 locations" + ); + assert!( + locations[0].id > locations[1].id, + "list should return locations in descending ID order (newest first)" + ); + assert!( + locations[1].id > locations[2].id, + "list should maintain descending ID order throughout page" + ); for location in &locations { - assert!(location.table_name.starts_with("test_table_")); - assert!(location.active); + assert!( + location.table_name.starts_with("test_table_"), + "list should return locations with correct table_name prefix" + ); + assert!( + location.active, + "list should return locations with persisted active status" + ); } } @@ -105,19 +114,11 @@ async fn list_locations_next_page_uses_cursor() { let mut all_location_ids = Vec::new(); for i in 0..10 { let table_name = TableName::from_owned_unchecked(format!("test_table_page_{}", i)); - let url = - Url::parse(&format!("s3://bucket/page{}.parquet", i)).expect("Failed to parse URL"); - let location_id = physical_table::register( - &mut conn, - namespace.clone(), - name.clone(), - hash.clone(), - &table_name, - url.as_str(), - true, - ) - .await - .expect("Failed to insert location"); + let url = TableUrl::from_owned_unchecked(format!("s3://bucket/page{}.parquet", i)); + let location_id = + physical_table::register(&mut conn, &namespace, &name, &hash, &table_name, url, true) + .await + .expect("Failed to insert location"); all_location_ids.push(location_id); // Small delay to ensure different timestamps @@ -139,15 +140,31 @@ async fn list_locations_next_page_uses_cursor() { .expect("Failed to list second page"); //* Then - assert_eq!(second_page.len(), 3); + assert_eq!( + second_page.len(), + 3, + "list should respect limit for subsequent pages" + ); // Verify no overlap with first page let first_page_ids: Vec<_> = first_page.iter().map(|l| l.id).collect(); for location in &second_page { - assert!(!first_page_ids.contains(&location.id)); + assert!( + !first_page_ids.contains(&location.id), + "list should not return locations from previous page (no overlap)" + ); } // Verify ordering - assert!(second_page[0].id > second_page[1].id); - assert!(second_page[1].id > second_page[2].id); + assert!( + second_page[0].id > second_page[1].id, + "list should maintain descending ID order on second page" + ); + assert!( + second_page[1].id > second_page[2].id, + "list should maintain descending ID order throughout second page" + ); // Verify cursor worked correctly - assert!(cursor > second_page[0].id); + assert!( + cursor > second_page[0].id, + "list should use cursor to exclude locations with ID >= cursor" + ); } diff --git a/crates/core/metadata-db/src/physical_table/url.rs b/crates/core/metadata-db/src/physical_table/url.rs new file mode 100644 index 000000000..0ae72a609 --- /dev/null +++ b/crates/core/metadata-db/src/physical_table/url.rs @@ -0,0 +1,168 @@ +//! Physical table URL new-type wrapper for database values +//! +//! This module provides a [`Url`] new-type wrapper around [`Cow`] that maintains +//! physical table URL invariants for database operations. The type provides efficient handling +//! with support for both borrowed and owned strings. +//! +//! ## Validation Strategy +//! +//! This type **maintains invariants but does not validate** input data. Validation occurs +//! at system boundaries through types like [`common::catalog::physical::PhyTableUrl`], which enforce +//! the required format before converting into this database-layer type. Database values are +//! trusted as already valid, following the principle of "validate at boundaries, trust +//! database data." +//! +//! Types that convert into [`Url`] are responsible for ensuring invariants are met: +//! - URLs must be valid object store URLs (parseable by `url::Url`) +//! - URLs must have a supported scheme (s3, gs, azure, file) +//! - URLs should represent directory locations (typically ending with `/`) + +use std::borrow::Cow; + +/// An owned physical table URL type for database return values and owned storage scenarios. +/// +/// This is a type alias for `Url<'static>`, specifically intended for use as a return type from +/// database queries or in any context where a physical table URL with owned storage is required. +/// Prefer this alias when working with URLs that need to be stored or returned from the database, +/// rather than just representing a URL with owned storage in general. +pub type UrlOwned = Url<'static>; + +/// A physical table URL wrapper for database values. +/// +/// This new-type wrapper around `Cow` maintains physical table URL invariants for database +/// operations. It supports both borrowed and owned strings through copy-on-write semantics, +/// enabling efficient handling without unnecessary allocations. +/// +/// The type trusts that values are already validated. Validation must occur at system +/// boundaries before conversion into this type. +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Url<'a>(Cow<'a, str>); + +impl<'a> Url<'a> { + /// Create a new Url wrapper from a reference to str (borrowed) + /// + /// # Safety + /// The caller must ensure the provided URL upholds the physical table URL invariants. + /// This method does not perform validation. Failure to uphold the invariants may + /// cause undefined behavior. + pub fn from_ref_unchecked(url: &'a str) -> Self { + Self(Cow::Borrowed(url)) + } + + /// Create a new Url wrapper from an owned String + /// + /// # Safety + /// The caller must ensure the provided URL upholds the physical table URL invariants. + /// This method does not perform validation. Failure to uphold the invariants may + /// cause undefined behavior. + pub fn from_owned_unchecked(url: String) -> Url<'static> { + Url(Cow::Owned(url)) + } + + /// Consume and return the inner String (owned) + pub fn into_inner(self) -> String { + match self { + Url(Cow::Owned(url)) => url, + Url(Cow::Borrowed(url)) => url.to_owned(), + } + } + + /// Get a reference to the inner str + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl<'a> From<&'a Url<'a>> for Url<'a> { + fn from(value: &'a Url<'a>) -> Self { + // Create a borrowed Cow variant pointing to the data inside the input Url. + // This works for both Cow::Borrowed and Cow::Owned without cloning the underlying data. + // SAFETY: The input Url already upholds invariants, so the referenced data is valid. + Url::from_ref_unchecked(value.as_ref()) + } +} + +impl<'a> std::ops::Deref for Url<'a> { + type Target = str; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a> AsRef for Url<'a> { + fn as_ref(&self) -> &str { + &self.0 + } +} + +impl<'a> PartialEq<&str> for Url<'a> { + fn eq(&self, other: &&str) -> bool { + self.as_str() == *other + } +} + +impl<'a> PartialEq> for &str { + fn eq(&self, other: &Url<'a>) -> bool { + *self == other.as_str() + } +} + +impl<'a> PartialEq for Url<'a> { + fn eq(&self, other: &str) -> bool { + self.as_str() == other + } +} + +impl<'a> PartialEq> for str { + fn eq(&self, other: &Url<'a>) -> bool { + self == other.as_str() + } +} + +impl<'a> PartialEq for Url<'a> { + fn eq(&self, other: &String) -> bool { + self.as_str() == other + } +} + +impl<'a> PartialEq> for String { + fn eq(&self, other: &Url<'a>) -> bool { + self == other.as_str() + } +} + +impl<'a> std::fmt::Display for Url<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl<'a> std::fmt::Debug for Url<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl sqlx::Type for Url<'_> { + fn type_info() -> sqlx::postgres::PgTypeInfo { + >::type_info() + } +} + +impl<'a> sqlx::Encode<'_, sqlx::Postgres> for Url<'a> { + fn encode_by_ref( + &self, + buf: &mut ::ArgumentBuffer<'_>, + ) -> Result { + <&str as sqlx::Encode<'_, sqlx::Postgres>>::encode_by_ref(&self.as_str(), buf) + } +} + +impl<'r> sqlx::Decode<'r, sqlx::Postgres> for UrlOwned { + fn decode(value: sqlx::postgres::PgValueRef<'r>) -> Result { + let s = >::decode(value)?; + // SAFETY: Database values are trusted to uphold invariants; validation occurs at boundaries before insertion. + Ok(Url::from_owned_unchecked(s)) + } +}