timely/dataflow/operators/core/
exchange.rs1use crate::Container;
4use crate::progress::Timestamp;
5use crate::container::{DrainContainer, SizableContainer, PushInto};
6use crate::dataflow::channels::pact::ExchangeCore;
7use crate::dataflow::operators::generic::operator::Operator;
8use crate::dataflow::Stream;
9
10pub trait Exchange<C: DrainContainer> {
12 fn exchange<F>(self, route: F) -> Self
29 where
30 for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static;
31}
32
33impl<T: Timestamp, C> Exchange<C> for Stream<'_, T, C>
34where
35 C: Container
36 + SizableContainer
37 + DrainContainer
38 + Send
39 + crate::dataflow::channels::ContainerBytes
40 + for<'a> PushInto<C::Item<'a>>,
41{
42 fn exchange<F>(self, route: F) -> Self
43 where
44 for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static,
45 {
46 self.unary(ExchangeCore::new(route), "Exchange", |_, _| {
47 move |input, output| {
48 input.for_each_time(|time, data| {
49 output.session(&time).give_containers(data);
50 });
51 }
52 })
53 }
54}