timely/dataflow/channels/
pact.rs

1//! Parallelization contracts, describing requirements for data movement along dataflow edges.
2//!
3//! Pacts describe how data should be exchanged between workers, and implement a method which
4//! creates a pair of `Push` and `Pull` implementors from an `A: AsWorker`. These two endpoints
5//! respectively distribute and collect data among workers according to the pact.
6//!
7//! The only requirement of a pact is that it not alter the number of `D` records at each time `T`.
8//! The progress tracking logic assumes that this number is independent of the pact used.
9
10use std::{fmt::{self, Debug}, marker::PhantomData};
11use std::rc::Rc;
12
13use crate::{Container, container::{ContainerBuilder, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder, PushInto}};
14use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
15use crate::communication::{Push, Pull};
16use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
17use crate::dataflow::channels::Message;
18use crate::logging::{TimelyLogger as Logger, MessagesEvent};
19use crate::progress::Timestamp;
20use crate::worker::AsWorker;
21use crate::Data;
22
23/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
24pub trait ParallelizationContract<T, C> {
25    /// Type implementing `Push` produced by this pact.
26    type Pusher: Push<Message<T, C>>+'static;
27    /// Type implementing `Pull` produced by this pact.
28    type Puller: Pull<Message<T, C>>+'static;
29    /// Allocates a matched pair of push and pull endpoints implementing the pact.
30    fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
31}
32
33/// A direct connection
34#[derive(Debug)]
35pub struct Pipeline;
36
37impl<T: 'static, C: Container + 'static> ParallelizationContract<T, C> for Pipeline {
38    type Pusher = LogPusher<ThreadPusher<Message<T, C>>>;
39    type Puller = LogPuller<ThreadPuller<Message<T, C>>>;
40    fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
41        let (pusher, puller) = allocator.pipeline::<Message<T, C>>(identifier, address);
42        (LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
43         LogPuller::new(puller, allocator.index(), identifier, logging))
44    }
45}
46
47/// An exchange between multiple observers by data
48pub struct ExchangeCore<CB, F> { hash_func: F, phantom: PhantomData<CB> }
49
50/// [ExchangeCore] specialized to vector-based containers.
51pub type Exchange<D, F> = ExchangeCore<CapacityContainerBuilder<Vec<D>>, F>;
52
53impl<CB, F> ExchangeCore<CB, F>
54where
55    CB: LengthPreservingContainerBuilder,
56    for<'a> F: FnMut(&<CB::Container as Container>::Item<'a>)->u64
57{
58    /// Allocates a new `Exchange` pact from a distribution function.
59    pub fn new_core(func: F) -> ExchangeCore<CB, F> {
60        ExchangeCore {
61            hash_func:  func,
62            phantom:    PhantomData,
63        }
64    }
65}
66
67impl<C, F> ExchangeCore<CapacityContainerBuilder<C>, F>
68where
69    C: SizableContainer,
70    for<'a> F: FnMut(&C::Item<'a>)->u64
71{
72    /// Allocates a new `Exchange` pact from a distribution function.
73    pub fn new(func: F) -> ExchangeCore<CapacityContainerBuilder<C>, F> {
74        ExchangeCore {
75            hash_func:  func,
76            phantom:    PhantomData,
77        }
78    }
79}
80
81// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
82impl<T: Timestamp, CB, H: 'static> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
83where
84    CB: ContainerBuilder,
85    CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
86    CB::Container: Data + Send + crate::dataflow::channels::ContainerBytes,
87    for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
88{
89    type Pusher = ExchangePusher<T, CB, LogPusher<Box<dyn Push<Message<T, CB::Container>>>>, H>;
90    type Puller = LogPuller<Box<dyn Pull<Message<T, CB::Container>>>>;
91
92    fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
93        let (senders, receiver) = allocator.allocate::<Message<T, CB::Container>>(identifier, address);
94        let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
95        (ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
96    }
97}
98
99impl<C, F> Debug for ExchangeCore<C, F> {
100    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101        f.debug_struct("Exchange").finish()
102    }
103}
104
105/// Wraps a `Message<T,D>` pusher to provide a `Push<(T, Content<D>)>`.
106#[derive(Debug)]
107pub struct LogPusher<P> {
108    pusher: P,
109    channel: usize,
110    counter: usize,
111    source: usize,
112    target: usize,
113    logging: Option<Logger>,
114}
115
116impl<P> LogPusher<P> {
117    /// Allocates a new pusher.
118    pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
119        LogPusher {
120            pusher,
121            channel,
122            counter: 0,
123            source,
124            target,
125            logging,
126        }
127    }
128}
129
130impl<T, C: Container, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<P> {
131    #[inline]
132    fn push(&mut self, pair: &mut Option<Message<T, C>>) {
133        if let Some(bundle) = pair {
134            self.counter += 1;
135
136            // Stamp the sequence number and source.
137            // FIXME: Awkward moment/logic.
138            bundle.seq = self.counter - 1;
139            bundle.from = self.source;
140
141            if let Some(logger) = self.logging.as_ref() {
142                logger.log(MessagesEvent {
143                    is_send: true,
144                    channel: self.channel,
145                    source: self.source,
146                    target: self.target,
147                    seq_no: self.counter - 1,
148                    length: bundle.data.len(),
149                })
150            }
151        }
152
153        self.pusher.push(pair);
154    }
155}
156
157/// Wraps a `Message<T,D>` puller to provide a `Pull<(T, Content<D>)>`.
158#[derive(Debug)]
159pub struct LogPuller<P> {
160    puller: P,
161    channel: usize,
162    index: usize,
163    logging: Option<Logger>,
164}
165
166impl<P> LogPuller<P> {
167    /// Allocates a new `Puller`.
168    pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
169        LogPuller {
170            puller,
171            channel,
172            index,
173            logging,
174        }
175    }
176}
177
178impl<T, C: Container, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<P> {
179    #[inline]
180    fn pull(&mut self) -> &mut Option<Message<T, C>> {
181        let result = self.puller.pull();
182        if let Some(bundle) = result {
183            let channel = self.channel;
184            let target = self.index;
185
186            if let Some(logger) = self.logging.as_ref() {
187                logger.log(MessagesEvent {
188                    is_send: false,
189                    channel,
190                    source: bundle.from,
191                    target,
192                    seq_no: bundle.seq,
193                    length: bundle.data.len(),
194                });
195            }
196        }
197
198        result
199    }
200}