Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 45 additions & 23 deletions r/R/convert-array-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -57,48 +57,70 @@ convert_array_stream <- function(array_stream, to = NULL, size = NULL, n = Inf)

n <- as.double(n)[1]


if (!is.null(size)) {
# The underlying nanoarrow_c_convert_array_stream() currently requires that
# the total length of all batches is known in advance. If the caller
# provided this we can save a bit of work.
.Call(
return(.Call(
nanoarrow_c_convert_array_stream,
array_stream,
to,
as.double(size)[1],
n
)
} else {
# Otherwise, we need to collect all batches and calculate the total length
# before calling nanoarrow_c_convert_array_stream().
batches <- collect_array_stream(
array_stream,
n,
schema = schema,
validate = FALSE
)
))
}

# If there is exactly one batch, use convert_array(). Converting a single
# array currently takes a more efficient code path for types that can be
# converted as ALTREP (e.g., strings).
if (length(batches) == 1L) {
return(.Call(nanoarrow_c_convert_array, batches[[1]], to))
}
# Otherwise, we need to collect all batches.
batches <- collect_array_stream(
array_stream,
n,
schema = schema,
validate = FALSE
)

# Otherwise, compute the final size, create another array stream,
# and call convert_array_stream() with a known size. Using .Call()
# directly because we have already type checked the inputs.
# If 0 batches, return zero-length result consistent with 'to'
if (length(batches) == 0L) {
basic_stream <- .Call(nanoarrow_c_basic_array_stream, list(), schema, FALSE)
return(.Call(nanoarrow_c_convert_array_stream, basic_stream, to, 0, Inf))
}

# If there is exactly one batch, use convert_array(). Converting a single
# array currently takes a more efficient code path for types that can be
# converted as ALTREP (e.g., strings).
if (length(batches) == 1L) {
return(.Call(nanoarrow_c_convert_array, batches[[1]], to))
}

# For `to` with known bind function, convert from already collected array and avoid recreating
# another array stream which has very bad exponential time behavior.
if (is.data.frame(to) || is.matrix(to)) {
converted_batches <- lapply(batches, function(b) .Call(nanoarrow_c_convert_array, b, to))
if (is.data.frame(to)) {
# Prefer vctrs::vec_rbind over base::rbind because it's faster.
if (requireNamespace("vctrs", quietly = TRUE)) {
converted <- do.call(vctrs::vec_rbind, converted_batches)
} else {
converted <- do.call(rbind, converted_batches)
}
} else if (is.matrix(to)) {
converted <- do.call(rbind, converted_batches)
} else {
stop("unhandled case")
}
return(converted)
} else {
# When there's no way to bind converted batches together, have to create a single stream and convert that.
# This method has horrible exponential time when ALTREP is involved, so avoid whenever possible.
size <- .Call(nanoarrow_c_array_list_total_length, batches)
basic_stream <- .Call(nanoarrow_c_basic_array_stream, batches, schema, FALSE)

.Call(
return(.Call(
nanoarrow_c_convert_array_stream,
basic_stream,
to,
as.double(size),
Inf
)
))
}
}

Expand Down