Spec:
@spec collapse(Enumerable.t, (Enumerable.t -> [Enumerable.t]), (Enumerable.t -> any)) :: Enumerable.t
def collapse(stream, grouper, reducer)
Example:
uniq_values = Streamz.collapse(values, &Stream.chunk(&1, 100), &Stream.uniq/1) |> Enum.uniq
Explanation:
By breaking values up into chunks of length 100, we call uniq on them in parallel. We then call uniq on the final result. This is a building block function. It could be used to easily do parallel reduce:
def parallel_reduce(stream, reducer) do
Streamz.collapse(stream, &Stream.chunk(&1, 100), reducer) |> reducer.()
end
Implementation:
This can be done right now serially via chunk, map and reduce. To do it in parallel, we can use the same building blocks as farm. group_by will break them out into a stream of streams. That stream of streams can be Stream.farm'd out and in the map we do Enum.reduce with the passed in reducer.
Why build this one? I see parallel reduce to have two components. The lazy/parallel component which must be stream based and the final Enum.reduce component which moves from a vector component to a scalar.
They could all be built together, but I think collapse may have other use cases where maintaining the stream is a good idea.
One issue here is that this stream will not emit any values until the stream has terminated... so maybe it doesn't belong in Stream. We'll see what happens here.
Spec:
Example:
Explanation:
By breaking values up into chunks of length 100, we call uniq on them in parallel. We then call uniq on the final result. This is a building block function. It could be used to easily do parallel reduce:
Implementation:
This can be done right now serially via chunk, map and reduce. To do it in parallel, we can use the same building blocks as farm. group_by will break them out into a stream of streams. That stream of streams can be Stream.farm'd out and in the map we do Enum.reduce with the passed in reducer.
Why build this one? I see parallel reduce to have two components. The lazy/parallel component which must be stream based and the final Enum.reduce component which moves from a vector component to a scalar.
They could all be built together, but I think collapse may have other use cases where maintaining the stream is a good idea.
One issue here is that this stream will not emit any values until the stream has terminated... so maybe it doesn't belong in Stream. We'll see what happens here.