Skip to content

feat(io): add streaming FileIO support#641

Open
wgtmac wants to merge 1 commit intoapache:mainfrom
wgtmac:io_api
Open

feat(io): add streaming FileIO support#641
wgtmac wants to merge 1 commit intoapache:mainfrom
wgtmac:io_api

Conversation

@wgtmac
Copy link
Copy Markdown
Member

@wgtmac wgtmac commented May 6, 2026

Introduce InputFile/OutputFile stream APIs and Arrow IO adapters so bundled Avro/Parquet readers and writers can work with generic FileIO implementations.

Introduce InputFile/OutputFile stream APIs and Arrow IO adapters so bundled
Avro/Parquet readers and writers can work with generic FileIO implementations.
Comment on lines +139 to +142
{
std::lock_guard lock(mutex_);
ARROW_RETURN_NOT_OK(CheckOpenLocked());
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It checked in the inner Read function, no need to check here.

std::lock_guard lock(mutex_);
ARROW_RETURN_NOT_OK(CheckOpenLocked());
}
auto bytes_to_read = position >= size_ ? 0 : std::min(nbytes, size_ - position);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it return error if position > size_?

: input_(std::move(input)) {}

Result<int64_t> Position() const override {
ICEBERG_RETURN_UNEXPECTED(CheckOpen());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do the open check in InputStream/OutputStream, I think the open check in InputStreamAdapter/OutputStreamAdapter can be removed and call the inner stream directly.

Comment on lines +531 to +537
if (length.has_value()) {
ICEBERG_ASSIGN_OR_RAISE(size, ToInt64Length(*length));
ICEBERG_ASSIGN_OR_RAISE(input_file, io->NewInputFile(path, *length));
} else {
ICEBERG_ASSIGN_OR_RAISE(input_file, io->NewInputFile(path));
ICEBERG_ASSIGN_OR_RAISE(size, input_file->Size());
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (length.has_value()) {
ICEBERG_ASSIGN_OR_RAISE(size, ToInt64Length(*length));
ICEBERG_ASSIGN_OR_RAISE(input_file, io->NewInputFile(path, *length));
} else {
ICEBERG_ASSIGN_OR_RAISE(input_file, io->NewInputFile(path));
ICEBERG_ASSIGN_OR_RAISE(size, input_file->Size());
}
if (length.has_value()) {
ICEBERG_ASSIGN_OR_RAISE(input_file, io->NewInputFile(path, *length));
} else {
ICEBERG_ASSIGN_OR_RAISE(input_file, io->NewInputFile(path));
}
ICEBERG_ASSIGN_OR_RAISE(size, input_file->Size());

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants