@@ -3507,6 +3507,222 @@ impl PipelinedMftReader {
35073507
35083508 Ok ( all_results)
35093509 }
3510+
3511+ /// Reads all MFT records with pipelined I/O and parallel parsing.
3512+ ///
3513+ /// This method combines the benefits of pipelined I/O (true I/O+CPU
3514+ /// overlap) with multi-core parallel parsing using Rayon. This is the
3515+ /// optimal mode for HDDs with multi-core CPUs.
3516+ ///
3517+ /// Architecture:
3518+ /// ```text
3519+ /// ┌─────────────┐ ┌──────────────────┐ ┌─────────────────────┐
3520+ /// │ Reader │────▶│ Bounded Channel │────▶│ Rayon Thread Pool │
3521+ /// │ Thread │ │ (backpressure) │ │ (parallel parsing) │
3522+ /// └─────────────┘ └──────────────────┘ └─────────────────────┘
3523+ /// │ │
3524+ /// ▼ ▼
3525+ /// Read chunks Parse records in
3526+ /// from disk parallel batches
3527+ /// ```
3528+ #[ allow( unsafe_code) ]
3529+ pub fn read_all_pipelined_parallel < F > (
3530+ & self ,
3531+ handle : HANDLE ,
3532+ merge_extensions : bool ,
3533+ mut progress_callback : Option < F > ,
3534+ ) -> Result < Vec < ParsedRecord > >
3535+ where
3536+ F : FnMut ( u64 , u64 ) ,
3537+ {
3538+ use std:: thread;
3539+
3540+ use crossbeam_channel:: { Receiver , Sender , bounded} ;
3541+
3542+ let chunks = generate_read_chunks ( & self . extent_map , self . bitmap . as_ref ( ) , self . chunk_size ) ;
3543+ let record_size = self . extent_map . bytes_per_record ;
3544+ let num_chunks = chunks. len ( ) ;
3545+
3546+ if num_chunks == 0 {
3547+ return Ok ( Vec :: new ( ) ) ;
3548+ }
3549+
3550+ // Calculate total bytes for progress
3551+ let total_bytes: u64 = chunks
3552+ . iter ( )
3553+ . map ( |c| c. record_count * u64:: from ( record_size) )
3554+ . sum ( ) ;
3555+
3556+ // Estimate capacity
3557+ let estimated_records = if let Some ( ref bm) = self . bitmap {
3558+ bm. count_in_use ( )
3559+ } else {
3560+ self . extent_map . total_records ( ) as usize
3561+ } ;
3562+
3563+ info ! (
3564+ chunks = num_chunks,
3565+ estimated_records,
3566+ chunk_size_mb = self . chunk_size / ( 1024 * 1024 ) ,
3567+ pipeline_depth = self . pipeline_depth,
3568+ rayon_threads = rayon:: current_num_threads( ) ,
3569+ "🚀 Starting pipelined-parallel read with I/O+CPU overlap and multi-core parsing"
3570+ ) ;
3571+
3572+ // Create bounded channel for backpressure
3573+ // Use larger depth for parallel mode to keep Rayon workers fed
3574+ let parallel_depth = self . pipeline_depth * 2 ;
3575+ let ( tx, rx) : ( Sender < ReadBuffer > , Receiver < ReadBuffer > ) = bounded ( parallel_depth) ;
3576+
3577+ // Pre-allocate buffer pool for the reader thread
3578+ let max_chunk_size = chunks
3579+ . iter ( )
3580+ . map ( |c| c. record_count * u64:: from ( record_size) )
3581+ . max ( )
3582+ . unwrap_or ( self . chunk_size as u64 ) as usize ;
3583+
3584+ // Clone data needed by reader thread
3585+ let chunks_for_reader = chunks;
3586+ let handle_raw = handle. 0 as usize ; // Convert to usize for Send
3587+
3588+ // Spawn reader thread
3589+ let reader_handle = thread:: spawn ( move || {
3590+ // Reconstruct HANDLE in reader thread
3591+ let handle = HANDLE ( handle_raw as * mut std:: ffi:: c_void ) ;
3592+
3593+ // Create buffer pool
3594+ let mut buffer_pool: Vec < AlignedBuffer > = Vec :: new ( ) ;
3595+
3596+ for chunk in chunks_for_reader {
3597+ // Get or create a buffer
3598+ let mut buffer = buffer_pool
3599+ . pop ( )
3600+ . unwrap_or_else ( || AlignedBuffer :: new ( max_chunk_size + SECTOR_SIZE ) ) ;
3601+
3602+ // Read chunk into buffer
3603+ match read_chunk_into_buffer_static ( handle, & chunk, record_size, & mut buffer) {
3604+ Ok ( bytes_read) => {
3605+ let read_buffer = ReadBuffer {
3606+ buffer,
3607+ bytes_read,
3608+ chunk,
3609+ record_size,
3610+ } ;
3611+
3612+ // Send to parser (blocks if channel is full - backpressure)
3613+ if tx. send ( read_buffer) . is_err ( ) {
3614+ // Receiver dropped, stop reading
3615+ break ;
3616+ }
3617+ }
3618+ Err ( e) => {
3619+ warn ! ( error = %e, "Failed to read chunk, skipping" ) ;
3620+ // Return buffer to pool
3621+ buffer_pool. push ( buffer) ;
3622+ }
3623+ }
3624+ }
3625+ // tx is dropped here, signaling end of stream
3626+ } ) ;
3627+
3628+ // Collect all buffers first, then parse in parallel with Rayon
3629+ // This allows Rayon to efficiently distribute work across cores
3630+ let mut all_buffers: Vec < ReadBuffer > = Vec :: with_capacity ( num_chunks) ;
3631+ let mut bytes_read_total: u64 = 0 ;
3632+
3633+ while let Ok ( read_buffer) = rx. recv ( ) {
3634+ bytes_read_total += read_buffer. bytes_read as u64 ;
3635+ all_buffers. push ( read_buffer) ;
3636+
3637+ // Report progress during collection phase
3638+ if let Some ( ref mut cb) = progress_callback {
3639+ cb ( bytes_read_total, total_bytes) ;
3640+ }
3641+ }
3642+
3643+ // Wait for reader thread to finish
3644+ if let Err ( e) = reader_handle. join ( ) {
3645+ warn ! ( "Reader thread panicked: {:?}" , e) ;
3646+ }
3647+
3648+ info ! (
3649+ buffers = all_buffers. len( ) ,
3650+ bytes_mb = bytes_read_total / ( 1024 * 1024 ) ,
3651+ "📦 All buffers collected, starting parallel parsing"
3652+ ) ;
3653+
3654+ // Parse all buffers in parallel using Rayon
3655+ let parse_results: Vec < ParseResult > = all_buffers
3656+ . par_iter ( )
3657+ . flat_map ( |read_buffer| parse_buffer_to_results ( read_buffer, merge_extensions) )
3658+ . collect ( ) ;
3659+
3660+ info ! (
3661+ parse_results = parse_results. len( ) ,
3662+ "✅ Parallel parsing complete"
3663+ ) ;
3664+
3665+ // Merge results using MftRecordMerger (single-threaded, as designed)
3666+ let mut merger = MftRecordMerger :: with_capacity ( estimated_records) ;
3667+ for result in parse_results {
3668+ merger. add_result ( result) ;
3669+ }
3670+
3671+ let all_results = merger. merge ( ) ;
3672+
3673+ info ! (
3674+ records = all_results. len( ) ,
3675+ bytes_mb = bytes_read_total / ( 1024 * 1024 ) ,
3676+ "✅ Pipelined-parallel read complete"
3677+ ) ;
3678+
3679+ Ok ( all_results)
3680+ }
3681+ }
3682+
3683+ /// Parses all records in a buffer and returns the results.
3684+ ///
3685+ /// This is a helper function used by both serial and parallel parsing paths.
3686+ /// It handles skip_begin, effective record count, bounds checking, fixup, and
3687+ /// parsing.
3688+ fn parse_buffer_to_results ( read_buffer : & ReadBuffer , merge_extensions : bool ) -> Vec < ParseResult > {
3689+ let ReadBuffer {
3690+ buffer,
3691+ bytes_read,
3692+ chunk,
3693+ record_size,
3694+ } = read_buffer;
3695+
3696+ let skip_begin = chunk. skip_begin as usize ;
3697+ let effective_count = chunk. effective_record_count ( ) as usize ;
3698+ let record_size_usize = * record_size as usize ;
3699+
3700+ let mut results = Vec :: with_capacity ( effective_count) ;
3701+
3702+ for i in 0 ..effective_count {
3703+ let offset = ( skip_begin + i) * record_size_usize;
3704+ if offset + record_size_usize > * bytes_read {
3705+ break ;
3706+ }
3707+
3708+ let record_data = & buffer. as_slice ( ) [ offset..offset + record_size_usize] ;
3709+ let mut record_buf = record_data. to_vec ( ) ;
3710+ let frs = chunk. start_frs + skip_begin as u64 + i as u64 ;
3711+
3712+ // Apply fixup
3713+ if !apply_fixup ( & mut record_buf) {
3714+ continue ;
3715+ }
3716+
3717+ // Parse record
3718+ if merge_extensions {
3719+ results. push ( parse_record_full ( & record_buf, frs) ) ;
3720+ } else if let Some ( rec) = parse_record ( & record_buf, frs) {
3721+ results. push ( ParseResult :: Base ( rec) ) ;
3722+ }
3723+ }
3724+
3725+ results
35103726}
35113727
35123728/// Static helper to read a chunk into a buffer (for use in reader thread).
0 commit comments