timely/dataflow/channels/pushers/
exchange.rs

1//! The exchange pattern distributes pushed data between many target pushees.
2
3use crate::communication::Push;
4use crate::container::{ContainerBuilder, DrainContainer, PushInto};
5use crate::dataflow::channels::Message;
6
7/// Distribute containers to several pushers.
8///
9/// A distributor sits behind an exchange pusher, and partitions containers at a given time
10/// into several pushers. It can use [`Message::push_at`] to push its outputs at the desired
11/// pusher.
12///
13/// It needs to uphold progress tracking requirements. The count of the input container
14/// must be preserved across the output containers, from the first call to `partition` until the
15/// call to `flush` for a specific time stamp.
16pub trait Distributor<C> {
17    /// Partition the contents of `container` at `time` into the `pushers`.
18    fn partition<T: Clone, P: Push<Message<T, C>>>(&mut self, container: &mut C, time: &T, pushers: &mut [P]);
19    /// Flush any remaining contents into the `pushers` at time `time`.
20    fn flush<T: Clone, P: Push<Message<T, C>>>(&mut self, time: &T, pushers: &mut [P]);
21    /// Optionally release resources, such as memory.
22    fn relax(&mut self);
23}
24
25/// A distributor creating containers from a drainable container based
26/// on a hash function of the container's item.
27pub struct DrainContainerDistributor<CB, H> {
28    builders: Vec<CB>,
29    hash_func: H,
30}
31
32impl<CB: Default, H> DrainContainerDistributor<CB, H> {
33    /// Constructs a new `DrainContainerDistributor` with the given hash function for a number of
34    /// peers.
35    pub fn new(hash_func: H, peers: usize) -> Self {
36        Self {
37            builders: std::iter::repeat_with(Default::default).take(peers).collect(),
38            hash_func,
39        }
40    }
41}
42
43impl<CB, H> Distributor<CB::Container> for DrainContainerDistributor<CB, H>
44where
45    CB: ContainerBuilder<Container: DrainContainer> + for<'a> PushInto<<CB::Container as DrainContainer>::Item<'a>>,
46    for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64,
47{
48    fn partition<T: Clone, P: Push<Message<T, CB::Container>>>(&mut self, container: &mut CB::Container, time: &T, pushers: &mut [P]) {
49        debug_assert_eq!(self.builders.len(), pushers.len());
50        if pushers.len().is_power_of_two() {
51            let mask = (pushers.len() - 1) as u64;
52            for datum in container.drain() {
53                let index = ((self.hash_func)(&datum) & mask) as usize;
54                self.builders[index].push_into(datum);
55                while let Some(produced) = self.builders[index].extract() {
56                    Message::push_at(produced, time.clone(), &mut pushers[index]);
57                }
58            }
59        }
60        else {
61            let num_pushers = pushers.len() as u64;
62            for datum in container.drain() {
63                let index = ((self.hash_func)(&datum) % num_pushers) as usize;
64                self.builders[index].push_into(datum);
65                while let Some(produced) = self.builders[index].extract() {
66                    Message::push_at(produced, time.clone(), &mut pushers[index]);
67                }
68            }
69        }
70    }
71
72    fn flush<T: Clone, P: Push<Message<T, CB::Container>>>(&mut self, time: &T, pushers: &mut [P]) {
73        for (builder, pusher) in self.builders.iter_mut().zip(pushers.iter_mut()) {
74            while let Some(container) = builder.finish() {
75                Message::push_at(container, time.clone(), pusher);
76            }
77        }
78    }
79
80    fn relax(&mut self) {
81        for builder in &mut self.builders {
82            builder.relax();
83        }
84    }
85}
86
87// TODO : Software write combining
88/// Distributes records among target pushees according to a distributor.
89pub struct Exchange<T, P, D> {
90    pushers: Vec<P>,
91    current: Option<T>,
92    distributor: D,
93}
94
95impl<T: Clone, P, D>  Exchange<T, P, D> {
96    /// Allocates a new `Exchange` from a supplied set of pushers and a distributor.
97    pub fn new(pushers: Vec<P>, distributor: D) -> Exchange<T, P, D> {
98        Exchange {
99            pushers,
100            current: None,
101            distributor,
102        }
103    }
104}
105
106impl<T: Eq+Clone, C, P, D> Push<Message<T, C>> for Exchange<T, P, D>
107where
108    P: Push<Message<T, C>>,
109    D: Distributor<C>,
110{
111    #[inline(never)]
112    fn push(&mut self, message: &mut Option<Message<T, C>>) {
113        // if only one pusher, no exchange
114        if self.pushers.len() == 1 {
115            self.pushers[0].push(message);
116        }
117        else if let Some(message) = message {
118
119            let time = &message.time;
120            let data = &mut message.data;
121
122            // if the time isn't right, flush everything.
123            match self.current.as_ref() {
124                // We have a current time, and it is different from the new time.
125                Some(current_time) if current_time != time => {
126                    self.distributor.flush(current_time, &mut self.pushers);
127                    self.current = Some(time.clone());
128                }
129                // We had no time before, or flushed.
130                None => self.current = Some(time.clone()),
131                // Time didn't change since last call.
132                _ => {}
133            }
134
135            self.distributor.partition(data, time, &mut self.pushers);
136        }
137        else {
138            // flush
139            if let Some(time) = self.current.take() {
140                self.distributor.flush(&time, &mut self.pushers);
141            }
142            self.distributor.relax();
143            for index in 0..self.pushers.len() {
144                self.pushers[index].push(&mut None);
145            }
146        }
147    }
148}