timely/dataflow/channels/
pact.rs1use std::fmt::Debug;
11use std::rc::Rc;
12
13use crate::Accountable;
14use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
15use crate::communication::{Push, Pull};
16use crate::dataflow::channels::Message;
17use crate::logging::TimelyLogger as Logger;
18use crate::worker::AsWorker;
19
20pub trait ParallelizationContract<T, C> {
22 type Pusher: Push<Message<T, C>>+'static;
24 type Puller: Pull<Message<T, C>>+'static;
26 fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
28}
29
30#[derive(Debug)]
32pub struct Pipeline;
33
34impl<T: 'static, C: Accountable + 'static> ParallelizationContract<T, C> for Pipeline {
35 type Pusher = LogPusher<ThreadPusher<Message<T, C>>>;
36 type Puller = LogPuller<ThreadPuller<Message<T, C>>>;
37 fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
38 let (pusher, puller) = allocator.pipeline::<Message<T, C>>(identifier, address);
39 (LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
40 LogPuller::new(puller, allocator.index(), identifier, logging))
41 }
42}
43
44pub use exchange::{ExchangeCore, Exchange};
45mod exchange {
46
47 use crate::Container;
48 use crate::container::{DrainContainer, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder};
49 use crate::dataflow::channels::pushers::exchange::DrainContainerDistributor;
50
51 use super::DistributorPact;
52
53 pub type ExchangeCore<CB, F> = DistributorPact<Box<dyn FnOnce(usize) -> DrainContainerDistributor<CB, F>>>;
55
56 pub type Exchange<D, F> = ExchangeCore<CapacityContainerBuilder<Vec<D>>, F>;
58
59 impl<CB, F> ExchangeCore<CB, F>
60 where
61 CB: LengthPreservingContainerBuilder,
62 CB::Container: DrainContainer,
63 for<'a> F: FnMut(&<CB::Container as DrainContainer>::Item<'a>)->u64 + 'static
64 {
65 pub fn new_core(func: F) -> ExchangeCore<CB, F> {
67 DistributorPact(Box::new(move |peers| DrainContainerDistributor::new(func, peers)))
68 }
69 }
70
71 impl<C, F> ExchangeCore<CapacityContainerBuilder<C>, F>
72 where
73 C: Container + SizableContainer + DrainContainer,
74 for<'a> F: FnMut(&C::Item<'a>)->u64 + 'static
75 {
76 pub fn new(func: F) -> ExchangeCore<CapacityContainerBuilder<C>, F> {
78 DistributorPact(Box::new(move |peers| DrainContainerDistributor::new(func, peers)))
79 }
80 }
81}
82
83pub use distributor::DistributorPact;
84mod distributor {
86
87 use std::rc::Rc;
88
89 use crate::Accountable;
90 use crate::communication::{Push, Pull};
91 use crate::dataflow::channels::pushers::{Exchange, exchange::Distributor};
92 use crate::dataflow::channels::{ContainerBytes, Message};
93 use crate::logging::TimelyLogger;
94 use crate::progress::Timestamp;
95 use crate::worker::AsWorker;
96
97 use super::{ParallelizationContract, LogPusher, LogPuller};
98
99 pub struct DistributorPact<B>(pub B);
105
106 impl<T, B, C, D> ParallelizationContract<T, C> for DistributorPact<B>
107 where
108 T: Timestamp,
109 B: FnOnce(usize) -> D,
110 C: Accountable + ContainerBytes + Send + 'static,
111 D: Distributor<C> + 'static,
112 {
113 type Pusher = Exchange<T, LogPusher<Box<dyn Push<Message<T, C>>>>, D>;
114 type Puller = LogPuller<Box<dyn Pull<Message<T, C>>>>;
115 fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<TimelyLogger>) -> (Self::Pusher, Self::Puller) {
116 let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
117 let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
118 let distributor = (self.0)(allocator.peers());
119 (Exchange::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
120 }
121 }
122}
123
124pub use push_pull::{LogPusher, LogPuller};
125mod push_pull {
126
127 use crate::Accountable;
128 use crate::communication::{Push, Pull};
129 use crate::dataflow::channels::Message;
130 use crate::logging::{TimelyLogger as Logger, MessagesEvent};
131
132 #[derive(Debug)]
134 pub struct LogPusher<P> {
135 pusher: P,
136 channel: usize,
137 counter: usize,
138 source: usize,
139 target: usize,
140 logging: Option<Logger>,
141 }
142
143 impl<P> LogPusher<P> {
144 pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
146 LogPusher {
147 pusher,
148 channel,
149 counter: 0,
150 source,
151 target,
152 logging,
153 }
154 }
155 }
156
157 impl<T, C: Accountable, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<P> {
158 #[inline]
159 fn push(&mut self, pair: &mut Option<Message<T, C>>) {
160 if let Some(bundle) = pair {
161 self.counter += 1;
162
163 bundle.seq = self.counter - 1;
166 bundle.from = self.source;
167
168 if let Some(logger) = self.logging.as_ref() {
169 logger.log(MessagesEvent {
170 is_send: true,
171 channel: self.channel,
172 source: self.source,
173 target: self.target,
174 seq_no: self.counter - 1,
175 record_count: bundle.data.record_count(),
176 })
177 }
178 }
179
180 self.pusher.push(pair);
181 }
182 }
183
184 #[derive(Debug)]
186 pub struct LogPuller<P> {
187 puller: P,
188 channel: usize,
189 index: usize,
190 logging: Option<Logger>,
191 }
192
193 impl<P> LogPuller<P> {
194 pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
196 LogPuller {
197 puller,
198 channel,
199 index,
200 logging,
201 }
202 }
203 }
204
205 impl<T, C: Accountable, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<P> {
206 #[inline]
207 fn pull(&mut self) -> &mut Option<Message<T, C>> {
208 let result = self.puller.pull();
209 if let Some(bundle) = result {
210 let channel = self.channel;
211 let target = self.index;
212
213 if let Some(logger) = self.logging.as_ref() {
214 logger.log(MessagesEvent {
215 is_send: false,
216 channel,
217 source: bundle.from,
218 target,
219 seq_no: bundle.seq,
220 record_count: bundle.data.record_count(),
221 });
222 }
223 }
224
225 result
226 }
227 }
228}