differential_dataflow/collection.rs
1//! Types and traits associated with collections of data.
2//!
3//! The `Collection` type is differential dataflow's core abstraction for an updatable pile of data.
4//!
5//! Most differential dataflow programs are "collection-oriented", in the sense that they transform
6//! one collection into another, using operators defined on collections. This contrasts with a more
7//! imperative programming style, in which one might iterate through the contents of a collection
8//! manually. The higher-level of programming allows differential dataflow to provide efficient
9//! implementations, and to support efficient incremental updates to the collections.
10
11use std::hash::Hash;
12
13use timely::{Container, Data};
14use timely::progress::Timestamp;
15use timely::order::Product;
16use timely::dataflow::scopes::{Child, child::Iterative};
17use timely::dataflow::Scope;
18use timely::dataflow::operators::*;
19use timely::dataflow::StreamCore;
20
21use crate::difference::{Semigroup, Abelian, Multiply};
22use crate::lattice::Lattice;
23use crate::hashable::Hashable;
24
25/// A mutable collection of values of type `D`
26///
27/// The `Collection` type is the core abstraction in differential dataflow programs. As you write your
28/// differential dataflow computation, you write as if the collection is a static dataset to which you
29/// apply functional transformations, creating new collections. Once your computation is written, you
30/// are able to mutate the collection (by inserting and removing elements); differential dataflow will
31/// propagate changes through your functional computation and report the corresponding changes to the
32/// output collections.
33///
34/// Each collection has three generic parameters. The parameter `G` is for the scope in which the
35/// collection exists; as you write more complicated programs you may wish to introduce nested scopes
36/// (e.g. for iteration) and this parameter tracks the scope (for timely dataflow's benefit). The `D`
37/// parameter is the type of data in your collection, for example `String`, or `(u32, Vec<Option<()>>)`.
38/// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
39/// defaults to) `isize`, representing changes to the occurrence count of each record.
40#[derive(Clone)]
41pub struct Collection<G: Scope, D, R = isize, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>> {
42 /// The underlying timely dataflow stream.
43 ///
44 /// This field is exposed to support direct timely dataflow manipulation when required, but it is
45 /// not intended to be the idiomatic way to work with the collection.
46 ///
47 /// The timestamp in the data is required to always be at least the timestamp _of_ the data, in
48 /// the timely-dataflow sense. If this invariant is not upheld, differential operators may behave
49 /// unexpectedly.
50 pub inner: timely::dataflow::StreamCore<G, C>,
51 /// Phantom data for unreferenced `D` and `R` types.
52 phantom: std::marker::PhantomData<(D, R)>,
53}
54
55impl<G: Scope, D, R, C> Collection<G, D, R, C> {
56 /// Creates a new Collection from a timely dataflow stream.
57 ///
58 /// This method seems to be rarely used, with the `as_collection` method on streams being a more
59 /// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
60 /// provides a `new_collection` method which will create a new collection for you without exposing
61 /// the underlying timely stream at all.
62 ///
63 /// This stream should satisfy the timestamp invariant as documented on [Collection]; this
64 /// method does not check it.
65 pub fn new(stream: StreamCore<G, C>) -> Collection<G, D, R, C> {
66 Collection { inner: stream, phantom: std::marker::PhantomData }
67 }
68}
69impl<G: Scope, D, R, C: Container> Collection<G, D, R, C> {
70 /// Creates a new collection accumulating the contents of the two collections.
71 ///
72 /// Despite the name, differential dataflow collections are unordered. This method is so named because the
73 /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
74 /// two collections.
75 ///
76 /// # Examples
77 ///
78 /// ```
79 /// use differential_dataflow::input::Input;
80 ///
81 /// ::timely::example(|scope| {
82 ///
83 /// let data = scope.new_collection_from(1 .. 10).1;
84 ///
85 /// let odds = data.filter(|x| x % 2 == 1);
86 /// let evens = data.filter(|x| x % 2 == 0);
87 ///
88 /// odds.concat(&evens)
89 /// .assert_eq(&data);
90 /// });
91 /// ```
92 pub fn concat(&self, other: &Self) -> Self {
93 self.inner
94 .concat(&other.inner)
95 .as_collection()
96 }
97 /// Creates a new collection accumulating the contents of the two collections.
98 ///
99 /// Despite the name, differential dataflow collections are unordered. This method is so named because the
100 /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
101 /// two collections.
102 ///
103 /// # Examples
104 ///
105 /// ```
106 /// use differential_dataflow::input::Input;
107 ///
108 /// ::timely::example(|scope| {
109 ///
110 /// let data = scope.new_collection_from(1 .. 10).1;
111 ///
112 /// let odds = data.filter(|x| x % 2 == 1);
113 /// let evens = data.filter(|x| x % 2 == 0);
114 ///
115 /// odds.concatenate(Some(evens))
116 /// .assert_eq(&data);
117 /// });
118 /// ```
119 pub fn concatenate<I>(&self, sources: I) -> Self
120 where
121 I: IntoIterator<Item=Self>
122 {
123 self.inner
124 .concatenate(sources.into_iter().map(|x| x.inner))
125 .as_collection()
126 }
127 // Brings a Collection into a nested region.
128 ///
129 /// This method is a specialization of `enter` to the case where the nested scope is a region.
130 /// It removes the need for an operator that adjusts the timestamp.
131 pub fn enter_region<'a>(&self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, D, R, C> {
132 self.inner
133 .enter(child)
134 .as_collection()
135 }
136 /// Applies a supplied function to each batch of updates.
137 ///
138 /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
139 /// timely dataflow capability associated with the batch of updates. The observed batching depends
140 /// on how the system executes, and may vary run to run.
141 ///
142 /// # Examples
143 ///
144 /// ```
145 /// use differential_dataflow::input::Input;
146 ///
147 /// ::timely::example(|scope| {
148 /// scope.new_collection_from(1 .. 10).1
149 /// .map_in_place(|x| *x *= 2)
150 /// .filter(|x| x % 2 == 1)
151 /// .inspect_container(|event| println!("event: {:?}", event));
152 /// });
153 /// ```
154 pub fn inspect_container<F>(&self, func: F) -> Self
155 where
156 F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static,
157 {
158 self.inner
159 .inspect_container(func)
160 .as_collection()
161 }
162 /// Attaches a timely dataflow probe to the output of a Collection.
163 ///
164 /// This probe is used to determine when the state of the Collection has stabilized and can
165 /// be read out.
166 pub fn probe(&self) -> probe::Handle<G::Timestamp> {
167 self.inner
168 .probe()
169 }
170 /// Attaches a timely dataflow probe to the output of a Collection.
171 ///
172 /// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
173 /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
174 /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
175 /// avoid swamping the system.
176 pub fn probe_with(&self, handle: &probe::Handle<G::Timestamp>) -> Self {
177 Self::new(self.inner.probe_with(handle))
178 }
179 /// The scope containing the underlying timely dataflow stream.
180 pub fn scope(&self) -> G {
181 self.inner.scope()
182 }
183
184 /// Creates a new collection whose counts are the negation of those in the input.
185 ///
186 /// This method is most commonly used with `concat` to get those element in one collection but not another.
187 /// However, differential dataflow computations are still defined for all values of the difference type `R`,
188 /// including negative counts.
189 ///
190 /// # Examples
191 ///
192 /// ```
193 /// use differential_dataflow::input::Input;
194 ///
195 /// ::timely::example(|scope| {
196 ///
197 /// let data = scope.new_collection_from(1 .. 10).1;
198 ///
199 /// let odds = data.filter(|x| x % 2 == 1);
200 /// let evens = data.filter(|x| x % 2 == 0);
201 ///
202 /// odds.negate()
203 /// .concat(&data)
204 /// .assert_eq(&evens);
205 /// });
206 /// ```
207 // TODO: Removing this function is possible, but breaks existing callers of `negate` who expect
208 // an inherent method on `Collection`.
209 pub fn negate(&self) -> Collection<G, D, R, C>
210 where
211 StreamCore<G, C>: crate::operators::Negate<G, C>
212 {
213 crate::operators::Negate::negate(&self.inner).as_collection()
214 }
215}
216
217impl<G: Scope, D: Clone+'static, R: Clone+'static> Collection<G, D, R> {
218 /// Creates a new collection by applying the supplied function to each input element.
219 ///
220 /// # Examples
221 ///
222 /// ```
223 /// use differential_dataflow::input::Input;
224 ///
225 /// ::timely::example(|scope| {
226 /// scope.new_collection_from(1 .. 10).1
227 /// .map(|x| x * 2)
228 /// .filter(|x| x % 2 == 1)
229 /// .assert_empty();
230 /// });
231 /// ```
232 pub fn map<D2, L>(&self, mut logic: L) -> Collection<G, D2, R>
233 where
234 D2: Data,
235 L: FnMut(D) -> D2 + 'static,
236 {
237 self.inner
238 .map(move |(data, time, delta)| (logic(data), time, delta))
239 .as_collection()
240 }
241 /// Creates a new collection by applying the supplied function to each input element.
242 ///
243 /// Although the name suggests in-place mutation, this function does not change the source collection,
244 /// but rather re-uses the underlying allocations in its implementation. The method is semantically
245 /// equivalent to `map`, but can be more efficient.
246 ///
247 /// # Examples
248 ///
249 /// ```
250 /// use differential_dataflow::input::Input;
251 ///
252 /// ::timely::example(|scope| {
253 /// scope.new_collection_from(1 .. 10).1
254 /// .map_in_place(|x| *x *= 2)
255 /// .filter(|x| x % 2 == 1)
256 /// .assert_empty();
257 /// });
258 /// ```
259 pub fn map_in_place<L>(&self, mut logic: L) -> Collection<G, D, R>
260 where
261 L: FnMut(&mut D) + 'static,
262 {
263 self.inner
264 .map_in_place(move |&mut (ref mut data, _, _)| logic(data))
265 .as_collection()
266 }
267 /// Creates a new collection by applying the supplied function to each input element and accumulating the results.
268 ///
269 /// This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be
270 /// warned that if the iterators produce substantial amounts of data, they are currently fully drained before
271 /// attempting to consolidate the results.
272 ///
273 /// # Examples
274 ///
275 /// ```
276 /// use differential_dataflow::input::Input;
277 ///
278 /// ::timely::example(|scope| {
279 /// scope.new_collection_from(1 .. 10).1
280 /// .flat_map(|x| 0 .. x);
281 /// });
282 /// ```
283 pub fn flat_map<I, L>(&self, mut logic: L) -> Collection<G, I::Item, R>
284 where
285 G::Timestamp: Clone,
286 I: IntoIterator<Item: Data>,
287 L: FnMut(D) -> I + 'static,
288 {
289 self.inner
290 .flat_map(move |(data, time, delta)| logic(data).into_iter().map(move |x| (x, time.clone(), delta.clone())))
291 .as_collection()
292 }
293 /// Creates a new collection containing those input records satisfying the supplied predicate.
294 ///
295 /// # Examples
296 ///
297 /// ```
298 /// use differential_dataflow::input::Input;
299 ///
300 /// ::timely::example(|scope| {
301 /// scope.new_collection_from(1 .. 10).1
302 /// .map(|x| x * 2)
303 /// .filter(|x| x % 2 == 1)
304 /// .assert_empty();
305 /// });
306 /// ```
307 pub fn filter<L>(&self, mut logic: L) -> Collection<G, D, R>
308 where
309 L: FnMut(&D) -> bool + 'static,
310 {
311 self.inner
312 .filter(move |(data, _, _)| logic(data))
313 .as_collection()
314 }
315 /// Replaces each record with another, with a new difference type.
316 ///
317 /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
318 /// and move the data into the difference component. This will allow differential dataflow to update in-place.
319 ///
320 /// # Examples
321 ///
322 /// ```
323 /// use differential_dataflow::input::Input;
324 ///
325 /// ::timely::example(|scope| {
326 ///
327 /// let nums = scope.new_collection_from(0 .. 10).1;
328 /// let x1 = nums.flat_map(|x| 0 .. x);
329 /// let x2 = nums.map(|x| (x, 9 - x))
330 /// .explode(|(x,y)| Some((x,y)));
331 ///
332 /// x1.assert_eq(&x2);
333 /// });
334 /// ```
335 pub fn explode<D2, R2, I, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
336 where
337 D2: Data,
338 R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
339 I: IntoIterator<Item=(D2,R2)>,
340 L: FnMut(D)->I+'static,
341 {
342 self.inner
343 .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,d2)| (x, t.clone(), d2.multiply(&d))))
344 .as_collection()
345 }
346
347 /// Joins each record against a collection defined by the function `logic`.
348 ///
349 /// This method performs what is essentially a join with the collection of records `(x, logic(x))`.
350 /// Rather than materialize this second relation, `logic` is applied to each record and the appropriate
351 /// modifications made to the results, namely joining timestamps and multiplying differences.
352 ///
353 /// #Examples
354 ///
355 /// ```
356 /// use differential_dataflow::input::Input;
357 ///
358 /// ::timely::example(|scope| {
359 /// // creates `x` copies of `2*x` from time `3*x` until `4*x`,
360 /// // for x from 0 through 9.
361 /// scope.new_collection_from(0 .. 10isize).1
362 /// .join_function(|x|
363 /// // data time diff
364 /// vec![(2*x, (3*x) as u64, x),
365 /// (2*x, (4*x) as u64, -x)]
366 /// );
367 /// });
368 /// ```
369 pub fn join_function<D2, R2, I, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
370 where
371 G::Timestamp: Lattice,
372 D2: Data,
373 R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
374 I: IntoIterator<Item=(D2,G::Timestamp,R2)>,
375 L: FnMut(D)->I+'static,
376 {
377 self.inner
378 .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,t2,d2)| (x, t.join(&t2), d2.multiply(&d))))
379 .as_collection()
380 }
381
382 /// Brings a Collection into a nested scope.
383 ///
384 /// # Examples
385 ///
386 /// ```
387 /// use timely::dataflow::Scope;
388 /// use differential_dataflow::input::Input;
389 ///
390 /// ::timely::example(|scope| {
391 ///
392 /// let data = scope.new_collection_from(1 .. 10).1;
393 ///
394 /// let result = scope.region(|child| {
395 /// data.enter(child)
396 /// .leave()
397 /// });
398 ///
399 /// data.assert_eq(&result);
400 /// });
401 /// ```
402 pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, D, R>
403 where
404 T: Refines<<G as ScopeParent>::Timestamp>,
405 {
406 self.inner
407 .enter(child)
408 .map(|(data, time, diff)| (data, T::to_inner(time), diff))
409 .as_collection()
410 }
411
412 /// Brings a Collection into a nested scope, at varying times.
413 ///
414 /// The `initial` function indicates the time at which each element of the Collection should appear.
415 ///
416 /// # Examples
417 ///
418 /// ```
419 /// use timely::dataflow::Scope;
420 /// use differential_dataflow::input::Input;
421 ///
422 /// ::timely::example(|scope| {
423 ///
424 /// let data = scope.new_collection_from(1 .. 10).1;
425 ///
426 /// let result = scope.iterative::<u64,_,_>(|child| {
427 /// data.enter_at(child, |x| *x)
428 /// .leave()
429 /// });
430 ///
431 /// data.assert_eq(&result);
432 /// });
433 /// ```
434 pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, mut initial: F) -> Collection<Iterative<'a, G, T>, D, R>
435 where
436 T: Timestamp+Hash,
437 F: FnMut(&D) -> T + Clone + 'static,
438 G::Timestamp: Hash,
439 {
440 self.inner
441 .enter(child)
442 .map(move |(data, time, diff)| {
443 let new_time = Product::new(time, initial(&data));
444 (data, new_time, diff)
445 })
446 .as_collection()
447 }
448
449 /// Delays each difference by a supplied function.
450 ///
451 /// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly
452 /// wrong if that assumption is incorrect. It is also critical that `func` be monotonic: if two times are
453 /// ordered, they should have the same order or compare equal once `func` is applied to them (this
454 /// is because we advance the timely capability with the same logic, and it must remain `less_equal`
455 /// to all of the data timestamps).
456 pub fn delay<F>(&self, func: F) -> Collection<G, D, R>
457 where
458 F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static,
459 {
460 let mut func1 = func.clone();
461 let mut func2 = func.clone();
462
463 self.inner
464 .delay_batch(move |x| func1(x))
465 .map_in_place(move |x| x.1 = func2(&x.1))
466 .as_collection()
467 }
468
469 /// Applies a supplied function to each update.
470 ///
471 /// This method is most commonly used to report information back to the user, often for debugging purposes.
472 /// Any function can be used here, but be warned that the incremental nature of differential dataflow does
473 /// not guarantee that it will be called as many times as you might expect.
474 ///
475 /// The `(data, time, diff)` triples indicate a change `diff` to the frequency of `data` which takes effect
476 /// at the logical time `time`. When times are totally ordered (for example, `usize`), these updates reflect
477 /// the changes along the sequence of collections. For partially ordered times, the mathematics are more
478 /// interesting and less intuitive, unfortunately.
479 ///
480 /// # Examples
481 ///
482 /// ```
483 /// use differential_dataflow::input::Input;
484 ///
485 /// ::timely::example(|scope| {
486 /// scope.new_collection_from(1 .. 10).1
487 /// .map_in_place(|x| *x *= 2)
488 /// .filter(|x| x % 2 == 1)
489 /// .inspect(|x| println!("error: {:?}", x));
490 /// });
491 /// ```
492 pub fn inspect<F>(&self, func: F) -> Collection<G, D, R>
493 where
494 F: FnMut(&(D, G::Timestamp, R))+'static,
495 {
496 self.inner
497 .inspect(func)
498 .as_collection()
499 }
500 /// Applies a supplied function to each batch of updates.
501 ///
502 /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
503 /// timely dataflow capability associated with the batch of updates. The observed batching depends
504 /// on how the system executes, and may vary run to run.
505 ///
506 /// # Examples
507 ///
508 /// ```
509 /// use differential_dataflow::input::Input;
510 ///
511 /// ::timely::example(|scope| {
512 /// scope.new_collection_from(1 .. 10).1
513 /// .map_in_place(|x| *x *= 2)
514 /// .filter(|x| x % 2 == 1)
515 /// .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
516 /// });
517 /// ```
518 pub fn inspect_batch<F>(&self, mut func: F) -> Collection<G, D, R>
519 where
520 F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static,
521 {
522 self.inner
523 .inspect_batch(move |time, data| func(time, data))
524 .as_collection()
525 }
526
527 /// Assert if the collection is ever non-empty.
528 ///
529 /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
530 /// is not run, or not run to completion, there may be un-exercised times at which the collection could be
531 /// non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a
532 /// program should indicate that this assertion never found cause to complain.
533 ///
534 /// # Examples
535 ///
536 /// ```
537 /// use differential_dataflow::input::Input;
538 ///
539 /// ::timely::example(|scope| {
540 /// scope.new_collection_from(1 .. 10).1
541 /// .map(|x| x * 2)
542 /// .filter(|x| x % 2 == 1)
543 /// .assert_empty();
544 /// });
545 /// ```
546 pub fn assert_empty(&self)
547 where
548 D: crate::ExchangeData+Hashable,
549 R: crate::ExchangeData+Hashable + Semigroup,
550 G::Timestamp: Lattice+Ord,
551 {
552 self.consolidate()
553 .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
554 }
555}
556
557use timely::dataflow::scopes::ScopeParent;
558use timely::progress::timestamp::Refines;
559
560/// Methods requiring a nested scope.
561impl<'a, G: Scope, T: Timestamp, D: Clone+'static, R: Clone+'static> Collection<Child<'a, G, T>, D, R>
562where
563 T: Refines<<G as ScopeParent>::Timestamp>,
564{
565 /// Returns the final value of a Collection from a nested scope to its containing scope.
566 ///
567 /// # Examples
568 ///
569 /// ```
570 /// use timely::dataflow::Scope;
571 /// use differential_dataflow::input::Input;
572 ///
573 /// ::timely::example(|scope| {
574 ///
575 /// let data = scope.new_collection_from(1 .. 10).1;
576 ///
577 /// let result = scope.region(|child| {
578 /// data.enter(child)
579 /// .leave()
580 /// });
581 ///
582 /// data.assert_eq(&result);
583 /// });
584 /// ```
585 pub fn leave(&self) -> Collection<G, D, R> {
586 self.inner
587 .leave()
588 .map(|(data, time, diff)| (data, time.to_outer(), diff))
589 .as_collection()
590 }
591}
592
593/// Methods requiring a region as the scope.
594impl<G: Scope, D, R, C: Container+Data> Collection<Child<'_, G, G::Timestamp>, D, R, C>
595{
596 /// Returns the value of a Collection from a nested region to its containing scope.
597 ///
598 /// This method is a specialization of `leave` to the case that of a nested region.
599 /// It removes the need for an operator that adjusts the timestamp.
600 pub fn leave_region(&self) -> Collection<G, D, R, C> {
601 self.inner
602 .leave()
603 .as_collection()
604 }
605}
606
607/// Methods requiring an Abelian difference, to support negation.
608impl<G: Scope<Timestamp: Data>, D: Clone+'static, R: Abelian+'static> Collection<G, D, R> {
609 /// Assert if the collections are ever different.
610 ///
611 /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
612 /// is not run, or not run to completion, there may be un-exercised times at which the collections could vary.
613 /// Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should
614 /// indicate that this assertion never found cause to complain.
615 ///
616 /// # Examples
617 ///
618 /// ```
619 /// use differential_dataflow::input::Input;
620 ///
621 /// ::timely::example(|scope| {
622 ///
623 /// let data = scope.new_collection_from(1 .. 10).1;
624 ///
625 /// let odds = data.filter(|x| x % 2 == 1);
626 /// let evens = data.filter(|x| x % 2 == 0);
627 ///
628 /// odds.concat(&evens)
629 /// .assert_eq(&data);
630 /// });
631 /// ```
632 pub fn assert_eq(&self, other: &Self)
633 where
634 D: crate::ExchangeData+Hashable,
635 R: crate::ExchangeData+Hashable,
636 G::Timestamp: Lattice+Ord,
637 {
638 self.negate()
639 .concat(other)
640 .assert_empty();
641 }
642}
643
644/// Conversion to a differential dataflow Collection.
645pub trait AsCollection<G: Scope, D, R, C> {
646 /// Converts the type to a differential dataflow collection.
647 fn as_collection(&self) -> Collection<G, D, R, C>;
648}
649
650impl<G: Scope, D, R, C: Clone> AsCollection<G, D, R, C> for StreamCore<G, C> {
651 /// Converts the type to a differential dataflow collection.
652 ///
653 /// By calling this method, you guarantee that the timestamp invariant (as documented on
654 /// [Collection]) is upheld. This method will not check it.
655 fn as_collection(&self) -> Collection<G, D, R, C> {
656 Collection::<G,D,R,C>::new(self.clone())
657 }
658}
659
660/// Concatenates multiple collections.
661///
662/// This method has the effect of a sequence of calls to `concat`, but it does
663/// so in one operator rather than a chain of many operators.
664///
665/// # Examples
666///
667/// ```
668/// use differential_dataflow::input::Input;
669///
670/// ::timely::example(|scope| {
671///
672/// let data = scope.new_collection_from(1 .. 10).1;
673///
674/// let odds = data.filter(|x| x % 2 == 1);
675/// let evens = data.filter(|x| x % 2 == 0);
676///
677/// differential_dataflow::collection::concatenate(scope, vec![odds, evens])
678/// .assert_eq(&data);
679/// });
680/// ```
681pub fn concatenate<G, D, R, C, I>(scope: &mut G, iterator: I) -> Collection<G, D, R, C>
682where
683 G: Scope,
684 D: Data,
685 R: Semigroup + 'static,
686 C: Container,
687 I: IntoIterator<Item=Collection<G, D, R, C>>,
688{
689 scope
690 .concatenate(iterator.into_iter().map(|x| x.inner))
691 .as_collection()
692}