timely/dataflow/channels/pushers/
exchange.rs1use crate::communication::Push;
4use crate::container::{ContainerBuilder, DrainContainer, PushInto};
5use crate::dataflow::channels::Message;
6
7pub trait Distributor<C> {
17 fn partition<T: Clone, P: Push<Message<T, C>>>(&mut self, container: &mut C, time: &T, pushers: &mut [P]);
19 fn flush<T: Clone, P: Push<Message<T, C>>>(&mut self, time: &T, pushers: &mut [P]);
21 fn relax(&mut self);
23}
24
25pub struct DrainContainerDistributor<CB, H> {
28 builders: Vec<CB>,
29 hash_func: H,
30}
31
32impl<CB: Default, H> DrainContainerDistributor<CB, H> {
33 pub fn new(hash_func: H, peers: usize) -> Self {
36 Self {
37 builders: std::iter::repeat_with(Default::default).take(peers).collect(),
38 hash_func,
39 }
40 }
41}
42
43impl<CB, H> Distributor<CB::Container> for DrainContainerDistributor<CB, H>
44where
45 CB: ContainerBuilder<Container: DrainContainer> + for<'a> PushInto<<CB::Container as DrainContainer>::Item<'a>>,
46 for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64,
47{
48 fn partition<T: Clone, P: Push<Message<T, CB::Container>>>(&mut self, container: &mut CB::Container, time: &T, pushers: &mut [P]) {
49 debug_assert_eq!(self.builders.len(), pushers.len());
50 if pushers.len().is_power_of_two() {
51 let mask = (pushers.len() - 1) as u64;
52 for datum in container.drain() {
53 let index = ((self.hash_func)(&datum) & mask) as usize;
54 self.builders[index].push_into(datum);
55 while let Some(produced) = self.builders[index].extract() {
56 Message::push_at(produced, time.clone(), &mut pushers[index]);
57 }
58 }
59 }
60 else {
61 let num_pushers = pushers.len() as u64;
62 for datum in container.drain() {
63 let index = ((self.hash_func)(&datum) % num_pushers) as usize;
64 self.builders[index].push_into(datum);
65 while let Some(produced) = self.builders[index].extract() {
66 Message::push_at(produced, time.clone(), &mut pushers[index]);
67 }
68 }
69 }
70 }
71
72 fn flush<T: Clone, P: Push<Message<T, CB::Container>>>(&mut self, time: &T, pushers: &mut [P]) {
73 for (builder, pusher) in self.builders.iter_mut().zip(pushers.iter_mut()) {
74 while let Some(container) = builder.finish() {
75 Message::push_at(container, time.clone(), pusher);
76 }
77 }
78 }
79
80 fn relax(&mut self) {
81 for builder in &mut self.builders {
82 builder.relax();
83 }
84 }
85}
86
87pub struct Exchange<T, P, D> {
90 pushers: Vec<P>,
91 current: Option<T>,
92 distributor: D,
93}
94
95impl<T: Clone, P, D> Exchange<T, P, D> {
96 pub fn new(pushers: Vec<P>, distributor: D) -> Exchange<T, P, D> {
98 Exchange {
99 pushers,
100 current: None,
101 distributor,
102 }
103 }
104}
105
106impl<T: Eq+Clone, C, P, D> Push<Message<T, C>> for Exchange<T, P, D>
107where
108 P: Push<Message<T, C>>,
109 D: Distributor<C>,
110{
111 #[inline(never)]
112 fn push(&mut self, message: &mut Option<Message<T, C>>) {
113 if self.pushers.len() == 1 {
115 self.pushers[0].push(message);
116 }
117 else if let Some(message) = message {
118
119 let time = &message.time;
120 let data = &mut message.data;
121
122 match self.current.as_ref() {
124 Some(current_time) if current_time != time => {
126 self.distributor.flush(current_time, &mut self.pushers);
127 self.current = Some(time.clone());
128 }
129 None => self.current = Some(time.clone()),
131 _ => {}
133 }
134
135 self.distributor.partition(data, time, &mut self.pushers);
136 }
137 else {
138 if let Some(time) = self.current.take() {
140 self.distributor.flush(&time, &mut self.pushers);
141 }
142 self.distributor.relax();
143 for index in 0..self.pushers.len() {
144 self.pushers[index].push(&mut None);
145 }
146 }
147 }
148}