Skip to content

Refactor StreamingCloudFetchProvider struct to use channels\n\nTask ID: task-3.1-refactor-streaming-provider-struct#357

Draft
eric-wang-1990 wants to merge 9 commits intomainfrom
stack/pr-phase3-integration
Draft

Refactor StreamingCloudFetchProvider struct to use channels\n\nTask ID: task-3.1-refactor-streaming-provider-struct#357
eric-wang-1990 wants to merge 9 commits intomainfrom
stack/pr-phase3-integration

Conversation

@eric-wang-1990
Copy link
Collaborator

@eric-wang-1990 eric-wang-1990 commented Mar 18, 2026

🥞 Stacked PR

Use this link to review incremental changes.


What's Changed

Please fill in a description of the changes here.

This contains breaking changes.

Closes #NNN.

@eric-wang-1990
Copy link
Collaborator Author

[High] Cancellation test doesn't actually test cancellation mid-stream

The cancellation test reads 3 batches then breaks out of the loop — this is a clean exit, not a cancellation. provider.cancel() is called after the consumer task has already finished. The test doesn't verify that an in-flight download is interrupted.

Restructure so cancellation races with an active download:

let provider_for_cancel = Arc::clone(&provider);
tokio::spawn(async move {
    tokio::time::sleep(Duration::from_millis(50)).await;
    provider_for_cancel.cancel();
});

let mut count = 0;
while let Ok(Some(_)) = provider.next_batch().await {
    count += 1;
}
// Should stop before consuming all chunks
assert!(count < total_chunks, "Expected early termination via cancel");

This comment was generated with GitHub MCP.

@eric-wang-1990
Copy link
Collaborator Author

[Medium] Consumer result_rx await has no timeout — worker panic causes indefinite hang

If a worker panics (not just returns Err), tokio drops the task and the oneshot::Sender is dropped without sending. The consumer's handle.result_rx await gets a RecvError which is handled — but only if the cancel_token isn't the escape hatch. If cancellation never fires, the consumer hangs.

Add a timeout as a safety net alongside the cancellation select arm:

tokio::select! {
    result = handle.result_rx => { ... }
    _ = tokio::time::sleep(Duration::from_secs(DEFAULT_CHUNK_READY_TIMEOUT_SECS as u64)) => {
        return Err(DatabricksErrorHelper::io()
            .message(format!("Timeout waiting for worker result on chunk {}", chunk_index)));
    }
    _ = self.cancel_token.cancelled() => { ... }
}

This comment was generated with GitHub MCP.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant