timely/dataflow/channels/
pact.rs

1//! Parallelization contracts, describing requirements for data movement along dataflow edges.
2//!
3//! Pacts describe how data should be exchanged between workers, and implement a method which
4//! creates a pair of `Push` and `Pull` implementors from an `A: AsWorker`. These two endpoints
5//! respectively distribute and collect data among workers according to the pact.
6//!
7//! The only requirement of a pact is that it not alter the number of `D` records at each time `T`.
8//! The progress tracking logic assumes that this number is independent of the pact used.
9
10use std::{fmt::{self, Debug}, marker::PhantomData};
11use std::rc::Rc;
12
13use crate::Accountable;
14use crate::container::{ContainerBuilder, DrainContainer, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder, PushInto};
15use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
16use crate::communication::{Push, Pull};
17use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
18use crate::dataflow::channels::pushers::exchange::DrainContainerDistributor;
19use crate::dataflow::channels::Message;
20use crate::logging::{TimelyLogger as Logger, MessagesEvent};
21use crate::progress::Timestamp;
22use crate::worker::AsWorker;
23
24/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
25pub trait ParallelizationContract<T, C> {
26    /// Type implementing `Push` produced by this pact.
27    type Pusher: Push<Message<T, C>>+'static;
28    /// Type implementing `Pull` produced by this pact.
29    type Puller: Pull<Message<T, C>>+'static;
30    /// Allocates a matched pair of push and pull endpoints implementing the pact.
31    fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
32}
33
34/// A direct connection
35#[derive(Debug)]
36pub struct Pipeline;
37
38impl<T: 'static, C: Accountable + 'static> ParallelizationContract<T, C> for Pipeline {
39    type Pusher = LogPusher<ThreadPusher<Message<T, C>>>;
40    type Puller = LogPuller<ThreadPuller<Message<T, C>>>;
41    fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
42        let (pusher, puller) = allocator.pipeline::<Message<T, C>>(identifier, address);
43        (LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
44         LogPuller::new(puller, allocator.index(), identifier, logging))
45    }
46}
47
48/// An exchange between multiple observers by data
49pub struct ExchangeCore<CB, F> { hash_func: F, phantom: PhantomData<CB> }
50
51/// [ExchangeCore] specialized to vector-based containers.
52pub type Exchange<D, F> = ExchangeCore<CapacityContainerBuilder<Vec<D>>, F>;
53
54impl<CB, F> ExchangeCore<CB, F>
55where
56    CB: LengthPreservingContainerBuilder,
57    CB::Container: DrainContainer,
58    for<'a> F: FnMut(&<CB::Container as DrainContainer>::Item<'a>)->u64
59{
60    /// Allocates a new `Exchange` pact from a distribution function.
61    pub fn new_core(func: F) -> ExchangeCore<CB, F> {
62        ExchangeCore {
63            hash_func:  func,
64            phantom:    PhantomData,
65        }
66    }
67}
68
69impl<C, F> ExchangeCore<CapacityContainerBuilder<C>, F>
70where
71    C: SizableContainer + DrainContainer,
72    for<'a> F: FnMut(&C::Item<'a>)->u64
73{
74    /// Allocates a new `Exchange` pact from a distribution function.
75    pub fn new(func: F) -> ExchangeCore<CapacityContainerBuilder<C>, F> {
76        ExchangeCore {
77            hash_func:  func,
78            phantom:    PhantomData,
79        }
80    }
81}
82
83// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
84impl<T: Timestamp, CB, H> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
85where
86    CB: ContainerBuilder<Container: DrainContainer> + for<'a> PushInto<<CB::Container as DrainContainer>::Item<'a>>,
87    CB::Container: Send + crate::dataflow::channels::ContainerBytes,
88    for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64 + 'static,
89{
90    type Pusher = ExchangePusher<
91        T,
92        LogPusher<Box<dyn Push<Message<T, CB::Container>>>>,
93        DrainContainerDistributor<CB, H>
94    >;
95    type Puller = LogPuller<Box<dyn Pull<Message<T, CB::Container>>>>;
96
97    fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
98        let (senders, receiver) = allocator.allocate::<Message<T, CB::Container>>(identifier, address);
99        let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
100        let distributor = DrainContainerDistributor::new(self.hash_func, allocator.peers());
101        (ExchangePusher::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
102    }
103}
104
105impl<C, F> Debug for ExchangeCore<C, F> {
106    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107        f.debug_struct("Exchange").finish()
108    }
109}
110
111/// Wraps a `Message<T,D>` pusher to provide a `Push<(T, Content<D>)>`.
112#[derive(Debug)]
113pub struct LogPusher<P> {
114    pusher: P,
115    channel: usize,
116    counter: usize,
117    source: usize,
118    target: usize,
119    logging: Option<Logger>,
120}
121
122impl<P> LogPusher<P> {
123    /// Allocates a new pusher.
124    pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
125        LogPusher {
126            pusher,
127            channel,
128            counter: 0,
129            source,
130            target,
131            logging,
132        }
133    }
134}
135
136impl<T, C: Accountable, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<P> {
137    #[inline]
138    fn push(&mut self, pair: &mut Option<Message<T, C>>) {
139        if let Some(bundle) = pair {
140            self.counter += 1;
141
142            // Stamp the sequence number and source.
143            // FIXME: Awkward moment/logic.
144            bundle.seq = self.counter - 1;
145            bundle.from = self.source;
146
147            if let Some(logger) = self.logging.as_ref() {
148                logger.log(MessagesEvent {
149                    is_send: true,
150                    channel: self.channel,
151                    source: self.source,
152                    target: self.target,
153                    seq_no: self.counter - 1,
154                    record_count: bundle.data.record_count(),
155                })
156            }
157        }
158
159        self.pusher.push(pair);
160    }
161}
162
163/// Wraps a `Message<T,D>` puller to provide a `Pull<(T, Content<D>)>`.
164#[derive(Debug)]
165pub struct LogPuller<P> {
166    puller: P,
167    channel: usize,
168    index: usize,
169    logging: Option<Logger>,
170}
171
172impl<P> LogPuller<P> {
173    /// Allocates a new `Puller`.
174    pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
175        LogPuller {
176            puller,
177            channel,
178            index,
179            logging,
180        }
181    }
182}
183
184impl<T, C: Accountable, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<P> {
185    #[inline]
186    fn pull(&mut self) -> &mut Option<Message<T, C>> {
187        let result = self.puller.pull();
188        if let Some(bundle) = result {
189            let channel = self.channel;
190            let target = self.index;
191
192            if let Some(logger) = self.logging.as_ref() {
193                logger.log(MessagesEvent {
194                    is_send: false,
195                    channel,
196                    source: bundle.from,
197                    target,
198                    seq_no: bundle.seq,
199                    record_count: bundle.data.record_count(),
200                });
201            }
202        }
203
204        result
205    }
206}