1use std::{fmt::{self, Debug}, marker::PhantomData};
11use std::rc::Rc;
12
13use crate::{Container, container::{ContainerBuilder, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder, PushInto}};
14use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
15use crate::communication::{Push, Pull};
16use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
17use crate::dataflow::channels::Message;
18use crate::logging::{TimelyLogger as Logger, MessagesEvent};
19use crate::progress::Timestamp;
20use crate::worker::AsWorker;
21use crate::Data;
22
23pub trait ParallelizationContract<T, C> {
25 type Pusher: Push<Message<T, C>>+'static;
27 type Puller: Pull<Message<T, C>>+'static;
29 fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
31}
32
33#[derive(Debug)]
35pub struct Pipeline;
36
37impl<T: 'static, C: Container + 'static> ParallelizationContract<T, C> for Pipeline {
38 type Pusher = LogPusher<T, C, ThreadPusher<Message<T, C>>>;
39 type Puller = LogPuller<T, C, ThreadPuller<Message<T, C>>>;
40 fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
41 let (pusher, puller) = allocator.pipeline::<Message<T, C>>(identifier, address);
42 (LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
43 LogPuller::new(puller, allocator.index(), identifier, logging))
44 }
45}
46
47pub struct ExchangeCore<CB, F> { hash_func: F, phantom: PhantomData<CB> }
49
50pub type Exchange<D, F> = ExchangeCore<CapacityContainerBuilder<Vec<D>>, F>;
52
53impl<CB, F> ExchangeCore<CB, F>
54where
55 CB: LengthPreservingContainerBuilder,
56 for<'a> F: FnMut(&<CB::Container as Container>::Item<'a>)->u64
57{
58 pub fn new_core(func: F) -> ExchangeCore<CB, F> {
60 ExchangeCore {
61 hash_func: func,
62 phantom: PhantomData,
63 }
64 }
65}
66
67impl<C, F> ExchangeCore<CapacityContainerBuilder<C>, F>
68where
69 C: SizableContainer,
70 for<'a> F: FnMut(&C::Item<'a>)->u64
71{
72 pub fn new(func: F) -> ExchangeCore<CapacityContainerBuilder<C>, F> {
74 ExchangeCore {
75 hash_func: func,
76 phantom: PhantomData,
77 }
78 }
79}
80
81impl<T: Timestamp, CB, H: 'static> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
83where
84 CB: ContainerBuilder,
85 CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
86 CB::Container: Data + Send + crate::dataflow::channels::ContainerBytes,
87 for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
88{
89 type Pusher = ExchangePusher<T, CB, LogPusher<T, CB::Container, Box<dyn Push<Message<T, CB::Container>>>>, H>;
90 type Puller = LogPuller<T, CB::Container, Box<dyn Pull<Message<T, CB::Container>>>>;
91
92 fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
93 let (senders, receiver) = allocator.allocate::<Message<T, CB::Container>>(identifier, address);
94 let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
95 (ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
96 }
97}
98
99impl<C, F> Debug for ExchangeCore<C, F> {
100 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101 f.debug_struct("Exchange").finish()
102 }
103}
104
105#[derive(Debug)]
107pub struct LogPusher<T, C, P: Push<Message<T, C>>> {
108 pusher: P,
109 channel: usize,
110 counter: usize,
111 source: usize,
112 target: usize,
113 phantom: PhantomData<(T, C)>,
114 logging: Option<Logger>,
115}
116
117impl<T, C, P: Push<Message<T, C>>> LogPusher<T, C, P> {
118 pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
120 LogPusher {
121 pusher,
122 channel,
123 counter: 0,
124 source,
125 target,
126 phantom: PhantomData,
127 logging,
128 }
129 }
130}
131
132impl<T, C: Container, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<T, C, P> {
133 #[inline]
134 fn push(&mut self, pair: &mut Option<Message<T, C>>) {
135 if let Some(bundle) = pair {
136 self.counter += 1;
137
138 bundle.seq = self.counter - 1;
141 bundle.from = self.source;
142
143 if let Some(logger) = self.logging.as_ref() {
144 logger.log(MessagesEvent {
145 is_send: true,
146 channel: self.channel,
147 source: self.source,
148 target: self.target,
149 seq_no: self.counter - 1,
150 length: bundle.data.len(),
151 })
152 }
153 }
154
155 self.pusher.push(pair);
156 }
157}
158
159#[derive(Debug)]
161pub struct LogPuller<T, C, P: Pull<Message<T, C>>> {
162 puller: P,
163 channel: usize,
164 index: usize,
165 phantom: PhantomData<(T, C)>,
166 logging: Option<Logger>,
167}
168
169impl<T, C, P: Pull<Message<T, C>>> LogPuller<T, C, P> {
170 pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
172 LogPuller {
173 puller,
174 channel,
175 index,
176 phantom: PhantomData,
177 logging,
178 }
179 }
180}
181
182impl<T, C: Container, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<T, C, P> {
183 #[inline]
184 fn pull(&mut self) -> &mut Option<Message<T, C>> {
185 let result = self.puller.pull();
186 if let Some(bundle) = result {
187 let channel = self.channel;
188 let target = self.index;
189
190 if let Some(logger) = self.logging.as_ref() {
191 logger.log(MessagesEvent {
192 is_send: false,
193 channel,
194 source: bundle.from,
195 target,
196 seq_no: bundle.seq,
197 length: bundle.data.len(),
198 });
199 }
200 }
201
202 result
203 }
204}