timely/dataflow/channels/
pact.rs

1//! Parallelization contracts, describing requirements for data movement along dataflow edges.
2//!
3//! Pacts describe how data should be exchanged between workers, and implement a method which
4//! creates a pair of `Push` and `Pull` implementors from an `A: AsWorker`. These two endpoints
5//! respectively distribute and collect data among workers according to the pact.
6//!
7//! The only requirement of a pact is that it not alter the number of `D` records at each time `T`.
8//! The progress tracking logic assumes that this number is independent of the pact used.
9
10use std::{fmt::{self, Debug}, marker::PhantomData};
11use std::rc::Rc;
12
13use crate::{Container, container::{ContainerBuilder, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder, PushInto}};
14use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
15use crate::communication::{Push, Pull};
16use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
17use crate::dataflow::channels::Message;
18use crate::logging::{TimelyLogger as Logger, MessagesEvent};
19use crate::progress::Timestamp;
20use crate::worker::AsWorker;
21use crate::Data;
22
23/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
24pub trait ParallelizationContract<T, C> {
25    /// Type implementing `Push` produced by this pact.
26    type Pusher: Push<Message<T, C>>+'static;
27    /// Type implementing `Pull` produced by this pact.
28    type Puller: Pull<Message<T, C>>+'static;
29    /// Allocates a matched pair of push and pull endpoints implementing the pact.
30    fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
31}
32
33/// A direct connection
34#[derive(Debug)]
35pub struct Pipeline;
36
37impl<T: 'static, C: Container + 'static> ParallelizationContract<T, C> for Pipeline {
38    type Pusher = LogPusher<T, C, ThreadPusher<Message<T, C>>>;
39    type Puller = LogPuller<T, C, ThreadPuller<Message<T, C>>>;
40    fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
41        let (pusher, puller) = allocator.pipeline::<Message<T, C>>(identifier, address);
42        (LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
43         LogPuller::new(puller, allocator.index(), identifier, logging))
44    }
45}
46
47/// An exchange between multiple observers by data
48pub struct ExchangeCore<CB, F> { hash_func: F, phantom: PhantomData<CB> }
49
50/// [ExchangeCore] specialized to vector-based containers.
51pub type Exchange<D, F> = ExchangeCore<CapacityContainerBuilder<Vec<D>>, F>;
52
53impl<CB, F> ExchangeCore<CB, F>
54where
55    CB: LengthPreservingContainerBuilder,
56    for<'a> F: FnMut(&<CB::Container as Container>::Item<'a>)->u64
57{
58    /// Allocates a new `Exchange` pact from a distribution function.
59    pub fn new_core(func: F) -> ExchangeCore<CB, F> {
60        ExchangeCore {
61            hash_func:  func,
62            phantom:    PhantomData,
63        }
64    }
65}
66
67impl<C, F> ExchangeCore<CapacityContainerBuilder<C>, F>
68where
69    C: SizableContainer,
70    for<'a> F: FnMut(&C::Item<'a>)->u64
71{
72    /// Allocates a new `Exchange` pact from a distribution function.
73    pub fn new(func: F) -> ExchangeCore<CapacityContainerBuilder<C>, F> {
74        ExchangeCore {
75            hash_func:  func,
76            phantom:    PhantomData,
77        }
78    }
79}
80
81// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
82impl<T: Timestamp, CB, H: 'static> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
83where
84    CB: ContainerBuilder,
85    CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
86    CB::Container: Data + Send + crate::dataflow::channels::ContainerBytes,
87    for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
88{
89    type Pusher = ExchangePusher<T, CB, LogPusher<T, CB::Container, Box<dyn Push<Message<T, CB::Container>>>>, H>;
90    type Puller = LogPuller<T, CB::Container, Box<dyn Pull<Message<T, CB::Container>>>>;
91
92    fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
93        let (senders, receiver) = allocator.allocate::<Message<T, CB::Container>>(identifier, address);
94        let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
95        (ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
96    }
97}
98
99impl<C, F> Debug for ExchangeCore<C, F> {
100    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101        f.debug_struct("Exchange").finish()
102    }
103}
104
105/// Wraps a `Message<T,D>` pusher to provide a `Push<(T, Content<D>)>`.
106#[derive(Debug)]
107pub struct LogPusher<T, C, P: Push<Message<T, C>>> {
108    pusher: P,
109    channel: usize,
110    counter: usize,
111    source: usize,
112    target: usize,
113    phantom: PhantomData<(T, C)>,
114    logging: Option<Logger>,
115}
116
117impl<T, C, P: Push<Message<T, C>>> LogPusher<T, C, P> {
118    /// Allocates a new pusher.
119    pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
120        LogPusher {
121            pusher,
122            channel,
123            counter: 0,
124            source,
125            target,
126            phantom: PhantomData,
127            logging,
128        }
129    }
130}
131
132impl<T, C: Container, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<T, C, P> {
133    #[inline]
134    fn push(&mut self, pair: &mut Option<Message<T, C>>) {
135        if let Some(bundle) = pair {
136            self.counter += 1;
137
138            // Stamp the sequence number and source.
139            // FIXME: Awkward moment/logic.
140            bundle.seq = self.counter - 1;
141            bundle.from = self.source;
142
143            if let Some(logger) = self.logging.as_ref() {
144                logger.log(MessagesEvent {
145                    is_send: true,
146                    channel: self.channel,
147                    source: self.source,
148                    target: self.target,
149                    seq_no: self.counter - 1,
150                    length: bundle.data.len(),
151                })
152            }
153        }
154
155        self.pusher.push(pair);
156    }
157}
158
159/// Wraps a `Message<T,D>` puller to provide a `Pull<(T, Content<D>)>`.
160#[derive(Debug)]
161pub struct LogPuller<T, C, P: Pull<Message<T, C>>> {
162    puller: P,
163    channel: usize,
164    index: usize,
165    phantom: PhantomData<(T, C)>,
166    logging: Option<Logger>,
167}
168
169impl<T, C, P: Pull<Message<T, C>>> LogPuller<T, C, P> {
170    /// Allocates a new `Puller`.
171    pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
172        LogPuller {
173            puller,
174            channel,
175            index,
176            phantom: PhantomData,
177            logging,
178        }
179    }
180}
181
182impl<T, C: Container, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<T, C, P> {
183    #[inline]
184    fn pull(&mut self) -> &mut Option<Message<T, C>> {
185        let result = self.puller.pull();
186        if let Some(bundle) = result {
187            let channel = self.channel;
188            let target = self.index;
189
190            if let Some(logger) = self.logging.as_ref() {
191                logger.log(MessagesEvent {
192                    is_send: false,
193                    channel,
194                    source: bundle.from,
195                    target,
196                    seq_no: bundle.seq,
197                    length: bundle.data.len(),
198                });
199            }
200        }
201
202        result
203    }
204}