timely/dataflow/channels/
pact.rs1use std::{fmt::{self, Debug}, marker::PhantomData};
11use std::rc::Rc;
12
13use crate::Accountable;
14use crate::container::{ContainerBuilder, DrainContainer, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder, PushInto};
15use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
16use crate::communication::{Push, Pull};
17use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
18use crate::dataflow::channels::pushers::exchange::DrainContainerDistributor;
19use crate::dataflow::channels::Message;
20use crate::logging::{TimelyLogger as Logger, MessagesEvent};
21use crate::progress::Timestamp;
22use crate::worker::AsWorker;
23
24pub trait ParallelizationContract<T, C> {
26 type Pusher: Push<Message<T, C>>+'static;
28 type Puller: Pull<Message<T, C>>+'static;
30 fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
32}
33
34#[derive(Debug)]
36pub struct Pipeline;
37
38impl<T: 'static, C: Accountable + 'static> ParallelizationContract<T, C> for Pipeline {
39 type Pusher = LogPusher<ThreadPusher<Message<T, C>>>;
40 type Puller = LogPuller<ThreadPuller<Message<T, C>>>;
41 fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
42 let (pusher, puller) = allocator.pipeline::<Message<T, C>>(identifier, address);
43 (LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
44 LogPuller::new(puller, allocator.index(), identifier, logging))
45 }
46}
47
48pub struct ExchangeCore<CB, F> { hash_func: F, phantom: PhantomData<CB> }
50
51pub type Exchange<D, F> = ExchangeCore<CapacityContainerBuilder<Vec<D>>, F>;
53
54impl<CB, F> ExchangeCore<CB, F>
55where
56 CB: LengthPreservingContainerBuilder,
57 CB::Container: DrainContainer,
58 for<'a> F: FnMut(&<CB::Container as DrainContainer>::Item<'a>)->u64
59{
60 pub fn new_core(func: F) -> ExchangeCore<CB, F> {
62 ExchangeCore {
63 hash_func: func,
64 phantom: PhantomData,
65 }
66 }
67}
68
69impl<C, F> ExchangeCore<CapacityContainerBuilder<C>, F>
70where
71 C: SizableContainer + DrainContainer,
72 for<'a> F: FnMut(&C::Item<'a>)->u64
73{
74 pub fn new(func: F) -> ExchangeCore<CapacityContainerBuilder<C>, F> {
76 ExchangeCore {
77 hash_func: func,
78 phantom: PhantomData,
79 }
80 }
81}
82
83impl<T: Timestamp, CB, H> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
85where
86 CB: ContainerBuilder<Container: DrainContainer> + for<'a> PushInto<<CB::Container as DrainContainer>::Item<'a>>,
87 CB::Container: Send + crate::dataflow::channels::ContainerBytes,
88 for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64 + 'static,
89{
90 type Pusher = ExchangePusher<
91 T,
92 LogPusher<Box<dyn Push<Message<T, CB::Container>>>>,
93 DrainContainerDistributor<CB, H>
94 >;
95 type Puller = LogPuller<Box<dyn Pull<Message<T, CB::Container>>>>;
96
97 fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
98 let (senders, receiver) = allocator.allocate::<Message<T, CB::Container>>(identifier, address);
99 let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
100 let distributor = DrainContainerDistributor::new(self.hash_func, allocator.peers());
101 (ExchangePusher::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
102 }
103}
104
105impl<C, F> Debug for ExchangeCore<C, F> {
106 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107 f.debug_struct("Exchange").finish()
108 }
109}
110
111#[derive(Debug)]
113pub struct LogPusher<P> {
114 pusher: P,
115 channel: usize,
116 counter: usize,
117 source: usize,
118 target: usize,
119 logging: Option<Logger>,
120}
121
122impl<P> LogPusher<P> {
123 pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
125 LogPusher {
126 pusher,
127 channel,
128 counter: 0,
129 source,
130 target,
131 logging,
132 }
133 }
134}
135
136impl<T, C: Accountable, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<P> {
137 #[inline]
138 fn push(&mut self, pair: &mut Option<Message<T, C>>) {
139 if let Some(bundle) = pair {
140 self.counter += 1;
141
142 bundle.seq = self.counter - 1;
145 bundle.from = self.source;
146
147 if let Some(logger) = self.logging.as_ref() {
148 logger.log(MessagesEvent {
149 is_send: true,
150 channel: self.channel,
151 source: self.source,
152 target: self.target,
153 seq_no: self.counter - 1,
154 record_count: bundle.data.record_count(),
155 })
156 }
157 }
158
159 self.pusher.push(pair);
160 }
161}
162
163#[derive(Debug)]
165pub struct LogPuller<P> {
166 puller: P,
167 channel: usize,
168 index: usize,
169 logging: Option<Logger>,
170}
171
172impl<P> LogPuller<P> {
173 pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
175 LogPuller {
176 puller,
177 channel,
178 index,
179 logging,
180 }
181 }
182}
183
184impl<T, C: Accountable, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<P> {
185 #[inline]
186 fn pull(&mut self) -> &mut Option<Message<T, C>> {
187 let result = self.puller.pull();
188 if let Some(bundle) = result {
189 let channel = self.channel;
190 let target = self.index;
191
192 if let Some(logger) = self.logging.as_ref() {
193 logger.log(MessagesEvent {
194 is_send: false,
195 channel,
196 source: bundle.from,
197 target,
198 seq_no: bundle.seq,
199 record_count: bundle.data.record_count(),
200 });
201 }
202 }
203
204 result
205 }
206}