differential_dataflow/collection.rs
1//! Types and traits associated with collections of data.
2//!
3//! The `Collection` type is differential dataflow's core abstraction for an updatable pile of data.
4//!
5//! Most differential dataflow programs are "collection-oriented", in the sense that they transform
6//! one collection into another, using operators defined on collections. This contrasts with a more
7//! imperative programming style, in which one might iterate through the contents of a collection
8//! manually. The higher-level of programming allows differential dataflow to provide efficient
9//! implementations, and to support efficient incremental updates to the collections.
10
11use std::hash::Hash;
12
13use timely::Container;
14use timely::Data;
15use timely::progress::Timestamp;
16use timely::order::Product;
17use timely::dataflow::scopes::{Child, child::Iterative};
18use timely::dataflow::Scope;
19use timely::dataflow::operators::*;
20use timely::dataflow::StreamCore;
21
22use crate::difference::{Semigroup, Abelian, Multiply};
23use crate::lattice::Lattice;
24use crate::hashable::Hashable;
25
26/// A mutable collection of values of type `D`
27///
28/// The `Collection` type is the core abstraction in differential dataflow programs. As you write your
29/// differential dataflow computation, you write as if the collection is a static dataset to which you
30/// apply functional transformations, creating new collections. Once your computation is written, you
31/// are able to mutate the collection (by inserting and removing elements); differential dataflow will
32/// propagate changes through your functional computation and report the corresponding changes to the
33/// output collections.
34///
35/// Each collection has three generic parameters. The parameter `G` is for the scope in which the
36/// collection exists; as you write more complicated programs you may wish to introduce nested scopes
37/// (e.g. for iteration) and this parameter tracks the scope (for timely dataflow's benefit). The `D`
38/// parameter is the type of data in your collection, for example `String`, or `(u32, Vec<Option<()>>)`.
39/// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
40/// defaults to) `isize`, representing changes to the occurrence count of each record.
41#[derive(Clone)]
42pub struct Collection<G: Scope, D, R = isize, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>> {
43 /// The underlying timely dataflow stream.
44 ///
45 /// This field is exposed to support direct timely dataflow manipulation when required, but it is
46 /// not intended to be the idiomatic way to work with the collection.
47 pub inner: timely::dataflow::StreamCore<G, C>,
48 /// Phantom data for unreferenced `D` and `R` types.
49 phantom: std::marker::PhantomData<(D, R)>,
50}
51
52impl<G: Scope, D, R, C> Collection<G, D, R, C> {
53 /// Creates a new Collection from a timely dataflow stream.
54 ///
55 /// This method seems to be rarely used, with the `as_collection` method on streams being a more
56 /// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
57 /// provides a `new_collection` method which will create a new collection for you without exposing
58 /// the underlying timely stream at all.
59 pub fn new(stream: StreamCore<G, C>) -> Collection<G, D, R, C> {
60 Collection { inner: stream, phantom: std::marker::PhantomData }
61 }
62}
63impl<G: Scope, D, R, C: Container + Clone + 'static> Collection<G, D, R, C> {
64 /// Creates a new collection accumulating the contents of the two collections.
65 ///
66 /// Despite the name, differential dataflow collections are unordered. This method is so named because the
67 /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
68 /// two collections.
69 ///
70 /// # Examples
71 ///
72 /// ```
73 /// use differential_dataflow::input::Input;
74 ///
75 /// ::timely::example(|scope| {
76 ///
77 /// let data = scope.new_collection_from(1 .. 10).1;
78 ///
79 /// let odds = data.filter(|x| x % 2 == 1);
80 /// let evens = data.filter(|x| x % 2 == 0);
81 ///
82 /// odds.concat(&evens)
83 /// .assert_eq(&data);
84 /// });
85 /// ```
86 pub fn concat(&self, other: &Self) -> Self {
87 self.inner
88 .concat(&other.inner)
89 .as_collection()
90 }
91 /// Creates a new collection accumulating the contents of the two collections.
92 ///
93 /// Despite the name, differential dataflow collections are unordered. This method is so named because the
94 /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
95 /// two collections.
96 ///
97 /// # Examples
98 ///
99 /// ```
100 /// use differential_dataflow::input::Input;
101 ///
102 /// ::timely::example(|scope| {
103 ///
104 /// let data = scope.new_collection_from(1 .. 10).1;
105 ///
106 /// let odds = data.filter(|x| x % 2 == 1);
107 /// let evens = data.filter(|x| x % 2 == 0);
108 ///
109 /// odds.concatenate(Some(evens))
110 /// .assert_eq(&data);
111 /// });
112 /// ```
113 pub fn concatenate<I>(&self, sources: I) -> Self
114 where
115 I: IntoIterator<Item=Self>
116 {
117 self.inner
118 .concatenate(sources.into_iter().map(|x| x.inner))
119 .as_collection()
120 }
121 // Brings a Collection into a nested region.
122 ///
123 /// This method is a specialization of `enter` to the case where the nested scope is a region.
124 /// It removes the need for an operator that adjusts the timestamp.
125 pub fn enter_region<'a>(&self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, D, R, C> {
126 self.inner
127 .enter(child)
128 .as_collection()
129 }
130 /// Applies a supplied function to each batch of updates.
131 ///
132 /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
133 /// timely dataflow capability associated with the batch of updates. The observed batching depends
134 /// on how the system executes, and may vary run to run.
135 ///
136 /// # Examples
137 ///
138 /// ```
139 /// use differential_dataflow::input::Input;
140 ///
141 /// ::timely::example(|scope| {
142 /// scope.new_collection_from(1 .. 10).1
143 /// .map_in_place(|x| *x *= 2)
144 /// .filter(|x| x % 2 == 1)
145 /// .inspect_container(|event| println!("event: {:?}", event));
146 /// });
147 /// ```
148 pub fn inspect_container<F>(&self, func: F) -> Self
149 where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static {
150 self.inner
151 .inspect_container(func)
152 .as_collection()
153 }
154 /// Attaches a timely dataflow probe to the output of a Collection.
155 ///
156 /// This probe is used to determine when the state of the Collection has stabilized and can
157 /// be read out.
158 pub fn probe(&self) -> probe::Handle<G::Timestamp> {
159 self.inner
160 .probe()
161 }
162 /// Attaches a timely dataflow probe to the output of a Collection.
163 ///
164 /// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
165 /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
166 /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
167 /// avoid swamping the system.
168 pub fn probe_with(&self, handle: &probe::Handle<G::Timestamp>) -> Self {
169 Self::new(self.inner.probe_with(handle))
170 }
171 /// The scope containing the underlying timely dataflow stream.
172 pub fn scope(&self) -> G {
173 self.inner.scope()
174 }
175
176 /// Creates a new collection whose counts are the negation of those in the input.
177 ///
178 /// This method is most commonly used with `concat` to get those element in one collection but not another.
179 /// However, differential dataflow computations are still defined for all values of the difference type `R`,
180 /// including negative counts.
181 ///
182 /// # Examples
183 ///
184 /// ```
185 /// use differential_dataflow::input::Input;
186 ///
187 /// ::timely::example(|scope| {
188 ///
189 /// let data = scope.new_collection_from(1 .. 10).1;
190 ///
191 /// let odds = data.filter(|x| x % 2 == 1);
192 /// let evens = data.filter(|x| x % 2 == 0);
193 ///
194 /// odds.negate()
195 /// .concat(&data)
196 /// .assert_eq(&evens);
197 /// });
198 /// ```
199 // TODO: Removing this function is possible, but breaks existing callers of `negate` who expect
200 // an inherent method on `Collection`.
201 pub fn negate(&self) -> Collection<G, D, R, C> where StreamCore<G, C>: crate::operators::Negate<G, C> {
202 crate::operators::Negate::negate(&self.inner).as_collection()
203 }
204}
205
206impl<G: Scope, D: Clone+'static, R: Clone+'static> Collection<G, D, R> {
207 /// Creates a new collection by applying the supplied function to each input element.
208 ///
209 /// # Examples
210 ///
211 /// ```
212 /// use differential_dataflow::input::Input;
213 ///
214 /// ::timely::example(|scope| {
215 /// scope.new_collection_from(1 .. 10).1
216 /// .map(|x| x * 2)
217 /// .filter(|x| x % 2 == 1)
218 /// .assert_empty();
219 /// });
220 /// ```
221 pub fn map<D2, L>(&self, mut logic: L) -> Collection<G, D2, R>
222 where D2: Data,
223 L: FnMut(D) -> D2 + 'static
224 {
225 self.inner
226 .map(move |(data, time, delta)| (logic(data), time, delta))
227 .as_collection()
228 }
229 /// Creates a new collection by applying the supplied function to each input element.
230 ///
231 /// Although the name suggests in-place mutation, this function does not change the source collection,
232 /// but rather re-uses the underlying allocations in its implementation. The method is semantically
233 /// equivalent to `map`, but can be more efficient.
234 ///
235 /// # Examples
236 ///
237 /// ```
238 /// use differential_dataflow::input::Input;
239 ///
240 /// ::timely::example(|scope| {
241 /// scope.new_collection_from(1 .. 10).1
242 /// .map_in_place(|x| *x *= 2)
243 /// .filter(|x| x % 2 == 1)
244 /// .assert_empty();
245 /// });
246 /// ```
247 pub fn map_in_place<L>(&self, mut logic: L) -> Collection<G, D, R>
248 where L: FnMut(&mut D) + 'static {
249 self.inner
250 .map_in_place(move |&mut (ref mut data, _, _)| logic(data))
251 .as_collection()
252 }
253 /// Creates a new collection by applying the supplied function to each input element and accumulating the results.
254 ///
255 /// This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be
256 /// warned that if the iterators produce substantial amounts of data, they are currently fully drained before
257 /// attempting to consolidate the results.
258 ///
259 /// # Examples
260 ///
261 /// ```
262 /// use differential_dataflow::input::Input;
263 ///
264 /// ::timely::example(|scope| {
265 /// scope.new_collection_from(1 .. 10).1
266 /// .flat_map(|x| 0 .. x);
267 /// });
268 /// ```
269 pub fn flat_map<I, L>(&self, mut logic: L) -> Collection<G, I::Item, R>
270 where G::Timestamp: Clone,
271 I: IntoIterator,
272 I::Item: Data,
273 L: FnMut(D) -> I + 'static {
274 self.inner
275 .flat_map(move |(data, time, delta)| logic(data).into_iter().map(move |x| (x, time.clone(), delta.clone())))
276 .as_collection()
277 }
278 /// Creates a new collection containing those input records satisfying the supplied predicate.
279 ///
280 /// # Examples
281 ///
282 /// ```
283 /// use differential_dataflow::input::Input;
284 ///
285 /// ::timely::example(|scope| {
286 /// scope.new_collection_from(1 .. 10).1
287 /// .map(|x| x * 2)
288 /// .filter(|x| x % 2 == 1)
289 /// .assert_empty();
290 /// });
291 /// ```
292 pub fn filter<L>(&self, mut logic: L) -> Collection<G, D, R>
293 where L: FnMut(&D) -> bool + 'static {
294 self.inner
295 .filter(move |(data, _, _)| logic(data))
296 .as_collection()
297 }
298 /// Replaces each record with another, with a new difference type.
299 ///
300 /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
301 /// and move the data into the difference component. This will allow differential dataflow to update in-place.
302 ///
303 /// # Examples
304 ///
305 /// ```
306 /// use differential_dataflow::input::Input;
307 ///
308 /// ::timely::example(|scope| {
309 ///
310 /// let nums = scope.new_collection_from(0 .. 10).1;
311 /// let x1 = nums.flat_map(|x| 0 .. x);
312 /// let x2 = nums.map(|x| (x, 9 - x))
313 /// .explode(|(x,y)| Some((x,y)));
314 ///
315 /// x1.assert_eq(&x2);
316 /// });
317 /// ```
318 pub fn explode<D2, R2, I, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
319 where D2: Data,
320 R2: Semigroup+Multiply<R>,
321 <R2 as Multiply<R>>::Output: Semigroup+'static,
322 I: IntoIterator<Item=(D2,R2)>,
323 L: FnMut(D)->I+'static,
324 {
325 self.inner
326 .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,d2)| (x, t.clone(), d2.multiply(&d))))
327 .as_collection()
328 }
329
330 /// Joins each record against a collection defined by the function `logic`.
331 ///
332 /// This method performs what is essentially a join with the collection of records `(x, logic(x))`.
333 /// Rather than materialize this second relation, `logic` is applied to each record and the appropriate
334 /// modifications made to the results, namely joining timestamps and multiplying differences.
335 ///
336 /// #Examples
337 ///
338 /// ```
339 /// use differential_dataflow::input::Input;
340 ///
341 /// ::timely::example(|scope| {
342 /// // creates `x` copies of `2*x` from time `3*x` until `4*x`,
343 /// // for x from 0 through 9.
344 /// scope.new_collection_from(0 .. 10isize).1
345 /// .join_function(|x|
346 /// // data time diff
347 /// vec![(2*x, (3*x) as u64, x),
348 /// (2*x, (4*x) as u64, -x)]
349 /// );
350 /// });
351 /// ```
352 pub fn join_function<D2, R2, I, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
353 where G::Timestamp: Lattice,
354 D2: Data,
355 R2: Semigroup+Multiply<R>,
356 <R2 as Multiply<R>>::Output: Semigroup+'static,
357 I: IntoIterator<Item=(D2,G::Timestamp,R2)>,
358 L: FnMut(D)->I+'static,
359 {
360 self.inner
361 .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,t2,d2)| (x, t.join(&t2), d2.multiply(&d))))
362 .as_collection()
363 }
364
365 /// Brings a Collection into a nested scope.
366 ///
367 /// # Examples
368 ///
369 /// ```
370 /// use timely::dataflow::Scope;
371 /// use differential_dataflow::input::Input;
372 ///
373 /// ::timely::example(|scope| {
374 ///
375 /// let data = scope.new_collection_from(1 .. 10).1;
376 ///
377 /// let result = scope.region(|child| {
378 /// data.enter(child)
379 /// .leave()
380 /// });
381 ///
382 /// data.assert_eq(&result);
383 /// });
384 /// ```
385 pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, D, R>
386 where
387 T: Refines<<G as ScopeParent>::Timestamp>,
388 {
389 self.inner
390 .enter(child)
391 .map(|(data, time, diff)| (data, T::to_inner(time), diff))
392 .as_collection()
393 }
394
395 /// Brings a Collection into a nested scope, at varying times.
396 ///
397 /// The `initial` function indicates the time at which each element of the Collection should appear.
398 ///
399 /// # Examples
400 ///
401 /// ```
402 /// use timely::dataflow::Scope;
403 /// use differential_dataflow::input::Input;
404 ///
405 /// ::timely::example(|scope| {
406 ///
407 /// let data = scope.new_collection_from(1 .. 10).1;
408 ///
409 /// let result = scope.iterative::<u64,_,_>(|child| {
410 /// data.enter_at(child, |x| *x)
411 /// .leave()
412 /// });
413 ///
414 /// data.assert_eq(&result);
415 /// });
416 /// ```
417 pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, mut initial: F) -> Collection<Iterative<'a, G, T>, D, R>
418 where
419 T: Timestamp+Hash,
420 F: FnMut(&D) -> T + Clone + 'static,
421 G::Timestamp: Hash,
422 {
423 self.inner
424 .enter(child)
425 .map(move |(data, time, diff)| {
426 let new_time = Product::new(time, initial(&data));
427 (data, new_time, diff)
428 })
429 .as_collection()
430 }
431
432 /// Delays each difference by a supplied function.
433 ///
434 /// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly
435 /// wrong if that assumption is incorrect. It is also critical that `func` be monotonic: if two times are
436 /// ordered, they should have the same order once `func` is applied to them (this is because we advance the
437 /// timely capability with the same logic, and it must remain `less_equal` to all of the data timestamps).
438 pub fn delay<F>(&self, func: F) -> Collection<G, D, R>
439 where F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static {
440
441 let mut func1 = func.clone();
442 let mut func2 = func.clone();
443
444 self.inner
445 .delay_batch(move |x| func1(x))
446 .map_in_place(move |x| x.1 = func2(&x.1))
447 .as_collection()
448 }
449 /// Applies a supplied function to each update.
450 ///
451 /// This method is most commonly used to report information back to the user, often for debugging purposes.
452 /// Any function can be used here, but be warned that the incremental nature of differential dataflow does
453 /// not guarantee that it will be called as many times as you might expect.
454 ///
455 /// The `(data, time, diff)` triples indicate a change `diff` to the frequency of `data` which takes effect
456 /// at the logical time `time`. When times are totally ordered (for example, `usize`), these updates reflect
457 /// the changes along the sequence of collections. For partially ordered times, the mathematics are more
458 /// interesting and less intuitive, unfortunately.
459 ///
460 /// # Examples
461 ///
462 /// ```
463 /// use differential_dataflow::input::Input;
464 ///
465 /// ::timely::example(|scope| {
466 /// scope.new_collection_from(1 .. 10).1
467 /// .map_in_place(|x| *x *= 2)
468 /// .filter(|x| x % 2 == 1)
469 /// .inspect(|x| println!("error: {:?}", x));
470 /// });
471 /// ```
472 pub fn inspect<F>(&self, func: F) -> Collection<G, D, R>
473 where F: FnMut(&(D, G::Timestamp, R))+'static {
474 self.inner
475 .inspect(func)
476 .as_collection()
477 }
478 /// Applies a supplied function to each batch of updates.
479 ///
480 /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
481 /// timely dataflow capability associated with the batch of updates. The observed batching depends
482 /// on how the system executes, and may vary run to run.
483 ///
484 /// # Examples
485 ///
486 /// ```
487 /// use differential_dataflow::input::Input;
488 ///
489 /// ::timely::example(|scope| {
490 /// scope.new_collection_from(1 .. 10).1
491 /// .map_in_place(|x| *x *= 2)
492 /// .filter(|x| x % 2 == 1)
493 /// .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
494 /// });
495 /// ```
496 pub fn inspect_batch<F>(&self, mut func: F) -> Collection<G, D, R>
497 where F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static {
498 self.inner
499 .inspect_batch(move |time, data| func(time, data))
500 .as_collection()
501 }
502
503 /// Assert if the collection is ever non-empty.
504 ///
505 /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
506 /// is not run, or not run to completion, there may be un-exercised times at which the collection could be
507 /// non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a
508 /// program should indicate that this assertion never found cause to complain.
509 ///
510 /// # Examples
511 ///
512 /// ```
513 /// use differential_dataflow::input::Input;
514 ///
515 /// ::timely::example(|scope| {
516 /// scope.new_collection_from(1 .. 10).1
517 /// .map(|x| x * 2)
518 /// .filter(|x| x % 2 == 1)
519 /// .assert_empty();
520 /// });
521 /// ```
522 pub fn assert_empty(&self)
523 where D: crate::ExchangeData+Hashable,
524 R: crate::ExchangeData+Hashable + Semigroup,
525 G::Timestamp: Lattice+Ord,
526 {
527 self.consolidate()
528 .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
529 }
530}
531
532use timely::dataflow::scopes::ScopeParent;
533use timely::progress::timestamp::Refines;
534
535/// Methods requiring a nested scope.
536impl<'a, G: Scope, T: Timestamp, D: Clone+'static, R: Clone+'static> Collection<Child<'a, G, T>, D, R>
537where
538 T: Refines<<G as ScopeParent>::Timestamp>,
539{
540 /// Returns the final value of a Collection from a nested scope to its containing scope.
541 ///
542 /// # Examples
543 ///
544 /// ```
545 /// use timely::dataflow::Scope;
546 /// use differential_dataflow::input::Input;
547 ///
548 /// ::timely::example(|scope| {
549 ///
550 /// let data = scope.new_collection_from(1 .. 10).1;
551 ///
552 /// let result = scope.region(|child| {
553 /// data.enter(child)
554 /// .leave()
555 /// });
556 ///
557 /// data.assert_eq(&result);
558 /// });
559 /// ```
560 pub fn leave(&self) -> Collection<G, D, R> {
561 self.inner
562 .leave()
563 .map(|(data, time, diff)| (data, time.to_outer(), diff))
564 .as_collection()
565 }
566}
567
568/// Methods requiring a region as the scope.
569impl<G: Scope, D, R, C: Container+Data> Collection<Child<'_, G, G::Timestamp>, D, R, C>
570{
571 /// Returns the value of a Collection from a nested region to its containing scope.
572 ///
573 /// This method is a specialization of `leave` to the case that of a nested region.
574 /// It removes the need for an operator that adjusts the timestamp.
575 pub fn leave_region(&self) -> Collection<G, D, R, C> {
576 self.inner
577 .leave()
578 .as_collection()
579 }
580}
581
582/// Methods requiring an Abelian difference, to support negation.
583impl<G: Scope, D: Clone+'static, R: Abelian+'static> Collection<G, D, R> where G::Timestamp: Data {
584 /// Assert if the collections are ever different.
585 ///
586 /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
587 /// is not run, or not run to completion, there may be un-exercised times at which the collections could vary.
588 /// Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should
589 /// indicate that this assertion never found cause to complain.
590 ///
591 /// # Examples
592 ///
593 /// ```
594 /// use differential_dataflow::input::Input;
595 ///
596 /// ::timely::example(|scope| {
597 ///
598 /// let data = scope.new_collection_from(1 .. 10).1;
599 ///
600 /// let odds = data.filter(|x| x % 2 == 1);
601 /// let evens = data.filter(|x| x % 2 == 0);
602 ///
603 /// odds.concat(&evens)
604 /// .assert_eq(&data);
605 /// });
606 /// ```
607 pub fn assert_eq(&self, other: &Self)
608 where D: crate::ExchangeData+Hashable,
609 R: crate::ExchangeData+Hashable,
610 G::Timestamp: Lattice+Ord
611 {
612 self.negate()
613 .concat(other)
614 .assert_empty();
615 }
616}
617
618/// Conversion to a differential dataflow Collection.
619pub trait AsCollection<G: Scope, D, R, C> {
620 /// Converts the type to a differential dataflow collection.
621 fn as_collection(&self) -> Collection<G, D, R, C>;
622}
623
624impl<G: Scope, D, R, C: Clone> AsCollection<G, D, R, C> for StreamCore<G, C> {
625 fn as_collection(&self) -> Collection<G, D, R, C> {
626 Collection::<G,D,R,C>::new(self.clone())
627 }
628}
629
630/// Concatenates multiple collections.
631///
632/// This method has the effect of a sequence of calls to `concat`, but it does
633/// so in one operator rather than a chain of many operators.
634///
635/// # Examples
636///
637/// ```
638/// use differential_dataflow::input::Input;
639///
640/// ::timely::example(|scope| {
641///
642/// let data = scope.new_collection_from(1 .. 10).1;
643///
644/// let odds = data.filter(|x| x % 2 == 1);
645/// let evens = data.filter(|x| x % 2 == 0);
646///
647/// differential_dataflow::collection::concatenate(scope, vec![odds, evens])
648/// .assert_eq(&data);
649/// });
650/// ```
651pub fn concatenate<G, D, R, C, I>(scope: &mut G, iterator: I) -> Collection<G, D, R, C>
652where
653 G: Scope,
654 D: Data,
655 R: Semigroup+'static,
656 C: Container + Clone + 'static,
657 I: IntoIterator<Item=Collection<G, D, R, C>>,
658{
659 scope
660 .concatenate(iterator.into_iter().map(|x| x.inner))
661 .as_collection()
662}