timely/dataflow/channels/
pact.rs1use std::{fmt::{self, Debug}, marker::PhantomData};
11use std::rc::Rc;
12
13use crate::{Container, container::{ContainerBuilder, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder, PushInto}};
14use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
15use crate::communication::{Push, Pull};
16use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
17use crate::dataflow::channels::Message;
18use crate::logging::{TimelyLogger as Logger, MessagesEvent};
19use crate::progress::Timestamp;
20use crate::worker::AsWorker;
21use crate::Data;
22
23pub trait ParallelizationContract<T, C> {
25 type Pusher: Push<Message<T, C>>+'static;
27 type Puller: Pull<Message<T, C>>+'static;
29 fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
31}
32
33#[derive(Debug)]
35pub struct Pipeline;
36
37impl<T: 'static, C: Container + 'static> ParallelizationContract<T, C> for Pipeline {
38 type Pusher = LogPusher<ThreadPusher<Message<T, C>>>;
39 type Puller = LogPuller<ThreadPuller<Message<T, C>>>;
40 fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
41 let (pusher, puller) = allocator.pipeline::<Message<T, C>>(identifier, address);
42 (LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
43 LogPuller::new(puller, allocator.index(), identifier, logging))
44 }
45}
46
47pub struct ExchangeCore<CB, F> { hash_func: F, phantom: PhantomData<CB> }
49
50pub type Exchange<D, F> = ExchangeCore<CapacityContainerBuilder<Vec<D>>, F>;
52
53impl<CB, F> ExchangeCore<CB, F>
54where
55 CB: LengthPreservingContainerBuilder,
56 for<'a> F: FnMut(&<CB::Container as Container>::Item<'a>)->u64
57{
58 pub fn new_core(func: F) -> ExchangeCore<CB, F> {
60 ExchangeCore {
61 hash_func: func,
62 phantom: PhantomData,
63 }
64 }
65}
66
67impl<C, F> ExchangeCore<CapacityContainerBuilder<C>, F>
68where
69 C: SizableContainer,
70 for<'a> F: FnMut(&C::Item<'a>)->u64
71{
72 pub fn new(func: F) -> ExchangeCore<CapacityContainerBuilder<C>, F> {
74 ExchangeCore {
75 hash_func: func,
76 phantom: PhantomData,
77 }
78 }
79}
80
81impl<T: Timestamp, CB, H: 'static> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
83where
84 CB: ContainerBuilder,
85 CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
86 CB::Container: Data + Send + crate::dataflow::channels::ContainerBytes,
87 for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
88{
89 type Pusher = ExchangePusher<T, CB, LogPusher<Box<dyn Push<Message<T, CB::Container>>>>, H>;
90 type Puller = LogPuller<Box<dyn Pull<Message<T, CB::Container>>>>;
91
92 fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
93 let (senders, receiver) = allocator.allocate::<Message<T, CB::Container>>(identifier, address);
94 let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
95 (ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
96 }
97}
98
99impl<C, F> Debug for ExchangeCore<C, F> {
100 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101 f.debug_struct("Exchange").finish()
102 }
103}
104
105#[derive(Debug)]
107pub struct LogPusher<P> {
108 pusher: P,
109 channel: usize,
110 counter: usize,
111 source: usize,
112 target: usize,
113 logging: Option<Logger>,
114}
115
116impl<P> LogPusher<P> {
117 pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
119 LogPusher {
120 pusher,
121 channel,
122 counter: 0,
123 source,
124 target,
125 logging,
126 }
127 }
128}
129
130impl<T, C: Container, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<P> {
131 #[inline]
132 fn push(&mut self, pair: &mut Option<Message<T, C>>) {
133 if let Some(bundle) = pair {
134 self.counter += 1;
135
136 bundle.seq = self.counter - 1;
139 bundle.from = self.source;
140
141 if let Some(logger) = self.logging.as_ref() {
142 logger.log(MessagesEvent {
143 is_send: true,
144 channel: self.channel,
145 source: self.source,
146 target: self.target,
147 seq_no: self.counter - 1,
148 length: bundle.data.len(),
149 })
150 }
151 }
152
153 self.pusher.push(pair);
154 }
155}
156
157#[derive(Debug)]
159pub struct LogPuller<P> {
160 puller: P,
161 channel: usize,
162 index: usize,
163 logging: Option<Logger>,
164}
165
166impl<P> LogPuller<P> {
167 pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
169 LogPuller {
170 puller,
171 channel,
172 index,
173 logging,
174 }
175 }
176}
177
178impl<T, C: Container, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<P> {
179 #[inline]
180 fn pull(&mut self) -> &mut Option<Message<T, C>> {
181 let result = self.puller.pull();
182 if let Some(bundle) = result {
183 let channel = self.channel;
184 let target = self.index;
185
186 if let Some(logger) = self.logging.as_ref() {
187 logger.log(MessagesEvent {
188 is_send: false,
189 channel,
190 source: bundle.from,
191 target,
192 seq_no: bundle.seq,
193 length: bundle.data.len(),
194 });
195 }
196 }
197
198 result
199 }
200}