1use timely::dataflow::operators::{Enter, Map};
21use timely::order::PartialOrder;
22use timely::dataflow::{Scope, Stream, StreamCore};
23use timely::dataflow::operators::generic::Operator;
24use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline, Exchange};
25use timely::progress::Timestamp;
26use timely::progress::Antichain;
27use timely::dataflow::operators::Capability;
28
29use crate::{Data, ExchangeData, Collection, AsCollection, Hashable};
30use crate::difference::Semigroup;
31use crate::lattice::Lattice;
32use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
33use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine};
34use crate::trace::implementations::merge_batcher::container::MergerChunk;
35
36use trace::wrappers::enter::{TraceEnter, BatchEnter,};
37use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
38use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
39
40use super::TraceAgent;
41
42pub struct Arranged<G, Tr>
47where
48 G: Scope<Timestamp: Lattice+Ord>,
49 Tr: TraceReader+Clone,
50{
51 pub stream: Stream<G, Tr::Batch>,
57 pub trace: Tr,
59 }
62
63impl<G, Tr> Clone for Arranged<G, Tr>
64where
65 G: Scope<Timestamp=Tr::Time>,
66 Tr: TraceReader + Clone,
67{
68 fn clone(&self) -> Self {
69 Arranged {
70 stream: self.stream.clone(),
71 trace: self.trace.clone(),
72 }
73 }
74}
75
76use ::timely::dataflow::scopes::Child;
77use ::timely::progress::timestamp::Refines;
78use timely::Container;
79use timely::container::PushInto;
80
81impl<G, Tr> Arranged<G, Tr>
82where
83 G: Scope<Timestamp=Tr::Time>,
84 Tr: TraceReader + Clone,
85{
86 pub fn enter<'a, TInner>(&self, child: &Child<'a, G, TInner>)
92 -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
93 where
94 TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone,
95 {
96 Arranged {
97 stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)),
98 trace: TraceEnter::make_from(self.trace.clone()),
99 }
100 }
101
102 pub fn enter_region<'a>(&self, child: &Child<'a, G, G::Timestamp>)
107 -> Arranged<Child<'a, G, G::Timestamp>, Tr> {
108 Arranged {
109 stream: self.stream.enter(child),
110 trace: self.trace.clone(),
111 }
112 }
113
114 pub fn enter_at<'a, TInner, F, P>(&self, child: &Child<'a, G, TInner>, logic: F, prior: P)
120 -> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
121 where
122 TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
123 F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static,
124 P: FnMut(&TInner)->Tr::Time+Clone+'static,
125 {
126 let logic1 = logic.clone();
127 let logic2 = logic.clone();
128 Arranged {
129 trace: TraceEnterAt::make_from(self.trace.clone(), logic1, prior),
130 stream: self.stream.enter(child).map(move |bw| BatchEnterAt::make_from(bw, logic2.clone())),
131 }
132 }
133
134 pub fn as_collection<D: Data, L>(&self, mut logic: L) -> Collection<G, D, Tr::Diff>
140 where
141 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static,
142 {
143 self.flat_map_ref(move |key, val| Some(logic(key,val)))
144 }
145
146 pub fn flat_map_ref<I, L>(&self, logic: L) -> Collection<G, I::Item, Tr::Diff>
151 where
152 I: IntoIterator<Item: Data>,
153 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
154 {
155 Self::flat_map_batches(&self.stream, logic)
156 }
157
158 pub fn flat_map_batches<I, L>(stream: &Stream<G, Tr::Batch>, mut logic: L) -> Collection<G, I::Item, Tr::Diff>
166 where
167 I: IntoIterator<Item: Data>,
168 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
169 {
170 stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
171 input.for_each(|time, data| {
172 let mut session = output.session(&time);
173 for wrapper in data.iter() {
174 let batch = &wrapper;
175 let mut cursor = batch.cursor();
176 while let Some(key) = cursor.get_key(batch) {
177 while let Some(val) = cursor.get_val(batch) {
178 for datum in logic(key, val) {
179 cursor.map_times(batch, |time, diff| {
180 session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff)));
181 });
182 }
183 cursor.step_val(batch);
184 }
185 cursor.step_key(batch);
186 }
187 }
188 });
189 })
190 .as_collection()
191 }
192}
193
194
195use crate::difference::Multiply;
196impl<G, T1> Arranged<G, T1>
198where
199 G: Scope<Timestamp=T1::Time>,
200 T1: TraceReader + Clone + 'static,
201{
202 pub fn join_core<T2,I,L>(&self, other: &Arranged<G,T2>, mut result: L) -> Collection<G,I::Item,<T1::Diff as Multiply<T2::Diff>>::Output>
204 where
205 T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>,Time=T1::Time>+Clone+'static,
206 T1::Diff: Multiply<T2::Diff, Output: Semigroup+'static>,
207 I: IntoIterator<Item: Data>,
208 L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>)->I+'static
209 {
210 let result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: T2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &T2::Diff| {
211 let t = t.clone();
212 let r = (r1.clone()).multiply(r2);
213 result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
214 };
215 self.join_core_internal_unsafe(other, result)
216 }
217 pub fn join_core_internal_unsafe<T2,I,L,D,ROut> (&self, other: &Arranged<G,T2>, mut result: L) -> Collection<G,D,ROut>
219 where
220 T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>, Time=T1::Time>+Clone+'static,
221 D: Data,
222 ROut: Semigroup+'static,
223 I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
224 L: FnMut(T1::Key<'_>, T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static,
225 {
226 use crate::operators::join::join_traces;
227 join_traces::<_, _, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
228 self,
229 other,
230 move |k, v1, v2, t, d1, d2, c| {
231 for datum in result(k, v1, v2, t, d1, d2) {
232 c.give(datum);
233 }
234 }
235 )
236 .as_collection()
237 }
238}
239
240use crate::difference::Abelian;
242impl<G, T1> Arranged<G, T1>
243where
244 G: Scope<Timestamp = T1::Time>,
245 T1: TraceReader + Clone + 'static,
246{
247 pub fn reduce_abelian<L, Bu, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
249 where
250 T1: TraceReader<KeyOwn: Ord>,
251 T2: for<'a> Trace<
252 Key<'a>= T1::Key<'a>,
253 KeyOwn=T1::KeyOwn,
254 ValOwn: Data,
255 Time=T1::Time,
256 Diff: Abelian,
257 >+'static,
258 Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
259 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
260 {
261 self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
262 if !input.is_empty() {
263 logic(key, input, change);
264 }
265 change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
266 crate::consolidation::consolidate(change);
267 })
268 }
269
270 pub fn reduce_core<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
272 where
273 T1: TraceReader<KeyOwn: Ord>,
274 T2: for<'a> Trace<
275 Key<'a>=T1::Key<'a>,
276 KeyOwn=T1::KeyOwn,
277 ValOwn: Data,
278 Time=T1::Time,
279 >+'static,
280 Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
281 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
282 {
283 use crate::operators::reduce::reduce_trace;
284 reduce_trace::<_,_,Bu,_,_>(self, name, logic)
285 }
286}
287
288
289impl<'a, G, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
290where
291 G: Scope<Timestamp=Tr::Time>,
292 Tr: TraceReader + Clone,
293{
294 pub fn leave_region(&self) -> Arranged<G, Tr> {
299 use timely::dataflow::operators::Leave;
300 Arranged {
301 stream: self.stream.leave(),
302 trace: self.trace.clone(),
303 }
304 }
305}
306
307pub trait Arrange<G, C>
309where
310 G: Scope<Timestamp: Lattice>,
311{
312 fn arrange<Ba, Bu, Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
314 where
315 Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
316 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
317 Tr: Trace<Time=G::Timestamp> + 'static,
318 {
319 self.arrange_named::<Ba, Bu, Tr>("Arrange")
320 }
321
322 fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
324 where
325 Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
326 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
327 Tr: Trace<Time=G::Timestamp> + 'static,
328 ;
329}
330
331impl<G, K, V, R> Arrange<G, Vec<((K, V), G::Timestamp, R)>> for Collection<G, (K, V), R>
332where
333 G: Scope<Timestamp: Lattice>,
334 K: ExchangeData + Hashable,
335 V: ExchangeData,
336 R: ExchangeData + Semigroup,
337{
338 fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
339 where
340 Ba: Batcher<Input=Vec<((K, V), G::Timestamp, R)>, Time=G::Timestamp> + 'static,
341 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
342 Tr: Trace<Time=G::Timestamp> + 'static,
343 {
344 let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
345 arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name)
346 }
347}
348
349pub fn arrange_core<G, P, Ba, Bu, Tr>(stream: &StreamCore<G, Ba::Input>, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
355where
356 G: Scope<Timestamp: Lattice>,
357 P: ParallelizationContract<G::Timestamp, Ba::Input>,
358 Ba: Batcher<Time=G::Timestamp,Input: Container> + 'static,
359 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
360 Tr: Trace<Time=G::Timestamp>+'static,
361{
362 let mut reader: Option<TraceAgent<Tr>> = None;
378
379 let reader_ref = &mut reader;
381 let scope = stream.scope();
382
383 let stream = stream.unary_frontier(pact, name, move |_capability, info| {
384
385 let logger = scope.logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
387
388 let mut batcher = Ba::new(logger.clone(), info.global_id);
390
391 let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
393
394 let activator = Some(scope.activator_for(info.address.clone()));
395 let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
396 if let Some(exert_logic) = scope.config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
398 empty_trace.set_exert_logic(exert_logic);
399 }
400
401 let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
402
403 *reader_ref = Some(reader_local);
404
405 let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
407
408 move |input, output| {
409
410 input.for_each(|cap, data| {
415 capabilities.insert(cap.retain());
416 batcher.push_container(data);
417 });
418
419 assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier()));
426
427 if prev_frontier.borrow() != input.frontier().frontier() {
431 if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {
445
446 let mut upper = Antichain::new(); for (index, capability) in capabilities.elements().iter().enumerate() {
450
451 if !input.frontier().less_equal(capability.time()) {
452
453 upper.clear();
457 for time in input.frontier().frontier().iter() {
458 upper.insert(time.clone());
459 }
460 for other_capability in &capabilities.elements()[(index + 1) .. ] {
461 upper.insert(other_capability.time().clone());
462 }
463
464 let batch = batcher.seal::<Bu>(upper.clone());
466
467 writer.insert(batch.clone(), Some(capability.time().clone()));
468
469 output.session(&capabilities.elements()[index]).give(batch);
471 }
472 }
473
474 let mut new_capabilities = Antichain::new();
480 for time in batcher.frontier().iter() {
481 if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
482 new_capabilities.insert(capability.delayed(time));
483 }
484 else {
485 panic!("failed to find capability");
486 }
487 }
488
489 capabilities = new_capabilities;
490 }
491 else {
492 let _batch = batcher.seal::<Bu>(input.frontier().frontier().to_owned());
494 writer.seal(input.frontier().frontier().to_owned());
495 }
496
497 prev_frontier.clear();
498 prev_frontier.extend(input.frontier().frontier().iter().cloned());
499 }
500
501 writer.exert();
502 }
503 });
504
505 Arranged { stream, trace: reader.unwrap() }
506}
507
508impl<G, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, Vec<((K, ()), G::Timestamp, R)>> for Collection<G, K, R>
509where
510 G: Scope<Timestamp: Lattice+Ord>,
511{
512 fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
513 where
514 Ba: Batcher<Input=Vec<((K,()),G::Timestamp,R)>, Time=G::Timestamp> + 'static,
515 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
516 Tr: Trace<Time=G::Timestamp> + 'static,
517 {
518 let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into());
519 arrange_core::<_,_,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name)
520 }
521}
522
523pub trait ArrangeByKey<G: Scope, K: Data+Hashable, V: Data, R: Ord+Semigroup+'static>
529where
530 G: Scope<Timestamp: Lattice+Ord>,
531{
532 fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
538
539 fn arrange_by_key_named(&self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
541}
542
543impl<G, K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData+Semigroup> ArrangeByKey<G, K, V, R> for Collection<G, (K,V), R>
544where
545 G: Scope<Timestamp: Lattice+Ord>,
546{
547 fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
548 self.arrange_by_key_named("ArrangeByKey")
549 }
550
551 fn arrange_by_key_named(&self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
552 self.arrange_named::<ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(name)
553 }
554}
555
556pub trait ArrangeBySelf<G, K: Data+Hashable, R: Ord+Semigroup+'static>
562where
563 G: Scope<Timestamp: Lattice+Ord>,
564{
565 fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
571
572 fn arrange_by_self_named(&self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
574}
575
576
577impl<G, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> ArrangeBySelf<G, K, R> for Collection<G, K, R>
578where
579 G: Scope<Timestamp: Lattice+Ord>,
580{
581 fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
582 self.arrange_by_self_named("ArrangeBySelf")
583 }
584
585 fn arrange_by_self_named(&self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
586 self.map(|k| (k, ()))
587 .arrange_named::<KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(name)
588 }
589}