differential_dataflow/collection.rs
1//! Types and traits associated with collections of data.
2//!
3//! The `Collection` type is differential dataflow's core abstraction for an updatable pile of data.
4//!
5//! Most differential dataflow programs are "collection-oriented", in the sense that they transform
6//! one collection into another, using operators defined on collections. This contrasts with a more
7//! imperative programming style, in which one might iterate through the contents of a collection
8//! manually. The higher-level of programming allows differential dataflow to provide efficient
9//! implementations, and to support efficient incremental updates to the collections.
10
11use std::hash::Hash;
12
13use timely::{Container, Data};
14use timely::progress::Timestamp;
15use timely::order::Product;
16use timely::dataflow::scopes::{Child, child::Iterative};
17use timely::dataflow::Scope;
18use timely::dataflow::operators::*;
19use timely::dataflow::StreamCore;
20
21use crate::difference::{Semigroup, Abelian, Multiply};
22use crate::lattice::Lattice;
23use crate::hashable::Hashable;
24
25/// An evolving collection of values of type `D`, backed by Rust `Vec` types as containers.
26///
27/// The `Collection` type is the core abstraction in differential dataflow programs. As you write your
28/// differential dataflow computation, you write as if the collection is a static dataset to which you
29/// apply functional transformations, creating new collections. Once your computation is written, you
30/// are able to mutate the collection (by inserting and removing elements); differential dataflow will
31/// propagate changes through your functional computation and report the corresponding changes to the
32/// output collections.
33///
34/// Each vec collection has three generic parameters. The parameter `G` is for the scope in which the
35/// collection exists; as you write more complicated programs you may wish to introduce nested scopes
36/// (e.g. for iteration) and this parameter tracks the scope (for timely dataflow's benefit). The `D`
37/// parameter is the type of data in your collection, for example `String`, or `(u32, Vec<Option<()>>)`.
38/// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
39/// defaults to) `isize`, representing changes to the occurrence count of each record.
40///
41/// This type definition instantiates the [`Collection`] type with a `Vec<(D, G::Timestamp, R)>`.
42pub type VecCollection<G, D, R = isize> = Collection<G, Vec<(D, <G as ScopeParent>::Timestamp, R)>>;
43
44/// An evolving collection represented by a stream of abstract containers.
45///
46/// The containers purport to reperesent changes to a collection, and they must implement various traits
47/// in order to expose some of this functionality (e.g. negation, timestamp manipulation). Other actions
48/// on the containers, and streams of containers, are left to the container implementor to describe.
49#[derive(Clone)]
50pub struct Collection<G: Scope, C> {
51 /// The underlying timely dataflow stream.
52 ///
53 /// This field is exposed to support direct timely dataflow manipulation when required, but it is
54 /// not intended to be the idiomatic way to work with the collection.
55 ///
56 /// The timestamp in the data is required to always be at least the timestamp _of_ the data, in
57 /// the timely-dataflow sense. If this invariant is not upheld, differential operators may behave
58 /// unexpectedly.
59 pub inner: timely::dataflow::StreamCore<G, C>,
60}
61
62impl<G: Scope, C> Collection<G, C> {
63 /// Creates a new Collection from a timely dataflow stream.
64 ///
65 /// This method seems to be rarely used, with the `as_collection` method on streams being a more
66 /// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
67 /// provides a `new_collection` method which will create a new collection for you without exposing
68 /// the underlying timely stream at all.
69 ///
70 /// This stream should satisfy the timestamp invariant as documented on [Collection]; this
71 /// method does not check it.
72 pub fn new(stream: StreamCore<G, C>) -> Self { Self { inner: stream } }
73}
74impl<G: Scope, C: Container> Collection<G, C> {
75 /// Creates a new collection accumulating the contents of the two collections.
76 ///
77 /// Despite the name, differential dataflow collections are unordered. This method is so named because the
78 /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
79 /// two collections.
80 ///
81 /// # Examples
82 ///
83 /// ```
84 /// use differential_dataflow::input::Input;
85 ///
86 /// ::timely::example(|scope| {
87 ///
88 /// let data = scope.new_collection_from(1 .. 10).1;
89 ///
90 /// let odds = data.filter(|x| x % 2 == 1);
91 /// let evens = data.filter(|x| x % 2 == 0);
92 ///
93 /// odds.concat(&evens)
94 /// .assert_eq(&data);
95 /// });
96 /// ```
97 pub fn concat(&self, other: &Self) -> Self {
98 self.inner
99 .concat(&other.inner)
100 .as_collection()
101 }
102 /// Creates a new collection accumulating the contents of the two collections.
103 ///
104 /// Despite the name, differential dataflow collections are unordered. This method is so named because the
105 /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
106 /// two collections.
107 ///
108 /// # Examples
109 ///
110 /// ```
111 /// use differential_dataflow::input::Input;
112 ///
113 /// ::timely::example(|scope| {
114 ///
115 /// let data = scope.new_collection_from(1 .. 10).1;
116 ///
117 /// let odds = data.filter(|x| x % 2 == 1);
118 /// let evens = data.filter(|x| x % 2 == 0);
119 ///
120 /// odds.concatenate(Some(evens))
121 /// .assert_eq(&data);
122 /// });
123 /// ```
124 pub fn concatenate<I>(&self, sources: I) -> Self
125 where
126 I: IntoIterator<Item=Self>
127 {
128 self.inner
129 .concatenate(sources.into_iter().map(|x| x.inner))
130 .as_collection()
131 }
132 // Brings a Collection into a nested region.
133 ///
134 /// This method is a specialization of `enter` to the case where the nested scope is a region.
135 /// It removes the need for an operator that adjusts the timestamp.
136 pub fn enter_region<'a>(&self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, C> {
137 self.inner
138 .enter(child)
139 .as_collection()
140 }
141 /// Applies a supplied function to each batch of updates.
142 ///
143 /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
144 /// timely dataflow capability associated with the batch of updates. The observed batching depends
145 /// on how the system executes, and may vary run to run.
146 ///
147 /// # Examples
148 ///
149 /// ```
150 /// use differential_dataflow::input::Input;
151 ///
152 /// ::timely::example(|scope| {
153 /// scope.new_collection_from(1 .. 10).1
154 /// .map_in_place(|x| *x *= 2)
155 /// .filter(|x| x % 2 == 1)
156 /// .inspect_container(|event| println!("event: {:?}", event));
157 /// });
158 /// ```
159 pub fn inspect_container<F>(&self, func: F) -> Self
160 where
161 F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static,
162 {
163 self.inner
164 .inspect_container(func)
165 .as_collection()
166 }
167 /// Attaches a timely dataflow probe to the output of a Collection.
168 ///
169 /// This probe is used to determine when the state of the Collection has stabilized and can
170 /// be read out.
171 pub fn probe(&self) -> probe::Handle<G::Timestamp> {
172 self.inner
173 .probe()
174 }
175 /// Attaches a timely dataflow probe to the output of a Collection.
176 ///
177 /// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
178 /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
179 /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
180 /// avoid swamping the system.
181 pub fn probe_with(&self, handle: &probe::Handle<G::Timestamp>) -> Self {
182 Self::new(self.inner.probe_with(handle))
183 }
184 /// The scope containing the underlying timely dataflow stream.
185 pub fn scope(&self) -> G {
186 self.inner.scope()
187 }
188
189 /// Creates a new collection whose counts are the negation of those in the input.
190 ///
191 /// This method is most commonly used with `concat` to get those element in one collection but not another.
192 /// However, differential dataflow computations are still defined for all values of the difference type `R`,
193 /// including negative counts.
194 ///
195 /// # Examples
196 ///
197 /// ```
198 /// use differential_dataflow::input::Input;
199 ///
200 /// ::timely::example(|scope| {
201 ///
202 /// let data = scope.new_collection_from(1 .. 10).1;
203 ///
204 /// let odds = data.filter(|x| x % 2 == 1);
205 /// let evens = data.filter(|x| x % 2 == 0);
206 ///
207 /// odds.negate()
208 /// .concat(&data)
209 /// .assert_eq(&evens);
210 /// });
211 /// ```
212 pub fn negate(&self) -> Self where C: containers::Negate {
213 use timely::dataflow::channels::pact::Pipeline;
214 self.inner
215 .unary(Pipeline, "Negate", move |_,_| move |input, output| {
216 input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).negate()));
217 })
218 .as_collection()
219 }
220
221 /// Brings a Collection into a nested scope.
222 ///
223 /// # Examples
224 ///
225 /// ```
226 /// use timely::dataflow::Scope;
227 /// use differential_dataflow::input::Input;
228 ///
229 /// ::timely::example(|scope| {
230 ///
231 /// let data = scope.new_collection_from(1 .. 10).1;
232 ///
233 /// let result = scope.region(|child| {
234 /// data.enter(child)
235 /// .leave()
236 /// });
237 ///
238 /// data.assert_eq(&result);
239 /// });
240 /// ```
241 pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, <C as containers::Enter<<G as ScopeParent>::Timestamp, T>>::InnerContainer>
242 where
243 C: containers::Enter<<G as ScopeParent>::Timestamp, T, InnerContainer: Container>,
244 T: Refines<<G as ScopeParent>::Timestamp>,
245 {
246 use timely::dataflow::channels::pact::Pipeline;
247 self.inner
248 .enter(child)
249 .unary(Pipeline, "Enter", move |_,_| move |input, output| {
250 input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).enter()));
251 })
252 .as_collection()
253 }
254
255 /// Advances a timestamp in the stream according to the timestamp actions on the path.
256 ///
257 /// The path may advance the timestamp sufficiently that it is no longer valid, for example if
258 /// incrementing fields would result in integer overflow. In this case, the record is dropped.
259 ///
260 /// # Examples
261 /// ```
262 /// use timely::dataflow::Scope;
263 /// use timely::dataflow::operators::{ToStream, Concat, Inspect, BranchWhen};
264 ///
265 /// use differential_dataflow::input::Input;
266 ///
267 /// timely::example(|scope| {
268 /// let summary1 = 5;
269 ///
270 /// let data = scope.new_collection_from(1 .. 10).1;
271 /// /// Applies `results_in` on every timestamp in the collection.
272 /// data.results_in(summary1);
273 /// });
274 /// ```
275 pub fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self
276 where
277 C: containers::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
278 {
279 use timely::dataflow::channels::pact::Pipeline;
280 self.inner
281 .unary(Pipeline, "ResultsIn", move |_,_| move |input, output| {
282 input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).results_in(&step)));
283 })
284 .as_collection()
285 }
286}
287
288impl<G: Scope, D: Clone+'static, R: Clone+'static> VecCollection<G, D, R> {
289 /// Creates a new collection by applying the supplied function to each input element.
290 ///
291 /// # Examples
292 ///
293 /// ```
294 /// use differential_dataflow::input::Input;
295 ///
296 /// ::timely::example(|scope| {
297 /// scope.new_collection_from(1 .. 10).1
298 /// .map(|x| x * 2)
299 /// .filter(|x| x % 2 == 1)
300 /// .assert_empty();
301 /// });
302 /// ```
303 pub fn map<D2, L>(&self, mut logic: L) -> VecCollection<G, D2, R>
304 where
305 D2: Data,
306 L: FnMut(D) -> D2 + 'static,
307 {
308 self.inner
309 .map(move |(data, time, delta)| (logic(data), time, delta))
310 .as_collection()
311 }
312 /// Creates a new collection by applying the supplied function to each input element.
313 ///
314 /// Although the name suggests in-place mutation, this function does not change the source collection,
315 /// but rather re-uses the underlying allocations in its implementation. The method is semantically
316 /// equivalent to `map`, but can be more efficient.
317 ///
318 /// # Examples
319 ///
320 /// ```
321 /// use differential_dataflow::input::Input;
322 ///
323 /// ::timely::example(|scope| {
324 /// scope.new_collection_from(1 .. 10).1
325 /// .map_in_place(|x| *x *= 2)
326 /// .filter(|x| x % 2 == 1)
327 /// .assert_empty();
328 /// });
329 /// ```
330 pub fn map_in_place<L>(&self, mut logic: L) -> VecCollection<G, D, R>
331 where
332 L: FnMut(&mut D) + 'static,
333 {
334 self.inner
335 .map_in_place(move |&mut (ref mut data, _, _)| logic(data))
336 .as_collection()
337 }
338 /// Creates a new collection by applying the supplied function to each input element and accumulating the results.
339 ///
340 /// This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be
341 /// warned that if the iterators produce substantial amounts of data, they are currently fully drained before
342 /// attempting to consolidate the results.
343 ///
344 /// # Examples
345 ///
346 /// ```
347 /// use differential_dataflow::input::Input;
348 ///
349 /// ::timely::example(|scope| {
350 /// scope.new_collection_from(1 .. 10).1
351 /// .flat_map(|x| 0 .. x);
352 /// });
353 /// ```
354 pub fn flat_map<I, L>(&self, mut logic: L) -> VecCollection<G, I::Item, R>
355 where
356 G::Timestamp: Clone,
357 I: IntoIterator<Item: Data>,
358 L: FnMut(D) -> I + 'static,
359 {
360 self.inner
361 .flat_map(move |(data, time, delta)| logic(data).into_iter().map(move |x| (x, time.clone(), delta.clone())))
362 .as_collection()
363 }
364 /// Creates a new collection containing those input records satisfying the supplied predicate.
365 ///
366 /// # Examples
367 ///
368 /// ```
369 /// use differential_dataflow::input::Input;
370 ///
371 /// ::timely::example(|scope| {
372 /// scope.new_collection_from(1 .. 10).1
373 /// .map(|x| x * 2)
374 /// .filter(|x| x % 2 == 1)
375 /// .assert_empty();
376 /// });
377 /// ```
378 pub fn filter<L>(&self, mut logic: L) -> VecCollection<G, D, R>
379 where
380 L: FnMut(&D) -> bool + 'static,
381 {
382 self.inner
383 .filter(move |(data, _, _)| logic(data))
384 .as_collection()
385 }
386 /// Replaces each record with another, with a new difference type.
387 ///
388 /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
389 /// and move the data into the difference component. This will allow differential dataflow to update in-place.
390 ///
391 /// # Examples
392 ///
393 /// ```
394 /// use differential_dataflow::input::Input;
395 ///
396 /// ::timely::example(|scope| {
397 ///
398 /// let nums = scope.new_collection_from(0 .. 10).1;
399 /// let x1 = nums.flat_map(|x| 0 .. x);
400 /// let x2 = nums.map(|x| (x, 9 - x))
401 /// .explode(|(x,y)| Some((x,y)));
402 ///
403 /// x1.assert_eq(&x2);
404 /// });
405 /// ```
406 pub fn explode<D2, R2, I, L>(&self, mut logic: L) -> VecCollection<G, D2, <R2 as Multiply<R>>::Output>
407 where
408 D2: Data,
409 R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
410 I: IntoIterator<Item=(D2,R2)>,
411 L: FnMut(D)->I+'static,
412 {
413 self.inner
414 .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,d2)| (x, t.clone(), d2.multiply(&d))))
415 .as_collection()
416 }
417
418 /// Joins each record against a collection defined by the function `logic`.
419 ///
420 /// This method performs what is essentially a join with the collection of records `(x, logic(x))`.
421 /// Rather than materialize this second relation, `logic` is applied to each record and the appropriate
422 /// modifications made to the results, namely joining timestamps and multiplying differences.
423 ///
424 /// #Examples
425 ///
426 /// ```
427 /// use differential_dataflow::input::Input;
428 ///
429 /// ::timely::example(|scope| {
430 /// // creates `x` copies of `2*x` from time `3*x` until `4*x`,
431 /// // for x from 0 through 9.
432 /// scope.new_collection_from(0 .. 10isize).1
433 /// .join_function(|x|
434 /// // data time diff
435 /// vec![(2*x, (3*x) as u64, x),
436 /// (2*x, (4*x) as u64, -x)]
437 /// );
438 /// });
439 /// ```
440 pub fn join_function<D2, R2, I, L>(&self, mut logic: L) -> VecCollection<G, D2, <R2 as Multiply<R>>::Output>
441 where
442 G::Timestamp: Lattice,
443 D2: Data,
444 R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
445 I: IntoIterator<Item=(D2,G::Timestamp,R2)>,
446 L: FnMut(D)->I+'static,
447 {
448 self.inner
449 .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,t2,d2)| (x, t.join(&t2), d2.multiply(&d))))
450 .as_collection()
451 }
452
453 /// Brings a Collection into a nested scope, at varying times.
454 ///
455 /// The `initial` function indicates the time at which each element of the Collection should appear.
456 ///
457 /// # Examples
458 ///
459 /// ```
460 /// use timely::dataflow::Scope;
461 /// use differential_dataflow::input::Input;
462 ///
463 /// ::timely::example(|scope| {
464 ///
465 /// let data = scope.new_collection_from(1 .. 10).1;
466 ///
467 /// let result = scope.iterative::<u64,_,_>(|child| {
468 /// data.enter_at(child, |x| *x)
469 /// .leave()
470 /// });
471 ///
472 /// data.assert_eq(&result);
473 /// });
474 /// ```
475 pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, mut initial: F) -> VecCollection<Iterative<'a, G, T>, D, R>
476 where
477 T: Timestamp+Hash,
478 F: FnMut(&D) -> T + Clone + 'static,
479 G::Timestamp: Hash,
480 {
481 self.inner
482 .enter(child)
483 .map(move |(data, time, diff)| {
484 let new_time = Product::new(time, initial(&data));
485 (data, new_time, diff)
486 })
487 .as_collection()
488 }
489
490 /// Delays each difference by a supplied function.
491 ///
492 /// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly
493 /// wrong if that assumption is incorrect. It is also critical that `func` be monotonic: if two times are
494 /// ordered, they should have the same order or compare equal once `func` is applied to them (this
495 /// is because we advance the timely capability with the same logic, and it must remain `less_equal`
496 /// to all of the data timestamps).
497 pub fn delay<F>(&self, func: F) -> VecCollection<G, D, R>
498 where
499 G::Timestamp: Hash,
500 F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static,
501 {
502 let mut func1 = func.clone();
503 let mut func2 = func.clone();
504
505 self.inner
506 .delay_batch(move |x| func1(x))
507 .map_in_place(move |x| x.1 = func2(&x.1))
508 .as_collection()
509 }
510
511 /// Applies a supplied function to each update.
512 ///
513 /// This method is most commonly used to report information back to the user, often for debugging purposes.
514 /// Any function can be used here, but be warned that the incremental nature of differential dataflow does
515 /// not guarantee that it will be called as many times as you might expect.
516 ///
517 /// The `(data, time, diff)` triples indicate a change `diff` to the frequency of `data` which takes effect
518 /// at the logical time `time`. When times are totally ordered (for example, `usize`), these updates reflect
519 /// the changes along the sequence of collections. For partially ordered times, the mathematics are more
520 /// interesting and less intuitive, unfortunately.
521 ///
522 /// # Examples
523 ///
524 /// ```
525 /// use differential_dataflow::input::Input;
526 ///
527 /// ::timely::example(|scope| {
528 /// scope.new_collection_from(1 .. 10).1
529 /// .map_in_place(|x| *x *= 2)
530 /// .filter(|x| x % 2 == 1)
531 /// .inspect(|x| println!("error: {:?}", x));
532 /// });
533 /// ```
534 pub fn inspect<F>(&self, func: F) -> VecCollection<G, D, R>
535 where
536 F: FnMut(&(D, G::Timestamp, R))+'static,
537 {
538 self.inner
539 .inspect(func)
540 .as_collection()
541 }
542 /// Applies a supplied function to each batch of updates.
543 ///
544 /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
545 /// timely dataflow capability associated with the batch of updates. The observed batching depends
546 /// on how the system executes, and may vary run to run.
547 ///
548 /// # Examples
549 ///
550 /// ```
551 /// use differential_dataflow::input::Input;
552 ///
553 /// ::timely::example(|scope| {
554 /// scope.new_collection_from(1 .. 10).1
555 /// .map_in_place(|x| *x *= 2)
556 /// .filter(|x| x % 2 == 1)
557 /// .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
558 /// });
559 /// ```
560 pub fn inspect_batch<F>(&self, mut func: F) -> VecCollection<G, D, R>
561 where
562 F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static,
563 {
564 self.inner
565 .inspect_batch(move |time, data| func(time, data))
566 .as_collection()
567 }
568
569 /// Assert if the collection is ever non-empty.
570 ///
571 /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
572 /// is not run, or not run to completion, there may be un-exercised times at which the collection could be
573 /// non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a
574 /// program should indicate that this assertion never found cause to complain.
575 ///
576 /// # Examples
577 ///
578 /// ```
579 /// use differential_dataflow::input::Input;
580 ///
581 /// ::timely::example(|scope| {
582 /// scope.new_collection_from(1 .. 10).1
583 /// .map(|x| x * 2)
584 /// .filter(|x| x % 2 == 1)
585 /// .assert_empty();
586 /// });
587 /// ```
588 pub fn assert_empty(&self)
589 where
590 D: crate::ExchangeData+Hashable,
591 R: crate::ExchangeData+Hashable + Semigroup,
592 G::Timestamp: Lattice+Ord,
593 {
594 self.consolidate()
595 .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
596 }
597}
598
599use timely::dataflow::scopes::ScopeParent;
600use timely::progress::timestamp::Refines;
601
602/// Methods requiring a nested scope.
603impl<'a, G: Scope, T: Timestamp, C: Container> Collection<Child<'a, G, T>, C>
604where
605 C: containers::Leave<T, G::Timestamp, OuterContainer: Container>,
606 T: Refines<<G as ScopeParent>::Timestamp>,
607{
608 /// Returns the final value of a Collection from a nested scope to its containing scope.
609 ///
610 /// # Examples
611 ///
612 /// ```
613 /// use timely::dataflow::Scope;
614 /// use differential_dataflow::input::Input;
615 ///
616 /// ::timely::example(|scope| {
617 ///
618 /// let data = scope.new_collection_from(1 .. 10).1;
619 ///
620 /// let result = scope.region(|child| {
621 /// data.enter(child)
622 /// .leave()
623 /// });
624 ///
625 /// data.assert_eq(&result);
626 /// });
627 /// ```
628 pub fn leave(&self) -> Collection<G, <C as containers::Leave<T, G::Timestamp>>::OuterContainer> {
629 use timely::dataflow::channels::pact::Pipeline;
630 self.inner
631 .leave()
632 .unary(Pipeline, "Leave", move |_,_| move |input, output| {
633 input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).leave()));
634 })
635 .as_collection()
636 }
637}
638
639/// Methods requiring a region as the scope.
640impl<G: Scope, C: Container+Data> Collection<Child<'_, G, G::Timestamp>, C>
641{
642 /// Returns the value of a Collection from a nested region to its containing scope.
643 ///
644 /// This method is a specialization of `leave` to the case that of a nested region.
645 /// It removes the need for an operator that adjusts the timestamp.
646 pub fn leave_region(&self) -> Collection<G, C> {
647 self.inner
648 .leave()
649 .as_collection()
650 }
651}
652
653/// Methods requiring an Abelian difference, to support negation.
654impl<G: Scope<Timestamp: Data>, D: Clone+'static, R: Abelian+'static> VecCollection<G, D, R> {
655 /// Assert if the collections are ever different.
656 ///
657 /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
658 /// is not run, or not run to completion, there may be un-exercised times at which the collections could vary.
659 /// Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should
660 /// indicate that this assertion never found cause to complain.
661 ///
662 /// # Examples
663 ///
664 /// ```
665 /// use differential_dataflow::input::Input;
666 ///
667 /// ::timely::example(|scope| {
668 ///
669 /// let data = scope.new_collection_from(1 .. 10).1;
670 ///
671 /// let odds = data.filter(|x| x % 2 == 1);
672 /// let evens = data.filter(|x| x % 2 == 0);
673 ///
674 /// odds.concat(&evens)
675 /// .assert_eq(&data);
676 /// });
677 /// ```
678 pub fn assert_eq(&self, other: &Self)
679 where
680 D: crate::ExchangeData+Hashable,
681 R: crate::ExchangeData+Hashable,
682 G::Timestamp: Lattice+Ord,
683 {
684 self.negate()
685 .concat(other)
686 .assert_empty();
687 }
688}
689
690/// Conversion to a differential dataflow Collection.
691pub trait AsCollection<G: Scope, C> {
692 /// Converts the type to a differential dataflow collection.
693 fn as_collection(&self) -> Collection<G, C>;
694}
695
696impl<G: Scope, C: Clone> AsCollection<G, C> for StreamCore<G, C> {
697 /// Converts the type to a differential dataflow collection.
698 ///
699 /// By calling this method, you guarantee that the timestamp invariant (as documented on
700 /// [Collection]) is upheld. This method will not check it.
701 fn as_collection(&self) -> Collection<G, C> {
702 Collection::<G,C>::new(self.clone())
703 }
704}
705
706/// Concatenates multiple collections.
707///
708/// This method has the effect of a sequence of calls to `concat`, but it does
709/// so in one operator rather than a chain of many operators.
710///
711/// # Examples
712///
713/// ```
714/// use differential_dataflow::input::Input;
715///
716/// ::timely::example(|scope| {
717///
718/// let data = scope.new_collection_from(1 .. 10).1;
719///
720/// let odds = data.filter(|x| x % 2 == 1);
721/// let evens = data.filter(|x| x % 2 == 0);
722///
723/// differential_dataflow::collection::concatenate(scope, vec![odds, evens])
724/// .assert_eq(&data);
725/// });
726/// ```
727pub fn concatenate<G, C, I>(scope: &mut G, iterator: I) -> Collection<G, C>
728where
729 G: Scope,
730 C: Container,
731 I: IntoIterator<Item=Collection<G, C>>,
732{
733 scope
734 .concatenate(iterator.into_iter().map(|x| x.inner))
735 .as_collection()
736}
737
738/// Traits that can be implemented by containers to provide functionality to collections based on them.
739pub mod containers {
740
741 use timely::progress::{Timestamp, timestamp::Refines};
742 use crate::collection::Abelian;
743
744 /// A container that can negate its updates.
745 pub trait Negate {
746 /// Negates Abelian differences of each update.
747 fn negate(self) -> Self;
748 }
749 impl<D, T, R: Abelian> Negate for Vec<(D, T, R)> {
750 fn negate(mut self) -> Self {
751 for (_data, _time, diff) in self.iter_mut() {
752 diff.negate();
753 }
754 self
755 }
756 }
757
758 /// A container that can enter from timestamp `T1` into timestamp `T2`.
759 pub trait Enter<T1, T2> {
760 /// The resulting container type.
761 type InnerContainer;
762 /// Update timestamps from `T1` to `T2`.
763 fn enter(self) -> Self::InnerContainer;
764 }
765 impl<D, T1: Timestamp, T2: Refines<T1>, R> Enter<T1, T2> for Vec<(D, T1, R)> {
766 type InnerContainer = Vec<(D, T2, R)>;
767 fn enter(self) -> Self::InnerContainer {
768 self.into_iter().map(|(d,t1,r)| (d,T2::to_inner(t1),r)).collect()
769 }
770 }
771
772 /// A container that can leave from timestamp `T1` into timestamp `T2`.
773 pub trait Leave<T1, T2> {
774 /// The resulting container type.
775 type OuterContainer;
776 /// Update timestamps from `T1` to `T2`.
777 fn leave(self) -> Self::OuterContainer;
778 }
779 impl<D, T1: Refines<T2>, T2: Timestamp, R> Leave<T1, T2> for Vec<(D, T1, R)> {
780 type OuterContainer = Vec<(D, T2, R)>;
781 fn leave(self) -> Self::OuterContainer {
782 self.into_iter().map(|(d,t1,r)| (d,t1.to_outer(),r)).collect()
783 }
784 }
785
786 /// A container that can advance timestamps by a summary `TS`.
787 pub trait ResultsIn<TS> {
788 /// Advance times in the container by `step`.
789 fn results_in(self, step: &TS) -> Self;
790 }
791 impl<D, T: Timestamp, R> ResultsIn<T::Summary> for Vec<(D, T, R)> {
792 fn results_in(self, step: &T::Summary) -> Self {
793 use timely::progress::PathSummary;
794 self.into_iter().filter_map(move |(d,t,r)| step.results_in(&t).map(|t| (d,t,r))).collect()
795 }
796 }
797}