Skip to content

Commit 51ac58a

Browse files
committed
Refactor emission handling and simplify logic
Remove redundant num_groups field and derive emission size from values. Collapse deferred-state retention into a tighter iterator/unzip flow. Eliminate the extra append_batch_values forwarding helper. Split evaluate() into smaller private steps with replay_deferred_batches and finish_emit. Simplify should_defer and refine the deferred replay loop for clarity and efficiency.
1 parent 0425bd0 commit 51ac58a

1 file changed

Lines changed: 63 additions & 75 deletions

File tree

datafusion/functions-aggregate/src/string_agg.rs

Lines changed: 63 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -334,8 +334,6 @@ struct StringAggGroupsAccumulator {
334334
batches: Vec<ArrayRef>,
335335
/// Per-batch `(group_idx, row_idx)` pairs for non-null rows.
336336
batch_entries: Vec<Vec<(u32, u32)>>,
337-
/// Total number of groups tracked.
338-
num_groups: usize,
339337
}
340338

341339
enum StringInputArray<'a> {
@@ -422,7 +420,6 @@ impl StringAggGroupsAccumulator {
422420
total_data_bytes: 0,
423421
batches: Vec::new(),
424422
batch_entries: Vec::new(),
425-
num_groups: 0,
426423
}
427424
}
428425

@@ -433,38 +430,32 @@ impl StringAggGroupsAccumulator {
433430
self.total_data_bytes = 0;
434431
self.batches = Vec::new();
435432
self.batch_entries = Vec::new();
436-
self.num_groups = 0;
437433
}
438434

439435
fn retain_after_emit(&mut self, emit_groups: usize) {
440436
let emit_groups = emit_groups as u32;
441-
let mut retained_batches = Vec::with_capacity(self.batches.len());
442-
let mut retained_entries = Vec::with_capacity(self.batch_entries.len());
443-
444-
for (batch, entries) in self.batches.drain(..).zip(self.batch_entries.drain(..)) {
445-
let entries: Vec<_> = entries
446-
.into_iter()
447-
.filter_map(|(group_idx, row_idx)| {
448-
if group_idx >= emit_groups {
449-
Some((group_idx - emit_groups, row_idx))
450-
} else {
451-
None
452-
}
453-
})
454-
.collect();
455-
if entries.is_empty() {
456-
continue;
457-
}
458-
459-
// Keep the original arrays for this prototype and only renumber
460-
// retained groups. SUB_ISSUE_04 will compact mixed batches so
461-
// partially emitted batches no longer pin their full inputs.
462-
retained_batches.push(batch);
463-
retained_entries.push(entries);
464-
}
465-
466-
self.batches = retained_batches;
467-
self.batch_entries = retained_entries;
437+
(self.batches, self.batch_entries) = self
438+
.batches
439+
.drain(..)
440+
.zip(self.batch_entries.drain(..))
441+
.filter_map(|(batch, entries)| {
442+
let entries = entries
443+
.into_iter()
444+
.filter_map(|(group_idx, row_idx)| {
445+
if group_idx >= emit_groups {
446+
Some((group_idx - emit_groups, row_idx))
447+
} else {
448+
None
449+
}
450+
})
451+
.collect::<Vec<_>>();
452+
453+
// Keep the original arrays for this prototype and only renumber
454+
// retained groups. SUB_ISSUE_04 will compact mixed batches so
455+
// partially emitted batches no longer pin their full inputs.
456+
(!entries.is_empty()).then_some((batch, entries))
457+
})
458+
.unzip();
468459
}
469460

