differential_dataflow/operators/arrange/
arrangement.rs

1//! Arranges a collection into a re-usable trace structure.
2//!
3//! The `arrange` operator applies to a differential dataflow `Collection` and returns an `Arranged`
4//! structure, provides access to both an indexed form of accepted updates as well as a stream of
5//! batches of newly arranged updates.
6//!
7//! Several operators (`join`, `reduce`, and `count`, among others) are implemented against `Arranged`,
8//! and can be applied directly to arranged data instead of the collection. Internally, the operators
9//! will borrow the shared state, and listen on the timely stream for shared batches of data. The
10//! resources to index the collection---communication, computation, and memory---are spent only once,
11//! and only one copy of the index needs to be maintained as the collection changes.
12//!
13//! The arranged collection is stored in a trace, whose append-only operation means that it is safe to
14//! share between the single `arrange` writer and multiple readers. Each reader is expected to interrogate
15//! the trace only at times for which it knows the trace is complete, as indicated by the frontiers on its
16//! incoming channels. Failing to do this is "safe" in the Rust sense of memory safety, but the reader may
17//! see ill-defined data at times for which the trace is not complete. (All current implementations
18//! commit only completed data to the trace).
19
20use timely::dataflow::operators::{Enter, Map};
21use timely::order::PartialOrder;
22use timely::dataflow::{Scope, Stream, StreamCore};
23use timely::dataflow::operators::generic::Operator;
24use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline, Exchange};
25use timely::progress::Timestamp;
26use timely::progress::Antichain;
27use timely::dataflow::operators::Capability;
28
29use crate::{Data, ExchangeData, Collection, AsCollection, Hashable, IntoOwned};
30use crate::difference::Semigroup;
31use crate::lattice::Lattice;
32use crate::trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Builder, Cursor};
33use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine};
34
35use trace::wrappers::enter::{TraceEnter, BatchEnter,};
36use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
37use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
38use trace::wrappers::filter::{TraceFilter, BatchFilter};
39
40use super::TraceAgent;
41
42/// An arranged collection of `(K,V)` values.
43///
44/// An `Arranged` allows multiple differential operators to share the resources (communication,
45/// computation, memory) required to produce and maintain an indexed representation of a collection.
46pub struct Arranged<G: Scope, Tr>
47where
48    G::Timestamp: Lattice+Ord,
49    Tr: TraceReader+Clone,
50{
51    /// A stream containing arranged updates.
52    ///
53    /// This stream contains the same batches of updates the trace itself accepts, so there should
54    /// be no additional overhead to receiving these records. The batches can be navigated just as
55    /// the batches in the trace, by key and by value.
56    pub stream: Stream<G, Tr::Batch>,
57    /// A shared trace, updated by the `Arrange` operator and readable by others.
58    pub trace: Tr,
59    // TODO : We might have an `Option<Collection<G, (K, V)>>` here, which `as_collection` sets and
60    // returns when invoked, so as to not duplicate work with multiple calls to `as_collection`.
61}
62
63impl<G, Tr> Clone for Arranged<G, Tr>
64where
65    G: Scope<Timestamp=Tr::Time>,
66    Tr: TraceReader + Clone,
67{
68    fn clone(&self) -> Self {
69        Arranged {
70            stream: self.stream.clone(),
71            trace: self.trace.clone(),
72        }
73    }
74}
75
76use ::timely::dataflow::scopes::Child;
77use ::timely::progress::timestamp::Refines;
78use timely::Container;
79use timely::container::PushInto;
80
81impl<G, Tr> Arranged<G, Tr>
82where
83    G: Scope<Timestamp=Tr::Time>,
84    Tr: TraceReader + Clone,
85{
86    /// Brings an arranged collection into a nested scope.
87    ///
88    /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
89    /// have all been extended with an additional coordinate with the default value. The resulting collection does
90    /// not vary with the new timestamp coordinate.
91    pub fn enter<'a, TInner>(&self, child: &Child<'a, G, TInner>)
92        -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
93        where
94            TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone,
95    {
96        Arranged {
97            stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)),
98            trace: TraceEnter::make_from(self.trace.clone()),
99        }
100    }
101
102    /// Brings an arranged collection into a nested region.
103    ///
104    /// This method only applies to *regions*, which are subscopes with the same timestamp
105    /// as their containing scope. In this case, the trace type does not need to change.
106    pub fn enter_region<'a>(&self, child: &Child<'a, G, G::Timestamp>)
107        -> Arranged<Child<'a, G, G::Timestamp>, Tr> {
108        Arranged {
109            stream: self.stream.enter(child),
110            trace: self.trace.clone(),
111        }
112    }
113
114    /// Brings an arranged collection into a nested scope.
115    ///
116    /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
117    /// have all been extended with an additional coordinate with the default value. The resulting collection does
118    /// not vary with the new timestamp coordinate.
119    pub fn enter_at<'a, TInner, F, P>(&self, child: &Child<'a, G, TInner>, logic: F, prior: P)
120        -> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
121        where
122            TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
123            F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static,
124            P: FnMut(&TInner)->Tr::Time+Clone+'static,
125        {
126        let logic1 = logic.clone();
127        let logic2 = logic.clone();
128        Arranged {
129            trace: TraceEnterAt::make_from(self.trace.clone(), logic1, prior),
130            stream: self.stream.enter(child).map(move |bw| BatchEnterAt::make_from(bw, logic2.clone())),
131        }
132    }
133
134    /// Filters an arranged collection.
135    ///
136    /// This method produces a new arrangement backed by the same shared
137    /// arrangement as `self`, paired with user-specified logic that can
138    /// filter by key and value. The resulting collection is restricted
139    /// to the keys and values that return true under the user predicate.
140    ///
141    /// # Examples
142    ///
143    /// ```
144    /// use differential_dataflow::input::Input;
145    /// use differential_dataflow::operators::arrange::ArrangeByKey;
146    ///
147    /// ::timely::example(|scope| {
148    ///
149    ///     let arranged =
150    ///     scope.new_collection_from(0 .. 10).1
151    ///          .map(|x| (x, x+1))
152    ///          .arrange_by_key();
153    ///
154    ///     arranged
155    ///         .filter(|k,v| k == v)
156    ///         .as_collection(|k,v| (*k,*v))
157    ///         .assert_empty();
158    /// });
159    /// ```
160    pub fn filter<F>(&self, logic: F)
161        -> Arranged<G, TraceFilter<Tr, F>>
162        where
163            F: FnMut(Tr::Key<'_>, Tr::Val<'_>)->bool+Clone+'static,
164    {
165        let logic1 = logic.clone();
166        let logic2 = logic.clone();
167        Arranged {
168            trace: TraceFilter::make_from(self.trace.clone(), logic1),
169            stream: self.stream.map(move |bw| BatchFilter::make_from(bw, logic2.clone())),
170        }
171    }
172    /// Flattens the stream into a `Collection`.
173    ///
174    /// The underlying `Stream<G, BatchWrapper<T::Batch>>` is a much more efficient way to access the data,
175    /// and this method should only be used when the data need to be transformed or exchanged, rather than
176    /// supplied as arguments to an operator using the same key-value structure.
177    pub fn as_collection<D: Data, L>(&self, mut logic: L) -> Collection<G, D, Tr::Diff>
178        where
179            L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static,
180    {
181        self.flat_map_ref(move |key, val| Some(logic(key,val)))
182    }
183
184    /// Extracts elements from an arrangement as a collection.
185    ///
186    /// The supplied logic may produce an iterator over output values, allowing either
187    /// filtering or flat mapping as part of the extraction.
188    pub fn flat_map_ref<I, L>(&self, logic: L) -> Collection<G, I::Item, Tr::Diff>
189        where
190            I: IntoIterator,
191            I::Item: Data,
192            L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
193    {
194        Self::flat_map_batches(&self.stream, logic)
195    }
196
197    /// Extracts elements from a stream of batches as a collection.
198    ///
199    /// The supplied logic may produce an iterator over output values, allowing either
200    /// filtering or flat mapping as part of the extraction.
201    ///
202    /// This method exists for streams of batches without the corresponding arrangement.
203    /// If you have the arrangement, its `flat_map_ref` method is equivalent to this.
204    pub fn flat_map_batches<I, L>(stream: &Stream<G, Tr::Batch>, mut logic: L) -> Collection<G, I::Item, Tr::Diff>
205    where
206        I: IntoIterator,
207        I::Item: Data,
208        L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
209    {
210        stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
211            input.for_each(|time, data| {
212                let mut session = output.session(&time);
213                for wrapper in data.iter() {
214                    let batch = &wrapper;
215                    let mut cursor = batch.cursor();
216                    while let Some(key) = cursor.get_key(batch) {
217                        while let Some(val) = cursor.get_val(batch) {
218                            for datum in logic(key, val) {
219                                cursor.map_times(batch, |time, diff| {
220                                    session.give((datum.clone(), time.into_owned(), diff.into_owned()));
221                                });
222                            }
223                            cursor.step_val(batch);
224                        }
225                        cursor.step_key(batch);
226                    }
227                }
228            });
229        })
230        .as_collection()
231    }
232}
233
234
235use crate::difference::Multiply;
236// Direct join implementations.
237impl<G, T1> Arranged<G, T1>
238where
239    G: Scope<Timestamp=T1::Time>,
240    T1: TraceReader + Clone + 'static,
241{
242    /// A direct implementation of the `JoinCore::join_core` method.
243    pub fn join_core<T2,I,L>(&self, other: &Arranged<G,T2>, mut result: L) -> Collection<G,I::Item,<T1::Diff as Multiply<T2::Diff>>::Output>
244    where
245        T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>,Time=T1::Time>+Clone+'static,
246        T1::Diff: Multiply<T2::Diff>,
247        <T1::Diff as Multiply<T2::Diff>>::Output: Semigroup+'static,
248        I: IntoIterator,
249        I::Item: Data,
250        L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>)->I+'static
251    {
252        let result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: T2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &T2::Diff| {
253            let t = t.clone();
254            let r = (r1.clone()).multiply(r2);
255            result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
256        };
257        self.join_core_internal_unsafe(other, result)
258    }
259    /// A direct implementation of the `JoinCore::join_core_internal_unsafe` method.
260    pub fn join_core_internal_unsafe<T2,I,L,D,ROut> (&self, other: &Arranged<G,T2>, mut result: L) -> Collection<G,D,ROut>
261    where
262        T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>, Time=T1::Time>+Clone+'static,
263        D: Data,
264        ROut: Semigroup+'static,
265        I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
266        L: FnMut(T1::Key<'_>, T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static,
267    {
268        use crate::operators::join::join_traces;
269        join_traces::<_, _, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
270            self,
271            other,
272            move |k, v1, v2, t, d1, d2, c| {
273                for datum in result(k, v1, v2, t, d1, d2) {
274                    c.give(datum);
275                }
276            }
277        )
278            .as_collection()
279    }
280}
281
282// Direct reduce implementations.
283use crate::difference::Abelian;
284impl<G, T1> Arranged<G, T1>
285where
286    G: Scope<Timestamp = T1::Time>,
287    T1: TraceReader + Clone + 'static,
288{
289    /// A direct implementation of `ReduceCore::reduce_abelian`.
290    pub fn reduce_abelian<L, K, V, Bu, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
291    where
292        for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
293        T2: for<'a> Trace<Key<'a>= T1::Key<'a>, Time=T1::Time>+'static,
294        K: Ord + 'static,
295        V: Data,
296        for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>,
297        T2::Diff: Abelian,
298        T2::Batch: Batch,
299        Bu: Builder<Time=G::Timestamp, Output = T2::Batch>,
300        Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
301        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static,
302    {
303        self.reduce_core::<_,K,V,Bu,T2>(name, move |key, input, output, change| {
304            if !input.is_empty() {
305                logic(key, input, change);
306            }
307            change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
308            crate::consolidation::consolidate(change);
309        })
310    }
311
312    /// A direct implementation of `ReduceCore::reduce_core`.
313    pub fn reduce_core<L, K, V, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
314    where
315        for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
316        T2: for<'a> Trace<Key<'a>=T1::Key<'a>, Time=T1::Time>+'static,
317        K: Ord + 'static,
318        V: Data,
319        for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>,
320        T2::Batch: Batch,
321        Bu: Builder<Time=G::Timestamp, Output = T2::Batch>,
322        Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
323        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
324    {
325        use crate::operators::reduce::reduce_trace;
326        reduce_trace::<_,_,Bu,_,_,V,_>(self, name, logic)
327    }
328}
329
330
331impl<'a, G, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
332where
333    G: Scope<Timestamp=Tr::Time>,
334    Tr: TraceReader + Clone,
335{
336    /// Brings an arranged collection out of a nested region.
337    ///
338    /// This method only applies to *regions*, which are subscopes with the same timestamp
339    /// as their containing scope. In this case, the trace type does not need to change.
340    pub fn leave_region(&self) -> Arranged<G, Tr> {
341        use timely::dataflow::operators::Leave;
342        Arranged {
343            stream: self.stream.leave(),
344            trace: self.trace.clone(),
345        }
346    }
347}
348
349/// A type that can be arranged as if a collection of updates.
350pub trait Arrange<G, C>
351where
352    G: Scope,
353    G::Timestamp: Lattice,
354{
355    /// Arranges updates into a shared trace.
356    fn arrange<Ba, Bu, Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
357    where
358        Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
359        Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
360        Tr: Trace<Time=G::Timestamp> + 'static,
361        Tr::Batch: Batch,
362    {
363        self.arrange_named::<Ba, Bu, Tr>("Arrange")
364    }
365
366    /// Arranges updates into a shared trace, with a supplied name.
367    fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
368    where
369        Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
370        Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
371        Tr: Trace<Time=G::Timestamp> + 'static,
372        Tr::Batch: Batch,
373    ;
374}
375
376impl<G, K, V, R> Arrange<G, Vec<((K, V), G::Timestamp, R)>> for Collection<G, (K, V), R>
377where
378    G: Scope,
379    G::Timestamp: Lattice,
380    K: ExchangeData + Hashable,
381    V: ExchangeData,
382    R: ExchangeData + Semigroup,
383{
384    fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
385    where
386        Ba: Batcher<Input=Vec<((K, V), G::Timestamp, R)>, Time=G::Timestamp> + 'static,
387        Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
388        Tr: Trace<Time=G::Timestamp> + 'static,
389        Tr::Batch: Batch,
390    {
391        let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
392        arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name)
393    }
394}
395
396/// Arranges a stream of updates by a key, configured with a name and a parallelization contract.
397///
398/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
399/// It uses the supplied parallelization contract to distribute the data, which does not need to
400/// be consistently by key (though this is the most common).
401pub fn arrange_core<G, P, Ba, Bu, Tr>(stream: &StreamCore<G, Ba::Input>, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
402where
403    G: Scope,
404    G::Timestamp: Lattice,
405    P: ParallelizationContract<G::Timestamp, Ba::Input>,
406    Ba: Batcher<Time=G::Timestamp> + 'static,
407    Ba::Input: Container + Clone + 'static,
408    Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
409    Tr: Trace<Time=G::Timestamp>+'static,
410    Tr::Batch: Batch,
411{
412    // The `Arrange` operator is tasked with reacting to an advancing input
413    // frontier by producing the sequence of batches whose lower and upper
414    // bounds are those frontiers, containing updates at times greater or
415    // equal to lower and not greater or equal to upper.
416    //
417    // The operator uses its batch type's `Batcher`, which accepts update
418    // triples and responds to requests to "seal" batches (presented as new
419    // upper frontiers).
420    //
421    // Each sealed batch is presented to the trace, and if at all possible
422    // transmitted along the outgoing channel. Empty batches may not have
423    // a corresponding capability, as they are only retained for actual data
424    // held by the batcher, which may prevents the operator from sending an
425    // empty batch.
426
427    let mut reader: Option<TraceAgent<Tr>> = None;
428
429    // fabricate a data-parallel operator using the `unary_notify` pattern.
430    let reader_ref = &mut reader;
431    let scope = stream.scope();
432
433    let stream = stream.unary_frontier(pact, name, move |_capability, info| {
434
435        // Acquire a logger for arrange events.
436        let logger = scope.logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
437
438        // Where we will deposit received updates, and from which we extract batches.
439        let mut batcher = Ba::new(logger.clone(), info.global_id);
440
441        // Capabilities for the lower envelope of updates in `batcher`.
442        let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
443
444        let activator = Some(scope.activator_for(info.address.clone()));
445        let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
446        // If there is default exertion logic set, install it.
447        if let Some(exert_logic) = scope.config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
448            empty_trace.set_exert_logic(exert_logic);
449        }
450
451        let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
452
453        *reader_ref = Some(reader_local);
454
455        // Initialize to the minimal input frontier.
456        let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
457
458        move |input, output| {
459
460            // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
461            // We don't have to keep all capabilities, but we need to be able to form output messages
462            // when we realize that time intervals are complete.
463
464            input.for_each(|cap, data| {
465                capabilities.insert(cap.retain());
466                batcher.push_container(data);
467            });
468
469            // The frontier may have advanced by multiple elements, which is an issue because
470            // timely dataflow currently only allows one capability per message. This means we
471            // must pretend to process the frontier advances one element at a time, batching
472            // and sending smaller bites than we might have otherwise done.
473
474            // Assert that the frontier never regresses.
475            assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier()));
476
477            // Test to see if strict progress has occurred, which happens whenever the new
478            // frontier isn't equal to the previous. It is only in this case that we have any
479            // data processing to do.
480            if prev_frontier.borrow() != input.frontier().frontier() {
481                // There are two cases to handle with some care:
482                //
483                // 1. If any held capabilities are not in advance of the new input frontier,
484                //    we must carve out updates now in advance of the new input frontier and
485                //    transmit them as batches, which requires appropriate *single* capabilities;
486                //    Until timely dataflow supports multiple capabilities on messages, at least.
487                //
488                // 2. If there are no held capabilities in advance of the new input frontier,
489                //    then there are no updates not in advance of the new input frontier and
490                //    we can simply create an empty input batch with the new upper frontier
491                //    and feed this to the trace agent (but not along the timely output).
492
493                // If there is at least one capability not in advance of the input frontier ...
494                if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {
495
496                    let mut upper = Antichain::new();   // re-used allocation for sealing batches.
497
498                    // For each capability not in advance of the input frontier ...
499                    for (index, capability) in capabilities.elements().iter().enumerate() {
500
501                        if !input.frontier().less_equal(capability.time()) {
502
503                            // Assemble the upper bound on times we can commit with this capabilities.
504                            // We must respect the input frontier, and *subsequent* capabilities, as
505                            // we are pretending to retire the capability changes one by one.
506                            upper.clear();
507                            for time in input.frontier().frontier().iter() {
508                                upper.insert(time.clone());
509                            }
510                            for other_capability in &capabilities.elements()[(index + 1) .. ] {
511                                upper.insert(other_capability.time().clone());
512                            }
513
514                            // Extract updates not in advance of `upper`.
515                            let batch = batcher.seal::<Bu>(upper.clone());
516
517                            writer.insert(batch.clone(), Some(capability.time().clone()));
518
519                            // send the batch to downstream consumers, empty or not.
520                            output.session(&capabilities.elements()[index]).give(batch);
521                        }
522                    }
523
524                    // Having extracted and sent batches between each capability and the input frontier,
525                    // we should downgrade all capabilities to match the batcher's lower update frontier.
526                    // This may involve discarding capabilities, which is fine as any new updates arrive
527                    // in messages with new capabilities.
528
529                    let mut new_capabilities = Antichain::new();
530                    for time in batcher.frontier().iter() {
531                        if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
532                            new_capabilities.insert(capability.delayed(time));
533                        }
534                        else {
535                            panic!("failed to find capability");
536                        }
537                    }
538
539                    capabilities = new_capabilities;
540                }
541                else {
542                    // Announce progress updates, even without data.
543                    let _batch = batcher.seal::<Bu>(input.frontier().frontier().to_owned());
544                    writer.seal(input.frontier().frontier().to_owned());
545                }
546
547                prev_frontier.clear();
548                prev_frontier.extend(input.frontier().frontier().iter().cloned());
549            }
550
551            writer.exert();
552        }
553    });
554
555    Arranged { stream, trace: reader.unwrap() }
556}
557
558impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, Vec<((K, ()), G::Timestamp, R)>> for Collection<G, K, R>
559where
560    G::Timestamp: Lattice+Ord,
561{
562    fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
563    where
564        Ba: Batcher<Input=Vec<((K,()),G::Timestamp,R)>, Time=G::Timestamp> + 'static,
565        Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
566        Tr: Trace<Time=G::Timestamp> + 'static,
567        Tr::Batch: Batch,
568    {
569        let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into());
570        arrange_core::<_,_,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name)
571    }
572}
573
574/// Arranges something as `(Key,Val)` pairs according to a type `T` of trace.
575///
576/// This arrangement requires `Key: Hashable`, and uses the `hashed()` method to place keys in a hashed
577/// map. This can result in many hash calls, and in some cases it may help to first transform `K` to the
578/// pair `(u64, K)` of hash value and key.
579pub trait ArrangeByKey<G: Scope, K: Data+Hashable, V: Data, R: Ord+Semigroup+'static>
580where G::Timestamp: Lattice+Ord {
581    /// Arranges a collection of `(Key, Val)` records by `Key`.
582    ///
583    /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
584    /// This trace is current for all times completed by the output stream, which can be used to
585    /// safely identify the stable times and values in the trace.
586    fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
587
588    /// As `arrange_by_key` but with the ability to name the arrangement.
589    fn arrange_by_key_named(&self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
590}
591
592impl<G: Scope, K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData+Semigroup> ArrangeByKey<G, K, V, R> for Collection<G, (K,V), R>
593where
594    G::Timestamp: Lattice+Ord
595{
596    fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
597        self.arrange_by_key_named("ArrangeByKey")
598    }
599
600    fn arrange_by_key_named(&self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
601        self.arrange_named::<ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(name)
602    }
603}
604
605/// Arranges something as `(Key, ())` pairs according to a type `T` of trace.
606///
607/// This arrangement requires `Key: Hashable`, and uses the `hashed()` method to place keys in a hashed
608/// map. This can result in many hash calls, and in some cases it may help to first transform `K` to the
609/// pair `(u64, K)` of hash value and key.
610pub trait ArrangeBySelf<G: Scope, K: Data+Hashable, R: Ord+Semigroup+'static>
611where
612    G::Timestamp: Lattice+Ord
613{
614    /// Arranges a collection of `Key` records by `Key`.
615    ///
616    /// This operator arranges a collection of records into a shared trace, whose contents it maintains.
617    /// This trace is current for all times complete in the output stream, which can be used to safely
618    /// identify the stable times and values in the trace.
619    fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
620
621    /// As `arrange_by_self` but with the ability to name the arrangement.
622    fn arrange_by_self_named(&self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
623}
624
625
626impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> ArrangeBySelf<G, K, R> for Collection<G, K, R>
627where
628    G::Timestamp: Lattice+Ord
629{
630    fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
631        self.arrange_by_self_named("ArrangeBySelf")
632    }
633
634    fn arrange_by_self_named(&self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
635        self.map(|k| (k, ()))
636            .arrange_named::<KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(name)
637    }
638}