Skip to content

Commit b53294e

Browse files
committed
fix: wrap read_all_index in spawn_blocking to avoid nested tokio runtime
1 parent 721add5 commit b53294e

File tree

4 files changed

+134
-398
lines changed

4 files changed

+134
-398
lines changed

LOG/2026_01_28_10_00_CHANGELOG_HEALING.md

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,3 +173,71 @@ The correct solution is to use `spawn_blocking` with **synchronous** function ve
173173
- **Total Time**: 485s
174174
- **Commit**: `cdafbae2e` pushed to `main`
175175

176+
---
177+
178+
## Third Fix: Polars Nested Runtime Issue (v0.2.137)
179+
180+
### Problem
181+
v0.2.136 still failed with the same "Cannot start a runtime from within a runtime" panic.
182+
The error occurred during `compute_tree_metrics_impl()` execution, but the root cause was
183+
polars' internal tokio usage.
184+
185+
### Root Cause Analysis
186+
1. Polars uses tokio internally through `polars-stream` for its streaming engine
187+
2. When polars operations are called from within a tokio async context, polars may try
188+
to create its own runtime or call `block_on`
189+
3. The `load_or_build_dataframe_cached` function was running directly on the tokio worker
190+
thread, and when it called `index.to_dataframe()` (which uses polars), the nested
191+
runtime issue occurred
192+
193+
### Evidence
194+
```bash
195+
$ cargo tree -p polars-stream 2>/dev/null | grep -i tokio
196+
│ │ │ │ │ ├── tokio v1.49.0
197+
│ │ │ │ │ ├── tokio-util v0.7.18
198+
│ │ │ │ ├── tokio v1.49.0 (*)
199+
...
200+
```
201+
202+
### Fix Applied
203+
**File:** `crates/uffs-mft/src/cache.rs`
204+
205+
Wrapped the entire `load_or_build_dataframe_cached` function body in `spawn_blocking`:
206+
207+
```rust
208+
#[cfg(windows)]
209+
pub async fn load_or_build_dataframe_cached(
210+
drive: char,
211+
ttl_seconds: u64,
212+
) -> crate::Result<uffs_polars::DataFrame> {
213+
// Use spawn_blocking to run all MFT reading and polars operations on a
214+
// dedicated blocking thread. This avoids nested tokio runtime issues since
215+
// polars uses tokio internally for some operations.
216+
tokio::task::spawn_blocking(move || load_or_build_dataframe_cached_sync(drive, ttl_seconds))
217+
.await
218+
.map_err(|e| crate::MftError::InvalidInput(format!("Task join error: {e}")))?
219+
}
220+
221+
/// Synchronous version of `load_or_build_dataframe_cached`.
222+
#[cfg(windows)]
223+
fn load_or_build_dataframe_cached_sync(
224+
drive: char,
225+
ttl_seconds: u64,
226+
) -> crate::Result<uffs_polars::DataFrame> {
227+
// ... all MFT reading and polars operations run here on blocking thread
228+
}
229+
```
230+
231+
### Why This Works
232+
- `spawn_blocking` moves the entire operation to a dedicated blocking thread pool
233+
- The blocking thread is NOT part of the tokio async worker pool
234+
- When polars tries to create a runtime or call `block_on`, it succeeds because
235+
there's no existing runtime context on the blocking thread
236+
- All MFT reading and polars operations (including `to_dataframe()`) run in isolation
237+
238+
### CI Result
239+
- **Status**: ✅ PASSED
240+
- **Version**: 0.2.137
241+
- **Total Time**: 495s
242+
- **Commit**: `721add5ef` pushed to `main`
243+

crates/uffs-mft/src/reader.rs

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -915,13 +915,40 @@ impl MftReader {
915915
/// Use this when you need fast indexing and searching. Convert to DataFrame
916916
/// later with `MftIndex::to_dataframe()` if you need Polars analytics.
917917
///
918+
/// # Note
919+
///
920+
/// This function uses `spawn_blocking` internally to run MFT reading on a
921+
/// dedicated blocking thread. This avoids potential nested tokio runtime
922+
/// issues that can occur when dependencies (like polars) try to create
923+
/// their own runtime.
924+
///
918925
/// # Errors
919926
///
920927
/// Returns an error if MFT reading fails.
921928
#[cfg(windows)]
922-
#[allow(clippy::unused_async)]
923929
pub async fn read_all_index(&self) -> Result<crate::index::MftIndex> {
924-
self.read_mft_index_internal(None::<fn(MftProgress)>)
930+
// Capture configuration to recreate reader in blocking thread
931+
let volume = self.volume;
932+
let mode = self.mode;
933+
let merge_extensions = self.merge_extensions;
934+
let use_bitmap = self.use_bitmap;
935+
let expand_hardlinks = self.expand_hardlinks;
936+
937+
tokio::task::spawn_blocking(move || {
938+
// Create a new reader in the blocking thread
939+
let handle = crate::platform::VolumeHandle::open(volume)?;
940+
let reader = MftReader {
941+
volume,
942+
handle,
943+
mode,
944+
merge_extensions,
945+
use_bitmap,
946+
expand_hardlinks,
947+
};
948+
reader.read_mft_index_internal(None::<fn(MftProgress)>)
949+
})
950+
.await
951+
.map_err(|e| MftError::InvalidInput(format!("Task join error: {e}")))?
925952
}
926953

927954
/// Read MFT into lean index (non-Windows stub).
@@ -994,16 +1021,42 @@ impl MftReader {
9941021
///
9951022
/// * `callback` - Function called periodically with progress updates
9961023
///
1024+
/// # Note
1025+
///
1026+
/// This function uses `spawn_blocking` internally to run MFT reading on a
1027+
/// dedicated blocking thread. This avoids potential nested tokio runtime
1028+
/// issues.
1029+
///
9971030
/// # Errors
9981031
///
9991032
/// Returns an error if MFT reading fails.
10001033
#[cfg(windows)]
1001-
#[allow(clippy::unused_async)]
10021034
pub async fn read_index_with_progress<F>(&self, callback: F) -> Result<crate::index::MftIndex>
10031035
where
10041036
F: Fn(MftProgress) + Send + 'static,
10051037
{
1006-
self.read_mft_index_internal(Some(callback))
1038+
// Capture configuration to recreate reader in blocking thread
1039+
let volume = self.volume;
1040+
let mode = self.mode;
1041+
let merge_extensions = self.merge_extensions;
1042+
let use_bitmap = self.use_bitmap;
1043+
let expand_hardlinks = self.expand_hardlinks;
1044+
1045+
tokio::task::spawn_blocking(move || {
1046+
// Create a new reader in the blocking thread
1047+
let handle = crate::platform::VolumeHandle::open(volume)?;
1048+
let reader = MftReader {
1049+
volume,
1050+
handle,
1051+
mode,
1052+
merge_extensions,
1053+
use_bitmap,
1054+
expand_hardlinks,
1055+
};
1056+
reader.read_mft_index_internal(Some(callback))
1057+
})
1058+
.await
1059+
.map_err(|e| MftError::InvalidInput(format!("Task join error: {e}")))?
10071060
}
10081061

10091062
/// Read MFT into lean index with progress (non-Windows stub).

0 commit comments

Comments
 (0)