From ed684d0b4e215c8ba64c3c4ee62b57f3fe715607 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Fri, 20 Oct 2023 15:02:55 +0200 Subject: [PATCH] compute: reduce boilerplate in linear join rendering This commit reduces the amount of boilerplate code in linear join rendering in two ways: * It replaces the four `dispatch_` functions with a single macro. * It makes `differential_join` and `differential_join_inner` methods of `Context`, to reduce the number of arguments that need to be passed around. --- src/compute/src/render/join/linear_join.rs | 672 ++++++--------------- 1 file changed, 197 insertions(+), 475 deletions(-) diff --git a/src/compute/src/render/join/linear_join.rs b/src/compute/src/render/join/linear_join.rs index f40fc7f965d57..04d9da3de1da6 100644 --- a/src/compute/src/render/join/linear_join.rs +++ b/src/compute/src/render/join/linear_join.rs @@ -132,7 +132,7 @@ where inputs: Vec>, linear_plan: LinearJoinPlan, ) -> CollectionBundle { - self.scope.region_named("Join(Linear)", |inner| { + self.scope.clone().region_named("Join(Linear)", |inner| { // Collect all error streams, and concatenate them at the end. let mut errors = Vec::new(); @@ -193,14 +193,11 @@ where for stage_plan in linear_plan.stage_plans.into_iter() { // Different variants of `joined` implement this differently, // and the logic is centralized there. - let stream = differential_join( - self.linear_join_spec, + let stream = self.differential_join( joined, inputs[stage_plan.lookup_relation].enter_region(inner), stage_plan, &mut errors, - self.shutdown_token.clone(), - self.enable_specialized_arrangements, ); // Update joined results and capture any errors. joined = JoinedFlavor::Collection(stream); @@ -241,456 +238,208 @@ where .leave_region() }) } -} -/// Looks up the arrangement for the next input and joins it to the arranged -/// version of the join of previous inputs. This is split into its own method -/// to enable reuse of code with different types of `prev_keyed`. -fn differential_join( - join_spec: LinearJoinSpec, - mut joined: JoinedFlavor, - lookup_relation: CollectionBundle, - LinearStagePlan { - stream_key, - stream_thinning, - lookup_key, - closure, - lookup_relation: _, - }: LinearStagePlan, - errors: &mut Vec>, - shutdown_token: ShutdownToken, - _enable_specialized_arrangements: bool, -) -> Collection -where - G: Scope, - G::Timestamp: Lattice + Refines, - T: Timestamp + Lattice, -{ - // If we have only a streamed collection, we must first form an arrangement. - if let JoinedFlavor::Collection(stream) = joined { - let mut row_buf = Row::default(); - let (keyed, errs) = stream.map_fallible("LinearJoinKeyPreparation", { - // Reuseable allocation for unpacking. - let mut datums = DatumVec::new(); - move |row| { - let temp_storage = RowArena::new(); - let datums_local = datums.borrow_with(&row); - row_buf.packer().try_extend( - stream_key - .iter() - .map(|e| e.eval(&datums_local, &temp_storage)), - )?; - let key = row_buf.clone(); - row_buf - .packer() - .extend(stream_thinning.iter().map(|e| datums_local[*e])); - let value = row_buf.clone(); - Ok((key, value)) - } - }); - - errors.push(errs); + /// Looks up the arrangement for the next input and joins it to the arranged + /// version of the join of previous inputs. + fn differential_join( + &self, + mut joined: JoinedFlavor, + lookup_relation: CollectionBundle, + LinearStagePlan { + stream_key, + stream_thinning, + lookup_key, + closure, + lookup_relation: _, + }: LinearStagePlan, + errors: &mut Vec>, + ) -> Collection + where + S: Scope, + { + // If we have only a streamed collection, we must first form an arrangement. + if let JoinedFlavor::Collection(stream) = joined { + let mut row_buf = Row::default(); + let (keyed, errs) = stream.map_fallible("LinearJoinKeyPreparation", { + // Reuseable allocation for unpacking. + let mut datums = DatumVec::new(); + move |row| { + let temp_storage = RowArena::new(); + let datums_local = datums.borrow_with(&row); + row_buf.packer().try_extend( + stream_key + .iter() + .map(|e| e.eval(&datums_local, &temp_storage)), + )?; + let key = row_buf.clone(); + row_buf + .packer() + .extend(stream_thinning.iter().map(|e| datums_local[*e])); + let value = row_buf.clone(); + Ok((key, value)) + } + }); - // TODO(vmarcos): We should implement further arrangement specialization here (#22104). - // By knowing how types propagate through joins we could specialize intermediate - // arrangements as well, either in values or eventually in keys. - let arranged = keyed.mz_arrange::>("JoinStage"); - joined = JoinedFlavor::Local(SpecializedArrangement::RowRow(arranged)); - } + errors.push(errs); - // Demultiplex the four different cross products of arrangement types we might have. - let arrangement = lookup_relation - .arrangement(&lookup_key[..]) - .expect("Arrangement absent despite explicit construction"); - match joined { - JoinedFlavor::Collection(_) => { - unreachable!("JoinedFlavor::Collection variant avoided at top of method"); + // TODO(vmarcos): We should implement further arrangement specialization here (#22104). + // By knowing how types propagate through joins we could specialize intermediate + // arrangements as well, either in values or eventually in keys. + let arranged = keyed.mz_arrange::>("JoinStage"); + joined = JoinedFlavor::Local(SpecializedArrangement::RowRow(arranged)); } - JoinedFlavor::Local(local) => match arrangement { - ArrangementFlavor::Local(oks, errs1) => { - let (oks, errs2) = dispatch_differential_join_inner_local_local( - join_spec, - local, - oks, - closure, - shutdown_token, - ); - errors.push(errs1.as_collection(|k, _v| k.clone())); - errors.extend(errs2); - oks - } - ArrangementFlavor::Trace(_gid, oks, errs1) => { - let (oks, errs2) = dispatch_differential_join_inner_local_trace( - join_spec, - local, - oks, - closure, - shutdown_token, - ); - errors.push(errs1.as_collection(|k, _v| k.clone())); - errors.extend(errs2); - oks - } - }, - JoinedFlavor::Trace(trace) => match arrangement { - ArrangementFlavor::Local(oks, errs1) => { - let (oks, errs2) = dispatch_differential_join_inner_trace_local( - join_spec, - trace, - oks, - closure, - shutdown_token, - ); - errors.push(errs1.as_collection(|k, _v| k.clone())); - errors.extend(errs2); - oks - } - ArrangementFlavor::Trace(_gid, oks, errs1) => { - let (oks, errs2) = dispatch_differential_join_inner_trace_trace( - join_spec, - trace, - oks, - closure, - shutdown_token, - ); - errors.push(errs1.as_collection(|k, _v| k.clone())); - errors.extend(errs2); - oks - } - }, - } -} -/// Dispatches valid combinations of arrangements where the type-specialized keys match. -fn dispatch_differential_join_inner_local_local( - join_spec: LinearJoinSpec, - prev_keyed: SpecializedArrangement, - next_input: SpecializedArrangement, - closure: JoinClosure, - shutdown_token: ShutdownToken, -) -> ( - Collection, - Option>, -) -where - G: Scope, - G::Timestamp: Lattice, -{ - match (prev_keyed, next_input) { - ( - SpecializedArrangement::RowUnit(prev_keyed), - SpecializedArrangement::RowUnit(next_input), - ) => differential_join_inner( - join_spec, - prev_keyed, - next_input, - None, - Some(vec![]), - Some(vec![]), - closure, - shutdown_token, - ), - ( - SpecializedArrangement::RowUnit(prev_keyed), - SpecializedArrangement::RowRow(next_input), - ) => differential_join_inner( - join_spec, - prev_keyed, - next_input, - None, - Some(vec![]), - None, - closure, - shutdown_token, - ), - ( - SpecializedArrangement::RowRow(prev_keyed), - SpecializedArrangement::RowUnit(next_input), - ) => differential_join_inner( - join_spec, - prev_keyed, - next_input, - None, - None, - Some(vec![]), - closure, - shutdown_token, - ), - ( - SpecializedArrangement::RowRow(prev_keyed), - SpecializedArrangement::RowRow(next_input), - ) => differential_join_inner( - join_spec, - prev_keyed, - next_input, - None, - None, - None, - closure, - shutdown_token, - ), - } -} - -/// Dispatches valid combinations of arrangement-trace where the type-specialized keys match. -fn dispatch_differential_join_inner_local_trace( - join_spec: LinearJoinSpec, - prev_keyed: SpecializedArrangement, - next_input: SpecializedArrangementImport, - closure: JoinClosure, - shutdown_token: ShutdownToken, -) -> ( - Collection, - Option>, -) -where - G: Scope, - T: Timestamp + Lattice, - G::Timestamp: Lattice + Refines, -{ - match (prev_keyed, next_input) { - ( - SpecializedArrangement::RowUnit(prev_keyed), - SpecializedArrangementImport::RowUnit(next_input), - ) => differential_join_inner( - join_spec, - prev_keyed, - next_input, - None, - Some(vec![]), - Some(vec![]), - closure, - shutdown_token, - ), - ( - SpecializedArrangement::RowUnit(prev_keyed), - SpecializedArrangementImport::RowRow(next_input), - ) => differential_join_inner( - join_spec, - prev_keyed, - next_input, - None, - Some(vec![]), - None, - closure, - shutdown_token, - ), - ( - SpecializedArrangement::RowRow(prev_keyed), - SpecializedArrangementImport::RowUnit(next_input), - ) => differential_join_inner( - join_spec, - prev_keyed, - next_input, - None, - None, - Some(vec![]), - closure, - shutdown_token, - ), - ( - SpecializedArrangement::RowRow(prev_keyed), - SpecializedArrangementImport::RowRow(next_input), - ) => differential_join_inner( - join_spec, - prev_keyed, - next_input, - None, - None, - None, - closure, - shutdown_token, - ), - } -} - -/// Dispatches valid combinations of trace-arrangement where the type-specialized keys match. -fn dispatch_differential_join_inner_trace_local( - join_spec: LinearJoinSpec, - prev_keyed: SpecializedArrangementImport, - next_input: SpecializedArrangement, - closure: JoinClosure, - shutdown_token: ShutdownToken, -) -> ( - Collection, - Option>, -) -where - G: Scope, - T: Timestamp + Lattice, - G::Timestamp: Lattice + Refines, -{ - match (prev_keyed, next_input) { - ( - SpecializedArrangementImport::RowUnit(prev_keyed), - SpecializedArrangement::RowUnit(next_input), - ) => differential_join_inner( - join_spec, - prev_keyed, - next_input, - None, - Some(vec![]), - Some(vec![]), - closure, - shutdown_token, - ), - ( - SpecializedArrangementImport::RowUnit(prev_keyed), - SpecializedArrangement::RowRow(next_input), - ) => differential_join_inner( - join_spec, - prev_keyed, - next_input, - None, - Some(vec![]), - None, - closure, - shutdown_token, - ), - ( - SpecializedArrangementImport::RowRow(prev_keyed), - SpecializedArrangement::RowUnit(next_input), - ) => differential_join_inner( - join_spec, - prev_keyed, - next_input, - None, - None, - Some(vec![]), - closure, - shutdown_token, - ), - ( - SpecializedArrangementImport::RowRow(prev_keyed), - SpecializedArrangement::RowRow(next_input), - ) => differential_join_inner( - join_spec, - prev_keyed, - next_input, - None, - None, - None, - closure, - shutdown_token, - ), - } -} + macro_rules! dispatch { + ($A:tt, $B:tt, $prev_keyed:expr, $next_input:expr) => {{ + let empty = Some(vec![]); + match ($prev_keyed, $next_input) { + ($A::RowUnit(prev_keyed), $B::RowUnit(next_input)) => self + .differential_join_inner( + prev_keyed, + next_input, + None, + empty.clone(), + empty, + closure, + ), + ($A::RowUnit(prev_keyed), $B::RowRow(next_input)) => self + .differential_join_inner( + prev_keyed, next_input, None, empty, None, closure, + ), + ($A::RowRow(prev_keyed), $B::RowUnit(next_input)) => self + .differential_join_inner( + prev_keyed, next_input, None, None, empty, closure, + ), + ($A::RowRow(prev_keyed), $B::RowRow(next_input)) => self + .differential_join_inner(prev_keyed, next_input, None, None, None, closure), + } + }}; + } -/// Dispatches valid combinations of trace-arrangement where the type-specialized keys match. -fn dispatch_differential_join_inner_trace_trace( - join_spec: LinearJoinSpec, - prev_keyed: SpecializedArrangementImport, - next_input: SpecializedArrangementImport, - closure: JoinClosure, - shutdown_token: ShutdownToken, -) -> ( - Collection, - Option>, -) -where - G: Scope, - T: Timestamp + Lattice, - G::Timestamp: Lattice + Refines, -{ - match (prev_keyed, next_input) { - ( - SpecializedArrangementImport::RowUnit(prev_keyed), - SpecializedArrangementImport::RowUnit(next_input), - ) => differential_join_inner( - join_spec, - prev_keyed, - next_input, - None, - Some(vec![]), - Some(vec![]), - closure, - shutdown_token, - ), - ( - SpecializedArrangementImport::RowUnit(prev_keyed), - SpecializedArrangementImport::RowRow(next_input), - ) => differential_join_inner( - join_spec, - prev_keyed, - next_input, - None, - Some(vec![]), - None, - closure, - shutdown_token, - ), - ( - SpecializedArrangementImport::RowRow(prev_keyed), - SpecializedArrangementImport::RowUnit(next_input), - ) => differential_join_inner( - join_spec, - prev_keyed, - next_input, - None, - None, - Some(vec![]), - closure, - shutdown_token, - ), - ( - SpecializedArrangementImport::RowRow(prev_keyed), - SpecializedArrangementImport::RowRow(next_input), - ) => differential_join_inner( - join_spec, - prev_keyed, - next_input, - None, - None, - None, - closure, - shutdown_token, - ), + // Demultiplex the four different cross products of arrangement types we might have. + let arrangement = lookup_relation + .arrangement(&lookup_key[..]) + .expect("Arrangement absent despite explicit construction"); + match joined { + JoinedFlavor::Collection(_) => { + unreachable!("JoinedFlavor::Collection variant avoided at top of method"); + } + JoinedFlavor::Local(local) => match arrangement { + ArrangementFlavor::Local(oks, errs1) => { + let (oks, errs2) = + dispatch!(SpecializedArrangement, SpecializedArrangement, local, oks); + errors.push(errs1.as_collection(|k, _v| k.clone())); + errors.extend(errs2); + oks + } + ArrangementFlavor::Trace(_gid, oks, errs1) => { + let (oks, errs2) = dispatch!( + SpecializedArrangement, + SpecializedArrangementImport, + local, + oks + ); + errors.push(errs1.as_collection(|k, _v| k.clone())); + errors.extend(errs2); + oks + } + }, + JoinedFlavor::Trace(trace) => match arrangement { + ArrangementFlavor::Local(oks, errs1) => { + let (oks, errs2) = dispatch!( + SpecializedArrangementImport, + SpecializedArrangement, + trace, + oks + ); + errors.push(errs1.as_collection(|k, _v| k.clone())); + errors.extend(errs2); + oks + } + ArrangementFlavor::Trace(_gid, oks, errs1) => { + let (oks, errs2) = dispatch!( + SpecializedArrangementImport, + SpecializedArrangementImport, + trace, + oks + ); + errors.push(errs1.as_collection(|k, _v| k.clone())); + errors.extend(errs2); + oks + } + }, + } } -} -/// Joins the arrangement for `next_input` to the arranged version of the -/// join of previous inputs. This is split into its own method to enable -/// reuse of code with different types of `next_input`. -/// -/// The return type includes an optional error collection, which may be -/// `None` if we can determine that `closure` cannot error. -fn differential_join_inner( - join_spec: LinearJoinSpec, - prev_keyed: Arranged, - next_input: Arranged, - key_types: Option>, - prev_types: Option>, - next_types: Option>, - closure: JoinClosure, - shutdown_token: ShutdownToken, -) -> ( - Collection, - Option>, -) -where - G: Scope, - G::Timestamp: Lattice + Refines, - T: Timestamp + Lattice, - Tr1: TraceReader + Clone + 'static, - Tr2: TraceReader + Clone + 'static, - K: Data + IntoRowByTypes, - V1: Data + IntoRowByTypes, - V2: Data + IntoRowByTypes, -{ - // Reuseable allocation for unpacking. - let mut datums = DatumVec::new(); - let mut row_builder = Row::default(); - - let mut key_buf = Row::default(); - let mut old_buf = Row::default(); - let mut new_buf = Row::default(); + /// Joins the arrangement for `next_input` to the arranged version of the + /// join of previous inputs. This is split into its own method to enable + /// reuse of code with different types of `next_input`. + /// + /// The return type includes an optional error collection, which may be + /// `None` if we can determine that `closure` cannot error. + fn differential_join_inner( + &self, + prev_keyed: Arranged, + next_input: Arranged, + key_types: Option>, + prev_types: Option>, + next_types: Option>, + closure: JoinClosure, + ) -> ( + Collection, + Option>, + ) + where + S: Scope, + Tr1: TraceReader + Clone + 'static, + Tr2: TraceReader + Clone + 'static, + K: Data + IntoRowByTypes, + V1: Data + IntoRowByTypes, + V2: Data + IntoRowByTypes, + { + // Reuseable allocation for unpacking. + let mut datums = DatumVec::new(); + let mut row_builder = Row::default(); + + let mut key_buf = Row::default(); + let mut old_buf = Row::default(); + let mut new_buf = Row::default(); + + if closure.could_error() { + let (oks, err) = self + .linear_join_spec + .render( + &prev_keyed, + &next_input, + self.shutdown_token.clone(), + move |key, old, new| { + let key = key.into_row(&mut key_buf, key_types.as_deref()); + let old = old.into_row(&mut old_buf, prev_types.as_deref()); + let new = new.into_row(&mut new_buf, next_types.as_deref()); + + let temp_storage = RowArena::new(); + let mut datums_local = datums.borrow_with_many(&[key, old, new]); + closure + .apply(&mut datums_local, &temp_storage, &mut row_builder) + .map_err(DataflowError::from) + .transpose() + }, + ) + .inner + .ok_err(|(x, t, d)| { + // TODO(mcsherry): consider `ok_err()` for `Collection`. + match x { + Ok(x) => Ok((x, t, d)), + Err(x) => Err((x, t, d)), + } + }); - if closure.could_error() { - let (oks, err) = join_spec - .render( + (oks.as_collection(), Some(err.as_collection())) + } else { + let oks = self.linear_join_spec.render( &prev_keyed, &next_input, - shutdown_token, + self.shutdown_token.clone(), move |key, old, new| { let key = key.into_row(&mut key_buf, key_types.as_deref()); let old = old.into_row(&mut old_buf, prev_types.as_deref()); @@ -700,38 +449,11 @@ where let mut datums_local = datums.borrow_with_many(&[key, old, new]); closure .apply(&mut datums_local, &temp_storage, &mut row_builder) - .map_err(DataflowError::from) - .transpose() + .expect("Closure claimed to never error") }, - ) - .inner - .ok_err(|(x, t, d)| { - // TODO(mcsherry): consider `ok_err()` for `Collection`. - match x { - Ok(x) => Ok((x, t, d)), - Err(x) => Err((x, t, d)), - } - }); - - (oks.as_collection(), Some(err.as_collection())) - } else { - let oks = join_spec.render( - &prev_keyed, - &next_input, - shutdown_token, - move |key, old, new| { - let key = key.into_row(&mut key_buf, key_types.as_deref()); - let old = old.into_row(&mut old_buf, prev_types.as_deref()); - let new = new.into_row(&mut new_buf, next_types.as_deref()); + ); - let temp_storage = RowArena::new(); - let mut datums_local = datums.borrow_with_many(&[key, old, new]); - closure - .apply(&mut datums_local, &temp_storage, &mut row_builder) - .expect("Closure claimed to never error") - }, - ); - - (oks, None) + (oks, None) + } } }