diff --git a/src/iceberg/catalog/rest/endpoint.h b/src/iceberg/catalog/rest/endpoint.h index 7382955ce..596c21176 100644 --- a/src/iceberg/catalog/rest/endpoint.h +++ b/src/iceberg/catalog/rest/endpoint.h @@ -122,6 +122,9 @@ class ICEBERG_REST_EXPORT Endpoint { return {HttpMethod::kGet, "/v1/{prefix}/namespaces/{namespace}/tables/{table}/credentials"}; } + static Endpoint SubmitTableScanPlan() { + return {HttpMethod::kPost, "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan"}; + } // Transaction endpoints static Endpoint CommitTransaction() { diff --git a/src/iceberg/catalog/rest/resource_paths.cc b/src/iceberg/catalog/rest/resource_paths.cc index 7bdde7f04..636c2ca4e 100644 --- a/src/iceberg/catalog/rest/resource_paths.cc +++ b/src/iceberg/catalog/rest/resource_paths.cc @@ -110,6 +110,14 @@ Result ResourcePaths::Credentials(const TableIdentifier& ident) con encoded_namespace, encoded_table_name); } +Result ResourcePaths::Plan(const TableIdentifier& ident) const { + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, + EncodeNamespace(ident.ns, namespace_separator_)); + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name)); + return std::format("{}/v1/{}namespaces/{}/tables/{}/plan", base_uri_, prefix_, + encoded_namespace, encoded_table_name); +} + Result ResourcePaths::CommitTransaction() const { return std::format("{}/v1/{}transactions/commit", base_uri_, prefix_); } diff --git a/src/iceberg/catalog/rest/resource_paths.h b/src/iceberg/catalog/rest/resource_paths.h index db326b133..f4631339d 100644 --- a/src/iceberg/catalog/rest/resource_paths.h +++ b/src/iceberg/catalog/rest/resource_paths.h @@ -80,6 +80,10 @@ class ICEBERG_REST_EXPORT ResourcePaths { /// endpoint path. Result Credentials(const TableIdentifier& ident) const; + /// \brief Get the /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan endpoint + /// path. + Result Plan(const TableIdentifier& ident) const; + /// \brief Get the /v1/{prefix}/transactions/commit endpoint path. Result CommitTransaction() const; diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index a0267adcb..ba20109a1 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -56,13 +56,14 @@ namespace { /// iceberg rest spec. std::unordered_set GetDefaultEndpoints() { return { - Endpoint::ListNamespaces(), Endpoint::GetNamespaceProperties(), - Endpoint::CreateNamespace(), Endpoint::UpdateNamespace(), - Endpoint::DropNamespace(), Endpoint::ListTables(), - Endpoint::LoadTable(), Endpoint::CreateTable(), - Endpoint::UpdateTable(), Endpoint::DeleteTable(), - Endpoint::RenameTable(), Endpoint::RegisterTable(), - Endpoint::ReportMetrics(), Endpoint::CommitTransaction(), + Endpoint::ListNamespaces(), Endpoint::GetNamespaceProperties(), + Endpoint::CreateNamespace(), Endpoint::UpdateNamespace(), + Endpoint::DropNamespace(), Endpoint::ListTables(), + Endpoint::LoadTable(), Endpoint::CreateTable(), + Endpoint::UpdateTable(), Endpoint::DeleteTable(), + Endpoint::RenameTable(), Endpoint::RegisterTable(), + Endpoint::ReportMetrics(), Endpoint::SubmitTableScanPlan(), + Endpoint::CommitTransaction(), }; } @@ -503,4 +504,15 @@ Result> RestCatalog::RegisterTable( shared_from_this()); } +Result RestCatalog::SubmitTableScanPlan(const TableIdentifier& identifier, + const std::string& payload) { + ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::SubmitTableScanPlan()); + ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Plan(identifier)); + ICEBERG_ASSIGN_OR_RAISE( + const auto response, + client_->Post(path, payload, /*headers=*/{}, *TableErrorHandler::Instance(), + *catalog_session_)); + return response.body(); +} + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_catalog.h b/src/iceberg/catalog/rest/rest_catalog.h index 4fd4db5b8..f9dbd0dd3 100644 --- a/src/iceberg/catalog/rest/rest_catalog.h +++ b/src/iceberg/catalog/rest/rest_catalog.h @@ -99,6 +99,14 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog, const TableIdentifier& identifier, const std::string& metadata_file_location) override; + /// \brief Submit a table scan plan payload to the catalog scan-planning endpoint. + /// + /// \param identifier a table identifier + /// \param payload serialized JSON payload for scan planning + /// \return the raw response body returned by the catalog service + Result SubmitTableScanPlan(const TableIdentifier& identifier, + const std::string& payload); + private: RestCatalog(RestCatalogProperties config, std::shared_ptr file_io, std::unique_ptr client, std::unique_ptr paths, diff --git a/src/iceberg/test/endpoint_test.cc b/src/iceberg/test/endpoint_test.cc index fcdc92a78..f60b02c71 100644 --- a/src/iceberg/test/endpoint_test.cc +++ b/src/iceberg/test/endpoint_test.cc @@ -19,6 +19,8 @@ #include "iceberg/catalog/rest/endpoint.h" +#include + #include #include @@ -136,6 +138,11 @@ TEST(EndpointTest, TableEndpoints) { EXPECT_EQ(table_credentials.method(), HttpMethod::kGet); EXPECT_EQ(table_credentials.path(), "/v1/{prefix}/namespaces/{namespace}/tables/{table}/credentials"); + + auto submit_scan_plan = Endpoint::SubmitTableScanPlan(); + EXPECT_EQ(submit_scan_plan.method(), HttpMethod::kPost); + EXPECT_EQ(submit_scan_plan.path(), + "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan"); } // Test predefined transaction endpoints @@ -239,9 +246,10 @@ TEST(EndpointTest, FromStringInvalid) { TEST(EndpointTest, StringRoundTrip) { // Create various endpoints and verify they survive string round-trip std::vector endpoints = { - Endpoint::ListNamespaces(), Endpoint::GetNamespaceProperties(), - Endpoint::CreateNamespace(), Endpoint::LoadTable(), - Endpoint::CreateTable(), Endpoint::DeleteTable(), + Endpoint::ListNamespaces(), Endpoint::GetNamespaceProperties(), + Endpoint::CreateNamespace(), Endpoint::LoadTable(), + Endpoint::CreateTable(), Endpoint::DeleteTable(), + Endpoint::SubmitTableScanPlan(), }; for (const auto& original : endpoints) { diff --git a/src/iceberg/test/rest_catalog_integration_test.cc b/src/iceberg/test/rest_catalog_integration_test.cc index 3de7e722a..e7e982991 100644 --- a/src/iceberg/test/rest_catalog_integration_test.cc +++ b/src/iceberg/test/rest_catalog_integration_test.cc @@ -204,6 +204,41 @@ TEST_F(RestCatalogIntegrationTest, FetchServerConfigDirect) { } } +TEST_F(RestCatalogIntegrationTest, SubmitTableScanPlanRequest) { + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog()); + const TableIdentifier table_id{ + .ns = Namespace{.levels = {"missing_ns"}}, + .name = "missing_table", + }; + + const std::string payload = R"({"snapshot-id":1})"; + auto result = catalog->SubmitTableScanPlan(table_id, payload); + + if (result.has_value()) { + // Some server versions acknowledge the request with an empty body, while + // others return JSON payloads. + const std::string body = result.value(); + const auto first_non_ws = body.find_first_not_of(" \t\n\r"); + if (first_non_ws != std::string::npos) { + EXPECT_TRUE(body[first_non_ws] == '{' || body[first_non_ws] == '[') + << "Unexpected scan-plan response body: " << body; + } + return; + } + + // Different servers may either not expose this endpoint, reject unknown tables, + // or validate request details before resource existence. + EXPECT_TRUE(result.error().kind == ErrorKind::kNotSupported || + result.error().kind == ErrorKind::kNoSuchTable || + result.error().kind == ErrorKind::kNoSuchNamespace || + result.error().kind == ErrorKind::kNotFound || + result.error().kind == ErrorKind::kBadRequest || + result.error().kind == ErrorKind::kNotAuthorized || + result.error().kind == ErrorKind::kForbidden || + result.error().kind == ErrorKind::kServiceUnavailable) + << result.error().message; +} + // -- Namespace operations -- TEST_F(RestCatalogIntegrationTest, ListNamespaces) {