1use timely::dataflow::operators::{Enter, Map};
21use timely::order::PartialOrder;
22use timely::dataflow::{Scope, Stream, StreamCore};
23use timely::dataflow::operators::generic::Operator;
24use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline, Exchange};
25use timely::progress::Timestamp;
26use timely::progress::Antichain;
27use timely::dataflow::operators::Capability;
28
29use crate::{Data, ExchangeData, Collection, AsCollection, Hashable, IntoOwned};
30use crate::difference::Semigroup;
31use crate::lattice::Lattice;
32use crate::trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Builder, Cursor};
33use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine};
34
35use trace::wrappers::enter::{TraceEnter, BatchEnter,};
36use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
37use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
38use trace::wrappers::filter::{TraceFilter, BatchFilter};
39
40use super::TraceAgent;
41
42pub struct Arranged<G: Scope, Tr>
47where
48 G::Timestamp: Lattice+Ord,
49 Tr: TraceReader+Clone,
50{
51 pub stream: Stream<G, Tr::Batch>,
57 pub trace: Tr,
59 }
62
63impl<G, Tr> Clone for Arranged<G, Tr>
64where
65 G: Scope<Timestamp=Tr::Time>,
66 Tr: TraceReader + Clone,
67{
68 fn clone(&self) -> Self {
69 Arranged {
70 stream: self.stream.clone(),
71 trace: self.trace.clone(),
72 }
73 }
74}
75
76use ::timely::dataflow::scopes::Child;
77use ::timely::progress::timestamp::Refines;
78use timely::Container;
79use timely::container::PushInto;
80
81impl<G, Tr> Arranged<G, Tr>
82where
83 G: Scope<Timestamp=Tr::Time>,
84 Tr: TraceReader + Clone,
85{
86 pub fn enter<'a, TInner>(&self, child: &Child<'a, G, TInner>)
92 -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
93 where
94 TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone,
95 {
96 Arranged {
97 stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)),
98 trace: TraceEnter::make_from(self.trace.clone()),
99 }
100 }
101
102 pub fn enter_region<'a>(&self, child: &Child<'a, G, G::Timestamp>)
107 -> Arranged<Child<'a, G, G::Timestamp>, Tr> {
108 Arranged {
109 stream: self.stream.enter(child),
110 trace: self.trace.clone(),
111 }
112 }
113
114 pub fn enter_at<'a, TInner, F, P>(&self, child: &Child<'a, G, TInner>, logic: F, prior: P)
120 -> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
121 where
122 TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
123 F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static,
124 P: FnMut(&TInner)->Tr::Time+Clone+'static,
125 {
126 let logic1 = logic.clone();
127 let logic2 = logic.clone();
128 Arranged {
129 trace: TraceEnterAt::make_from(self.trace.clone(), logic1, prior),
130 stream: self.stream.enter(child).map(move |bw| BatchEnterAt::make_from(bw, logic2.clone())),
131 }
132 }
133
134 pub fn filter<F>(&self, logic: F)
161 -> Arranged<G, TraceFilter<Tr, F>>
162 where
163 F: FnMut(Tr::Key<'_>, Tr::Val<'_>)->bool+Clone+'static,
164 {
165 let logic1 = logic.clone();
166 let logic2 = logic.clone();
167 Arranged {
168 trace: TraceFilter::make_from(self.trace.clone(), logic1),
169 stream: self.stream.map(move |bw| BatchFilter::make_from(bw, logic2.clone())),
170 }
171 }
172 pub fn as_collection<D: Data, L>(&self, mut logic: L) -> Collection<G, D, Tr::Diff>
178 where
179 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static,
180 {
181 self.flat_map_ref(move |key, val| Some(logic(key,val)))
182 }
183
184 pub fn flat_map_ref<I, L>(&self, logic: L) -> Collection<G, I::Item, Tr::Diff>
189 where
190 I: IntoIterator,
191 I::Item: Data,
192 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
193 {
194 Self::flat_map_batches(&self.stream, logic)
195 }
196
197 pub fn flat_map_batches<I, L>(stream: &Stream<G, Tr::Batch>, mut logic: L) -> Collection<G, I::Item, Tr::Diff>
205 where
206 I: IntoIterator,
207 I::Item: Data,
208 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
209 {
210 stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
211 input.for_each(|time, data| {
212 let mut session = output.session(&time);
213 for wrapper in data.iter() {
214 let batch = &wrapper;
215 let mut cursor = batch.cursor();
216 while let Some(key) = cursor.get_key(batch) {
217 while let Some(val) = cursor.get_val(batch) {
218 for datum in logic(key, val) {
219 cursor.map_times(batch, |time, diff| {
220 session.give((datum.clone(), time.into_owned(), diff.into_owned()));
221 });
222 }
223 cursor.step_val(batch);
224 }
225 cursor.step_key(batch);
226 }
227 }
228 });
229 })
230 .as_collection()
231 }
232}
233
234
235use crate::difference::Multiply;
236impl<G, T1> Arranged<G, T1>
238where
239 G: Scope<Timestamp=T1::Time>,
240 T1: TraceReader + Clone + 'static,
241{
242 pub fn join_core<T2,I,L>(&self, other: &Arranged<G,T2>, mut result: L) -> Collection<G,I::Item,<T1::Diff as Multiply<T2::Diff>>::Output>
244 where
245 T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>,Time=T1::Time>+Clone+'static,
246 T1::Diff: Multiply<T2::Diff>,
247 <T1::Diff as Multiply<T2::Diff>>::Output: Semigroup+'static,
248 I: IntoIterator,
249 I::Item: Data,
250 L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>)->I+'static
251 {
252 let result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: T2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &T2::Diff| {
253 let t = t.clone();
254 let r = (r1.clone()).multiply(r2);
255 result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
256 };
257 self.join_core_internal_unsafe(other, result)
258 }
259 pub fn join_core_internal_unsafe<T2,I,L,D,ROut> (&self, other: &Arranged<G,T2>, mut result: L) -> Collection<G,D,ROut>
261 where
262 T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>, Time=T1::Time>+Clone+'static,
263 D: Data,
264 ROut: Semigroup+'static,
265 I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
266 L: FnMut(T1::Key<'_>, T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static,
267 {
268 use crate::operators::join::join_traces;
269 join_traces::<_, _, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
270 self,
271 other,
272 move |k, v1, v2, t, d1, d2, c| {
273 for datum in result(k, v1, v2, t, d1, d2) {
274 c.give(datum);
275 }
276 }
277 )
278 .as_collection()
279 }
280}
281
282use crate::difference::Abelian;
284impl<G, T1> Arranged<G, T1>
285where
286 G: Scope<Timestamp = T1::Time>,
287 T1: TraceReader + Clone + 'static,
288{
289 pub fn reduce_abelian<L, K, V, Bu, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
291 where
292 for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
293 T2: for<'a> Trace<Key<'a>= T1::Key<'a>, Time=T1::Time>+'static,
294 K: Ord + 'static,
295 V: Data,
296 for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>,
297 T2::Diff: Abelian,
298 T2::Batch: Batch,
299 Bu: Builder<Time=G::Timestamp, Output = T2::Batch>,
300 Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
301 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static,
302 {
303 self.reduce_core::<_,K,V,Bu,T2>(name, move |key, input, output, change| {
304 if !input.is_empty() {
305 logic(key, input, change);
306 }
307 change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
308 crate::consolidation::consolidate(change);
309 })
310 }
311
312 pub fn reduce_core<L, K, V, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
314 where
315 for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
316 T2: for<'a> Trace<Key<'a>=T1::Key<'a>, Time=T1::Time>+'static,
317 K: Ord + 'static,
318 V: Data,
319 for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>,
320 T2::Batch: Batch,
321 Bu: Builder<Time=G::Timestamp, Output = T2::Batch>,
322 Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
323 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
324 {
325 use crate::operators::reduce::reduce_trace;
326 reduce_trace::<_,_,Bu,_,_,V,_>(self, name, logic)
327 }
328}
329
330
331impl<'a, G, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
332where
333 G: Scope<Timestamp=Tr::Time>,
334 Tr: TraceReader + Clone,
335{
336 pub fn leave_region(&self) -> Arranged<G, Tr> {
341 use timely::dataflow::operators::Leave;
342 Arranged {
343 stream: self.stream.leave(),
344 trace: self.trace.clone(),
345 }
346 }
347}
348
349pub trait Arrange<G, C>
351where
352 G: Scope,
353 G::Timestamp: Lattice,
354{
355 fn arrange<Ba, Bu, Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
357 where
358 Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
359 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
360 Tr: Trace<Time=G::Timestamp> + 'static,
361 Tr::Batch: Batch,
362 {
363 self.arrange_named::<Ba, Bu, Tr>("Arrange")
364 }
365
366 fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
368 where
369 Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
370 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
371 Tr: Trace<Time=G::Timestamp> + 'static,
372 Tr::Batch: Batch,
373 ;
374}
375
376impl<G, K, V, R> Arrange<G, Vec<((K, V), G::Timestamp, R)>> for Collection<G, (K, V), R>
377where
378 G: Scope,
379 G::Timestamp: Lattice,
380 K: ExchangeData + Hashable,
381 V: ExchangeData,
382 R: ExchangeData + Semigroup,
383{
384 fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
385 where
386 Ba: Batcher<Input=Vec<((K, V), G::Timestamp, R)>, Time=G::Timestamp> + 'static,
387 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
388 Tr: Trace<Time=G::Timestamp> + 'static,
389 Tr::Batch: Batch,
390 {
391 let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
392 arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name)
393 }
394}
395
396pub fn arrange_core<G, P, Ba, Bu, Tr>(stream: &StreamCore<G, Ba::Input>, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
402where
403 G: Scope,
404 G::Timestamp: Lattice,
405 P: ParallelizationContract<G::Timestamp, Ba::Input>,
406 Ba: Batcher<Time=G::Timestamp> + 'static,
407 Ba::Input: Container + Clone + 'static,
408 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
409 Tr: Trace<Time=G::Timestamp>+'static,
410 Tr::Batch: Batch,
411{
412 let mut reader: Option<TraceAgent<Tr>> = None;
428
429 let reader_ref = &mut reader;
431 let scope = stream.scope();
432
433 let stream = stream.unary_frontier(pact, name, move |_capability, info| {
434
435 let logger = scope.logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
437
438 let mut batcher = Ba::new(logger.clone(), info.global_id);
440
441 let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
443
444 let activator = Some(scope.activator_for(info.address.clone()));
445 let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
446 if let Some(exert_logic) = scope.config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
448 empty_trace.set_exert_logic(exert_logic);
449 }
450
451 let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
452
453 *reader_ref = Some(reader_local);
454
455 let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
457
458 move |input, output| {
459
460 input.for_each(|cap, data| {
465 capabilities.insert(cap.retain());
466 batcher.push_container(data);
467 });
468
469 assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier()));
476
477 if prev_frontier.borrow() != input.frontier().frontier() {
481 if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {
495
496 let mut upper = Antichain::new(); for (index, capability) in capabilities.elements().iter().enumerate() {
500
501 if !input.frontier().less_equal(capability.time()) {
502
503 upper.clear();
507 for time in input.frontier().frontier().iter() {
508 upper.insert(time.clone());
509 }
510 for other_capability in &capabilities.elements()[(index + 1) .. ] {
511 upper.insert(other_capability.time().clone());
512 }
513
514 let batch = batcher.seal::<Bu>(upper.clone());
516
517 writer.insert(batch.clone(), Some(capability.time().clone()));
518
519 output.session(&capabilities.elements()[index]).give(batch);
521 }
522 }
523
524 let mut new_capabilities = Antichain::new();
530 for time in batcher.frontier().iter() {
531 if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
532 new_capabilities.insert(capability.delayed(time));
533 }
534 else {
535 panic!("failed to find capability");
536 }
537 }
538
539 capabilities = new_capabilities;
540 }
541 else {
542 let _batch = batcher.seal::<Bu>(input.frontier().frontier().to_owned());
544 writer.seal(input.frontier().frontier().to_owned());
545 }
546
547 prev_frontier.clear();
548 prev_frontier.extend(input.frontier().frontier().iter().cloned());
549 }
550
551 writer.exert();
552 }
553 });
554
555 Arranged { stream, trace: reader.unwrap() }
556}
557
558impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, Vec<((K, ()), G::Timestamp, R)>> for Collection<G, K, R>
559where
560 G::Timestamp: Lattice+Ord,
561{
562 fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
563 where
564 Ba: Batcher<Input=Vec<((K,()),G::Timestamp,R)>, Time=G::Timestamp> + 'static,
565 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
566 Tr: Trace<Time=G::Timestamp> + 'static,
567 Tr::Batch: Batch,
568 {
569 let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into());
570 arrange_core::<_,_,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name)
571 }
572}
573
574pub trait ArrangeByKey<G: Scope, K: Data+Hashable, V: Data, R: Ord+Semigroup+'static>
580where G::Timestamp: Lattice+Ord {
581 fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
587
588 fn arrange_by_key_named(&self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
590}
591
592impl<G: Scope, K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData+Semigroup> ArrangeByKey<G, K, V, R> for Collection<G, (K,V), R>
593where
594 G::Timestamp: Lattice+Ord
595{
596 fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
597 self.arrange_by_key_named("ArrangeByKey")
598 }
599
600 fn arrange_by_key_named(&self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
601 self.arrange_named::<ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(name)
602 }
603}
604
605pub trait ArrangeBySelf<G: Scope, K: Data+Hashable, R: Ord+Semigroup+'static>
611where
612 G::Timestamp: Lattice+Ord
613{
614 fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
620
621 fn arrange_by_self_named(&self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
623}
624
625
626impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> ArrangeBySelf<G, K, R> for Collection<G, K, R>
627where
628 G::Timestamp: Lattice+Ord
629{
630 fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
631 self.arrange_by_self_named("ArrangeBySelf")
632 }
633
634 fn arrange_by_self_named(&self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
635 self.map(|k| (k, ()))
636 .arrange_named::<KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(name)
637 }
638}