differential_dataflow/collection.rs
1//! Types and traits associated with collections of data.
2//!
3//! The `Collection` type is differential dataflow's core abstraction for an updatable pile of data.
4//!
5//! Most differential dataflow programs are "collection-oriented", in the sense that they transform
6//! one collection into another, using operators defined on collections. This contrasts with a more
7//! imperative programming style, in which one might iterate through the contents of a collection
8//! manually. The higher-level of programming allows differential dataflow to provide efficient
9//! implementations, and to support efficient incremental updates to the collections.
10
11use std::hash::Hash;
12
13use timely::Container;
14use timely::Data;
15use timely::progress::Timestamp;
16use timely::order::Product;
17use timely::dataflow::scopes::{Child, child::Iterative};
18use timely::dataflow::Scope;
19use timely::dataflow::operators::*;
20use timely::dataflow::StreamCore;
21
22use crate::difference::{Semigroup, Abelian, Multiply};
23use crate::lattice::Lattice;
24use crate::hashable::Hashable;
25
26/// A mutable collection of values of type `D`
27///
28/// The `Collection` type is the core abstraction in differential dataflow programs. As you write your
29/// differential dataflow computation, you write as if the collection is a static dataset to which you
30/// apply functional transformations, creating new collections. Once your computation is written, you
31/// are able to mutate the collection (by inserting and removing elements); differential dataflow will
32/// propagate changes through your functional computation and report the corresponding changes to the
33/// output collections.
34///
35/// Each collection has three generic parameters. The parameter `G` is for the scope in which the
36/// collection exists; as you write more complicated programs you may wish to introduce nested scopes
37/// (e.g. for iteration) and this parameter tracks the scope (for timely dataflow's benefit). The `D`
38/// parameter is the type of data in your collection, for example `String`, or `(u32, Vec<Option<()>>)`.
39/// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
40/// defaults to) `isize`, representing changes to the occurrence count of each record.
41#[derive(Clone)]
42pub struct Collection<G: Scope, D, R = isize, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>> {
43 /// The underlying timely dataflow stream.
44 ///
45 /// This field is exposed to support direct timely dataflow manipulation when required, but it is
46 /// not intended to be the idiomatic way to work with the collection.
47 ///
48 /// The timestamp in the data is required to always be at least the timestamp _of_ the data, in
49 /// the timely-dataflow sense. If this invariant is not upheld, differential operators may behave
50 /// unexpectedly.
51 pub inner: timely::dataflow::StreamCore<G, C>,
52 /// Phantom data for unreferenced `D` and `R` types.
53 phantom: std::marker::PhantomData<(D, R)>,
54}
55
56impl<G: Scope, D, R, C> Collection<G, D, R, C> {
57 /// Creates a new Collection from a timely dataflow stream.
58 ///
59 /// This method seems to be rarely used, with the `as_collection` method on streams being a more
60 /// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
61 /// provides a `new_collection` method which will create a new collection for you without exposing
62 /// the underlying timely stream at all.
63 ///
64 /// This stream should satisfy the timestamp invariant as documented on [Collection]; this
65 /// method does not check it.
66 pub fn new(stream: StreamCore<G, C>) -> Collection<G, D, R, C> {
67 Collection { inner: stream, phantom: std::marker::PhantomData }
68 }
69}
70impl<G: Scope, D, R, C: Container + Clone + 'static> Collection<G, D, R, C> {
71 /// Creates a new collection accumulating the contents of the two collections.
72 ///
73 /// Despite the name, differential dataflow collections are unordered. This method is so named because the
74 /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
75 /// two collections.
76 ///
77 /// # Examples
78 ///
79 /// ```
80 /// use differential_dataflow::input::Input;
81 ///
82 /// ::timely::example(|scope| {
83 ///
84 /// let data = scope.new_collection_from(1 .. 10).1;
85 ///
86 /// let odds = data.filter(|x| x % 2 == 1);
87 /// let evens = data.filter(|x| x % 2 == 0);
88 ///
89 /// odds.concat(&evens)
90 /// .assert_eq(&data);
91 /// });
92 /// ```
93 pub fn concat(&self, other: &Self) -> Self {
94 self.inner
95 .concat(&other.inner)
96 .as_collection()
97 }
98 /// Creates a new collection accumulating the contents of the two collections.
99 ///
100 /// Despite the name, differential dataflow collections are unordered. This method is so named because the
101 /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
102 /// two collections.
103 ///
104 /// # Examples
105 ///
106 /// ```
107 /// use differential_dataflow::input::Input;
108 ///
109 /// ::timely::example(|scope| {
110 ///
111 /// let data = scope.new_collection_from(1 .. 10).1;
112 ///
113 /// let odds = data.filter(|x| x % 2 == 1);
114 /// let evens = data.filter(|x| x % 2 == 0);
115 ///
116 /// odds.concatenate(Some(evens))
117 /// .assert_eq(&data);
118 /// });
119 /// ```
120 pub fn concatenate<I>(&self, sources: I) -> Self
121 where
122 I: IntoIterator<Item=Self>
123 {
124 self.inner
125 .concatenate(sources.into_iter().map(|x| x.inner))
126 .as_collection()
127 }
128 // Brings a Collection into a nested region.
129 ///
130 /// This method is a specialization of `enter` to the case where the nested scope is a region.
131 /// It removes the need for an operator that adjusts the timestamp.
132 pub fn enter_region<'a>(&self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, D, R, C> {
133 self.inner
134 .enter(child)
135 .as_collection()
136 }
137 /// Applies a supplied function to each batch of updates.
138 ///
139 /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
140 /// timely dataflow capability associated with the batch of updates. The observed batching depends
141 /// on how the system executes, and may vary run to run.
142 ///
143 /// # Examples
144 ///
145 /// ```
146 /// use differential_dataflow::input::Input;
147 ///
148 /// ::timely::example(|scope| {
149 /// scope.new_collection_from(1 .. 10).1
150 /// .map_in_place(|x| *x *= 2)
151 /// .filter(|x| x % 2 == 1)
152 /// .inspect_container(|event| println!("event: {:?}", event));
153 /// });
154 /// ```
155 pub fn inspect_container<F>(&self, func: F) -> Self
156 where
157 F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static,
158 {
159 self.inner
160 .inspect_container(func)
161 .as_collection()
162 }
163 /// Attaches a timely dataflow probe to the output of a Collection.
164 ///
165 /// This probe is used to determine when the state of the Collection has stabilized and can
166 /// be read out.
167 pub fn probe(&self) -> probe::Handle<G::Timestamp> {
168 self.inner
169 .probe()
170 }
171 /// Attaches a timely dataflow probe to the output of a Collection.
172 ///
173 /// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
174 /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
175 /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
176 /// avoid swamping the system.
177 pub fn probe_with(&self, handle: &probe::Handle<G::Timestamp>) -> Self {
178 Self::new(self.inner.probe_with(handle))
179 }
180 /// The scope containing the underlying timely dataflow stream.
181 pub fn scope(&self) -> G {
182 self.inner.scope()
183 }
184
185 /// Creates a new collection whose counts are the negation of those in the input.
186 ///
187 /// This method is most commonly used with `concat` to get those element in one collection but not another.
188 /// However, differential dataflow computations are still defined for all values of the difference type `R`,
189 /// including negative counts.
190 ///
191 /// # Examples
192 ///
193 /// ```
194 /// use differential_dataflow::input::Input;
195 ///
196 /// ::timely::example(|scope| {
197 ///
198 /// let data = scope.new_collection_from(1 .. 10).1;
199 ///
200 /// let odds = data.filter(|x| x % 2 == 1);
201 /// let evens = data.filter(|x| x % 2 == 0);
202 ///
203 /// odds.negate()
204 /// .concat(&data)
205 /// .assert_eq(&evens);
206 /// });
207 /// ```
208 // TODO: Removing this function is possible, but breaks existing callers of `negate` who expect
209 // an inherent method on `Collection`.
210 pub fn negate(&self) -> Collection<G, D, R, C>
211 where
212 StreamCore<G, C>: crate::operators::Negate<G, C>
213 {
214 crate::operators::Negate::negate(&self.inner).as_collection()
215 }
216}
217
218impl<G: Scope, D: Clone+'static, R: Clone+'static> Collection<G, D, R> {
219 /// Creates a new collection by applying the supplied function to each input element.
220 ///
221 /// # Examples
222 ///
223 /// ```
224 /// use differential_dataflow::input::Input;
225 ///
226 /// ::timely::example(|scope| {
227 /// scope.new_collection_from(1 .. 10).1
228 /// .map(|x| x * 2)
229 /// .filter(|x| x % 2 == 1)
230 /// .assert_empty();
231 /// });
232 /// ```
233 pub fn map<D2, L>(&self, mut logic: L) -> Collection<G, D2, R>
234 where
235 D2: Data,
236 L: FnMut(D) -> D2 + 'static,
237 {
238 self.inner
239 .map(move |(data, time, delta)| (logic(data), time, delta))
240 .as_collection()
241 }
242 /// Creates a new collection by applying the supplied function to each input element.
243 ///
244 /// Although the name suggests in-place mutation, this function does not change the source collection,
245 /// but rather re-uses the underlying allocations in its implementation. The method is semantically
246 /// equivalent to `map`, but can be more efficient.
247 ///
248 /// # Examples
249 ///
250 /// ```
251 /// use differential_dataflow::input::Input;
252 ///
253 /// ::timely::example(|scope| {
254 /// scope.new_collection_from(1 .. 10).1
255 /// .map_in_place(|x| *x *= 2)
256 /// .filter(|x| x % 2 == 1)
257 /// .assert_empty();
258 /// });
259 /// ```
260 pub fn map_in_place<L>(&self, mut logic: L) -> Collection<G, D, R>
261 where
262 L: FnMut(&mut D) + 'static,
263 {
264 self.inner
265 .map_in_place(move |&mut (ref mut data, _, _)| logic(data))
266 .as_collection()
267 }
268 /// Creates a new collection by applying the supplied function to each input element and accumulating the results.
269 ///
270 /// This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be
271 /// warned that if the iterators produce substantial amounts of data, they are currently fully drained before
272 /// attempting to consolidate the results.
273 ///
274 /// # Examples
275 ///
276 /// ```
277 /// use differential_dataflow::input::Input;
278 ///
279 /// ::timely::example(|scope| {
280 /// scope.new_collection_from(1 .. 10).1
281 /// .flat_map(|x| 0 .. x);
282 /// });
283 /// ```
284 pub fn flat_map<I, L>(&self, mut logic: L) -> Collection<G, I::Item, R>
285 where
286 G::Timestamp: Clone,
287 I: IntoIterator<Item: Data>,
288 L: FnMut(D) -> I + 'static,
289 {
290 self.inner
291 .flat_map(move |(data, time, delta)| logic(data).into_iter().map(move |x| (x, time.clone(), delta.clone())))
292 .as_collection()
293 }
294 /// Creates a new collection containing those input records satisfying the supplied predicate.
295 ///
296 /// # Examples
297 ///
298 /// ```
299 /// use differential_dataflow::input::Input;
300 ///
301 /// ::timely::example(|scope| {
302 /// scope.new_collection_from(1 .. 10).1
303 /// .map(|x| x * 2)
304 /// .filter(|x| x % 2 == 1)
305 /// .assert_empty();
306 /// });
307 /// ```
308 pub fn filter<L>(&self, mut logic: L) -> Collection<G, D, R>
309 where
310 L: FnMut(&D) -> bool + 'static,
311 {
312 self.inner
313 .filter(move |(data, _, _)| logic(data))
314 .as_collection()
315 }
316 /// Replaces each record with another, with a new difference type.
317 ///
318 /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
319 /// and move the data into the difference component. This will allow differential dataflow to update in-place.
320 ///
321 /// # Examples
322 ///
323 /// ```
324 /// use differential_dataflow::input::Input;
325 ///
326 /// ::timely::example(|scope| {
327 ///
328 /// let nums = scope.new_collection_from(0 .. 10).1;
329 /// let x1 = nums.flat_map(|x| 0 .. x);
330 /// let x2 = nums.map(|x| (x, 9 - x))
331 /// .explode(|(x,y)| Some((x,y)));
332 ///
333 /// x1.assert_eq(&x2);
334 /// });
335 /// ```
336 pub fn explode<D2, R2, I, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
337 where
338 D2: Data,
339 R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
340 I: IntoIterator<Item=(D2,R2)>,
341 L: FnMut(D)->I+'static,
342 {
343 self.inner
344 .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,d2)| (x, t.clone(), d2.multiply(&d))))
345 .as_collection()
346 }
347
348 /// Joins each record against a collection defined by the function `logic`.
349 ///
350 /// This method performs what is essentially a join with the collection of records `(x, logic(x))`.
351 /// Rather than materialize this second relation, `logic` is applied to each record and the appropriate
352 /// modifications made to the results, namely joining timestamps and multiplying differences.
353 ///
354 /// #Examples
355 ///
356 /// ```
357 /// use differential_dataflow::input::Input;
358 ///
359 /// ::timely::example(|scope| {
360 /// // creates `x` copies of `2*x` from time `3*x` until `4*x`,
361 /// // for x from 0 through 9.
362 /// scope.new_collection_from(0 .. 10isize).1
363 /// .join_function(|x|
364 /// // data time diff
365 /// vec![(2*x, (3*x) as u64, x),
366 /// (2*x, (4*x) as u64, -x)]
367 /// );
368 /// });
369 /// ```
370 pub fn join_function<D2, R2, I, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
371 where
372 G::Timestamp: Lattice,
373 D2: Data,
374 R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
375 I: IntoIterator<Item=(D2,G::Timestamp,R2)>,
376 L: FnMut(D)->I+'static,
377 {
378 self.inner
379 .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,t2,d2)| (x, t.join(&t2), d2.multiply(&d))))
380 .as_collection()
381 }
382
383 /// Brings a Collection into a nested scope.
384 ///
385 /// # Examples
386 ///
387 /// ```
388 /// use timely::dataflow::Scope;
389 /// use differential_dataflow::input::Input;
390 ///
391 /// ::timely::example(|scope| {
392 ///
393 /// let data = scope.new_collection_from(1 .. 10).1;
394 ///
395 /// let result = scope.region(|child| {
396 /// data.enter(child)
397 /// .leave()
398 /// });
399 ///
400 /// data.assert_eq(&result);
401 /// });
402 /// ```
403 pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, D, R>
404 where
405 T: Refines<<G as ScopeParent>::Timestamp>,
406 {
407 self.inner
408 .enter(child)
409 .map(|(data, time, diff)| (data, T::to_inner(time), diff))
410 .as_collection()
411 }
412
413 /// Brings a Collection into a nested scope, at varying times.
414 ///
415 /// The `initial` function indicates the time at which each element of the Collection should appear.
416 ///
417 /// # Examples
418 ///
419 /// ```
420 /// use timely::dataflow::Scope;
421 /// use differential_dataflow::input::Input;
422 ///
423 /// ::timely::example(|scope| {
424 ///
425 /// let data = scope.new_collection_from(1 .. 10).1;
426 ///
427 /// let result = scope.iterative::<u64,_,_>(|child| {
428 /// data.enter_at(child, |x| *x)
429 /// .leave()
430 /// });
431 ///
432 /// data.assert_eq(&result);
433 /// });
434 /// ```
435 pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, mut initial: F) -> Collection<Iterative<'a, G, T>, D, R>
436 where
437 T: Timestamp+Hash,
438 F: FnMut(&D) -> T + Clone + 'static,
439 G::Timestamp: Hash,
440 {
441 self.inner
442 .enter(child)
443 .map(move |(data, time, diff)| {
444 let new_time = Product::new(time, initial(&data));
445 (data, new_time, diff)
446 })
447 .as_collection()
448 }
449
450 /// Delays each difference by a supplied function.
451 ///
452 /// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly
453 /// wrong if that assumption is incorrect. It is also critical that `func` be monotonic: if two times are
454 /// ordered, they should have the same order or compare equal once `func` is applied to them (this
455 /// is because we advance the timely capability with the same logic, and it must remain `less_equal`
456 /// to all of the data timestamps).
457 pub fn delay<F>(&self, func: F) -> Collection<G, D, R>
458 where
459 F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static,
460 {
461 let mut func1 = func.clone();
462 let mut func2 = func.clone();
463
464 self.inner
465 .delay_batch(move |x| func1(x))
466 .map_in_place(move |x| x.1 = func2(&x.1))
467 .as_collection()
468 }
469
470 /// Applies a supplied function to each update.
471 ///
472 /// This method is most commonly used to report information back to the user, often for debugging purposes.
473 /// Any function can be used here, but be warned that the incremental nature of differential dataflow does
474 /// not guarantee that it will be called as many times as you might expect.
475 ///
476 /// The `(data, time, diff)` triples indicate a change `diff` to the frequency of `data` which takes effect
477 /// at the logical time `time`. When times are totally ordered (for example, `usize`), these updates reflect
478 /// the changes along the sequence of collections. For partially ordered times, the mathematics are more
479 /// interesting and less intuitive, unfortunately.
480 ///
481 /// # Examples
482 ///
483 /// ```
484 /// use differential_dataflow::input::Input;
485 ///
486 /// ::timely::example(|scope| {
487 /// scope.new_collection_from(1 .. 10).1
488 /// .map_in_place(|x| *x *= 2)
489 /// .filter(|x| x % 2 == 1)
490 /// .inspect(|x| println!("error: {:?}", x));
491 /// });
492 /// ```
493 pub fn inspect<F>(&self, func: F) -> Collection<G, D, R>
494 where
495 F: FnMut(&(D, G::Timestamp, R))+'static,
496 {
497 self.inner
498 .inspect(func)
499 .as_collection()
500 }
501 /// Applies a supplied function to each batch of updates.
502 ///
503 /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
504 /// timely dataflow capability associated with the batch of updates. The observed batching depends
505 /// on how the system executes, and may vary run to run.
506 ///
507 /// # Examples
508 ///
509 /// ```
510 /// use differential_dataflow::input::Input;
511 ///
512 /// ::timely::example(|scope| {
513 /// scope.new_collection_from(1 .. 10).1
514 /// .map_in_place(|x| *x *= 2)
515 /// .filter(|x| x % 2 == 1)
516 /// .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
517 /// });
518 /// ```
519 pub fn inspect_batch<F>(&self, mut func: F) -> Collection<G, D, R>
520 where
521 F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static,
522 {
523 self.inner
524 .inspect_batch(move |time, data| func(time, data))
525 .as_collection()
526 }
527
528 /// Assert if the collection is ever non-empty.
529 ///
530 /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
531 /// is not run, or not run to completion, there may be un-exercised times at which the collection could be
532 /// non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a
533 /// program should indicate that this assertion never found cause to complain.
534 ///
535 /// # Examples
536 ///
537 /// ```
538 /// use differential_dataflow::input::Input;
539 ///
540 /// ::timely::example(|scope| {
541 /// scope.new_collection_from(1 .. 10).1
542 /// .map(|x| x * 2)
543 /// .filter(|x| x % 2 == 1)
544 /// .assert_empty();
545 /// });
546 /// ```
547 pub fn assert_empty(&self)
548 where
549 D: crate::ExchangeData+Hashable,
550 R: crate::ExchangeData+Hashable + Semigroup,
551 G::Timestamp: Lattice+Ord,
552 {
553 self.consolidate()
554 .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
555 }
556}
557
558use timely::dataflow::scopes::ScopeParent;
559use timely::progress::timestamp::Refines;
560
561/// Methods requiring a nested scope.
562impl<'a, G: Scope, T: Timestamp, D: Clone+'static, R: Clone+'static> Collection<Child<'a, G, T>, D, R>
563where
564 T: Refines<<G as ScopeParent>::Timestamp>,
565{
566 /// Returns the final value of a Collection from a nested scope to its containing scope.
567 ///
568 /// # Examples
569 ///
570 /// ```
571 /// use timely::dataflow::Scope;
572 /// use differential_dataflow::input::Input;
573 ///
574 /// ::timely::example(|scope| {
575 ///
576 /// let data = scope.new_collection_from(1 .. 10).1;
577 ///
578 /// let result = scope.region(|child| {
579 /// data.enter(child)
580 /// .leave()
581 /// });
582 ///
583 /// data.assert_eq(&result);
584 /// });
585 /// ```
586 pub fn leave(&self) -> Collection<G, D, R> {
587 self.inner
588 .leave()
589 .map(|(data, time, diff)| (data, time.to_outer(), diff))
590 .as_collection()
591 }
592}
593
594/// Methods requiring a region as the scope.
595impl<G: Scope, D, R, C: Container+Data> Collection<Child<'_, G, G::Timestamp>, D, R, C>
596{
597 /// Returns the value of a Collection from a nested region to its containing scope.
598 ///
599 /// This method is a specialization of `leave` to the case that of a nested region.
600 /// It removes the need for an operator that adjusts the timestamp.
601 pub fn leave_region(&self) -> Collection<G, D, R, C> {
602 self.inner
603 .leave()
604 .as_collection()
605 }
606}
607
608/// Methods requiring an Abelian difference, to support negation.
609impl<G: Scope<Timestamp: Data>, D: Clone+'static, R: Abelian+'static> Collection<G, D, R> {
610 /// Assert if the collections are ever different.
611 ///
612 /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
613 /// is not run, or not run to completion, there may be un-exercised times at which the collections could vary.
614 /// Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should
615 /// indicate that this assertion never found cause to complain.
616 ///
617 /// # Examples
618 ///
619 /// ```
620 /// use differential_dataflow::input::Input;
621 ///
622 /// ::timely::example(|scope| {
623 ///
624 /// let data = scope.new_collection_from(1 .. 10).1;
625 ///
626 /// let odds = data.filter(|x| x % 2 == 1);
627 /// let evens = data.filter(|x| x % 2 == 0);
628 ///
629 /// odds.concat(&evens)
630 /// .assert_eq(&data);
631 /// });
632 /// ```
633 pub fn assert_eq(&self, other: &Self)
634 where
635 D: crate::ExchangeData+Hashable,
636 R: crate::ExchangeData+Hashable,
637 G::Timestamp: Lattice+Ord,
638 {
639 self.negate()
640 .concat(other)
641 .assert_empty();
642 }
643}
644
645/// Conversion to a differential dataflow Collection.
646pub trait AsCollection<G: Scope, D, R, C> {
647 /// Converts the type to a differential dataflow collection.
648 fn as_collection(&self) -> Collection<G, D, R, C>;
649}
650
651impl<G: Scope, D, R, C: Clone> AsCollection<G, D, R, C> for StreamCore<G, C> {
652 /// Converts the type to a differential dataflow collection.
653 ///
654 /// By calling this method, you guarantee that the timestamp invariant (as documented on
655 /// [Collection]) is upheld. This method will not check it.
656 fn as_collection(&self) -> Collection<G, D, R, C> {
657 Collection::<G,D,R,C>::new(self.clone())
658 }
659}
660
661/// Concatenates multiple collections.
662///
663/// This method has the effect of a sequence of calls to `concat`, but it does
664/// so in one operator rather than a chain of many operators.
665///
666/// # Examples
667///
668/// ```
669/// use differential_dataflow::input::Input;
670///
671/// ::timely::example(|scope| {
672///
673/// let data = scope.new_collection_from(1 .. 10).1;
674///
675/// let odds = data.filter(|x| x % 2 == 1);
676/// let evens = data.filter(|x| x % 2 == 0);
677///
678/// differential_dataflow::collection::concatenate(scope, vec![odds, evens])
679/// .assert_eq(&data);
680/// });
681/// ```
682pub fn concatenate<G, D, R, C, I>(scope: &mut G, iterator: I) -> Collection<G, D, R, C>
683where
684 G: Scope,
685 D: Data,
686 R: Semigroup + 'static,
687 C: Container + Clone + 'static,
688 I: IntoIterator<Item=Collection<G, D, R, C>>,
689{
690 scope
691 .concatenate(iterator.into_iter().map(|x| x.inner))
692 .as_collection()
693}