timely/dataflow/channels/pushers/
exchange.rs1use crate::ContainerBuilder;
4use crate::communication::Push;
5use crate::container::{DrainContainer, PushInto};
6use crate::dataflow::channels::Message;
7
8pub trait Distributor<C> {
18 fn partition<T: Clone, P: Push<Message<T, C>>>(&mut self, container: &mut C, time: &T, pushers: &mut [P]);
20 fn flush<T: Clone, P: Push<Message<T, C>>>(&mut self, time: &T, pushers: &mut [P]);
22 fn relax(&mut self) { }
24}
25
26pub struct DrainContainerDistributor<CB, H> {
29 builders: Vec<CB>,
30 hash_func: H,
31}
32
33impl<CB: Default, H> DrainContainerDistributor<CB, H> {
34 pub fn new(hash_func: H, peers: usize) -> Self {
37 Self {
38 builders: std::iter::repeat_with(Default::default).take(peers).collect(),
39 hash_func,
40 }
41 }
42}
43
44impl<CB, H> Distributor<CB::Container> for DrainContainerDistributor<CB, H>
45where
46 CB: ContainerBuilder<Container: DrainContainer> + for<'a> PushInto<<CB::Container as DrainContainer>::Item<'a>>,
47 for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64,
48{
49 fn partition<T: Clone, P: Push<Message<T, CB::Container>>>(&mut self, container: &mut CB::Container, time: &T, pushers: &mut [P]) {
50 debug_assert_eq!(self.builders.len(), pushers.len());
51 if pushers.len().is_power_of_two() {
52 let mask = (pushers.len() - 1) as u64;
53 for datum in container.drain() {
54 let index = ((self.hash_func)(&datum) & mask) as usize;
55 self.builders[index].push_into(datum);
56 while let Some(produced) = self.builders[index].extract() {
57 Message::push_at(produced, time.clone(), &mut pushers[index]);
58 }
59 }
60 }
61 else {
62 let num_pushers = pushers.len() as u64;
63 for datum in container.drain() {
64 let index = ((self.hash_func)(&datum) % num_pushers) as usize;
65 self.builders[index].push_into(datum);
66 while let Some(produced) = self.builders[index].extract() {
67 Message::push_at(produced, time.clone(), &mut pushers[index]);
68 }
69 }
70 }
71 }
72
73 fn flush<T: Clone, P: Push<Message<T, CB::Container>>>(&mut self, time: &T, pushers: &mut [P]) {
74 for (builder, pusher) in self.builders.iter_mut().zip(pushers.iter_mut()) {
75 while let Some(container) = builder.finish() {
76 Message::push_at(container, time.clone(), pusher);
77 }
78 }
79 }
80
81 fn relax(&mut self) {
82 for builder in &mut self.builders {
83 builder.relax();
84 }
85 }
86}
87
88pub struct Exchange<T, P, D> {
91 pushers: Vec<P>,
92 current: Option<T>,
93 distributor: D,
94}
95
96impl<T: Clone, P, D> Exchange<T, P, D> {
97 pub fn new(pushers: Vec<P>, distributor: D) -> Exchange<T, P, D> {
99 Exchange {
100 pushers,
101 current: None,
102 distributor,
103 }
104 }
105}
106
107impl<T: Eq+Clone, C, P, D> Push<Message<T, C>> for Exchange<T, P, D>
108where
109 P: Push<Message<T, C>>,
110 D: Distributor<C>,
111{
112 #[inline(never)]
113 fn push(&mut self, message: &mut Option<Message<T, C>>) {
114 if self.pushers.len() == 1 {
116 self.pushers[0].push(message);
117 }
118 else if let Some(message) = message {
119
120 let time = &message.time;
121 let data = &mut message.data;
122
123 match self.current.as_ref() {
125 Some(current_time) if current_time != time => {
127 self.distributor.flush(current_time, &mut self.pushers);
128 self.current = Some(time.clone());
129 }
130 None => self.current = Some(time.clone()),
132 _ => {}
134 }
135
136 self.distributor.partition(data, time, &mut self.pushers);
137 }
138 else {
139 if let Some(time) = self.current.take() {
141 self.distributor.flush(&time, &mut self.pushers);
142 }
143 self.distributor.relax();
144 for index in 0..self.pushers.len() {
145 self.pushers[index].done();
146 }
147 }
148 }
149}