timely/dataflow/operators/core/
exchange.rs1use crate::Container;
4use crate::container::{DrainContainer, SizableContainer, PushInto};
5use crate::dataflow::channels::pact::ExchangeCore;
6use crate::dataflow::operators::generic::operator::Operator;
7use crate::dataflow::{Scope, StreamCore};
8
9pub trait Exchange<C: DrainContainer> {
11 fn exchange<F>(&self, route: F) -> Self
27 where
28 for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static;
29}
30
31impl<G: Scope, C> Exchange<C> for StreamCore<G, C>
32where
33 C: Container
34 + SizableContainer
35 + DrainContainer
36 + Send
37 + crate::dataflow::channels::ContainerBytes
38 + for<'a> PushInto<C::Item<'a>>,
39{
40 fn exchange<F>(&self, route: F) -> StreamCore<G, C>
41 where
42 for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static,
43 {
44 self.unary(ExchangeCore::new(route), "Exchange", |_, _| {
45 move |input, output| {
46 input.for_each(|time, data| {
47 output.session(&time).give_container(data);
48 });
49 }
50 })
51 }
52}