timely/dataflow/channels/
pact.rs

1//! Parallelization contracts, describing requirements for data movement along dataflow edges.
2//!
3//! Pacts describe how data should be exchanged between workers, and implement a method which
4//! creates a pair of `Push` and `Pull` implementors from an `A: AsWorker`. These two endpoints
5//! respectively distribute and collect data among workers according to the pact.
6//!
7//! The only requirement of a pact is that it not alter the number of `D` records at each time `T`.
8//! The progress tracking logic assumes that this number is independent of the pact used.
9
10use std::fmt::Debug;
11use std::rc::Rc;
12
13use crate::Accountable;
14use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
15use crate::communication::{Push, Pull};
16use crate::dataflow::channels::Message;
17use crate::logging::TimelyLogger as Logger;
18use crate::worker::AsWorker;
19
20/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
21pub trait ParallelizationContract<T, C> {
22    /// Type implementing `Push` produced by this pact.
23    type Pusher: Push<Message<T, C>>+'static;
24    /// Type implementing `Pull` produced by this pact.
25    type Puller: Pull<Message<T, C>>+'static;
26    /// Allocates a matched pair of push and pull endpoints implementing the pact.
27    fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
28}
29
30/// A direct connection
31#[derive(Debug)]
32pub struct Pipeline;
33
34impl<T: 'static, C: Accountable + 'static> ParallelizationContract<T, C> for Pipeline {
35    type Pusher = LogPusher<ThreadPusher<Message<T, C>>>;
36    type Puller = LogPuller<ThreadPuller<Message<T, C>>>;
37    fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
38        let (pusher, puller) = allocator.pipeline::<Message<T, C>>(identifier, address);
39        (LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
40         LogPuller::new(puller, allocator.index(), identifier, logging))
41    }
42}
43
44pub use exchange::{ExchangeCore, Exchange};
45mod exchange {
46
47    use crate::Container;
48    use crate::container::{DrainContainer, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder};
49    use crate::dataflow::channels::pushers::exchange::DrainContainerDistributor;
50
51    use super::DistributorPact;
52
53    /// An exchange between multiple observers by data
54    pub type ExchangeCore<CB, F> = DistributorPact<Box<dyn FnOnce(usize) -> DrainContainerDistributor<CB, F>>>;
55
56    /// [ExchangeCore] specialized to vector-based containers.
57    pub type Exchange<D, F> = ExchangeCore<CapacityContainerBuilder<Vec<D>>, F>;
58
59    impl<CB, F> ExchangeCore<CB, F>
60    where
61        CB: LengthPreservingContainerBuilder,
62        CB::Container: DrainContainer,
63        for<'a> F: FnMut(&<CB::Container as DrainContainer>::Item<'a>)->u64 + 'static
64    {
65        /// Allocates a new `Exchange` pact from a distribution function.
66        pub fn new_core(func: F) -> ExchangeCore<CB, F> {
67            DistributorPact(Box::new(move |peers| DrainContainerDistributor::new(func, peers)))
68        }
69    }
70
71    impl<C, F> ExchangeCore<CapacityContainerBuilder<C>, F>
72    where
73        C: Container + SizableContainer + DrainContainer,
74        for<'a> F: FnMut(&C::Item<'a>)->u64 + 'static
75    {
76        /// Allocates a new `Exchange` pact from a distribution function.
77        pub fn new(func: F) -> ExchangeCore<CapacityContainerBuilder<C>, F> {
78            DistributorPact(Box::new(move |peers| DrainContainerDistributor::new(func, peers)))
79        }
80    }
81}
82
83pub use distributor::DistributorPact;
84/// Parallelization contract based on a `Distributor` implementation.
85mod distributor {
86
87    use std::rc::Rc;
88
89    use crate::Accountable;
90    use crate::communication::{Push, Pull};
91    use crate::dataflow::channels::pushers::{Exchange, exchange::Distributor};
92    use crate::dataflow::channels::{ContainerBytes, Message};
93    use crate::logging::TimelyLogger;
94    use crate::progress::Timestamp;
95    use crate::worker::AsWorker;
96
97    use super::{ParallelizationContract, LogPusher, LogPuller};
98
99    /// Intended to wrap a function from a `usize` to an `impl Distributor`.
100    ///
101    /// For a `D: Distributor<C>` and an appropriate builder `D::new(peers: usize)`,
102    /// a `DistributorPact(|peers| D::new(peers))` acts as a pact that will use `D`
103    /// and distribute containers of type `C`.
104    pub struct DistributorPact<B>(pub B);
105
106    impl<T, B, C, D> ParallelizationContract<T, C> for DistributorPact<B>
107    where
108        T: Timestamp,
109        B: FnOnce(usize) -> D,
110        C: Accountable + ContainerBytes + Send + 'static,
111        D: Distributor<C> + 'static,
112    {
113        type Pusher = Exchange<T, LogPusher<Box<dyn Push<Message<T, C>>>>, D>;
114        type Puller = LogPuller<Box<dyn Pull<Message<T, C>>>>;
115        fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<TimelyLogger>) -> (Self::Pusher, Self::Puller) {
116            let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
117            let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
118            let distributor = (self.0)(allocator.peers());
119            (Exchange::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
120        }
121    }
122}
123
124pub use push_pull::{LogPusher, LogPuller};
125mod push_pull {
126
127    use crate::Accountable;
128    use crate::communication::{Push, Pull};
129    use crate::dataflow::channels::Message;
130    use crate::logging::{TimelyLogger as Logger, MessagesEvent};
131
132    /// Wraps a `Message<T,D>` pusher to provide a `Push<(T, Content<D>)>`.
133    #[derive(Debug)]
134    pub struct LogPusher<P> {
135        pusher: P,
136        channel: usize,
137        counter: usize,
138        source: usize,
139        target: usize,
140        logging: Option<Logger>,
141    }
142
143    impl<P> LogPusher<P> {
144        /// Allocates a new pusher.
145        pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
146            LogPusher {
147                pusher,
148                channel,
149                counter: 0,
150                source,
151                target,
152                logging,
153            }
154        }
155    }
156
157    impl<T, C: Accountable, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<P> {
158        #[inline]
159        fn push(&mut self, pair: &mut Option<Message<T, C>>) {
160            if let Some(bundle) = pair {
161                self.counter += 1;
162
163                // Stamp the sequence number and source.
164                // FIXME: Awkward moment/logic.
165                bundle.seq = self.counter - 1;
166                bundle.from = self.source;
167
168                if let Some(logger) = self.logging.as_ref() {
169                    logger.log(MessagesEvent {
170                        is_send: true,
171                        channel: self.channel,
172                        source: self.source,
173                        target: self.target,
174                        seq_no: self.counter - 1,
175                        record_count: bundle.data.record_count(),
176                    })
177                }
178            }
179
180            self.pusher.push(pair);
181        }
182    }
183
184    /// Wraps a `Message<T,D>` puller to provide a `Pull<(T, Content<D>)>`.
185    #[derive(Debug)]
186    pub struct LogPuller<P> {
187        puller: P,
188        channel: usize,
189        index: usize,
190        logging: Option<Logger>,
191    }
192
193    impl<P> LogPuller<P> {
194        /// Allocates a new `Puller`.
195        pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
196            LogPuller {
197                puller,
198                channel,
199                index,
200                logging,
201            }
202        }
203    }
204
205    impl<T, C: Accountable, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<P> {
206        #[inline]
207        fn pull(&mut self) -> &mut Option<Message<T, C>> {
208            let result = self.puller.pull();
209            if let Some(bundle) = result {
210                let channel = self.channel;
211                let target = self.index;
212
213                if let Some(logger) = self.logging.as_ref() {
214                    logger.log(MessagesEvent {
215                        is_send: false,
216                        channel,
217                        source: bundle.from,
218                        target,
219                        seq_no: bundle.seq,
220                        record_count: bundle.data.record_count(),
221                    });
222                }
223            }
224
225            result
226        }
227    }
228}