diff --git a/example/demo_example.cc b/example/demo_example.cc index 6869aa37e..3c8745be2 100644 --- a/example/demo_example.cc +++ b/example/demo_example.cc @@ -19,7 +19,7 @@ #include -#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/arrow_io_util.h" #include "iceberg/avro/avro_register.h" #include "iceberg/catalog/memory/in_memory_catalog.h" #include "iceberg/manifest/manifest_entry.h" diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 6d190cb60..e8293447f 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -36,6 +36,7 @@ set(ICEBERG_SOURCES expression/rewrite_not.cc expression/strict_metrics_evaluator.cc expression/term.cc + file_io.cc file_io_registry.cc file_reader.cc file_writer.cc @@ -217,9 +218,9 @@ add_subdirectory(util) if(ICEBERG_BUILD_BUNDLE) set(ICEBERG_BUNDLE_SOURCES - arrow/arrow_fs_file_io.cc + arrow/arrow_io.cc arrow/s3/arrow_s3_file_io.cc - arrow/file_io_register.cc + arrow/arrow_io_register.cc arrow/metadata_column_util.cc avro/avro_data_util.cc avro/avro_direct_decoder.cc diff --git a/src/iceberg/arrow/arrow_fs_file_io.cc b/src/iceberg/arrow/arrow_fs_file_io.cc deleted file mode 100644 index 769fcfb13..000000000 --- a/src/iceberg/arrow/arrow_fs_file_io.cc +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include - -#include -#include - -#include "iceberg/arrow/arrow_file_io.h" -#include "iceberg/arrow/arrow_fs_file_io_internal.h" -#include "iceberg/arrow/arrow_status_internal.h" -#include "iceberg/util/macros.h" - -namespace iceberg::arrow { - -Result ArrowFileSystemFileIO::ResolvePath(const std::string& file_location) { - if (file_location.find("://") != std::string::npos) { - ICEBERG_ARROW_ASSIGN_OR_RETURN(auto path, arrow_fs_->PathFromUri(file_location)); - return path; - } - return file_location; -} - -/// \brief Read the content of the file at the given location. -Result ArrowFileSystemFileIO::ReadFile(const std::string& file_location, - std::optional length) { - ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location)); - ::arrow::fs::FileInfo file_info(path); - if (length.has_value()) { - file_info.set_size(length.value()); - } - std::string content; - ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenInputFile(file_info)); - ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file_size, file->GetSize()); - - content.resize(file_size); - size_t remain = file_size; - size_t offset = 0; - while (remain > 0) { - size_t read_length = std::min(remain, static_cast(1024 * 1024)); - ICEBERG_ARROW_ASSIGN_OR_RETURN( - auto read_bytes, - file->Read(read_length, reinterpret_cast(&content[offset]))); - if (read_bytes == 0) { - return IOError("Unexpected EOF reading {}: got {} of {} bytes", file_location, - offset, file_size); - } - remain -= read_bytes; - offset += read_bytes; - } - - return content; -} - -/// \brief Write the given content to the file at the given location. -Status ArrowFileSystemFileIO::WriteFile(const std::string& file_location, - std::string_view content) { - ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location)); - ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenOutputStream(path)); - ICEBERG_ARROW_RETURN_NOT_OK(file->Write(content.data(), content.size())); - ICEBERG_ARROW_RETURN_NOT_OK(file->Flush()); - ICEBERG_ARROW_RETURN_NOT_OK(file->Close()); - return {}; -} - -/// \brief Delete a file at the given location. -Status ArrowFileSystemFileIO::DeleteFile(const std::string& file_location) { - ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location)); - ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFile(path)); - return {}; -} - -std::unique_ptr ArrowFileSystemFileIO::MakeMockFileIO() { - return std::make_unique( - std::make_shared<::arrow::fs::internal::MockFileSystem>( - std::chrono::system_clock::now())); -} - -std::unique_ptr ArrowFileSystemFileIO::MakeLocalFileIO() { - return std::make_unique( - std::make_shared<::arrow::fs::LocalFileSystem>()); -} - -std::unique_ptr MakeMockFileIO() { - return ArrowFileSystemFileIO::MakeMockFileIO(); -} - -std::unique_ptr MakeLocalFileIO() { - return ArrowFileSystemFileIO::MakeLocalFileIO(); -} - -} // namespace iceberg::arrow diff --git a/src/iceberg/arrow/arrow_io.cc b/src/iceberg/arrow/arrow_io.cc new file mode 100644 index 000000000..a515f3385 --- /dev/null +++ b/src/iceberg/arrow/arrow_io.cc @@ -0,0 +1,590 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_io_internal.h" +#include "iceberg/arrow/arrow_io_util.h" +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/util/macros.h" + +namespace iceberg::arrow { + +namespace { + +Result ToInt64Length(size_t length) { + if (length > static_cast(std::numeric_limits::max())) { + return InvalidArgument("File length {} exceeds int64_t max", length); + } + return static_cast(length); +} + +::arrow::Status ToArrowStatus(const Error& error) { + switch (error.kind) { + case ErrorKind::kInvalid: + case ErrorKind::kInvalidArgument: + return ::arrow::Status::Invalid(error.message); + case ErrorKind::kNotImplemented: + case ErrorKind::kNotSupported: + return ::arrow::Status::NotImplemented(error.message); + default: + return ::arrow::Status::IOError(error.message); + } +} + +::arrow::Result BytesToReadAt(int64_t position, int64_t nbytes, int64_t size) { + if (position < 0 || nbytes < 0) { + return ::arrow::Status::Invalid("ReadAt position and length must be non-negative"); + } + if (position > size) { + return ::arrow::Status::IOError("Read out of bounds (offset = ", position, + ", size = ", nbytes, ") in file of size ", size); + } + return std::min(nbytes, size - position); +} + +/// Adapts the generic Iceberg input stream API to Arrow's RandomAccessFile API. +/// +/// Avro and Parquet readers in the bundle layer consume Arrow IO streams. This +/// fallback keeps those readers usable with non-Arrow FileIO implementations without +/// exposing Arrow filesystem details through the generic FileIO interface. +class InputStreamAdapter : public ::arrow::io::RandomAccessFile { + public: + InputStreamAdapter(std::unique_ptr input, int64_t size) + : input_(std::move(input)), size_(size) { + RandomAccessFile::set_mode(::arrow::io::FileMode::READ); + } + + ::arrow::Status Close() override { + std::lock_guard lock(mutex_); + if (closed_) { + return ::arrow::Status::OK(); + } + auto status = input_->Close(); + if (!status.has_value()) { + return ToArrowStatus(status.error()); + } + closed_ = true; + return ::arrow::Status::OK(); + } + + ::arrow::Result Tell() const override { + std::lock_guard lock(mutex_); + ARROW_RETURN_NOT_OK(CheckOpenLocked()); + auto position = input_->Position(); + if (!position.has_value()) { + return ToArrowStatus(position.error()); + } + if (position.value() < 0) { + return ::arrow::Status::IOError("FileIO input stream returned negative position"); + } + return position.value(); + } + + bool closed() const override { + std::lock_guard lock(mutex_); + return closed_; + } + + ::arrow::Status Seek(int64_t position) override { + std::lock_guard lock(mutex_); + ARROW_RETURN_NOT_OK(CheckOpenLocked()); + auto status = input_->Seek(position); + if (!status.has_value()) { + return ToArrowStatus(status.error()); + } + return ::arrow::Status::OK(); + } + + ::arrow::Result Read(int64_t nbytes, void* out) override { + if (nbytes < 0) { + return ::arrow::Status::Invalid("Cannot read a negative number of bytes"); + } + std::lock_guard lock(mutex_); + ARROW_RETURN_NOT_OK(CheckOpenLocked()); + if (nbytes == 0) { + return 0; + } + auto data = reinterpret_cast(out); + auto result = input_->Read(std::span(data, static_cast(nbytes))); + if (!result.has_value()) { + return ToArrowStatus(result.error()); + } + if (result.value() < 0 || result.value() > nbytes) { + return ::arrow::Status::IOError("FileIO input stream returned invalid byte count"); + } + return result.value(); + } + + ::arrow::Result> Read(int64_t nbytes) override { + if (nbytes < 0) { + return ::arrow::Status::Invalid("Cannot read a negative number of bytes"); + } + ARROW_ASSIGN_OR_RAISE(auto buffer, ::arrow::AllocateResizableBuffer(nbytes)); + ARROW_ASSIGN_OR_RAISE(auto bytes_read, Read(nbytes, buffer->mutable_data())); + ARROW_RETURN_NOT_OK(buffer->Resize(bytes_read, /*shrink_to_fit=*/false)); + return std::shared_ptr<::arrow::Buffer>(std::move(buffer)); + } + + ::arrow::Result GetSize() override { return size_; } + + ::arrow::Result ReadAt(int64_t position, int64_t nbytes, void* out) override { + std::lock_guard lock(mutex_); + ARROW_RETURN_NOT_OK(CheckOpenLocked()); + ARROW_ASSIGN_OR_RAISE(auto bytes_to_read, BytesToReadAt(position, nbytes, size_)); + if (bytes_to_read == 0) { + return 0; + } + auto data = reinterpret_cast(out); + auto status = + input_->ReadFully(position, std::span(data, static_cast(bytes_to_read))); + if (!status.has_value()) { + return ToArrowStatus(status.error()); + } + return bytes_to_read; + } + + ::arrow::Result> ReadAt(int64_t position, + int64_t nbytes) override { + { + std::lock_guard lock(mutex_); + ARROW_RETURN_NOT_OK(CheckOpenLocked()); + } + ARROW_ASSIGN_OR_RAISE(auto bytes_to_read, BytesToReadAt(position, nbytes, size_)); + ARROW_ASSIGN_OR_RAISE(auto buffer, ::arrow::AllocateResizableBuffer(bytes_to_read)); + if (bytes_to_read == 0) { + return std::shared_ptr<::arrow::Buffer>(std::move(buffer)); + } + std::lock_guard lock(mutex_); + ARROW_RETURN_NOT_OK(CheckOpenLocked()); + auto status = input_->ReadFully( + position, std::span(reinterpret_cast(buffer->mutable_data()), + static_cast(bytes_to_read))); + if (!status.has_value()) { + return ToArrowStatus(status.error()); + } + return std::shared_ptr<::arrow::Buffer>(std::move(buffer)); + } + + private: + ::arrow::Status CheckOpenLocked() const { + if (closed_) { + return ::arrow::Status::IOError("Operation on closed FileIO input stream"); + } + return ::arrow::Status::OK(); + } + + std::unique_ptr input_; + int64_t size_; + bool closed_ = false; + mutable std::mutex mutex_; +}; + +/// Adapts the generic Iceberg output stream API to Arrow's OutputStream API. +/// +/// Avro and Parquet writers in the bundle layer consume Arrow IO streams. This +/// fallback keeps those writers usable with non-Arrow FileIO implementations without +/// requiring them to downcast to ArrowFileSystemFileIO. +class OutputStreamAdapter : public ::arrow::io::OutputStream { + public: + explicit OutputStreamAdapter(std::unique_ptr output) + : output_(std::move(output)) { + OutputStream::set_mode(::arrow::io::FileMode::WRITE); + } + + ::arrow::Status Close() override { + std::lock_guard lock(mutex_); + if (closed_) { + return ::arrow::Status::OK(); + } + auto status = output_->Close(); + if (!status.has_value()) { + return ToArrowStatus(status.error()); + } + closed_ = true; + return ::arrow::Status::OK(); + } + + ::arrow::Result Tell() const override { + std::lock_guard lock(mutex_); + ARROW_RETURN_NOT_OK(CheckOpenLocked()); + auto position = output_->Position(); + if (!position.has_value()) { + return ToArrowStatus(position.error()); + } + if (position.value() < 0) { + return ::arrow::Status::IOError("FileIO output stream returned negative position"); + } + return position.value(); + } + + bool closed() const override { + std::lock_guard lock(mutex_); + return closed_; + } + + ::arrow::Status Write(const void* data, int64_t nbytes) override { + if (nbytes < 0) { + return ::arrow::Status::Invalid("Cannot write a negative number of bytes"); + } + std::lock_guard lock(mutex_); + ARROW_RETURN_NOT_OK(CheckOpenLocked()); + if (nbytes == 0) { + return ::arrow::Status::OK(); + } + auto status = output_->Write( + std::span(reinterpret_cast(data), static_cast(nbytes))); + if (!status.has_value()) { + return ToArrowStatus(status.error()); + } + return ::arrow::Status::OK(); + } + + ::arrow::Status Flush() override { + std::lock_guard lock(mutex_); + ARROW_RETURN_NOT_OK(CheckOpenLocked()); + auto status = output_->Flush(); + if (!status.has_value()) { + return ToArrowStatus(status.error()); + } + return ::arrow::Status::OK(); + } + + private: + ::arrow::Status CheckOpenLocked() const { + if (closed_) { + return ::arrow::Status::IOError("Operation on closed FileIO output stream"); + } + return ::arrow::Status::OK(); + } + + std::unique_ptr output_; + bool closed_ = false; + mutable std::mutex mutex_; +}; + +class ArrowSeekableInputStream : public SeekableInputStream { + public: + explicit ArrowSeekableInputStream(std::shared_ptr<::arrow::io::RandomAccessFile> input) + : input_(std::move(input)) {} + + Result Position() const override { + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto position, input_->Tell()); + return position; + } + + Status Seek(int64_t position) override { + ICEBERG_ARROW_RETURN_NOT_OK(input_->Seek(position)); + return {}; + } + + Result Read(std::span out) override { + ICEBERG_ASSIGN_OR_RAISE(auto size, ToInt64Length(out.size())); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto bytes_read, input_->Read(size, out.data())); + if (bytes_read < 0 || bytes_read > size) { + return IOError("Arrow input stream returned invalid byte count"); + } + return bytes_read; + } + + Status ReadFully(int64_t position, std::span out) override { + if (position < 0) { + return InvalidArgument("Cannot read from negative position {}", position); + } + ICEBERG_ASSIGN_OR_RAISE(auto size, ToInt64Length(out.size())); + if (size == 0) { + return {}; + } + if (position > std::numeric_limits::max() - size) { + return InvalidArgument( + "Read range starting at {} with length {} exceeds int64_t max", position, size); + } + + Status read_status = {}; + int64_t bytes_read = 0; + while (bytes_read < size) { + auto* data = out.data() + bytes_read; + auto remaining = size - bytes_read; + auto read_result = input_->ReadAt(position + bytes_read, remaining, data); + if (!read_result.ok()) { + read_status = + std::unexpected{{.kind = ToErrorKind(read_result.status()), + .message = read_result.status().ToString()}}; + break; + } + auto read = read_result.ValueOrDie(); + if (read < 0 || read > remaining) { + read_status = IOError("Arrow input stream returned invalid byte count"); + break; + } + if (read == 0) { + read_status = + IOError("Unexpected EOF reading at offset {}", position + bytes_read); + break; + } + bytes_read += read; + } + return read_status; + } + + Status Close() override { + if (input_->closed()) { + return {}; + } + ICEBERG_ARROW_RETURN_NOT_OK(input_->Close()); + return {}; + } + + private: + std::shared_ptr<::arrow::io::RandomAccessFile> input_; +}; + +class ArrowPositionOutputStream : public PositionOutputStream { + public: + explicit ArrowPositionOutputStream(std::shared_ptr<::arrow::io::OutputStream> output) + : output_(std::move(output)) {} + + Result Position() const override { + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto position, output_->Tell()); + return position; + } + + Status Write(std::span data) override { + ICEBERG_ASSIGN_OR_RAISE(auto size, ToInt64Length(data.size())); + ICEBERG_ARROW_RETURN_NOT_OK(output_->Write(data.data(), size)); + return {}; + } + + Status Flush() override { + ICEBERG_ARROW_RETURN_NOT_OK(output_->Flush()); + return {}; + } + + Status Close() override { + if (output_->closed()) { + return {}; + } + ICEBERG_ARROW_RETURN_NOT_OK(output_->Close()); + return {}; + } + + private: + std::shared_ptr<::arrow::io::OutputStream> output_; +}; + +class ArrowInputFile : public InputFile { + public: + ArrowInputFile(std::shared_ptr<::arrow::fs::FileSystem> fs, std::string location, + std::string path, std::optional file_size) + : fs_(std::move(fs)), + location_(std::move(location)), + path_(std::move(path)), + file_size_(file_size) {} + + std::string_view location() const override { return location_; } + + Result Size() const override { + if (file_size_.has_value()) { + return *file_size_; + } + ::arrow::fs::FileInfo file_info(path_, ::arrow::fs::FileType::File); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto input, fs_->OpenInputFile(file_info)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto size, input->GetSize()); + return size; + } + + Result> Open() override { + ::arrow::fs::FileInfo file_info(path_, ::arrow::fs::FileType::File); + if (file_size_.has_value()) { + file_info.set_size(*file_size_); + } + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto input, fs_->OpenInputFile(file_info)); + return std::make_unique(std::move(input)); + } + + private: + std::shared_ptr<::arrow::fs::FileSystem> fs_; + std::string location_; + std::string path_; + std::optional file_size_; +}; + +class ArrowOutputFile : public OutputFile { + public: + ArrowOutputFile(std::shared_ptr<::arrow::fs::FileSystem> fs, std::string location, + std::string path) + : fs_(std::move(fs)), location_(std::move(location)), path_(std::move(path)) {} + + std::string_view location() const override { return location_; } + + Result> Create() override { + return Create(/*overwrite=*/false); + } + + Result> CreateOrOverwrite() override { + return Create(/*overwrite=*/true); + } + + private: + Result> Create(bool overwrite) { + if (!overwrite) { + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto info, fs_->GetFileInfo(path_)); + if (info.type() != ::arrow::fs::FileType::NotFound) { + return AlreadyExists("File already exists: {}", location_); + } + } + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, fs_->OpenOutputStream(path_)); + return std::make_unique(std::move(output)); + } + + std::shared_ptr<::arrow::fs::FileSystem> fs_; + std::string location_; + std::string path_; +}; + +} // namespace + +Result ArrowFileSystemFileIO::ResolvePath(const std::string& file_location) { + if (file_location.find("://") != std::string::npos) { + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto path, arrow_fs_->PathFromUri(file_location)); + return path; + } + return file_location; +} + +Result> OpenArrowInputStream( + const std::shared_ptr& io, const std::string& path, + std::optional length) { + ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null"); + + if (auto arrow_io = std::dynamic_pointer_cast(io)) { + ICEBERG_ASSIGN_OR_RAISE(auto resolved_path, arrow_io->ResolvePath(path)); + ::arrow::fs::FileInfo file_info(resolved_path, ::arrow::fs::FileType::File); + if (length.has_value()) { + ICEBERG_ASSIGN_OR_RAISE(auto size, ToInt64Length(*length)); + file_info.set_size(size); + } + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto input, + arrow_io->arrow_fs_->OpenInputFile(file_info)); + return input; + } + + int64_t size; + std::unique_ptr input_file; + if (length.has_value()) { + ICEBERG_ASSIGN_OR_RAISE(input_file, io->NewInputFile(path, *length)); + } else { + ICEBERG_ASSIGN_OR_RAISE(input_file, io->NewInputFile(path)); + } + ICEBERG_ASSIGN_OR_RAISE(size, input_file->Size()); + if (size < 0) { + return Invalid("Invalid negative file size {} for {}", size, path); + } + ICEBERG_ASSIGN_OR_RAISE(auto input, input_file->Open()); + return std::make_shared(std::move(input), size); +} + +Result> OpenArrowOutputStream( + const std::shared_ptr& io, const std::string& path, bool overwrite) { + ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null"); + + if (auto arrow_io = std::dynamic_pointer_cast(io)) { + ICEBERG_ASSIGN_OR_RAISE(auto resolved_path, arrow_io->ResolvePath(path)); + if (!overwrite) { + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto info, + arrow_io->arrow_fs_->GetFileInfo(resolved_path)); + if (info.type() != ::arrow::fs::FileType::NotFound) { + return AlreadyExists("File already exists: {}", path); + } + } + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, + arrow_io->arrow_fs_->OpenOutputStream(resolved_path)); + return output; + } + + ICEBERG_ASSIGN_OR_RAISE(auto output_file, io->NewOutputFile(path)); + std::unique_ptr output; + if (overwrite) { + ICEBERG_ASSIGN_OR_RAISE(output, output_file->CreateOrOverwrite()); + } else { + ICEBERG_ASSIGN_OR_RAISE(output, output_file->Create()); + } + return std::make_shared(std::move(output)); +} + +Result> ArrowFileSystemFileIO::NewInputFile( + std::string file_location) { + ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location)); + return std::make_unique(arrow_fs_, std::move(file_location), + std::move(path), std::nullopt); +} + +Result> ArrowFileSystemFileIO::NewInputFile( + std::string file_location, size_t length) { + ICEBERG_ASSIGN_OR_RAISE(auto size, ToInt64Length(length)); + ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location)); + return std::make_unique(arrow_fs_, std::move(file_location), + std::move(path), size); +} + +Result> ArrowFileSystemFileIO::NewOutputFile( + std::string file_location) { + ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location)); + return std::make_unique(arrow_fs_, std::move(file_location), + std::move(path)); +} + +/// \brief Delete a file at the given location. +Status ArrowFileSystemFileIO::DeleteFile(const std::string& file_location) { + ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location)); + ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFile(path)); + return {}; +} + +std::unique_ptr ArrowFileSystemFileIO::MakeMockFileIO() { + return std::make_unique( + std::make_shared<::arrow::fs::internal::MockFileSystem>( + std::chrono::system_clock::now())); +} + +std::unique_ptr ArrowFileSystemFileIO::MakeLocalFileIO() { + return std::make_unique( + std::make_shared<::arrow::fs::LocalFileSystem>()); +} + +std::unique_ptr MakeMockFileIO() { + return ArrowFileSystemFileIO::MakeMockFileIO(); +} + +std::unique_ptr MakeLocalFileIO() { + return ArrowFileSystemFileIO::MakeLocalFileIO(); +} + +} // namespace iceberg::arrow diff --git a/src/iceberg/arrow/arrow_fs_file_io_internal.h b/src/iceberg/arrow/arrow_io_internal.h similarity index 50% rename from src/iceberg/arrow/arrow_fs_file_io_internal.h rename to src/iceberg/arrow/arrow_io_internal.h index 92a991501..4f170a8a4 100644 --- a/src/iceberg/arrow/arrow_fs_file_io_internal.h +++ b/src/iceberg/arrow/arrow_io_internal.h @@ -19,15 +19,37 @@ #pragma once +#include #include +#include +#include -#include +#include +#include #include "iceberg/file_io.h" #include "iceberg/iceberg_bundle_export.h" namespace iceberg::arrow { +/// \brief Open a FileIO input as an Arrow input stream. +/// +/// Uses ArrowFileSystemFileIO's native Arrow stream directly when possible and falls +/// back to a FileIO stream adapter otherwise. The fallback requires FileIO to +/// implement NewInputFile. +ICEBERG_BUNDLE_EXPORT Result> +OpenArrowInputStream(const std::shared_ptr& io, const std::string& path, + std::optional length = std::nullopt); + +/// \brief Open a FileIO output as an Arrow output stream. +/// +/// Uses ArrowFileSystemFileIO's native Arrow stream directly when possible and falls +/// back to a FileIO stream adapter otherwise. The fallback requires FileIO to +/// implement NewOutputFile. +ICEBERG_BUNDLE_EXPORT Result> +OpenArrowOutputStream(const std::shared_ptr& io, const std::string& path, + bool overwrite = true); + /// \brief A concrete implementation of FileIO for Arrow file system. class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO { public: @@ -42,12 +64,15 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO { ~ArrowFileSystemFileIO() override = default; - /// \brief Read the content of the file at the given location. - Result ReadFile(const std::string& file_location, - std::optional length) override; + /// \brief Create an input file handle for the given location. + Result> NewInputFile(std::string file_location) override; + + /// \brief Create an input file handle for the given location with a known length. + Result> NewInputFile(std::string file_location, + size_t length) override; - /// \brief Write the given content to the file at the given location. - Status WriteFile(const std::string& file_location, std::string_view content) override; + /// \brief Create an output file handle for the given location. + Result> NewOutputFile(std::string file_location) override; /// \brief Delete a file at the given location. Status DeleteFile(const std::string& file_location) override; @@ -56,6 +81,13 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO { const std::shared_ptr<::arrow::fs::FileSystem>& fs() const { return arrow_fs_; } private: + friend Result> OpenArrowInputStream( + const std::shared_ptr& io, const std::string& path, + std::optional length); + + friend Result> OpenArrowOutputStream( + const std::shared_ptr& io, const std::string& path, bool overwrite); + /// \brief Resolve a file location to a filesystem path. Result ResolvePath(const std::string& file_location); diff --git a/src/iceberg/arrow/file_io_register.cc b/src/iceberg/arrow/arrow_io_register.cc similarity index 95% rename from src/iceberg/arrow/file_io_register.cc rename to src/iceberg/arrow/arrow_io_register.cc index 1140e49b8..43273c0ae 100644 --- a/src/iceberg/arrow/file_io_register.cc +++ b/src/iceberg/arrow/arrow_io_register.cc @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -#include "iceberg/arrow/file_io_register.h" +#include "iceberg/arrow/arrow_io_register.h" #include #include -#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/arrow_io_util.h" #include "iceberg/file_io_registry.h" namespace iceberg::arrow { diff --git a/src/iceberg/arrow/file_io_register.h b/src/iceberg/arrow/arrow_io_register.h similarity index 96% rename from src/iceberg/arrow/file_io_register.h rename to src/iceberg/arrow/arrow_io_register.h index 1b4622bd7..f28b7a565 100644 --- a/src/iceberg/arrow/file_io_register.h +++ b/src/iceberg/arrow/arrow_io_register.h @@ -19,7 +19,7 @@ #pragma once -/// \file iceberg/arrow/file_io_register.h +/// \file iceberg/arrow/arrow_io_register.h /// \brief Provide functions to register Arrow FileIO implementations. #include "iceberg/iceberg_bundle_export.h" diff --git a/src/iceberg/arrow/arrow_file_io.h b/src/iceberg/arrow/arrow_io_util.h similarity index 100% rename from src/iceberg/arrow/arrow_file_io.h rename to src/iceberg/arrow/arrow_io_util.h diff --git a/src/iceberg/arrow/s3/arrow_s3_file_io.cc b/src/iceberg/arrow/s3/arrow_s3_file_io.cc index 808415d0a..cffd95840 100644 --- a/src/iceberg/arrow/s3/arrow_s3_file_io.cc +++ b/src/iceberg/arrow/s3/arrow_s3_file_io.cc @@ -27,8 +27,8 @@ # include #endif -#include "iceberg/arrow/arrow_file_io.h" -#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_io_internal.h" +#include "iceberg/arrow/arrow_io_util.h" #include "iceberg/arrow/arrow_status_internal.h" #include "iceberg/arrow/s3/s3_properties.h" #include "iceberg/util/macros.h" diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc index f4985d9ac..1d431c46b 100644 --- a/src/iceberg/avro/avro_reader.cc +++ b/src/iceberg/avro/avro_reader.cc @@ -31,7 +31,7 @@ #include #include -#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/arrow/arrow_status_internal.h" #include "iceberg/arrow/metadata_column_util_internal.h" #include "iceberg/avro/avro_data_util_internal.h" @@ -42,7 +42,6 @@ #include "iceberg/metadata_columns.h" #include "iceberg/name_mapping.h" #include "iceberg/schema_internal.h" -#include "iceberg/util/checked_cast.h" #include "iceberg/util/macros.h" namespace iceberg::avro { @@ -51,13 +50,8 @@ namespace { Result> CreateInputStream(const ReaderOptions& options, int64_t buffer_size) { - ::arrow::fs::FileInfo file_info(options.path, ::arrow::fs::FileType::File); - if (options.length) { - file_info.set_size(options.length.value()); - } - - auto io = internal::checked_pointer_cast(options.io); - ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, io->fs()->OpenInputFile(file_info)); + ICEBERG_ASSIGN_OR_RAISE( + auto file, arrow::OpenArrowInputStream(options.io, options.path, options.length)); return std::make_unique(file, buffer_size); } diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc index 32ce3f634..63fc31462 100644 --- a/src/iceberg/avro/avro_writer.cc +++ b/src/iceberg/avro/avro_writer.cc @@ -29,7 +29,7 @@ #include #include -#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/arrow/arrow_status_internal.h" #include "iceberg/avro/avro_data_util_internal.h" #include "iceberg/avro/avro_direct_encoder_internal.h" @@ -40,7 +40,6 @@ #include "iceberg/metrics_config.h" #include "iceberg/schema.h" #include "iceberg/schema_internal.h" -#include "iceberg/util/checked_cast.h" #include "iceberg/util/macros.h" namespace iceberg::avro { @@ -49,8 +48,8 @@ namespace { Result> CreateOutputStream(const WriterOptions& options, int64_t buffer_size) { - auto io = internal::checked_pointer_cast(options.io); - ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, io->fs()->OpenOutputStream(options.path)); + ICEBERG_ASSIGN_OR_RAISE(auto output, + arrow::OpenArrowOutputStream(options.io, options.path)); return std::make_unique(output, buffer_size); } diff --git a/src/iceberg/file_io.cc b/src/iceberg/file_io.cc new file mode 100644 index 000000000..d76ffeb60 --- /dev/null +++ b/src/iceberg/file_io.cc @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/file_io.h" + +#include +#include + +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +Status FinishWithCloseStatus(Status operation_status, Status close_status) { + if (!operation_status.has_value()) { + auto error = operation_status.error(); + if (!close_status.has_value()) { + error.message += "; additionally failed to close stream: "; + error.message += close_status.error().message; + } + return std::unexpected(std::move(error)); + } + return close_status; +} + +} // namespace + +Result> FileIO::NewInputFile(std::string file_location) { + return NotImplemented("NewInputFile not implemented for {}", file_location); +} + +Result> FileIO::NewInputFile(std::string file_location, + size_t /*length*/) { + return NewInputFile(std::move(file_location)); +} + +Result> FileIO::NewOutputFile(std::string file_location) { + return NotImplemented("NewOutputFile not implemented for {}", file_location); +} + +Result FileIO::ReadFile(const std::string& file_location, + std::optional length) { + int64_t read_size; + std::unique_ptr input_file; + if (length.has_value()) { + if (*length > static_cast(std::numeric_limits::max())) { + return InvalidArgument("Requested read length {} exceeds int64_t max", *length); + } + ICEBERG_ASSIGN_OR_RAISE(input_file, NewInputFile(file_location, *length)); + read_size = static_cast(*length); + } else { + ICEBERG_ASSIGN_OR_RAISE(input_file, NewInputFile(file_location)); + ICEBERG_ASSIGN_OR_RAISE(read_size, input_file->Size()); + } + if (read_size < 0) { + return Invalid("Invalid negative file size {} for {}", read_size, file_location); + } + + auto size = static_cast(read_size); + std::string content(size, '\0'); + ICEBERG_ASSIGN_OR_RAISE(auto stream, input_file->Open()); + Status read_status = {}; + if (size > 0) { + auto bytes = std::as_writable_bytes(std::span(content.data(), content.size())); + read_status = stream->ReadFully(/*position=*/0, bytes); + } + ICEBERG_RETURN_UNEXPECTED( + FinishWithCloseStatus(std::move(read_status), stream->Close())); + return content; +} + +Status FileIO::WriteFile(const std::string& file_location, std::string_view content) { + ICEBERG_ASSIGN_OR_RAISE(auto output_file, NewOutputFile(file_location)); + ICEBERG_ASSIGN_OR_RAISE(auto stream, output_file->CreateOrOverwrite()); + Status status = {}; + if (!content.empty()) { + auto bytes = std::as_bytes(std::span(content.data(), content.size())); + status = stream->Write(bytes); + } + if (status.has_value()) { + status = stream->Flush(); + } + return FinishWithCloseStatus(std::move(status), stream->Close()); +} + +} // namespace iceberg diff --git a/src/iceberg/file_io.h b/src/iceberg/file_io.h index 259da7556..e772b5336 100644 --- a/src/iceberg/file_io.h +++ b/src/iceberg/file_io.h @@ -19,7 +19,11 @@ #pragma once +#include +#include +#include #include +#include #include #include @@ -28,11 +32,82 @@ namespace iceberg { +/// \brief Seekable byte stream for reading file contents. +class ICEBERG_EXPORT SeekableInputStream { + public: + virtual ~SeekableInputStream() = default; + + /// \brief Return the current read position. + virtual Result Position() const = 0; + + /// \brief Seek to an absolute byte position. + virtual Status Seek(int64_t position) = 0; + + /// \brief Read up to out.size() bytes from the current position. + virtual Result Read(std::span out) = 0; + + /// \brief Read exactly out.size() bytes from an absolute position. + /// + /// Fails if fewer than out.size() bytes are available. The current stream position + /// after this call is unspecified; callers should Seek before subsequent + /// position-dependent reads. + virtual Status ReadFully(int64_t position, std::span out) = 0; + + /// \brief Close the stream. Implementations should allow repeated Close calls. + virtual Status Close() = 0; +}; + +/// \brief Positioned byte stream for writing file contents. +class ICEBERG_EXPORT PositionOutputStream { + public: + virtual ~PositionOutputStream() = default; + + /// \brief Return the current write position. + virtual Result Position() const = 0; + + /// \brief Write all bytes in data at the current position. + virtual Status Write(std::span data) = 0; + + /// \brief Flush buffered data to the underlying store. + virtual Status Flush() = 0; + + /// \brief Close the stream. Implementations should allow repeated Close calls. + virtual Status Close() = 0; +}; + +/// \brief Handle for opening a readable file. +class ICEBERG_EXPORT InputFile { + public: + virtual ~InputFile() = default; + + /// \brief File location represented by this handle. + virtual std::string_view location() const = 0; + + /// \brief Return the total file size in bytes. + virtual Result Size() const = 0; + + /// \brief Open a new independent input stream. + virtual Result> Open() = 0; +}; + +/// \brief Handle for creating a writable file. +class ICEBERG_EXPORT OutputFile { + public: + virtual ~OutputFile() = default; + + /// \brief File location represented by this handle. + virtual std::string_view location() const = 0; + + /// \brief Create a new output stream and fail if the file already exists. + virtual Result> Create() = 0; + + /// \brief Create a new output stream, replacing any existing file. + virtual Result> CreateOrOverwrite() = 0; +}; + /// \brief Pluggable module for reading, writing, and deleting files. /// -/// This module only handle metadata files, not data files. The metadata files -/// are typically small and are used to store schema, partition information, -/// and other metadata about the table. +/// This module handles metadata and data file bytes for table IO. /// /// Note that these functions are not atomic. For example, if a write fails, /// the file may be partially written. Implementations should be careful to @@ -42,6 +117,19 @@ class ICEBERG_EXPORT FileIO { FileIO() = default; virtual ~FileIO() = default; + /// \brief Create an input file handle for the given location. + virtual Result> NewInputFile(std::string file_location); + + /// \brief Create an input file handle for the given location with a known length. + /// + /// The length is a caller-provided content length hint. Implementations may use it to + /// avoid an extra metadata lookup. + virtual Result> NewInputFile(std::string file_location, + size_t length); + + /// \brief Create an output file handle for the given location. + virtual Result> NewOutputFile(std::string file_location); + /// \brief Read the content of the file at the given location. /// /// \param file_location The location of the file to read. @@ -50,21 +138,14 @@ class ICEBERG_EXPORT FileIO { /// \return The content of the file if the read succeeded, an error code if the read /// failed. virtual Result ReadFile(const std::string& file_location, - std::optional length) { - // We provide a default implementation to avoid Windows linker error LNK2019. - return NotImplemented("ReadFile not implemented"); - } + std::optional length); /// \brief Write the given content to the file at the given location. /// /// \param file_location The location of the file to write. /// \param content The content to write to the file. - /// \param overwrite If true, overwrite the file if it exists. If false, fail if the - /// file exists. /// \return void if the write succeeded, an error code if the write failed. - virtual Status WriteFile(const std::string& file_location, std::string_view content) { - return NotImplemented("WriteFile not implemented"); - } + virtual Status WriteFile(const std::string& file_location, std::string_view content); /// \brief Delete a file at the given location. /// diff --git a/src/iceberg/file_reader.h b/src/iceberg/file_reader.h index 923ac6bdb..c31d9b292 100644 --- a/src/iceberg/file_reader.h +++ b/src/iceberg/file_reader.h @@ -95,9 +95,7 @@ struct ICEBERG_EXPORT ReaderOptions { std::optional length; /// \brief The split to read. std::optional split; - /// \brief FileIO instance to open the file. Reader implementations should down cast it - /// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses - /// `ArrowFileSystemFileIO` as the default implementation. + /// \brief FileIO instance to open the file. std::shared_ptr io; /// \brief The projection schema to read from the file. This field is required. std::shared_ptr projection; diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h index f3352d8fd..a49b5228e 100644 --- a/src/iceberg/file_writer.h +++ b/src/iceberg/file_writer.h @@ -73,9 +73,7 @@ struct ICEBERG_EXPORT WriterOptions { std::string path; /// \brief The schema of the data to write. std::shared_ptr schema; - /// \brief FileIO instance to open the file. Writer implementations should down cast it - /// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses - /// `ArrowFileSystemFileIO` as the default implementation. + /// \brief FileIO instance to create the file. std::shared_ptr io; /// \brief Metadata to write to the file. std::unordered_map metadata; diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 17ed723fc..df9f1a975 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -58,6 +58,7 @@ iceberg_sources = files( 'expression/rewrite_not.cc', 'expression/strict_metrics_evaluator.cc', 'expression/term.cc', + 'file_io.cc', 'file_io_registry.cc', 'file_reader.cc', 'file_writer.cc', diff --git a/src/iceberg/parquet/parquet_reader.cc b/src/iceberg/parquet/parquet_reader.cc index 0e2808f59..775644a94 100644 --- a/src/iceberg/parquet/parquet_reader.cc +++ b/src/iceberg/parquet/parquet_reader.cc @@ -32,7 +32,7 @@ #include #include -#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/arrow/arrow_status_internal.h" #include "iceberg/arrow/metadata_column_util_internal.h" #include "iceberg/parquet/parquet_data_util_internal.h" @@ -41,7 +41,6 @@ #include "iceberg/result.h" #include "iceberg/schema_internal.h" #include "iceberg/schema_util.h" -#include "iceberg/util/checked_cast.h" #include "iceberg/util/macros.h" namespace iceberg::parquet { @@ -50,14 +49,7 @@ namespace { Result> OpenInputStream( const ReaderOptions& options) { - ::arrow::fs::FileInfo file_info(options.path, ::arrow::fs::FileType::File); - if (options.length) { - file_info.set_size(options.length.value()); - } - - auto io = internal::checked_pointer_cast(options.io); - ICEBERG_ARROW_ASSIGN_OR_RETURN(auto input, io->fs()->OpenInputFile(file_info)); - return input; + return arrow::OpenArrowInputStream(options.io, options.path, options.length); } Result BuildProjection(::parquet::arrow::FileReader* reader, diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc index a68e9e61e..7e2d3d151 100644 --- a/src/iceberg/parquet/parquet_writer.cc +++ b/src/iceberg/parquet/parquet_writer.cc @@ -29,10 +29,9 @@ #include #include -#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/arrow/arrow_status_internal.h" #include "iceberg/schema_internal.h" -#include "iceberg/util/checked_cast.h" #include "iceberg/util/macros.h" namespace iceberg::parquet { @@ -41,9 +40,7 @@ namespace { Result> OpenOutputStream( const WriterOptions& options) { - auto io = internal::checked_pointer_cast(options.io); - ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, io->fs()->OpenOutputStream(options.path)); - return output; + return arrow::OpenArrowOutputStream(options.io, options.path); } Result<::arrow::Compression::type> ParseCompression(const WriterProperties& properties) { diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 6b98951ad..1d80b29a5 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -156,7 +156,7 @@ if(ICEBERG_BUILD_BUNDLE) add_iceberg_test(arrow_test USE_BUNDLE SOURCES - arrow_fs_file_io_test.cc + arrow_io_test.cc arrow_test.cc gzip_decompress_test.cc metadata_io_test.cc diff --git a/src/iceberg/test/arrow_fs_file_io_test.cc b/src/iceberg/test/arrow_fs_file_io_test.cc deleted file mode 100644 index eacda2f75..000000000 --- a/src/iceberg/test/arrow_fs_file_io_test.cc +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include -#include - -#include "iceberg/arrow/arrow_fs_file_io_internal.h" -#include "iceberg/test/matchers.h" -#include "iceberg/test/temp_file_test_base.h" - -namespace iceberg { - -class LocalFileIOTest : public TempFileTestBase { - protected: - void SetUp() override { - TempFileTestBase::SetUp(); - file_io_ = std::make_shared( - std::make_shared<::arrow::fs::LocalFileSystem>()); - temp_filepath_ = CreateNewTempFilePath(); - } - - std::shared_ptr file_io_; - std::string temp_filepath_; -}; - -TEST_F(LocalFileIOTest, ReadWriteFile) { - auto read_res = file_io_->ReadFile(temp_filepath_, std::nullopt); - EXPECT_THAT(read_res, IsError(ErrorKind::kIOError)); - EXPECT_THAT(read_res, HasErrorMessage("Failed to open local file")); - - auto write_res = file_io_->WriteFile(temp_filepath_, "hello world"); - EXPECT_THAT(write_res, IsOk()); - - read_res = file_io_->ReadFile(temp_filepath_, std::nullopt); - EXPECT_THAT(read_res, IsOk()); - EXPECT_THAT(read_res, HasValue(::testing::Eq("hello world"))); -} - -TEST_F(LocalFileIOTest, DeleteFile) { - auto write_res = file_io_->WriteFile(temp_filepath_, "hello world"); - EXPECT_THAT(write_res, IsOk()); - - auto del_res = file_io_->DeleteFile(temp_filepath_); - EXPECT_THAT(del_res, IsOk()); - - del_res = file_io_->DeleteFile(temp_filepath_); - EXPECT_THAT(del_res, IsError(ErrorKind::kIOError)); - EXPECT_THAT(del_res, HasErrorMessage("Cannot delete file")); -} - -} // namespace iceberg diff --git a/src/iceberg/test/arrow_io_test.cc b/src/iceberg/test/arrow_io_test.cc new file mode 100644 index 000000000..0c885d07a --- /dev/null +++ b/src/iceberg/test/arrow_io_test.cc @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_io_internal.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/std_io.h" +#include "iceberg/test/temp_file_test_base.h" + +namespace iceberg { + +namespace { + +struct CloseState { + bool closed = false; +}; + +class ReadFailureInputStream : public SeekableInputStream { + public: + explicit ReadFailureInputStream(std::shared_ptr state) + : state_(std::move(state)) {} + + Result Position() const override { return 0; } + + Status Seek(int64_t /*position*/) override { return {}; } + + Result Read(std::span /*out*/) override { return 0; } + + Status ReadFully(int64_t /*position*/, std::span /*out*/) override { + return IOError("read failed"); + } + + Status Close() override { + state_->closed = true; + return IOError("close failed"); + } + + private: + std::shared_ptr state_; +}; + +class ReadFailureInputFile : public InputFile { + public: + explicit ReadFailureInputFile(std::shared_ptr state) + : state_(std::move(state)) {} + + std::string_view location() const override { return "read-failure"; } + + Result Size() const override { return 4; } + + Result> Open() override { + return std::make_unique(state_); + } + + private: + std::shared_ptr state_; +}; + +class ReadFailureFileIO : public FileIO { + public: + explicit ReadFailureFileIO(std::shared_ptr state) + : state_(std::move(state)) {} + + Result> NewInputFile( + std::string /*file_location*/) override { + return std::make_unique(state_); + } + + private: + std::shared_ptr state_; +}; + +class WriteFailureOutputStream : public PositionOutputStream { + public: + explicit WriteFailureOutputStream(std::shared_ptr state) + : state_(std::move(state)) {} + + Result Position() const override { return 0; } + + Status Write(std::span /*data*/) override { + return IOError("write failed"); + } + + Status Flush() override { return {}; } + + Status Close() override { + state_->closed = true; + return IOError("close failed"); + } + + private: + std::shared_ptr state_; +}; + +class WriteFailureOutputFile : public OutputFile { + public: + explicit WriteFailureOutputFile(std::shared_ptr state) + : state_(std::move(state)) {} + + std::string_view location() const override { return "write-failure"; } + + Result> Create() override { + return std::make_unique(state_); + } + + Result> CreateOrOverwrite() override { + return std::make_unique(state_); + } + + private: + std::shared_ptr state_; +}; + +class WriteFailureFileIO : public FileIO { + public: + explicit WriteFailureFileIO(std::shared_ptr state) + : state_(std::move(state)) {} + + Result> NewOutputFile( + std::string /*file_location*/) override { + return std::make_unique(state_); + } + + private: + std::shared_ptr state_; +}; + +struct PermissiveReadState { + std::string data; + bool closed = false; + int64_t position = 0; +}; + +class PermissiveInputStream : public SeekableInputStream { + public: + explicit PermissiveInputStream(std::shared_ptr state) + : state_(std::move(state)) {} + + Result Position() const override { return state_->position; } + + Status Seek(int64_t position) override { + if (position < 0) { + return InvalidArgument("Cannot seek to negative position {}", position); + } + state_->position = position; + return {}; + } + + Result Read(std::span out) override { + auto position = static_cast(state_->position); + if (position >= state_->data.size()) { + return 0; + } + auto bytes_to_read = std::min(out.size(), state_->data.size() - position); + std::copy_n(reinterpret_cast(state_->data.data() + position), + bytes_to_read, out.data()); + state_->position += static_cast(bytes_to_read); + return static_cast(bytes_to_read); + } + + Status ReadFully(int64_t position, std::span out) override { + if (position < 0) { + return InvalidArgument("Cannot read from negative position {}", position); + } + auto offset = static_cast(position); + if (offset > state_->data.size() || out.size() > state_->data.size() - offset) { + return IOError("Unexpected EOF"); + } + std::copy_n(reinterpret_cast(state_->data.data() + offset), + out.size(), out.data()); + return {}; + } + + Status Close() override { + state_->closed = true; + return {}; + } + + private: + std::shared_ptr state_; +}; + +class PermissiveInputFile : public InputFile { + public: + explicit PermissiveInputFile(std::shared_ptr state) + : state_(std::move(state)) {} + + std::string_view location() const override { return "permissive-input"; } + + Result Size() const override { + return static_cast(state_->data.size()); + } + + Result> Open() override { + return std::make_unique(state_); + } + + private: + std::shared_ptr state_; +}; + +class PermissiveInputFileIO : public FileIO { + public: + explicit PermissiveInputFileIO(std::shared_ptr state) + : state_(std::move(state)) {} + + Result> NewInputFile( + std::string /*file_location*/) override { + return std::make_unique(state_); + } + + private: + std::shared_ptr state_; +}; + +struct PermissiveWriteState { + std::string data; + bool closed = false; +}; + +class PermissiveOutputStream : public PositionOutputStream { + public: + explicit PermissiveOutputStream(std::shared_ptr state) + : state_(std::move(state)) {} + + Result Position() const override { + return static_cast(state_->data.size()); + } + + Status Write(std::span data) override { + state_->data.append(reinterpret_cast(data.data()), data.size()); + return {}; + } + + Status Flush() override { return {}; } + + Status Close() override { + state_->closed = true; + return {}; + } + + private: + std::shared_ptr state_; +}; + +class PermissiveOutputFile : public OutputFile { + public: + explicit PermissiveOutputFile(std::shared_ptr state) + : state_(std::move(state)) {} + + std::string_view location() const override { return "permissive-output"; } + + Result> Create() override { + return std::make_unique(state_); + } + + Result> CreateOrOverwrite() override { + return std::make_unique(state_); + } + + private: + std::shared_ptr state_; +}; + +class PermissiveOutputFileIO : public FileIO { + public: + explicit PermissiveOutputFileIO(std::shared_ptr state) + : state_(std::move(state)) {} + + Result> NewOutputFile( + std::string /*file_location*/) override { + return std::make_unique(state_); + } + + private: + std::shared_ptr state_; +}; + +} // namespace + +class LocalFileIOTest : public TempFileTestBase { + protected: + void SetUp() override { + TempFileTestBase::SetUp(); + file_io_ = std::make_shared( + std::make_shared<::arrow::fs::LocalFileSystem>()); + temp_filepath_ = CreateNewTempFilePath(); + } + + std::shared_ptr file_io_; + std::string temp_filepath_; +}; + +TEST_F(LocalFileIOTest, ReadWriteFile) { + auto read_res = file_io_->ReadFile(temp_filepath_, std::nullopt); + EXPECT_THAT(read_res, IsError(ErrorKind::kIOError)); + EXPECT_THAT(read_res, HasErrorMessage("Failed to open local file")); + + auto write_res = file_io_->WriteFile(temp_filepath_, "hello world"); + EXPECT_THAT(write_res, IsOk()); + + read_res = file_io_->ReadFile(temp_filepath_, std::nullopt); + EXPECT_THAT(read_res, IsOk()); + EXPECT_THAT(read_res, HasValue(::testing::Eq("hello world"))); +} + +TEST_F(LocalFileIOTest, DeleteFile) { + auto write_res = file_io_->WriteFile(temp_filepath_, "hello world"); + EXPECT_THAT(write_res, IsOk()); + + auto del_res = file_io_->DeleteFile(temp_filepath_); + EXPECT_THAT(del_res, IsOk()); + + del_res = file_io_->DeleteFile(temp_filepath_); + EXPECT_THAT(del_res, IsError(ErrorKind::kIOError)); + EXPECT_THAT(del_res, HasErrorMessage("Cannot delete file")); +} + +void VerifyReadFullyReadsFromAbsolutePosition(const std::shared_ptr& file_io, + const std::string& path) { + ASSERT_THAT(file_io->WriteFile(path, "abcdef"), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto input_file, file_io->NewInputFile(path)); + ICEBERG_UNWRAP_OR_FAIL(auto stream, input_file->Open()); + ASSERT_THAT(stream->Seek(5), IsOk()); + + std::array buffer; + ASSERT_THAT(stream->ReadFully(1, buffer), IsOk()); + + std::string data(reinterpret_cast(buffer.data()), buffer.size()); + EXPECT_EQ(data, "bc"); + + ASSERT_THAT(stream->Seek(5), IsOk()); + std::array next; + ICEBERG_UNWRAP_OR_FAIL(auto bytes_read, stream->Read(next)); + ASSERT_EQ(bytes_read, 1); + EXPECT_EQ(next[0], std::byte{'f'}); +} + +TEST_F(LocalFileIOTest, ReadFullyReadsFromAbsolutePosition) { + ASSERT_NO_FATAL_FAILURE( + VerifyReadFullyReadsFromAbsolutePosition(file_io_, temp_filepath_)); +} + +TEST_F(LocalFileIOTest, StdReadFullyReadsFromAbsolutePosition) { + auto file_io = std::make_shared(); + ASSERT_NO_FATAL_FAILURE( + VerifyReadFullyReadsFromAbsolutePosition(file_io, temp_filepath_)); +} + +TEST_F(LocalFileIOTest, StdReadKeepsPositionAvailableAtEof) { + auto file_io = std::make_shared(); + ASSERT_THAT(file_io->WriteFile(temp_filepath_, "abc"), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto input_file, file_io->NewInputFile(temp_filepath_)); + ICEBERG_UNWRAP_OR_FAIL(auto stream, input_file->Open()); + + std::array buffer; + ICEBERG_UNWRAP_OR_FAIL(auto bytes_read, stream->Read(buffer)); + EXPECT_EQ(bytes_read, 3); + EXPECT_THAT(stream->Position(), HasValue(::testing::Eq(3))); + + ICEBERG_UNWRAP_OR_FAIL(bytes_read, stream->Read(buffer)); + EXPECT_EQ(bytes_read, 0); + EXPECT_THAT(stream->Position(), HasValue(::testing::Eq(3))); +} + +TEST(FileIOAdapterTest, InputAdapterRejectsReadsAfterClose) { + auto state = std::make_shared(); + state->data = "abc"; + auto file_io = std::make_shared(state); + + ICEBERG_UNWRAP_OR_FAIL(auto input, arrow::OpenArrowInputStream(file_io, "input")); + ASSERT_TRUE(input->Close().ok()); + ASSERT_TRUE(input->Close().ok()); + ASSERT_TRUE(state->closed); + + std::array out; + auto result = input->Read(static_cast(out.size()), out.data()); + auto read_at_result = input->ReadAt(0, static_cast(out.size()), out.data()); + + EXPECT_FALSE(result.ok()); + EXPECT_THAT(result.status().ToString(), ::testing::HasSubstr("closed")); + EXPECT_FALSE(read_at_result.ok()); + EXPECT_THAT(read_at_result.status().ToString(), ::testing::HasSubstr("closed")); + EXPECT_EQ(state->position, 0); +} + +TEST(FileIOAdapterTest, InputAdapterRejectsReadAtBeyondKnownSize) { + auto state = std::make_shared(); + state->data = "abc"; + auto file_io = std::make_shared(state); + + ICEBERG_UNWRAP_OR_FAIL(auto input, arrow::OpenArrowInputStream(file_io, "input")); + + std::array out; + auto read_at_end = input->ReadAt(3, static_cast(out.size()), out.data()); + auto read_past_end = input->ReadAt(4, static_cast(out.size()), out.data()); + + ASSERT_TRUE(read_at_end.ok()); + EXPECT_EQ(read_at_end.ValueOrDie(), 0); + EXPECT_FALSE(read_past_end.ok()); + EXPECT_THAT(read_past_end.status().ToString(), ::testing::HasSubstr("out of bounds")); +} + +TEST(FileIOAdapterTest, InputAdapterUsesInputFileSizeWithLengthHint) { + auto state = std::make_shared(); + state->data = "abc"; + auto file_io = std::make_shared(state); + + ICEBERG_UNWRAP_OR_FAIL(auto input, arrow::OpenArrowInputStream(file_io, "input", 99)); + auto size = input->GetSize(); + + ASSERT_TRUE(size.ok()) << size.status().ToString(); + EXPECT_EQ(size.ValueOrDie(), 3); +} + +TEST(FileIOAdapterTest, OutputAdapterRejectsWritesAfterClose) { + auto state = std::make_shared(); + auto file_io = std::make_shared(state); + + ICEBERG_UNWRAP_OR_FAIL(auto output, arrow::OpenArrowOutputStream(file_io, "output")); + ASSERT_TRUE(output->Close().ok()); + ASSERT_TRUE(output->Close().ok()); + ASSERT_TRUE(state->closed); + + auto status = output->Write("x", 1); + auto flush_status = output->Flush(); + + EXPECT_FALSE(status.ok()); + EXPECT_THAT(status.ToString(), ::testing::HasSubstr("closed")); + EXPECT_FALSE(flush_status.ok()); + EXPECT_THAT(flush_status.ToString(), ::testing::HasSubstr("closed")); + EXPECT_TRUE(state->data.empty()); +} + +TEST(FileIOTest, ReadFileReturnsReadErrorWithCloseContext) { + auto state = std::make_shared(); + ReadFailureFileIO file_io(state); + + auto result = file_io.ReadFile("read-failure", std::nullopt); + + EXPECT_TRUE(state->closed); + EXPECT_THAT(result, IsError(ErrorKind::kIOError)); + EXPECT_THAT(result, HasErrorMessage("read failed")); + EXPECT_THAT(result, HasErrorMessage("close failed")); +} + +TEST(FileIOTest, WriteFileReturnsWriteErrorWithCloseContext) { + auto state = std::make_shared(); + WriteFailureFileIO file_io(state); + + auto result = file_io.WriteFile("write-failure", "data"); + + EXPECT_TRUE(state->closed); + EXPECT_THAT(result, IsError(ErrorKind::kIOError)); + EXPECT_THAT(result, HasErrorMessage("write failed")); + EXPECT_THAT(result, HasErrorMessage("close failed")); +} + +} // namespace iceberg diff --git a/src/iceberg/test/arrow_s3_file_io_test.cc b/src/iceberg/test/arrow_s3_file_io_test.cc index d890ad10e..b1caff1e8 100644 --- a/src/iceberg/test/arrow_s3_file_io_test.cc +++ b/src/iceberg/test/arrow_s3_file_io_test.cc @@ -26,7 +26,7 @@ #include #include -#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/arrow_io_util.h" #include "iceberg/arrow/s3/s3_properties.h" #include "iceberg/test/matchers.h" diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc index 82da97ea3..b74fe829b 100644 --- a/src/iceberg/test/avro_test.cc +++ b/src/iceberg/test/avro_test.cc @@ -23,12 +23,13 @@ #include #include #include +#include #include #include #include #include -#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/avro/avro_register.h" #include "iceberg/avro/avro_stream_internal.h" #include "iceberg/avro/avro_writer.h" @@ -37,16 +38,19 @@ #include "iceberg/schema.h" #include "iceberg/schema_internal.h" #include "iceberg/test/matchers.h" +#include "iceberg/test/std_io.h" +#include "iceberg/test/temp_file_test_base.h" #include "iceberg/type.h" #include "iceberg/util/checked_cast.h" namespace iceberg::avro { -class AvroReaderTest : public ::testing::Test { +class AvroReaderTest : public TempFileTestBase { protected: static void SetUpTestSuite() { RegisterAll(); } void SetUp() override { + TempFileTestBase::SetUp(); file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); temp_avro_file_ = "avro_reader_test.avro"; } @@ -187,6 +191,16 @@ TEST_F(AvroReaderTest, ReadTwoFields) { ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); } +TEST_F(AvroReaderTest, RoundTripWithGenericFileIO) { + file_io_ = std::make_shared(); + temp_avro_file_ = CreateNewTempFilePathWithSuffix(".avro"); + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", std::make_shared()), + SchemaField::MakeOptional(2, "name", std::make_shared())}); + + ASSERT_NO_FATAL_FAILURE(WriteAndVerify(schema, R"([[1, "Foo"], [2, "Bar"]])")); +} + TEST_F(AvroReaderTest, ReadReorderedFieldsWithNulls) { CreateSimpleAvroFile(); auto schema = std::make_shared(std::vector{ diff --git a/src/iceberg/test/data_writer_test.cc b/src/iceberg/test/data_writer_test.cc index a3a8fc088..14b7bf628 100644 --- a/src/iceberg/test/data_writer_test.cc +++ b/src/iceberg/test/data_writer_test.cc @@ -25,7 +25,7 @@ #include #include -#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/avro/avro_register.h" #include "iceberg/data/equality_delete_writer.h" #include "iceberg/data/position_delete_writer.h" diff --git a/src/iceberg/test/delete_file_index_test.cc b/src/iceberg/test/delete_file_index_test.cc index b99a2816b..0c8c8821b 100644 --- a/src/iceberg/test/delete_file_index_test.cc +++ b/src/iceberg/test/delete_file_index_test.cc @@ -29,7 +29,7 @@ #include #include -#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/arrow_io_util.h" #include "iceberg/avro/avro_register.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_list.h" diff --git a/src/iceberg/test/delete_loader_test.cc b/src/iceberg/test/delete_loader_test.cc index 6dcd564bf..c365b8bac 100644 --- a/src/iceberg/test/delete_loader_test.cc +++ b/src/iceberg/test/delete_loader_test.cc @@ -25,7 +25,7 @@ #include #include -#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/data/equality_delete_writer.h" #include "iceberg/data/position_delete_writer.h" #include "iceberg/deletes/position_delete_index.h" diff --git a/src/iceberg/test/file_scan_task_test.cc b/src/iceberg/test/file_scan_task_test.cc index ba0c41b37..55bc6a110 100644 --- a/src/iceberg/test/file_scan_task_test.cc +++ b/src/iceberg/test/file_scan_task_test.cc @@ -27,7 +27,7 @@ #include #include -#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/file_format.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/parquet/parquet_register.h" @@ -36,7 +36,6 @@ #include "iceberg/test/matchers.h" #include "iceberg/test/temp_file_test_base.h" #include "iceberg/type.h" -#include "iceberg/util/checked_cast.h" namespace iceberg { @@ -68,12 +67,14 @@ class FileScanTaskTest : public TempFileTestBase { .ValueOrDie()}) .ValueOrDie(); - auto io = internal::checked_cast(*file_io_); - auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie(); + ICEBERG_UNWRAP_OR_FAIL(auto outfile, + arrow::OpenArrowOutputStream(file_io_, temp_parquet_file_)); ASSERT_TRUE(::parquet::arrow::WriteTable(*table, ::arrow::default_memory_pool(), outfile, chunk_size) .ok()); + ASSERT_TRUE(outfile->Close().ok()); + RefreshParquetFileSize(); } // Helper to create a valid but empty Parquet file. @@ -84,11 +85,28 @@ class FileScanTaskTest : public TempFileTestBase { ::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"1"}))}); auto empty_table = ::arrow::Table::FromRecordBatches(arrow_schema, {}).ValueOrDie(); - auto io = internal::checked_cast(*file_io_); - auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie(); + ICEBERG_UNWRAP_OR_FAIL(auto outfile, + arrow::OpenArrowOutputStream(file_io_, temp_parquet_file_)); ASSERT_TRUE(::parquet::arrow::WriteTable(*empty_table, ::arrow::default_memory_pool(), outfile, 1024) .ok()); + ASSERT_TRUE(outfile->Close().ok()); + RefreshParquetFileSize(); + } + + void RefreshParquetFileSize() { + ICEBERG_UNWRAP_OR_FAIL(auto input_file, file_io_->NewInputFile(temp_parquet_file_)); + ICEBERG_UNWRAP_OR_FAIL(auto size, input_file->Size()); + ASSERT_GT(size, 0); + parquet_file_size_ = size; + } + + std::shared_ptr MakeDataFile() const { + auto data_file = std::make_shared(); + data_file->file_path = temp_parquet_file_; + data_file->file_format = FileFormatType::kParquet; + data_file->file_size_in_bytes = parquet_file_size_; + return data_file; } // Helper method to verify the content of the next batch from an ArrowArrayStream. @@ -124,12 +142,11 @@ class FileScanTaskTest : public TempFileTestBase { std::shared_ptr file_io_; std::string temp_parquet_file_; + int64_t parquet_file_size_ = 0; }; TEST_F(FileScanTaskTest, ReadFullSchema) { - auto data_file = std::make_shared(); - data_file->file_path = temp_parquet_file_; - data_file->file_format = FileFormatType::kParquet; + auto data_file = MakeDataFile(); auto projected_schema = std::make_shared( std::vector{SchemaField::MakeRequired(1, "id", int32()), @@ -146,9 +163,7 @@ TEST_F(FileScanTaskTest, ReadFullSchema) { } TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) { - auto data_file = std::make_shared(); - data_file->file_path = temp_parquet_file_; - data_file->file_format = FileFormatType::kParquet; + auto data_file = MakeDataFile(); auto projected_schema = std::make_shared( std::vector{SchemaField::MakeOptional(2, "name", string()), @@ -166,9 +181,7 @@ TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) { TEST_F(FileScanTaskTest, ReadEmptyFile) { CreateEmptyParquetFile(); - auto data_file = std::make_shared(); - data_file->file_path = temp_parquet_file_; - data_file->file_format = FileFormatType::kParquet; + auto data_file = MakeDataFile(); auto projected_schema = std::make_shared( std::vector{SchemaField::MakeRequired(1, "id", int32())}); diff --git a/src/iceberg/test/gzip_decompress_test.cc b/src/iceberg/test/gzip_decompress_test.cc index 3415c46be..b855e4a06 100644 --- a/src/iceberg/test/gzip_decompress_test.cc +++ b/src/iceberg/test/gzip_decompress_test.cc @@ -23,7 +23,7 @@ #include #include -#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/file_io.h" #include "iceberg/test/matchers.h" #include "iceberg/test/temp_file_test_base.h" @@ -69,7 +69,11 @@ TEST_F(GZipTest, GZipDecompressedString) { ASSERT_TRUE(compressed_stream->Flush().ok()); ASSERT_TRUE(compressed_stream->Close().ok()); - auto result = io_->ReadFile(temp_filepath_, test_string.size()); + ICEBERG_UNWRAP_OR_FAIL(auto input_file, io_->NewInputFile(temp_filepath_)); + ICEBERG_UNWRAP_OR_FAIL(auto compressed_size, input_file->Size()); + ASSERT_GE(compressed_size, 0); + + auto result = io_->ReadFile(temp_filepath_, static_cast(compressed_size)); EXPECT_THAT(result, IsOk()); auto gzip_decompressor = std::make_unique(); diff --git a/src/iceberg/test/in_memory_catalog_test.cc b/src/iceberg/test/in_memory_catalog_test.cc index 78f67dda4..1a65098e6 100644 --- a/src/iceberg/test/in_memory_catalog_test.cc +++ b/src/iceberg/test/in_memory_catalog_test.cc @@ -27,7 +27,7 @@ #include #include -#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/partition_spec.h" #include "iceberg/schema.h" #include "iceberg/sort_order.h" diff --git a/src/iceberg/test/manifest_group_test.cc b/src/iceberg/test/manifest_group_test.cc index 34ff9993b..017f98036 100644 --- a/src/iceberg/test/manifest_group_test.cc +++ b/src/iceberg/test/manifest_group_test.cc @@ -29,7 +29,7 @@ #include #include -#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/arrow_io_util.h" #include "iceberg/avro/avro_register.h" #include "iceberg/expression/expressions.h" #include "iceberg/manifest/manifest_entry.h" diff --git a/src/iceberg/test/manifest_list_versions_test.cc b/src/iceberg/test/manifest_list_versions_test.cc index 9c16a02ec..b173d56e7 100644 --- a/src/iceberg/test/manifest_list_versions_test.cc +++ b/src/iceberg/test/manifest_list_versions_test.cc @@ -25,7 +25,7 @@ #include #include -#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/arrow_io_util.h" #include "iceberg/avro/avro_register.h" #include "iceberg/file_reader.h" #include "iceberg/file_writer.h" diff --git a/src/iceberg/test/manifest_reader_stats_test.cc b/src/iceberg/test/manifest_reader_stats_test.cc index a94dca120..6c0e005bc 100644 --- a/src/iceberg/test/manifest_reader_stats_test.cc +++ b/src/iceberg/test/manifest_reader_stats_test.cc @@ -25,7 +25,7 @@ #include -#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/arrow_io_util.h" #include "iceberg/avro/avro_register.h" #include "iceberg/expression/expressions.h" #include "iceberg/manifest/manifest_entry.h" diff --git a/src/iceberg/test/manifest_reader_test.cc b/src/iceberg/test/manifest_reader_test.cc index 3e93f6ff5..3f85729a7 100644 --- a/src/iceberg/test/manifest_reader_test.cc +++ b/src/iceberg/test/manifest_reader_test.cc @@ -28,7 +28,7 @@ #include -#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/arrow_io_util.h" #include "iceberg/avro/avro_register.h" #include "iceberg/expression/expressions.h" #include "iceberg/manifest/manifest_entry.h" diff --git a/src/iceberg/test/manifest_writer_versions_test.cc b/src/iceberg/test/manifest_writer_versions_test.cc index fc61980a7..990224528 100644 --- a/src/iceberg/test/manifest_writer_versions_test.cc +++ b/src/iceberg/test/manifest_writer_versions_test.cc @@ -25,7 +25,7 @@ #include -#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/arrow_io_util.h" #include "iceberg/avro/avro_register.h" #include "iceberg/constants.h" #include "iceberg/file_format.h" diff --git a/src/iceberg/test/metadata_io_test.cc b/src/iceberg/test/metadata_io_test.cc index 72a23ecb2..c780dc18c 100644 --- a/src/iceberg/test/metadata_io_test.cc +++ b/src/iceberg/test/metadata_io_test.cc @@ -27,7 +27,7 @@ #include #include -#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/file_io.h" #include "iceberg/json_serde_internal.h" #include "iceberg/partition_spec.h" diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc index 0d983db58..65a4602d8 100644 --- a/src/iceberg/test/parquet_test.cc +++ b/src/iceberg/test/parquet_test.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -30,7 +31,7 @@ #include #include -#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/arrow/arrow_status_internal.h" #include "iceberg/file_reader.h" #include "iceberg/file_writer.h" @@ -41,6 +42,8 @@ #include "iceberg/schema_field.h" #include "iceberg/schema_internal.h" #include "iceberg/test/matchers.h" +#include "iceberg/test/std_io.h" +#include "iceberg/test/temp_file_test_base.h" #include "iceberg/type.h" #include "iceberg/util/checked_cast.h" #include "iceberg/util/macros.h" @@ -123,11 +126,12 @@ void DoRoundtrip(std::shared_ptr<::arrow::Array> data, std::shared_ptr s } // namespace -class ParquetReaderTest : public ::testing::Test { +class ParquetReaderTest : public TempFileTestBase { protected: static void SetUpTestSuite() { parquet::RegisterAll(); } void SetUp() override { + TempFileTestBase::SetUp(); file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); temp_parquet_file_ = "parquet_reader_test.parquet"; } @@ -232,6 +236,42 @@ TEST_F(ParquetReaderTest, ReadTwoFields) { ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); } +TEST_F(ParquetReaderTest, RoundTripWithGenericFileIO) { + auto file_io = std::make_shared(); + auto path = CreateNewTempFilePathWithSuffix(".parquet"); + + auto schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "name", string())}); + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + auto array = + ::arrow::json::ArrayFromJSONString(::arrow::struct_(arrow_schema->fields()), + R"([[1, "Foo"], [2, "Bar"]])") + .ValueOrDie(); + + WriterProperties writer_properties; + writer_properties.Set(WriterProperties::kParquetCompression, + std::string("uncompressed")); + auto writer_result = WriterFactoryRegistry::Open( + FileFormatType::kParquet, {.path = path, + .schema = schema, + .io = file_io, + .properties = std::move(writer_properties)}); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + ASSERT_THAT(WriteArray(array, *writer), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto length, writer->length()); + + std::shared_ptr<::arrow::Array> out; + auto read_status = ReadArray( + out, {.path = path, .length = length, .io = file_io, .projection = schema}, + nullptr); + ASSERT_THAT(read_status, IsOk()); + ASSERT_TRUE(out->Equals(*array)); +} + TEST_F(ParquetReaderTest, ReadReorderedFieldsWithNulls) { CreateSimpleParquetFile(); diff --git a/src/iceberg/test/rolling_manifest_writer_test.cc b/src/iceberg/test/rolling_manifest_writer_test.cc index b996eb166..2eae5a15b 100644 --- a/src/iceberg/test/rolling_manifest_writer_test.cc +++ b/src/iceberg/test/rolling_manifest_writer_test.cc @@ -28,7 +28,7 @@ #include -#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/arrow_io_util.h" #include "iceberg/avro/avro_register.h" #include "iceberg/file_format.h" #include "iceberg/manifest/manifest_entry.h" diff --git a/src/iceberg/test/scan_test_base.h b/src/iceberg/test/scan_test_base.h index 1a2f5648d..5bd1222e0 100644 --- a/src/iceberg/test/scan_test_base.h +++ b/src/iceberg/test/scan_test_base.h @@ -30,7 +30,7 @@ #include -#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/arrow_io_util.h" #include "iceberg/avro/avro_register.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_list.h" diff --git a/src/iceberg/test/std_io.h b/src/iceberg/test/std_io.h index 3b58267df..3866bddf0 100644 --- a/src/iceberg/test/std_io.h +++ b/src/iceberg/test/std_io.h @@ -19,78 +19,297 @@ #pragma once +#include +#include #include #include +#include +#include +#include #include -#include +#include #include #include +#include +#include #include "iceberg/file_io.h" #include "iceberg/result.h" +#include "iceberg/util/macros.h" namespace iceberg::test { -/// \brief Simple local filesystem FileIO implementation for testing -/// -/// This class provides a basic FileIO implementation that reads and writes -/// files to the local filesystem using standard C++ file streams. -class StdFileIO : public FileIO { +namespace detail { + +inline Result ToStreamSize(size_t size) { + if (size > static_cast(std::numeric_limits::max())) { + return InvalidArgument("Buffer size {} exceeds streamsize max", size); + } + return static_cast(size); +} + +inline Result ToInt64FileSize(uintmax_t size, std::string_view location) { + if (size > static_cast(std::numeric_limits::max())) { + return Invalid("File size for {} exceeds int64_t max", location); + } + return static_cast(size); +} + +} // namespace detail + +class StdSeekableInputStream : public SeekableInputStream { public: - Result ReadFile(const std::string& file_location, - std::optional length) override { - std::ifstream file(file_location, std::ios::binary); - if (!file.is_open()) { - return IOError("Failed to open file for reading: {}", file_location); - } - - if (length.has_value()) { - std::string content(length.value(), '\0'); - file.read(content.data(), length.value()); - if (!file) { - return IOError("Failed to read {} bytes from file: {}", length.value(), - file_location); + explicit StdSeekableInputStream(std::string location) + : location_(std::move(location)), file_(location_, std::ios::binary) {} + + bool is_open() const { return file_.is_open(); } + + Result Position() const override { + auto position = file_.tellg(); + if (position < 0) { + return IOError("Failed to get read position for: {}", location_); + } + return static_cast(position); + } + + Status Seek(int64_t position) override { + if (position < 0) { + return InvalidArgument("Cannot seek to negative position {}", position); + } + file_.clear(); + file_.seekg(position); + if (!file_) { + return IOError("Failed to seek to {} in file: {}", position, location_); + } + return {}; + } + + Result Read(std::span out) override { + if (out.empty()) { + return 0; + } + ICEBERG_ASSIGN_OR_RAISE(auto read_size, detail::ToStreamSize(out.size())); + file_.read(reinterpret_cast(out.data()), read_size); + auto bytes_read = file_.gcount(); + if (!file_) { + if (file_.bad() || !file_.eof()) { + return IOError("Failed to read from file: {}", location_); } - return content; - } else { - std::stringstream buffer; - buffer << file.rdbuf(); - if (!file && !file.eof()) { - return IOError("Failed to read file: {}", file_location); + file_.clear(); + } + if (bytes_read < 0) { + return IOError("Failed to read from file: {}", location_); + } + return static_cast(bytes_read); + } + + Status ReadFully(int64_t position, std::span out) override { + if (position < 0) { + return InvalidArgument("Cannot read from negative position {}", position); + } + if (out.empty()) { + return {}; + } + ICEBERG_ASSIGN_OR_RAISE(auto original_position, Position()); + auto seek_status = Seek(position); + if (!seek_status.has_value()) { + static_cast(Seek(original_position)); + return seek_status; + } + + Status read_status = {}; + size_t total_read = 0; + while (total_read < out.size()) { + auto read_result = Read(out.subspan(total_read)); + if (!read_result.has_value()) { + read_status = std::unexpected(read_result.error()); + break; + } + if (read_result.value() == 0) { + read_status = + IOError("Failed to read {} bytes from file: {}", out.size(), location_); + break; } - return buffer.str(); + total_read += static_cast(read_result.value()); + } + + auto restore_status = Seek(original_position); + ICEBERG_RETURN_UNEXPECTED(read_status); + return restore_status; + } + + Status Close() override { + if (!file_.is_open()) { + return {}; + } + file_.close(); + if (!file_) { + return IOError("Failed to close file: {}", location_); + } + return {}; + } + + private: + std::string location_; + mutable std::ifstream file_; +}; + +class StdPositionOutputStream : public PositionOutputStream { + public: + explicit StdPositionOutputStream(std::string location) + : location_(std::move(location)), + file_(location_, std::ios::binary | std::ios::out | std::ios::trunc) {} + + bool is_open() const { return file_.is_open(); } + + Result Position() const override { + auto position = file_.tellp(); + if (position < 0) { + return IOError("Failed to get write position for: {}", location_); + } + return static_cast(position); + } + + Status Write(std::span data) override { + if (data.empty()) { + return {}; + } + ICEBERG_ASSIGN_OR_RAISE(auto write_size, detail::ToStreamSize(data.size())); + file_.write(reinterpret_cast(data.data()), write_size); + if (!file_) { + return IOError("Failed to write to file: {}", location_); } + return {}; } - Status WriteFile(const std::string& file_location, std::string_view content) override { - // Create parent directories if they don't exist - std::filesystem::path path(file_location); + Status Flush() override { + file_.flush(); + if (!file_) { + return IOError("Failed to flush file: {}", location_); + } + return {}; + } + + Status Close() override { + if (!file_.is_open()) { + return {}; + } + file_.close(); + if (!file_) { + return IOError("Failed to close file: {}", location_); + } + return {}; + } + + private: + std::string location_; + mutable std::ofstream file_; +}; + +class StdInputFile : public InputFile { + public: + explicit StdInputFile(std::string location, + std::optional file_size = std::nullopt) + : location_(std::move(location)), file_size_(file_size) {} + + std::string_view location() const override { return location_; } + + Result Size() const override { + if (file_size_.has_value()) { + return *file_size_; + } + std::error_code ec; + auto size = std::filesystem::file_size(location_, ec); + if (ec) { + return IOError("Failed to get file size for {}: {}", location_, ec.message()); + } + return detail::ToInt64FileSize(size, location_); + } + + Result> Open() override { + auto stream = std::make_unique(location_); + if (!stream->is_open()) { + return IOError("Failed to open file for reading: {}", location_); + } + return stream; + } + + private: + std::string location_; + std::optional file_size_; +}; + +class StdOutputFile : public OutputFile { + public: + explicit StdOutputFile(std::string location) : location_(std::move(location)) {} + + std::string_view location() const override { return location_; } + + Result> Create() override { + return Create(/*overwrite=*/false); + } + + Result> CreateOrOverwrite() override { + return Create(/*overwrite=*/true); + } + + private: + Result> Create(bool overwrite) { + std::filesystem::path path(location_); + std::error_code ec; + auto exists = std::filesystem::exists(path, ec); + if (ec) { + return IOError("Failed to check file existence for {}: {}", location_, + ec.message()); + } + if (!overwrite && exists) { + return AlreadyExists("File already exists: {}", location_); + } if (path.has_parent_path()) { - std::error_code ec; std::filesystem::create_directories(path.parent_path(), ec); if (ec) { - return IOError("Failed to create parent directories for: {}", file_location); + return IOError("Failed to create parent directories for {}: {}", location_, + ec.message()); } } - - std::ofstream file(file_location, std::ios::binary); - if (!file.is_open()) { - return IOError("Failed to open file for writing: {}", file_location); + auto stream = std::make_unique(location_); + if (!stream->is_open()) { + return IOError("Failed to open file for writing: {}", location_); } + return stream; + } - file.write(content.data(), content.size()); - if (!file) { - return IOError("Failed to write to file: {}", file_location); + std::string location_; +}; + +/// \brief Simple local filesystem FileIO implementation for testing +/// +/// This class provides a basic FileIO implementation that reads and writes +/// files to the local filesystem using standard C++ file streams. +class StdFileIO : public FileIO { + public: + Result> NewInputFile(std::string file_location) override { + return std::make_unique(std::move(file_location)); + } + + Result> NewInputFile(std::string file_location, + size_t length) override { + if (length > static_cast(std::numeric_limits::max())) { + return InvalidArgument("File length {} exceeds int64_t max", length); } + return std::make_unique(std::move(file_location), + static_cast(length)); + } - return {}; + Result> NewOutputFile(std::string file_location) override { + return std::make_unique(std::move(file_location)); } Status DeleteFile(const std::string& file_location) override { std::error_code ec; if (!std::filesystem::remove(file_location, ec)) { if (ec) { - return IOError("Failed to delete file: {}", file_location); + return IOError("Failed to delete file {}: {}", file_location, ec.message()); } return IOError("File does not exist: {}", file_location); } diff --git a/src/iceberg/test/update_location_test.cc b/src/iceberg/test/update_location_test.cc index 53b347b56..a208209b3 100644 --- a/src/iceberg/test/update_location_test.cc +++ b/src/iceberg/test/update_location_test.cc @@ -26,7 +26,7 @@ #include #include -#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/result.h" #include "iceberg/test/matchers.h" #include "iceberg/test/update_test_base.h" diff --git a/src/iceberg/test/update_partition_spec_test.cc b/src/iceberg/test/update_partition_spec_test.cc index 632c4a55e..e310c0d1f 100644 --- a/src/iceberg/test/update_partition_spec_test.cc +++ b/src/iceberg/test/update_partition_spec_test.cc @@ -28,7 +28,7 @@ #include #include -#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/catalog/memory/in_memory_catalog.h" #include "iceberg/expression/expressions.h" #include "iceberg/partition_spec.h" diff --git a/src/iceberg/test/update_test_base.h b/src/iceberg/test/update_test_base.h index 310feb37e..b0ad3d20f 100644 --- a/src/iceberg/test/update_test_base.h +++ b/src/iceberg/test/update_test_base.h @@ -26,7 +26,7 @@ #include #include -#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/catalog/memory/in_memory_catalog.h" #include "iceberg/result.h" #include "iceberg/snapshot.h"