timely/dataflow/channels/pushers/
exchange.rs

1//! The exchange pattern distributes pushed data between many target pushees.
2
3use crate::ContainerBuilder;
4use crate::communication::Push;
5use crate::container::{DrainContainer, PushInto};
6use crate::dataflow::channels::Message;
7
8/// Distribute containers to several pushers.
9///
10/// A distributor sits behind an exchange pusher, and partitions containers at a given time
11/// into several pushers. It can use [`Message::push_at`] to push its outputs at the desired
12/// pusher.
13///
14/// It needs to uphold progress tracking requirements. The count of the input container
15/// must be preserved across the output containers, from the first call to `partition` until the
16/// call to `flush` for a specific time stamp.
17pub trait Distributor<C> {
18    /// Partition the contents of `container` at `time` into the `pushers`.
19    fn partition<T: Clone, P: Push<Message<T, C>>>(&mut self, container: &mut C, time: &T, pushers: &mut [P]);
20    /// Flush any remaining contents into the `pushers` at time `time`.
21    fn flush<T: Clone, P: Push<Message<T, C>>>(&mut self, time: &T, pushers: &mut [P]);
22    /// Optionally release resources, such as memory.
23    fn relax(&mut self) { }
24}
25
26/// A distributor creating containers from a drainable container based
27/// on a hash function of the container's item.
28pub struct DrainContainerDistributor<CB, H> {
29    builders: Vec<CB>,
30    hash_func: H,
31}
32
33impl<CB: Default, H> DrainContainerDistributor<CB, H> {
34    /// Constructs a new `DrainContainerDistributor` with the given hash function for a number of
35    /// peers.
36    pub fn new(hash_func: H, peers: usize) -> Self {
37        Self {
38            builders: std::iter::repeat_with(Default::default).take(peers).collect(),
39            hash_func,
40        }
41    }
42}
43
44impl<CB, H> Distributor<CB::Container> for DrainContainerDistributor<CB, H>
45where
46    CB: ContainerBuilder<Container: DrainContainer> + for<'a> PushInto<<CB::Container as DrainContainer>::Item<'a>>,
47    for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64,
48{
49    fn partition<T: Clone, P: Push<Message<T, CB::Container>>>(&mut self, container: &mut CB::Container, time: &T, pushers: &mut [P]) {
50        debug_assert_eq!(self.builders.len(), pushers.len());
51        if pushers.len().is_power_of_two() {
52            let mask = (pushers.len() - 1) as u64;
53            for datum in container.drain() {
54                let index = ((self.hash_func)(&datum) & mask) as usize;
55                self.builders[index].push_into(datum);
56                while let Some(produced) = self.builders[index].extract() {
57                    Message::push_at(produced, time.clone(), &mut pushers[index]);
58                }
59            }
60        }
61        else {
62            let num_pushers = pushers.len() as u64;
63            for datum in container.drain() {
64                let index = ((self.hash_func)(&datum) % num_pushers) as usize;
65                self.builders[index].push_into(datum);
66                while let Some(produced) = self.builders[index].extract() {
67                    Message::push_at(produced, time.clone(), &mut pushers[index]);
68                }
69            }
70        }
71    }
72
73    fn flush<T: Clone, P: Push<Message<T, CB::Container>>>(&mut self, time: &T, pushers: &mut [P]) {
74        for (builder, pusher) in self.builders.iter_mut().zip(pushers.iter_mut()) {
75            while let Some(container) = builder.finish() {
76                Message::push_at(container, time.clone(), pusher);
77            }
78        }
79    }
80
81    fn relax(&mut self) {
82        for builder in &mut self.builders {
83            builder.relax();
84        }
85    }
86}
87
88// TODO : Software write combining
89/// Distributes records among target pushees according to a distributor.
90pub struct Exchange<T, P, D> {
91    pushers: Vec<P>,
92    current: Option<T>,
93    distributor: D,
94}
95
96impl<T: Clone, P, D>  Exchange<T, P, D> {
97    /// Allocates a new `Exchange` from a supplied set of pushers and a distributor.
98    pub fn new(pushers: Vec<P>, distributor: D) -> Exchange<T, P, D> {
99        Exchange {
100            pushers,
101            current: None,
102            distributor,
103        }
104    }
105}
106
107impl<T: Eq+Clone, C, P, D> Push<Message<T, C>> for Exchange<T, P, D>
108where
109    P: Push<Message<T, C>>,
110    D: Distributor<C>,
111{
112    #[inline(never)]
113    fn push(&mut self, message: &mut Option<Message<T, C>>) {
114        // if only one pusher, no exchange
115        if self.pushers.len() == 1 {
116            self.pushers[0].push(message);
117        }
118        else if let Some(message) = message {
119
120            let time = &message.time;
121            let data = &mut message.data;
122
123            // if the time isn't right, flush everything.
124            match self.current.as_ref() {
125                // We have a current time, and it is different from the new time.
126                Some(current_time) if current_time != time => {
127                    self.distributor.flush(current_time, &mut self.pushers);
128                    self.current = Some(time.clone());
129                }
130                // We had no time before, or flushed.
131                None => self.current = Some(time.clone()),
132                // Time didn't change since last call.
133                _ => {}
134            }
135
136            self.distributor.partition(data, time, &mut self.pushers);
137        }
138        else {
139            // flush
140            if let Some(time) = self.current.take() {
141                self.distributor.flush(&time, &mut self.pushers);
142            }
143            self.distributor.relax();
144            for index in 0..self.pushers.len() {
145                self.pushers[index].done();
146            }
147        }
148    }
149}