diff --git a/src/iceberg/arrow/arrow_io.cc b/src/iceberg/arrow/arrow_io.cc index a515f3385..45ad4259e 100644 --- a/src/iceberg/arrow/arrow_io.cc +++ b/src/iceberg/arrow/arrow_io.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -568,6 +569,18 @@ Status ArrowFileSystemFileIO::DeleteFile(const std::string& file_location) { return {}; } +Status ArrowFileSystemFileIO::DeleteFiles( + const std::vector& file_locations) { + std::vector paths; + paths.reserve(file_locations.size()); + for (const auto& file_location : file_locations) { + ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location)); + paths.push_back(std::move(path)); + } + ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFiles(paths)); + return {}; +} + std::unique_ptr ArrowFileSystemFileIO::MakeMockFileIO() { return std::make_unique( std::make_shared<::arrow::fs::internal::MockFileSystem>( diff --git a/src/iceberg/arrow/arrow_io_internal.h b/src/iceberg/arrow/arrow_io_internal.h index 4f170a8a4..a6b85b6c9 100644 --- a/src/iceberg/arrow/arrow_io_internal.h +++ b/src/iceberg/arrow/arrow_io_internal.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -77,6 +78,9 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO { /// \brief Delete a file at the given location. Status DeleteFile(const std::string& file_location) override; + /// \brief Delete files at the given locations. + Status DeleteFiles(const std::vector& file_locations) override; + /// \brief Get the Arrow file system. const std::shared_ptr<::arrow::fs::FileSystem>& fs() const { return arrow_fs_; } diff --git a/src/iceberg/file_io.cc b/src/iceberg/file_io.cc index d76ffeb60..e4223182e 100644 --- a/src/iceberg/file_io.cc +++ b/src/iceberg/file_io.cc @@ -100,4 +100,11 @@ Status FileIO::WriteFile(const std::string& file_location, std::string_view cont return FinishWithCloseStatus(std::move(status), stream->Close()); } +Status FileIO::DeleteFiles(const std::vector& file_locations) { + for (const auto& file_location : file_locations) { + ICEBERG_RETURN_UNEXPECTED(DeleteFile(file_location)); + } + return {}; +} + } // namespace iceberg diff --git a/src/iceberg/file_io.h b/src/iceberg/file_io.h index e772b5336..1f91fb0c1 100644 --- a/src/iceberg/file_io.h +++ b/src/iceberg/file_io.h @@ -26,6 +26,7 @@ #include #include #include +#include #include "iceberg/iceberg_export.h" #include "iceberg/result.h" @@ -154,6 +155,16 @@ class ICEBERG_EXPORT FileIO { virtual Status DeleteFile(const std::string& file_location) { return NotImplemented("DeleteFile not implemented"); } + + /// \brief Delete files at the given locations. + /// + /// Implementations that can delete multiple files efficiently should override this + /// method. The default implementation deletes files sequentially using DeleteFile + /// and returns the first error encountered. + /// + /// \param file_locations The locations of the files to delete. + /// \return void if all deletes succeed, or an error code if any delete fails. + virtual Status DeleteFiles(const std::vector& file_locations); }; } // namespace iceberg diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 791ad9be0..37e66ed59 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -124,6 +124,7 @@ add_iceberg_test(util_test data_file_set_test.cc decimal_test.cc endian_test.cc + file_io_test.cc formatter_test.cc lazy_test.cc location_util_test.cc diff --git a/src/iceberg/test/arrow_io_test.cc b/src/iceberg/test/arrow_io_test.cc index 0c885d07a..7edaf0756 100644 --- a/src/iceberg/test/arrow_io_test.cc +++ b/src/iceberg/test/arrow_io_test.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -341,6 +342,20 @@ TEST_F(LocalFileIOTest, DeleteFile) { EXPECT_THAT(del_res, HasErrorMessage("Cannot delete file")); } +TEST_F(LocalFileIOTest, DeleteFiles) { + auto first_path = CreateNewTempFilePath(); + auto second_path = CreateNewTempFilePath(); + ASSERT_THAT(file_io_->WriteFile(first_path, "hello"), IsOk()); + ASSERT_THAT(file_io_->WriteFile(second_path, "world"), IsOk()); + + std::vector paths = {first_path, second_path}; + EXPECT_THAT(file_io_->DeleteFiles(paths), IsOk()); + + EXPECT_THAT(file_io_->ReadFile(first_path, std::nullopt), IsError(ErrorKind::kIOError)); + EXPECT_THAT(file_io_->ReadFile(second_path, std::nullopt), + IsError(ErrorKind::kIOError)); +} + void VerifyReadFullyReadsFromAbsolutePosition(const std::shared_ptr& file_io, const std::string& path) { ASSERT_THAT(file_io->WriteFile(path, "abcdef"), IsOk()); diff --git a/src/iceberg/test/file_io_test.cc b/src/iceberg/test/file_io_test.cc new file mode 100644 index 000000000..0908572f9 --- /dev/null +++ b/src/iceberg/test/file_io_test.cc @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/file_io.h" + +#include +#include +#include + +#include + +#include "iceberg/test/matchers.h" + +namespace iceberg { +namespace { + +class RecordingFileIO : public FileIO { + public: + explicit RecordingFileIO(std::string failure_path = "") + : failure_path_(std::move(failure_path)) {} + + Status DeleteFile(const std::string& file_location) override { + deleted_paths.push_back(file_location); + if (file_location == failure_path_) { + return IOError("failed to delete {}", file_location); + } + return {}; + } + + std::vector deleted_paths; + + private: + std::string failure_path_; +}; + +} // namespace + +TEST(FileIOTest, DeleteFilesFallsBackToDeleteFileForEachPath) { + RecordingFileIO file_io; + std::vector paths = {"file-a.avro", "file-b.avro"}; + + EXPECT_THAT(file_io.DeleteFiles(paths), IsOk()); + EXPECT_THAT(file_io.deleted_paths, + ::testing::ElementsAre("file-a.avro", "file-b.avro")); +} + +TEST(FileIOTest, DeleteFilesReturnsFirstDeleteFileError) { + RecordingFileIO file_io("file-b.avro"); + std::vector paths = {"file-a.avro", "file-b.avro", "file-c.avro"}; + + auto status = file_io.DeleteFiles(paths); + + EXPECT_THAT(status, IsError(ErrorKind::kIOError)); + EXPECT_THAT(status, HasErrorMessage("failed to delete file-b.avro")); + EXPECT_THAT(file_io.deleted_paths, + ::testing::ElementsAre("file-a.avro", "file-b.avro")); +} + +} // namespace iceberg diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index e168d08bf..1acb46e9b 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -88,6 +88,7 @@ iceberg_tests = { 'data_file_set_test.cc', 'decimal_test.cc', 'endian_test.cc', + 'file_io_test.cc', 'formatter_test.cc', 'lazy_test.cc', 'location_util_test.cc', diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index 8e109d085..c9ac9e4cd 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -98,26 +98,22 @@ class FileCleanupStrategy { return expired; } - /// \brief Delete a single file - void DeleteFile(const std::string& path) { + /// \brief Delete files at the given locations. + void DeleteFiles(const std::unordered_set& paths) { try { if (delete_func_) { - delete_func_(path); + for (const auto& path : paths) { + delete_func_(path); + } } else { - std::ignore = file_io_->DeleteFile(path); + std::vector path_list(paths.begin(), paths.end()); + std::ignore = file_io_->DeleteFiles(path_list); } } catch (...) { // TODO(shangxinli): add retry } } - // TODO(shangxinli): Add bulk deletion - void DeleteFiles(const std::unordered_set& paths) { - for (const auto& path : paths) { - DeleteFile(path); - } - } - bool HasAnyStatisticsFiles(const TableMetadata& metadata) const { return !metadata.statistics.empty() || !metadata.partition_statistics.empty(); }