Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/branch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl<S: Scope, D: Data> Branch<S, D> for Stream<S, D> {
let mut output1_handle = output1.activate();
let mut output2_handle = output2.activate();

input.activate().for_each_time(|time, data| {
input.for_each_time(|time, data| {
let mut out1 = output1_handle.session(&time);
let mut out2 = output2_handle.session(&time);
for datum in data.flat_map(|d| d.drain(..)) {
Expand Down Expand Up @@ -115,7 +115,7 @@ impl<S: Scope, C: Container> BranchWhen<S::Timestamp> for StreamCore<S, C> {
let mut output1_handle = output1.activate();
let mut output2_handle = output2.activate();

input.activate().for_each_time(|time, data| {
input.for_each_time(|time, data| {
let mut out = if condition(time.time()) {
output2_handle.session(&time)
} else {
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/ok_err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl<S: Scope, C: Container + DrainContainer> OkErr<S, C> for StreamCore<S, C> {
let mut output1_handle = output1.activate();
let mut output2_handle = output2.activate();

input.activate().for_each_time(|time, data| {
input.for_each_time(|time, data| {
let mut out1 = output1_handle.session(&time);
let mut out2 = output2_handle.session(&time);
for datum in data.flat_map(|d| d.drain()) {
Expand Down
29 changes: 1 addition & 28 deletions timely/src/dataflow/operators/generic/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,6 @@ use crate::container::{CapacityContainerBuilder, PushInto};
use crate::dataflow::operators::InputCapability;
use crate::dataflow::operators::capability::CapabilityTrait;

#[must_use]
pub struct InputSession<'a, T: Timestamp, C, P: Pull<Message<T, C>>> {
input: &'a mut InputHandleCore<T, C, P>,
}

impl<'a, T: Timestamp, C: Accountable, P: Pull<Message<T, C>>> InputSession<'a, T, C, P> {
/// Iterates through distinct capabilities and the lists of containers associated with each.
pub fn for_each_time<F>(self, logic: F) where F: FnMut(InputCapability<T>, std::slice::IterMut::<C>), C: Default {
self.input.for_each_time(logic)
}
/// Iterates through pairs of capability and container.
///
/// The `for_each_time` method is equivalent, but groups containers by capability and is preferred,
/// in that it often leads to grouping work by capability, including the creation of output sessions.
pub fn for_each<F>(self, logic: F) where F: FnMut(InputCapability<T>, &mut C) {
self.input.for_each(logic)
}
}

/// Handle to an operator's input stream.
pub struct InputHandleCore<T: Timestamp, C, P: Pull<Message<T, C>>> {
pull_counter: PullCounter<T, C, P>,
Expand All @@ -53,15 +34,11 @@ pub struct InputHandleCore<T: Timestamp, C, P: Pull<Message<T, C>>> {
}

impl<T: Timestamp, C: Accountable, P: Pull<Message<T, C>>> InputHandleCore<T, C, P> {

/// Activates an input handle with a session that reorders inputs and must be drained.
pub fn activate(&mut self) -> InputSession<'_, T, C, P> { InputSession { input: self } }

/// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`.
/// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability.
/// Returns `None` when there's no more data available.
#[inline]
pub fn next(&mut self) -> Option<(InputCapability<T>, &mut C)> {
fn next(&mut self) -> Option<(InputCapability<T>, &mut C)> {
let internal = &self.internal;
let summaries = &self.summaries;
self.pull_counter.next_guarded().map(|(guard, bundle)| {
Expand Down Expand Up @@ -94,10 +71,6 @@ impl<T: Timestamp, C: Accountable, P: Pull<Message<T, C>>> InputHandleCore<T, C,
}
}

pub fn _access_pull_counter<T: Timestamp, C: Accountable, P: Pull<Message<T, C>>>(input: &mut InputHandleCore<T, C, P>) -> &mut PullCounter<T, C, P> {
&mut input.pull_counter
}

/// Constructs an input handle.
/// Declared separately so that it can be kept private when `InputHandle` is re-exported.
pub fn new_input_handle<T: Timestamp, C: Accountable, P: Pull<Message<T, C>>>(
Expand Down
52 changes: 26 additions & 26 deletions timely/src/dataflow/operators/generic/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::progress::frontier::MutableAntichain;
use crate::dataflow::channels::pact::ParallelizationContract;

use crate::dataflow::operators::generic::handles::{InputSession, OutputBuilderSession, OutputBuilder};
use crate::dataflow::operators::generic::handles::{InputHandleCore, OutputBuilderSession, OutputBuilder};
use crate::dataflow::operators::capability::Capability;

use crate::dataflow::{Scope, StreamCore};
Expand Down Expand Up @@ -57,7 +57,7 @@ pub trait Operator<G: Scope, C1> {
where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>),
L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>),
&mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
P: ParallelizationContract<G::Timestamp, C1>;

Expand Down Expand Up @@ -87,7 +87,7 @@ pub trait Operator<G: Scope, C1> {
/// });
/// ```
fn unary_notify<CB: ContainerBuilder,
L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
&mut OutputBuilderSession<'_, G::Timestamp, CB>,
&mut Notificator<G::Timestamp>)+'static,
P: ParallelizationContract<G::Timestamp, C1>>
Expand Down Expand Up @@ -123,7 +123,7 @@ pub trait Operator<G: Scope, C1> {
where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
&mut OutputBuilderSession<G::Timestamp, CB>)+'static,
P: ParallelizationContract<G::Timestamp, C1>;

Expand Down Expand Up @@ -180,8 +180,8 @@ pub trait Operator<G: Scope, C1> {
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut((InputSession<'_, G::Timestamp, C1, P1::Puller>, &MutableAntichain<G::Timestamp>),
(InputSession<'_, G::Timestamp, C2, P2::Puller>, &MutableAntichain<G::Timestamp>),
L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &MutableAntichain<G::Timestamp>),
(&mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &MutableAntichain<G::Timestamp>),
&mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>;
Expand Down Expand Up @@ -229,8 +229,8 @@ pub trait Operator<G: Scope, C1> {
/// ```
fn binary_notify<C2: Container,
CB: ContainerBuilder,
L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>,
InputSession<'_, G::Timestamp, C2, P2::Puller>,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
&mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
&mut OutputBuilderSession<'_, G::Timestamp, CB>,
&mut Notificator<G::Timestamp>)+'static,
P1: ParallelizationContract<G::Timestamp, C1>,
Expand Down Expand Up @@ -268,8 +268,8 @@ pub trait Operator<G: Scope, C1> {
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>,
InputSession<'_, G::Timestamp, C2, P2::Puller>,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
&mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
&mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>;
Expand Down Expand Up @@ -299,7 +299,7 @@ pub trait Operator<G: Scope, C1> {
/// ```
fn sink<L, P>(&self, pact: P, name: &str, logic: L)
where
L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>))+'static,
L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>))+'static,
P: ParallelizationContract<G::Timestamp, C1>;
}

Expand All @@ -309,7 +309,7 @@ impl<G: Scope, C1: Container> Operator<G, C1> for StreamCore<G, C1> {
where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>),
L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>),
&mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
P: ParallelizationContract<G::Timestamp, C1> {

Expand All @@ -326,15 +326,15 @@ impl<G: Scope, C1: Container> Operator<G, C1> for StreamCore<G, C1> {
let mut logic = constructor(capability, operator_info);
move |frontiers| {
let mut output_handle = output.activate();
logic((input.activate(), &frontiers[0]), &mut output_handle);
logic((&mut input, &frontiers[0]), &mut output_handle);
}
});

stream
}

fn unary_notify<CB: ContainerBuilder,
L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
&mut OutputBuilderSession<'_, G::Timestamp, CB>,
&mut Notificator<G::Timestamp>)+'static,
P: ParallelizationContract<G::Timestamp, C1>>
Expand All @@ -358,7 +358,7 @@ impl<G: Scope, C1: Container> Operator<G, C1> for StreamCore<G, C1> {
where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
&mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
P: ParallelizationContract<G::Timestamp, C1> {

Expand All @@ -374,7 +374,7 @@ impl<G: Scope, C1: Container> Operator<G, C1> for StreamCore<G, C1> {
// `capabilities` should be a single-element vector.
let capability = capabilities.pop().unwrap();
let mut logic = constructor(capability, operator_info);
move |_frontiers| logic(input.activate(), &mut output.activate())
move |_frontiers| logic(&mut input, &mut output.activate())
});

stream
Expand All @@ -385,8 +385,8 @@ impl<G: Scope, C1: Container> Operator<G, C1> for StreamCore<G, C1> {
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut((InputSession<'_, G::Timestamp, C1, P1::Puller>, &MutableAntichain<G::Timestamp>),
(InputSession<'_, G::Timestamp, C2, P2::Puller>, &MutableAntichain<G::Timestamp>),
L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &MutableAntichain<G::Timestamp>),
(&mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &MutableAntichain<G::Timestamp>),
&mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2> {
Expand All @@ -405,7 +405,7 @@ impl<G: Scope, C1: Container> Operator<G, C1> for StreamCore<G, C1> {
let mut logic = constructor(capability, operator_info);
move |frontiers| {
let mut output_handle = output.activate();
logic((input1.activate(), &frontiers[0]), (input2.activate(), &frontiers[1]), &mut output_handle);
logic((&mut input1, &frontiers[0]), (&mut input2, &frontiers[1]), &mut output_handle);
}
});

Expand All @@ -414,8 +414,8 @@ impl<G: Scope, C1: Container> Operator<G, C1> for StreamCore<G, C1> {

fn binary_notify<C2: Container,
CB: ContainerBuilder,
L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>,
InputSession<'_, G::Timestamp, C2, P2::Puller>,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
&mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
&mut OutputBuilderSession<'_, G::Timestamp, CB>,
&mut Notificator<G::Timestamp>)+'static,
P1: ParallelizationContract<G::Timestamp, C1>,
Expand Down Expand Up @@ -443,8 +443,8 @@ impl<G: Scope, C1: Container> Operator<G, C1> for StreamCore<G, C1> {
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>,
InputSession<'_, G::Timestamp, C2, P2::Puller>,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
&mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
&mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2> {
Expand All @@ -464,7 +464,7 @@ impl<G: Scope, C1: Container> Operator<G, C1> for StreamCore<G, C1> {
let mut logic = constructor(capability, operator_info);
move |_frontiers| {
let mut output_handle = output.activate();
logic(input1.activate(), input2.activate(), &mut output_handle);
logic(&mut input1, &mut input2, &mut output_handle);
}
});

Expand All @@ -473,15 +473,15 @@ impl<G: Scope, C1: Container> Operator<G, C1> for StreamCore<G, C1> {

fn sink<L, P>(&self, pact: P, name: &str, mut logic: L)
where
L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>))+'static,
L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>))+'static,
P: ParallelizationContract<G::Timestamp, C1> {

let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
let mut input = builder.new_input(self, pact);

builder.build(|_capabilities| {
move |frontiers| {
logic((input.activate(), &frontiers[0]));
logic((&mut input, &frontiers[0]));
}
});
}
Expand Down
Loading