470461
fn append_rows_typed<'a, A>(array: &A, group_indices: &[usize]) -> Vec<(u32, u32)>
@@ -529,12 +520,12 @@ impl StringAggGroupsAccumulator {
529520
) where
530521
A: ArrayAccessor<Item = &'a str>,
531522
{
532-
for &(group_idx, row_idx) in entries {
523+
for (group_idx, row_idx) in entries
524+
.iter()
525+
.copied()
526+
.filter(|(group_idx, _)| (*group_idx as usize) < emit_groups)
527+
{
533528
let group_idx = group_idx as usize;
534-
if group_idx >= emit_groups {
535-
continue;
536-
}
537-
538529
let row_idx = row_idx as usize;
539530
debug_assert!(!array.is_null(row_idx));
540531
let _ = Self::append_group_value(
@@ -546,31 +537,42 @@ impl StringAggGroupsAccumulator {
546537
}
547538
}
548539

549-
fn append_batch_values(
540+
fn should_defer(
541+
&self,
542+
input: &StringInputArray<'_>,
543+
total_num_groups: usize,
544+
) -> bool {
545+
if total_num_groups < Self::DEFER_GROUP_THRESHOLD {
546+
return false;
547+
}
548+
549+
input
550+
.sample_non_null_len()
551+
.is_some_and(|len| len >= Self::DEFER_PAYLOAD_LEN_THRESHOLD)
552+
}
553+
554+
fn replay_deferred_batches(
555+
&self,
550556
values: &mut [Option<String>],
551-
entries: &[(u32, u32)],
552-
array: &ArrayRef,
553-
delimiter: &str,
554557
emit_groups: usize,
555558
) -> Result<()> {
556-
StringInputArray::try_new(array)?.append_batch_values(
557-
values,
558-
entries,
559-
delimiter,
560-
emit_groups,
561-
);
559+
for (batch, entries) in self.batches.iter().zip(&self.batch_entries) {
560+
StringInputArray::try_new(batch)?.append_batch_values(
561+
values,
562+
entries,
563+
&self.delimiter,
564+
emit_groups,
565+
);
566+
}
567+
562568
Ok(())
563569
}
564570

565-
fn should_defer(
566-
&self,
567-
input: &StringInputArray<'_>,
568-
total_num_groups: usize,
569-
) -> bool {
570-
total_num_groups >= Self::DEFER_GROUP_THRESHOLD
571-
&& input
572-
.sample_non_null_len()
573-
.is_some_and(|len| len >= Self::DEFER_PAYLOAD_LEN_THRESHOLD)
571+
fn finish_emit(&mut self, emit_to: EmitTo, emit_groups: usize) {
572+
match emit_to {
573+
EmitTo::All => self.clear_state(),
574+
EmitTo::First(_) => self.retain_after_emit(emit_groups),
575+
}
574576
}
575577
}
576578

@@ -582,7 +584,6 @@ impl GroupsAccumulator for StringAggGroupsAccumulator {
582584
opt_filter: Option<&BooleanArray>,
583585
total_num_groups: usize,
584586
) -> Result<()> {
585-
self.num_groups = self.num_groups.max(total_num_groups);
586587
self.values.resize(total_num_groups, None);
587588
let array = apply_filter_as_nulls(&values[0], opt_filter)?;
588589
let input = StringInputArray::try_new(&array)?;
@@ -605,31 +606,18 @@ impl GroupsAccumulator for StringAggGroupsAccumulator {
605606
}
606607

607608
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
608-
let mut to_emit = emit_to.take_needed(&mut self.values);
609+
let mut to_emit = match emit_to {
610+
EmitTo::All => std::mem::take(&mut self.values),
611+
EmitTo::First(_) => emit_to.take_needed(&mut self.values),
612+
};
609613
let emit_groups = to_emit.len();
610614
let emitted_bytes: usize = to_emit
611615
.iter()
612616
.filter_map(|opt| opt.as_ref().map(|s| s.len()))
613617
.sum();
614618
self.total_data_bytes -= emitted_bytes;
615-
616-
for (batch, entries) in self.batches.iter().zip(&self.batch_entries) {
617-
Self::append_batch_values(
618-
&mut to_emit,
619-
entries,
620-
batch,
621-
&self.delimiter,
622-
emit_groups,
623-
)?;
624-
}
625-
626-
match emit_to {
627-
EmitTo::All => self.clear_state(),
628-
EmitTo::First(_) => {
629-
self.retain_after_emit(emit_groups);
630-
self.num_groups = self.values.len();
631-
}
632-
}
619+
self.replay_deferred_batches(&mut to_emit, emit_groups)?;
620+
self.finish_emit(emit_to, emit_groups);
633621

634622
Ok(Arc::new(LargeStringArray::from(to_emit)))
635623
}

0 commit comments

Comments
 (0)