timely/dataflow/operators/core/
exchange.rs
1use crate::ExchangeData;
4use crate::container::{Container, SizableContainer, PushInto};
5use crate::dataflow::channels::pact::ExchangeCore;
6use crate::dataflow::operators::generic::operator::Operator;
7use crate::dataflow::{Scope, StreamCore};
8
9pub trait Exchange<C: Container> {
11 fn exchange<F>(&self, route: F) -> Self
27 where
28 for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static;
29}
30
31impl<G: Scope, C> Exchange<C> for StreamCore<G, C>
32where
33 C: SizableContainer + ExchangeData + crate::dataflow::channels::ContainerBytes,
34 C: for<'a> PushInto<C::Item<'a>>,
35
36{
37 fn exchange<F>(&self, route: F) -> StreamCore<G, C>
38 where
39 for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static,
40 {
41 self.unary(ExchangeCore::new(route), "Exchange", |_, _| {
42 move |input, output| {
43 input.for_each(|time, data| {
44 output.session(&time).give_container(data);
45 });
46 }
47 })
48 }
49}