differential_dataflow/
collection.rs

1//! Types and traits associated with collections of data.
2//!
3//! The `Collection` type is differential dataflow's core abstraction for an updatable pile of data.
4//!
5//! Most differential dataflow programs are "collection-oriented", in the sense that they transform
6//! one collection into another, using operators defined on collections. This contrasts with a more
7//! imperative programming style, in which one might iterate through the contents of a collection
8//! manually. The higher-level of programming allows differential dataflow to provide efficient
9//! implementations, and to support efficient incremental updates to the collections.
10
11use std::hash::Hash;
12
13use timely::{Container, Data};
14use timely::progress::Timestamp;
15use timely::order::Product;
16use timely::dataflow::scopes::{Child, child::Iterative};
17use timely::dataflow::Scope;
18use timely::dataflow::operators::*;
19use timely::dataflow::StreamCore;
20
21use crate::difference::{Semigroup, Abelian, Multiply};
22use crate::lattice::Lattice;
23use crate::hashable::Hashable;
24
25/// An evolving collection of values of type `D`, backed by Rust `Vec` types as containers.
26///
27/// The `Collection` type is the core abstraction in differential dataflow programs. As you write your
28/// differential dataflow computation, you write as if the collection is a static dataset to which you
29/// apply functional transformations, creating new collections. Once your computation is written, you
30/// are able to mutate the collection (by inserting and removing elements); differential dataflow will
31/// propagate changes through your functional computation and report the corresponding changes to the
32/// output collections.
33///
34/// Each vec collection has three generic parameters. The parameter `G` is for the scope in which the
35/// collection exists; as you write more complicated programs you may wish to introduce nested scopes
36/// (e.g. for iteration) and this parameter tracks the scope (for timely dataflow's benefit). The `D`
37/// parameter is the type of data in your collection, for example `String`, or `(u32, Vec<Option<()>>)`.
38/// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
39/// defaults to) `isize`, representing changes to the occurrence count of each record.
40///
41/// This type definition instantiates the [`Collection`] type with a `Vec<(D, G::Timestamp, R)>`.
42pub type VecCollection<G, D, R = isize> = Collection<G, Vec<(D, <G as ScopeParent>::Timestamp, R)>>;
43
44/// An evolving collection represented by a stream of abstract containers.
45///
46/// The containers purport to reperesent changes to a collection, and they must implement various traits
47/// in order to expose some of this functionality (e.g. negation, timestamp manipulation). Other actions
48/// on the containers, and streams of containers, are left to the container implementor to describe.
49#[derive(Clone)]
50pub struct Collection<G: Scope, C> {
51    /// The underlying timely dataflow stream.
52    ///
53    /// This field is exposed to support direct timely dataflow manipulation when required, but it is
54    /// not intended to be the idiomatic way to work with the collection.
55    ///
56    /// The timestamp in the data is required to always be at least the timestamp _of_ the data, in
57    /// the timely-dataflow sense. If this invariant is not upheld, differential operators may behave
58    /// unexpectedly.
59    pub inner: timely::dataflow::StreamCore<G, C>,
60}
61
62impl<G: Scope, C> Collection<G, C> {
63    /// Creates a new Collection from a timely dataflow stream.
64    ///
65    /// This method seems to be rarely used, with the `as_collection` method on streams being a more
66    /// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
67    /// provides a `new_collection` method which will create a new collection for you without exposing
68    /// the underlying timely stream at all.
69    ///
70    /// This stream should satisfy the timestamp invariant as documented on [Collection]; this
71    /// method does not check it.
72    pub fn new(stream: StreamCore<G, C>) -> Self { Self { inner: stream } }
73}
74impl<G: Scope, C: Container> Collection<G, C> {
75    /// Creates a new collection accumulating the contents of the two collections.
76    ///
77    /// Despite the name, differential dataflow collections are unordered. This method is so named because the
78    /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
79    /// two collections.
80    ///
81    /// # Examples
82    ///
83    /// ```
84    /// use differential_dataflow::input::Input;
85    ///
86    /// ::timely::example(|scope| {
87    ///
88    ///     let data = scope.new_collection_from(1 .. 10).1;
89    ///
90    ///     let odds = data.filter(|x| x % 2 == 1);
91    ///     let evens = data.filter(|x| x % 2 == 0);
92    ///
93    ///     odds.concat(&evens)
94    ///         .assert_eq(&data);
95    /// });
96    /// ```
97    pub fn concat(&self, other: &Self) -> Self {
98        self.inner
99            .concat(&other.inner)
100            .as_collection()
101    }
102    /// Creates a new collection accumulating the contents of the two collections.
103    ///
104    /// Despite the name, differential dataflow collections are unordered. This method is so named because the
105    /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
106    /// two collections.
107    ///
108    /// # Examples
109    ///
110    /// ```
111    /// use differential_dataflow::input::Input;
112    ///
113    /// ::timely::example(|scope| {
114    ///
115    ///     let data = scope.new_collection_from(1 .. 10).1;
116    ///
117    ///     let odds = data.filter(|x| x % 2 == 1);
118    ///     let evens = data.filter(|x| x % 2 == 0);
119    ///
120    ///     odds.concatenate(Some(evens))
121    ///         .assert_eq(&data);
122    /// });
123    /// ```
124    pub fn concatenate<I>(&self, sources: I) -> Self
125    where
126        I: IntoIterator<Item=Self>
127    {
128        self.inner
129            .concatenate(sources.into_iter().map(|x| x.inner))
130            .as_collection()
131    }
132    // Brings a Collection into a nested region.
133    ///
134    /// This method is a specialization of `enter` to the case where the nested scope is a region.
135    /// It removes the need for an operator that adjusts the timestamp.
136    pub fn enter_region<'a>(&self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, C> {
137        self.inner
138            .enter(child)
139            .as_collection()
140    }
141    /// Applies a supplied function to each batch of updates.
142    ///
143    /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
144    /// timely dataflow capability associated with the batch of updates. The observed batching depends
145    /// on how the system executes, and may vary run to run.
146    ///
147    /// # Examples
148    ///
149    /// ```
150    /// use differential_dataflow::input::Input;
151    ///
152    /// ::timely::example(|scope| {
153    ///     scope.new_collection_from(1 .. 10).1
154    ///          .map_in_place(|x| *x *= 2)
155    ///          .filter(|x| x % 2 == 1)
156    ///          .inspect_container(|event| println!("event: {:?}", event));
157    /// });
158    /// ```
159    pub fn inspect_container<F>(&self, func: F) -> Self
160    where
161        F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static,
162    {
163        self.inner
164            .inspect_container(func)
165            .as_collection()
166    }
167    /// Attaches a timely dataflow probe to the output of a Collection.
168    ///
169    /// This probe is used to determine when the state of the Collection has stabilized and can
170    /// be read out.
171    pub fn probe(&self) -> probe::Handle<G::Timestamp> {
172        self.inner
173            .probe()
174    }
175    /// Attaches a timely dataflow probe to the output of a Collection.
176    ///
177    /// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
178    /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
179    /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
180    /// avoid swamping the system.
181    pub fn probe_with(&self, handle: &probe::Handle<G::Timestamp>) -> Self {
182        Self::new(self.inner.probe_with(handle))
183    }
184    /// The scope containing the underlying timely dataflow stream.
185    pub fn scope(&self) -> G {
186        self.inner.scope()
187    }
188
189    /// Creates a new collection whose counts are the negation of those in the input.
190    ///
191    /// This method is most commonly used with `concat` to get those element in one collection but not another.
192    /// However, differential dataflow computations are still defined for all values of the difference type `R`,
193    /// including negative counts.
194    ///
195    /// # Examples
196    ///
197    /// ```
198    /// use differential_dataflow::input::Input;
199    ///
200    /// ::timely::example(|scope| {
201    ///
202    ///     let data = scope.new_collection_from(1 .. 10).1;
203    ///
204    ///     let odds = data.filter(|x| x % 2 == 1);
205    ///     let evens = data.filter(|x| x % 2 == 0);
206    ///
207    ///     odds.negate()
208    ///         .concat(&data)
209    ///         .assert_eq(&evens);
210    /// });
211    /// ```
212    pub fn negate(&self) -> Self where C: containers::Negate {
213        use timely::dataflow::channels::pact::Pipeline;
214        self.inner
215            .unary(Pipeline, "Negate", move |_,_| move |input, output| {
216                input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).negate()));
217            })
218            .as_collection()
219    }
220
221    /// Brings a Collection into a nested scope.
222    ///
223    /// # Examples
224    ///
225    /// ```
226    /// use timely::dataflow::Scope;
227    /// use differential_dataflow::input::Input;
228    ///
229    /// ::timely::example(|scope| {
230    ///
231    ///     let data = scope.new_collection_from(1 .. 10).1;
232    ///
233    ///     let result = scope.region(|child| {
234    ///         data.enter(child)
235    ///             .leave()
236    ///     });
237    ///
238    ///     data.assert_eq(&result);
239    /// });
240    /// ```
241    pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, <C as containers::Enter<<G as ScopeParent>::Timestamp, T>>::InnerContainer>
242    where
243        C: containers::Enter<<G as ScopeParent>::Timestamp, T, InnerContainer: Container>,
244        T: Refines<<G as ScopeParent>::Timestamp>,
245    {
246        use timely::dataflow::channels::pact::Pipeline;
247        self.inner
248            .enter(child)
249            .unary(Pipeline, "Enter", move |_,_| move |input, output| {
250                input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).enter()));
251            })
252            .as_collection()
253    }
254
255    /// Advances a timestamp in the stream according to the timestamp actions on the path.
256    ///
257    /// The path may advance the timestamp sufficiently that it is no longer valid, for example if
258    /// incrementing fields would result in integer overflow. In this case, the record is dropped.
259    ///
260    /// # Examples
261    /// ```
262    /// use timely::dataflow::Scope;
263    /// use timely::dataflow::operators::{ToStream, Concat, Inspect, BranchWhen};
264    ///
265    /// use differential_dataflow::input::Input;
266    ///
267    /// timely::example(|scope| {
268    ///     let summary1 = 5;
269    ///
270    ///     let data = scope.new_collection_from(1 .. 10).1;
271    ///     /// Applies `results_in` on every timestamp in the collection.
272    ///     data.results_in(summary1);
273    /// });
274    /// ```
275    pub fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self
276    where
277        C: containers::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
278    {
279        use timely::dataflow::channels::pact::Pipeline;
280        self.inner
281            .unary(Pipeline, "ResultsIn", move |_,_| move |input, output| {
282                input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).results_in(&step)));
283            })
284            .as_collection()
285    }
286}
287
288impl<G: Scope, D: Clone+'static, R: Clone+'static> VecCollection<G, D, R> {
289    /// Creates a new collection by applying the supplied function to each input element.
290    ///
291    /// # Examples
292    ///
293    /// ```
294    /// use differential_dataflow::input::Input;
295    ///
296    /// ::timely::example(|scope| {
297    ///     scope.new_collection_from(1 .. 10).1
298    ///          .map(|x| x * 2)
299    ///          .filter(|x| x % 2 == 1)
300    ///          .assert_empty();
301    /// });
302    /// ```
303    pub fn map<D2, L>(&self, mut logic: L) -> VecCollection<G, D2, R>
304    where
305        D2: Data,
306        L: FnMut(D) -> D2 + 'static,
307    {
308        self.inner
309            .map(move |(data, time, delta)| (logic(data), time, delta))
310            .as_collection()
311    }
312    /// Creates a new collection by applying the supplied function to each input element.
313    ///
314    /// Although the name suggests in-place mutation, this function does not change the source collection,
315    /// but rather re-uses the underlying allocations in its implementation. The method is semantically
316    /// equivalent to `map`, but can be more efficient.
317    ///
318    /// # Examples
319    ///
320    /// ```
321    /// use differential_dataflow::input::Input;
322    ///
323    /// ::timely::example(|scope| {
324    ///     scope.new_collection_from(1 .. 10).1
325    ///          .map_in_place(|x| *x *= 2)
326    ///          .filter(|x| x % 2 == 1)
327    ///          .assert_empty();
328    /// });
329    /// ```
330    pub fn map_in_place<L>(&self, mut logic: L) -> VecCollection<G, D, R>
331    where
332        L: FnMut(&mut D) + 'static,
333    {
334        self.inner
335            .map_in_place(move |&mut (ref mut data, _, _)| logic(data))
336            .as_collection()
337    }
338    /// Creates a new collection by applying the supplied function to each input element and accumulating the results.
339    ///
340    /// This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be
341    /// warned that if the iterators produce substantial amounts of data, they are currently fully drained before
342    /// attempting to consolidate the results.
343    ///
344    /// # Examples
345    ///
346    /// ```
347    /// use differential_dataflow::input::Input;
348    ///
349    /// ::timely::example(|scope| {
350    ///     scope.new_collection_from(1 .. 10).1
351    ///          .flat_map(|x| 0 .. x);
352    /// });
353    /// ```
354    pub fn flat_map<I, L>(&self, mut logic: L) -> VecCollection<G, I::Item, R>
355    where
356        G::Timestamp: Clone,
357        I: IntoIterator<Item: Data>,
358        L: FnMut(D) -> I + 'static,
359    {
360        self.inner
361            .flat_map(move |(data, time, delta)| logic(data).into_iter().map(move |x| (x, time.clone(), delta.clone())))
362            .as_collection()
363    }
364    /// Creates a new collection containing those input records satisfying the supplied predicate.
365    ///
366    /// # Examples
367    ///
368    /// ```
369    /// use differential_dataflow::input::Input;
370    ///
371    /// ::timely::example(|scope| {
372    ///     scope.new_collection_from(1 .. 10).1
373    ///          .map(|x| x * 2)
374    ///          .filter(|x| x % 2 == 1)
375    ///          .assert_empty();
376    /// });
377    /// ```
378    pub fn filter<L>(&self, mut logic: L) -> VecCollection<G, D, R>
379    where
380        L: FnMut(&D) -> bool + 'static,
381    {
382        self.inner
383            .filter(move |(data, _, _)| logic(data))
384            .as_collection()
385    }
386    /// Replaces each record with another, with a new difference type.
387    ///
388    /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
389    /// and move the data into the difference component. This will allow differential dataflow to update in-place.
390    ///
391    /// # Examples
392    ///
393    /// ```
394    /// use differential_dataflow::input::Input;
395    ///
396    /// ::timely::example(|scope| {
397    ///
398    ///     let nums = scope.new_collection_from(0 .. 10).1;
399    ///     let x1 = nums.flat_map(|x| 0 .. x);
400    ///     let x2 = nums.map(|x| (x, 9 - x))
401    ///                  .explode(|(x,y)| Some((x,y)));
402    ///
403    ///     x1.assert_eq(&x2);
404    /// });
405    /// ```
406    pub fn explode<D2, R2, I, L>(&self, mut logic: L) -> VecCollection<G, D2, <R2 as Multiply<R>>::Output>
407    where
408        D2: Data,
409        R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
410        I: IntoIterator<Item=(D2,R2)>,
411        L: FnMut(D)->I+'static,
412    {
413        self.inner
414            .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,d2)| (x, t.clone(), d2.multiply(&d))))
415            .as_collection()
416    }
417
418    /// Joins each record against a collection defined by the function `logic`.
419    ///
420    /// This method performs what is essentially a join with the collection of records `(x, logic(x))`.
421    /// Rather than materialize this second relation, `logic` is applied to each record and the appropriate
422    /// modifications made to the results, namely joining timestamps and multiplying differences.
423    ///
424    /// #Examples
425    ///
426    /// ```
427    /// use differential_dataflow::input::Input;
428    ///
429    /// ::timely::example(|scope| {
430    ///     // creates `x` copies of `2*x` from time `3*x` until `4*x`,
431    ///     // for x from 0 through 9.
432    ///     scope.new_collection_from(0 .. 10isize).1
433    ///          .join_function(|x|
434    ///              //   data      time      diff
435    ///              vec![(2*x, (3*x) as u64,  x),
436    ///                   (2*x, (4*x) as u64, -x)]
437    ///           );
438    /// });
439    /// ```
440    pub fn join_function<D2, R2, I, L>(&self, mut logic: L) -> VecCollection<G, D2, <R2 as Multiply<R>>::Output>
441    where
442        G::Timestamp: Lattice,
443        D2: Data,
444        R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
445        I: IntoIterator<Item=(D2,G::Timestamp,R2)>,
446        L: FnMut(D)->I+'static,
447    {
448        self.inner
449            .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,t2,d2)| (x, t.join(&t2), d2.multiply(&d))))
450            .as_collection()
451    }
452
453    /// Brings a Collection into a nested scope, at varying times.
454    ///
455    /// The `initial` function indicates the time at which each element of the Collection should appear.
456    ///
457    /// # Examples
458    ///
459    /// ```
460    /// use timely::dataflow::Scope;
461    /// use differential_dataflow::input::Input;
462    ///
463    /// ::timely::example(|scope| {
464    ///
465    ///     let data = scope.new_collection_from(1 .. 10).1;
466    ///
467    ///     let result = scope.iterative::<u64,_,_>(|child| {
468    ///         data.enter_at(child, |x| *x)
469    ///             .leave()
470    ///     });
471    ///
472    ///     data.assert_eq(&result);
473    /// });
474    /// ```
475    pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, mut initial: F) -> VecCollection<Iterative<'a, G, T>, D, R>
476    where
477        T: Timestamp+Hash,
478        F: FnMut(&D) -> T + Clone + 'static,
479        G::Timestamp: Hash,
480    {
481        self.inner
482            .enter(child)
483            .map(move |(data, time, diff)| {
484                let new_time = Product::new(time, initial(&data));
485                (data, new_time, diff)
486            })
487            .as_collection()
488    }
489
490    /// Delays each difference by a supplied function.
491    ///
492    /// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly
493    /// wrong if that assumption is incorrect. It is also critical that `func` be monotonic: if two times are
494    /// ordered, they should have the same order or compare equal once `func` is applied to them (this
495    /// is because we advance the timely capability with the same logic, and it must remain `less_equal`
496    /// to all of the data timestamps).
497    pub fn delay<F>(&self, func: F) -> VecCollection<G, D, R>
498    where
499        G::Timestamp: Hash,
500        F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static,
501    {
502        let mut func1 = func.clone();
503        let mut func2 = func.clone();
504
505        self.inner
506            .delay_batch(move |x| func1(x))
507            .map_in_place(move |x| x.1 = func2(&x.1))
508            .as_collection()
509    }
510
511    /// Applies a supplied function to each update.
512    ///
513    /// This method is most commonly used to report information back to the user, often for debugging purposes.
514    /// Any function can be used here, but be warned that the incremental nature of differential dataflow does
515    /// not guarantee that it will be called as many times as you might expect.
516    ///
517    /// The `(data, time, diff)` triples indicate a change `diff` to the frequency of `data` which takes effect
518    /// at the logical time `time`. When times are totally ordered (for example, `usize`), these updates reflect
519    /// the changes along the sequence of collections. For partially ordered times, the mathematics are more
520    /// interesting and less intuitive, unfortunately.
521    ///
522    /// # Examples
523    ///
524    /// ```
525    /// use differential_dataflow::input::Input;
526    ///
527    /// ::timely::example(|scope| {
528    ///     scope.new_collection_from(1 .. 10).1
529    ///          .map_in_place(|x| *x *= 2)
530    ///          .filter(|x| x % 2 == 1)
531    ///          .inspect(|x| println!("error: {:?}", x));
532    /// });
533    /// ```
534    pub fn inspect<F>(&self, func: F) -> VecCollection<G, D, R>
535    where
536        F: FnMut(&(D, G::Timestamp, R))+'static,
537    {
538        self.inner
539            .inspect(func)
540            .as_collection()
541    }
542    /// Applies a supplied function to each batch of updates.
543    ///
544    /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
545    /// timely dataflow capability associated with the batch of updates. The observed batching depends
546    /// on how the system executes, and may vary run to run.
547    ///
548    /// # Examples
549    ///
550    /// ```
551    /// use differential_dataflow::input::Input;
552    ///
553    /// ::timely::example(|scope| {
554    ///     scope.new_collection_from(1 .. 10).1
555    ///          .map_in_place(|x| *x *= 2)
556    ///          .filter(|x| x % 2 == 1)
557    ///          .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
558    /// });
559    /// ```
560    pub fn inspect_batch<F>(&self, mut func: F) -> VecCollection<G, D, R>
561    where
562        F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static,
563    {
564        self.inner
565            .inspect_batch(move |time, data| func(time, data))
566            .as_collection()
567    }
568
569    /// Assert if the collection is ever non-empty.
570    ///
571    /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
572    /// is not run, or not run to completion, there may be un-exercised times at which the collection could be
573    /// non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a
574    /// program should indicate that this assertion never found cause to complain.
575    ///
576    /// # Examples
577    ///
578    /// ```
579    /// use differential_dataflow::input::Input;
580    ///
581    /// ::timely::example(|scope| {
582    ///     scope.new_collection_from(1 .. 10).1
583    ///          .map(|x| x * 2)
584    ///          .filter(|x| x % 2 == 1)
585    ///          .assert_empty();
586    /// });
587    /// ```
588    pub fn assert_empty(&self)
589    where
590        D: crate::ExchangeData+Hashable,
591        R: crate::ExchangeData+Hashable + Semigroup,
592        G::Timestamp: Lattice+Ord,
593    {
594        self.consolidate()
595            .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
596    }
597}
598
599use timely::dataflow::scopes::ScopeParent;
600use timely::progress::timestamp::Refines;
601
602/// Methods requiring a nested scope.
603impl<'a, G: Scope, T: Timestamp, C: Container> Collection<Child<'a, G, T>, C>
604where
605    C: containers::Leave<T, G::Timestamp, OuterContainer: Container>,
606    T: Refines<<G as ScopeParent>::Timestamp>,
607{
608    /// Returns the final value of a Collection from a nested scope to its containing scope.
609    ///
610    /// # Examples
611    ///
612    /// ```
613    /// use timely::dataflow::Scope;
614    /// use differential_dataflow::input::Input;
615    ///
616    /// ::timely::example(|scope| {
617    ///
618    ///    let data = scope.new_collection_from(1 .. 10).1;
619    ///
620    ///    let result = scope.region(|child| {
621    ///         data.enter(child)
622    ///             .leave()
623    ///     });
624    ///
625    ///     data.assert_eq(&result);
626    /// });
627    /// ```
628    pub fn leave(&self) -> Collection<G, <C as containers::Leave<T, G::Timestamp>>::OuterContainer> {
629        use timely::dataflow::channels::pact::Pipeline;
630        self.inner
631            .leave()
632            .unary(Pipeline, "Leave", move |_,_| move |input, output| {
633                input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).leave()));
634            })
635            .as_collection()
636    }
637}
638
639/// Methods requiring a region as the scope.
640impl<G: Scope, C: Container+Data> Collection<Child<'_, G, G::Timestamp>, C>
641{
642    /// Returns the value of a Collection from a nested region to its containing scope.
643    ///
644    /// This method is a specialization of `leave` to the case that of a nested region.
645    /// It removes the need for an operator that adjusts the timestamp.
646    pub fn leave_region(&self) -> Collection<G, C> {
647        self.inner
648            .leave()
649            .as_collection()
650    }
651}
652
653/// Methods requiring an Abelian difference, to support negation.
654impl<G: Scope<Timestamp: Data>, D: Clone+'static, R: Abelian+'static> VecCollection<G, D, R> {
655    /// Assert if the collections are ever different.
656    ///
657    /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
658    /// is not run, or not run to completion, there may be un-exercised times at which the collections could vary.
659    /// Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should
660    /// indicate that this assertion never found cause to complain.
661    ///
662    /// # Examples
663    ///
664    /// ```
665    /// use differential_dataflow::input::Input;
666    ///
667    /// ::timely::example(|scope| {
668    ///
669    ///     let data = scope.new_collection_from(1 .. 10).1;
670    ///
671    ///     let odds = data.filter(|x| x % 2 == 1);
672    ///     let evens = data.filter(|x| x % 2 == 0);
673    ///
674    ///     odds.concat(&evens)
675    ///         .assert_eq(&data);
676    /// });
677    /// ```
678    pub fn assert_eq(&self, other: &Self)
679    where
680        D: crate::ExchangeData+Hashable,
681        R: crate::ExchangeData+Hashable,
682        G::Timestamp: Lattice+Ord,
683    {
684        self.negate()
685            .concat(other)
686            .assert_empty();
687    }
688}
689
690/// Conversion to a differential dataflow Collection.
691pub trait AsCollection<G: Scope, C> {
692    /// Converts the type to a differential dataflow collection.
693    fn as_collection(&self) -> Collection<G, C>;
694}
695
696impl<G: Scope, C: Clone> AsCollection<G, C> for StreamCore<G, C> {
697    /// Converts the type to a differential dataflow collection.
698    ///
699    /// By calling this method, you guarantee that the timestamp invariant (as documented on
700    /// [Collection]) is upheld. This method will not check it.
701    fn as_collection(&self) -> Collection<G, C> {
702        Collection::<G,C>::new(self.clone())
703    }
704}
705
706/// Concatenates multiple collections.
707///
708/// This method has the effect of a sequence of calls to `concat`, but it does
709/// so in one operator rather than a chain of many operators.
710///
711/// # Examples
712///
713/// ```
714/// use differential_dataflow::input::Input;
715///
716/// ::timely::example(|scope| {
717///
718///     let data = scope.new_collection_from(1 .. 10).1;
719///
720///     let odds = data.filter(|x| x % 2 == 1);
721///     let evens = data.filter(|x| x % 2 == 0);
722///
723///     differential_dataflow::collection::concatenate(scope, vec![odds, evens])
724///         .assert_eq(&data);
725/// });
726/// ```
727pub fn concatenate<G, C, I>(scope: &mut G, iterator: I) -> Collection<G, C>
728where
729    G: Scope,
730    C: Container,
731    I: IntoIterator<Item=Collection<G, C>>,
732{
733    scope
734        .concatenate(iterator.into_iter().map(|x| x.inner))
735        .as_collection()
736}
737
738/// Traits that can be implemented by containers to provide functionality to collections based on them.
739pub mod containers {
740
741    use timely::progress::{Timestamp, timestamp::Refines};
742    use crate::collection::Abelian;
743
744    /// A container that can negate its updates.
745    pub trait Negate {
746        /// Negates Abelian differences of each update.
747        fn negate(self) -> Self;
748    }
749    impl<D, T, R: Abelian> Negate for Vec<(D, T, R)> {
750        fn negate(mut self) -> Self {
751            for (_data, _time, diff) in self.iter_mut() {
752                diff.negate();
753            }
754            self
755        }
756    }
757
758    /// A container that can enter from timestamp `T1` into timestamp `T2`.
759    pub trait Enter<T1, T2> {
760        /// The resulting container type.
761        type InnerContainer;
762        /// Update timestamps from `T1` to `T2`.
763        fn enter(self) -> Self::InnerContainer;
764    }
765    impl<D, T1: Timestamp, T2: Refines<T1>, R> Enter<T1, T2> for Vec<(D, T1, R)> {
766        type InnerContainer = Vec<(D, T2, R)>;
767        fn enter(self) -> Self::InnerContainer {
768            self.into_iter().map(|(d,t1,r)| (d,T2::to_inner(t1),r)).collect()
769        }
770    }
771
772    /// A container that can leave from timestamp `T1` into timestamp `T2`.
773    pub trait Leave<T1, T2> {
774        /// The resulting container type.
775        type OuterContainer;
776        /// Update timestamps from `T1` to `T2`.
777        fn leave(self) -> Self::OuterContainer;
778    }
779    impl<D, T1: Refines<T2>, T2: Timestamp, R> Leave<T1, T2> for Vec<(D, T1, R)> {
780        type OuterContainer = Vec<(D, T2, R)>;
781        fn leave(self) -> Self::OuterContainer {
782            self.into_iter().map(|(d,t1,r)| (d,t1.to_outer(),r)).collect()
783        }
784    }
785
786    /// A container that can advance timestamps by a summary `TS`.
787    pub trait ResultsIn<TS> {
788        /// Advance times in the container by `step`.
789        fn results_in(self, step: &TS) -> Self;
790    }
791    impl<D, T: Timestamp, R> ResultsIn<T::Summary> for Vec<(D, T, R)> {
792        fn results_in(self, step: &T::Summary) -> Self {
793            use timely::progress::PathSummary;
794            self.into_iter().filter_map(move |(d,t,r)| step.results_in(&t).map(|t| (d,t,r))).collect()
795        }
796    }
797}