differential_dataflow/operators/arrange/
arrangement.rs

1//! Arranges a collection into a re-usable trace structure.
2//!
3//! The `arrange` operator applies to a differential dataflow `Collection` and returns an `Arranged`
4//! structure, provides access to both an indexed form of accepted updates as well as a stream of
5//! batches of newly arranged updates.
6//!
7//! Several operators (`join`, `reduce`, and `count`, among others) are implemented against `Arranged`,
8//! and can be applied directly to arranged data instead of the collection. Internally, the operators
9//! will borrow the shared state, and listen on the timely stream for shared batches of data. The
10//! resources to index the collection---communication, computation, and memory---are spent only once,
11//! and only one copy of the index needs to be maintained as the collection changes.
12//!
13//! The arranged collection is stored in a trace, whose append-only operation means that it is safe to
14//! share between the single `arrange` writer and multiple readers. Each reader is expected to interrogate
15//! the trace only at times for which it knows the trace is complete, as indicated by the frontiers on its
16//! incoming channels. Failing to do this is "safe" in the Rust sense of memory safety, but the reader may
17//! see ill-defined data at times for which the trace is not complete. (All current implementations
18//! commit only completed data to the trace).
19
20use timely::dataflow::operators::{Enter, Map};
21use timely::order::PartialOrder;
22use timely::dataflow::{Scope, Stream, StreamCore};
23use timely::dataflow::operators::generic::Operator;
24use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline, Exchange};
25use timely::progress::Timestamp;
26use timely::progress::Antichain;
27use timely::dataflow::operators::Capability;
28
29use crate::{Data, ExchangeData, Collection, AsCollection, Hashable};
30use crate::difference::Semigroup;
31use crate::lattice::Lattice;
32use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
33use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine};
34use crate::trace::implementations::merge_batcher::container::MergerChunk;
35
36use trace::wrappers::enter::{TraceEnter, BatchEnter,};
37use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
38use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
39
40use super::TraceAgent;
41
42/// An arranged collection of `(K,V)` values.
43///
44/// An `Arranged` allows multiple differential operators to share the resources (communication,
45/// computation, memory) required to produce and maintain an indexed representation of a collection.
46pub struct Arranged<G, Tr>
47where
48    G: Scope<Timestamp: Lattice+Ord>,
49    Tr: TraceReader+Clone,
50{
51    /// A stream containing arranged updates.
52    ///
53    /// This stream contains the same batches of updates the trace itself accepts, so there should
54    /// be no additional overhead to receiving these records. The batches can be navigated just as
55    /// the batches in the trace, by key and by value.
56    pub stream: Stream<G, Tr::Batch>,
57    /// A shared trace, updated by the `Arrange` operator and readable by others.
58    pub trace: Tr,
59    // TODO : We might have an `Option<Collection<G, (K, V)>>` here, which `as_collection` sets and
60    // returns when invoked, so as to not duplicate work with multiple calls to `as_collection`.
61}
62
63impl<G, Tr> Clone for Arranged<G, Tr>
64where
65    G: Scope<Timestamp=Tr::Time>,
66    Tr: TraceReader + Clone,
67{
68    fn clone(&self) -> Self {
69        Arranged {
70            stream: self.stream.clone(),
71            trace: self.trace.clone(),
72        }
73    }
74}
75
76use ::timely::dataflow::scopes::Child;
77use ::timely::progress::timestamp::Refines;
78use timely::Container;
79use timely::container::PushInto;
80
81impl<G, Tr> Arranged<G, Tr>
82where
83    G: Scope<Timestamp=Tr::Time>,
84    Tr: TraceReader + Clone,
85{
86    /// Brings an arranged collection into a nested scope.
87    ///
88    /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
89    /// have all been extended with an additional coordinate with the default value. The resulting collection does
90    /// not vary with the new timestamp coordinate.
91    pub fn enter<'a, TInner>(&self, child: &Child<'a, G, TInner>)
92        -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
93        where
94            TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone,
95    {
96        Arranged {
97            stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)),
98            trace: TraceEnter::make_from(self.trace.clone()),
99        }
100    }
101
102    /// Brings an arranged collection into a nested region.
103    ///
104    /// This method only applies to *regions*, which are subscopes with the same timestamp
105    /// as their containing scope. In this case, the trace type does not need to change.
106    pub fn enter_region<'a>(&self, child: &Child<'a, G, G::Timestamp>)
107        -> Arranged<Child<'a, G, G::Timestamp>, Tr> {
108        Arranged {
109            stream: self.stream.enter(child),
110            trace: self.trace.clone(),
111        }
112    }
113
114    /// Brings an arranged collection into a nested scope.
115    ///
116    /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
117    /// have all been extended with an additional coordinate with the default value. The resulting collection does
118    /// not vary with the new timestamp coordinate.
119    pub fn enter_at<'a, TInner, F, P>(&self, child: &Child<'a, G, TInner>, logic: F, prior: P)
120        -> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
121        where
122            TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
123            F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static,
124            P: FnMut(&TInner)->Tr::Time+Clone+'static,
125        {
126        let logic1 = logic.clone();
127        let logic2 = logic.clone();
128        Arranged {
129            trace: TraceEnterAt::make_from(self.trace.clone(), logic1, prior),
130            stream: self.stream.enter(child).map(move |bw| BatchEnterAt::make_from(bw, logic2.clone())),
131        }
132    }
133
134    /// Flattens the stream into a `Collection`.
135    ///
136    /// The underlying `Stream<G, BatchWrapper<T::Batch>>` is a much more efficient way to access the data,
137    /// and this method should only be used when the data need to be transformed or exchanged, rather than
138    /// supplied as arguments to an operator using the same key-value structure.
139    pub fn as_collection<D: Data, L>(&self, mut logic: L) -> Collection<G, D, Tr::Diff>
140        where
141            L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static,
142    {
143        self.flat_map_ref(move |key, val| Some(logic(key,val)))
144    }
145
146    /// Extracts elements from an arrangement as a collection.
147    ///
148    /// The supplied logic may produce an iterator over output values, allowing either
149    /// filtering or flat mapping as part of the extraction.
150    pub fn flat_map_ref<I, L>(&self, logic: L) -> Collection<G, I::Item, Tr::Diff>
151        where
152            I: IntoIterator<Item: Data>,
153            L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
154    {
155        Self::flat_map_batches(&self.stream, logic)
156    }
157
158    /// Extracts elements from a stream of batches as a collection.
159    ///
160    /// The supplied logic may produce an iterator over output values, allowing either
161    /// filtering or flat mapping as part of the extraction.
162    ///
163    /// This method exists for streams of batches without the corresponding arrangement.
164    /// If you have the arrangement, its `flat_map_ref` method is equivalent to this.
165    pub fn flat_map_batches<I, L>(stream: &Stream<G, Tr::Batch>, mut logic: L) -> Collection<G, I::Item, Tr::Diff>
166    where
167        I: IntoIterator<Item: Data>,
168        L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
169    {
170        stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
171            input.for_each(|time, data| {
172                let mut session = output.session(&time);
173                for wrapper in data.iter() {
174                    let batch = &wrapper;
175                    let mut cursor = batch.cursor();
176                    while let Some(key) = cursor.get_key(batch) {
177                        while let Some(val) = cursor.get_val(batch) {
178                            for datum in logic(key, val) {
179                                cursor.map_times(batch, |time, diff| {
180                                    session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff)));
181                                });
182                            }
183                            cursor.step_val(batch);
184                        }
185                        cursor.step_key(batch);
186                    }
187                }
188            });
189        })
190        .as_collection()
191    }
192}
193
194
195use crate::difference::Multiply;
196// Direct join implementations.
197impl<G, T1> Arranged<G, T1>
198where
199    G: Scope<Timestamp=T1::Time>,
200    T1: TraceReader + Clone + 'static,
201{
202    /// A direct implementation of the `JoinCore::join_core` method.
203    pub fn join_core<T2,I,L>(&self, other: &Arranged<G,T2>, mut result: L) -> Collection<G,I::Item,<T1::Diff as Multiply<T2::Diff>>::Output>
204    where
205        T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>,Time=T1::Time>+Clone+'static,
206        T1::Diff: Multiply<T2::Diff, Output: Semigroup+'static>,
207        I: IntoIterator<Item: Data>,
208        L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>)->I+'static
209    {
210        let result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: T2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &T2::Diff| {
211            let t = t.clone();
212            let r = (r1.clone()).multiply(r2);
213            result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
214        };
215        self.join_core_internal_unsafe(other, result)
216    }
217    /// A direct implementation of the `JoinCore::join_core_internal_unsafe` method.
218    pub fn join_core_internal_unsafe<T2,I,L,D,ROut> (&self, other: &Arranged<G,T2>, mut result: L) -> Collection<G,D,ROut>
219    where
220        T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>, Time=T1::Time>+Clone+'static,
221        D: Data,
222        ROut: Semigroup+'static,
223        I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
224        L: FnMut(T1::Key<'_>, T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static,
225    {
226        use crate::operators::join::join_traces;
227        join_traces::<_, _, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
228            self,
229            other,
230            move |k, v1, v2, t, d1, d2, c| {
231                for datum in result(k, v1, v2, t, d1, d2) {
232                    c.give(datum);
233                }
234            }
235        )
236            .as_collection()
237    }
238}
239
240// Direct reduce implementations.
241use crate::difference::Abelian;
242impl<G, T1> Arranged<G, T1>
243where
244    G: Scope<Timestamp = T1::Time>,
245    T1: TraceReader + Clone + 'static,
246{
247    /// A direct implementation of `ReduceCore::reduce_abelian`.
248    pub fn reduce_abelian<L, Bu, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
249    where
250        T1: TraceReader<KeyOwn: Ord>,
251        T2: for<'a> Trace<
252            Key<'a>= T1::Key<'a>,
253            KeyOwn=T1::KeyOwn,
254            ValOwn: Data,
255            Time=T1::Time,
256            Diff: Abelian,
257        >+'static,
258        Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
259        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
260    {
261        self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
262            if !input.is_empty() {
263                logic(key, input, change);
264            }
265            change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
266            crate::consolidation::consolidate(change);
267        })
268    }
269
270    /// A direct implementation of `ReduceCore::reduce_core`.
271    pub fn reduce_core<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
272    where
273        T1: TraceReader<KeyOwn: Ord>,
274        T2: for<'a> Trace<
275            Key<'a>=T1::Key<'a>,
276            KeyOwn=T1::KeyOwn,
277            ValOwn: Data,
278            Time=T1::Time,
279        >+'static,
280        Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
281        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
282    {
283        use crate::operators::reduce::reduce_trace;
284        reduce_trace::<_,_,Bu,_,_>(self, name, logic)
285    }
286}
287
288
289impl<'a, G, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
290where
291    G: Scope<Timestamp=Tr::Time>,
292    Tr: TraceReader + Clone,
293{
294    /// Brings an arranged collection out of a nested region.
295    ///
296    /// This method only applies to *regions*, which are subscopes with the same timestamp
297    /// as their containing scope. In this case, the trace type does not need to change.
298    pub fn leave_region(&self) -> Arranged<G, Tr> {
299        use timely::dataflow::operators::Leave;
300        Arranged {
301            stream: self.stream.leave(),
302            trace: self.trace.clone(),
303        }
304    }
305}
306
307/// A type that can be arranged as if a collection of updates.
308pub trait Arrange<G, C>
309where
310    G: Scope<Timestamp: Lattice>,
311{
312    /// Arranges updates into a shared trace.
313    fn arrange<Ba, Bu, Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
314    where
315        Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
316        Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
317        Tr: Trace<Time=G::Timestamp> + 'static,
318    {
319        self.arrange_named::<Ba, Bu, Tr>("Arrange")
320    }
321
322    /// Arranges updates into a shared trace, with a supplied name.
323    fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
324    where
325        Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
326        Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
327        Tr: Trace<Time=G::Timestamp> + 'static,
328    ;
329}
330
331impl<G, K, V, R> Arrange<G, Vec<((K, V), G::Timestamp, R)>> for Collection<G, (K, V), R>
332where
333    G: Scope<Timestamp: Lattice>,
334    K: ExchangeData + Hashable,
335    V: ExchangeData,
336    R: ExchangeData + Semigroup,
337{
338    fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
339    where
340        Ba: Batcher<Input=Vec<((K, V), G::Timestamp, R)>, Time=G::Timestamp> + 'static,
341        Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
342        Tr: Trace<Time=G::Timestamp> + 'static,
343    {
344        let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
345        arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name)
346    }
347}
348
349/// Arranges a stream of updates by a key, configured with a name and a parallelization contract.
350///
351/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
352/// It uses the supplied parallelization contract to distribute the data, which does not need to
353/// be consistently by key (though this is the most common).
354pub fn arrange_core<G, P, Ba, Bu, Tr>(stream: &StreamCore<G, Ba::Input>, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
355where
356    G: Scope<Timestamp: Lattice>,
357    P: ParallelizationContract<G::Timestamp, Ba::Input>,
358    Ba: Batcher<Time=G::Timestamp,Input: Container> + 'static,
359    Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
360    Tr: Trace<Time=G::Timestamp>+'static,
361{
362    // The `Arrange` operator is tasked with reacting to an advancing input
363    // frontier by producing the sequence of batches whose lower and upper
364    // bounds are those frontiers, containing updates at times greater or
365    // equal to lower and not greater or equal to upper.
366    //
367    // The operator uses its batch type's `Batcher`, which accepts update
368    // triples and responds to requests to "seal" batches (presented as new
369    // upper frontiers).
370    //
371    // Each sealed batch is presented to the trace, and if at all possible
372    // transmitted along the outgoing channel. Empty batches may not have
373    // a corresponding capability, as they are only retained for actual data
374    // held by the batcher, which may prevents the operator from sending an
375    // empty batch.
376
377    let mut reader: Option<TraceAgent<Tr>> = None;
378
379    // fabricate a data-parallel operator using the `unary_notify` pattern.
380    let reader_ref = &mut reader;
381    let scope = stream.scope();
382
383    let stream = stream.unary_frontier(pact, name, move |_capability, info| {
384
385        // Acquire a logger for arrange events.
386        let logger = scope.logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
387
388        // Where we will deposit received updates, and from which we extract batches.
389        let mut batcher = Ba::new(logger.clone(), info.global_id);
390
391        // Capabilities for the lower envelope of updates in `batcher`.
392        let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
393
394        let activator = Some(scope.activator_for(info.address.clone()));
395        let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
396        // If there is default exertion logic set, install it.
397        if let Some(exert_logic) = scope.config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
398            empty_trace.set_exert_logic(exert_logic);
399        }
400
401        let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
402
403        *reader_ref = Some(reader_local);
404
405        // Initialize to the minimal input frontier.
406        let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
407
408        move |input, output| {
409
410            // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
411            // We don't have to keep all capabilities, but we need to be able to form output messages
412            // when we realize that time intervals are complete.
413
414            input.for_each(|cap, data| {
415                capabilities.insert(cap.retain());
416                batcher.push_container(data);
417            });
418
419            // The frontier may have advanced by multiple elements, which is an issue because
420            // timely dataflow currently only allows one capability per message. This means we
421            // must pretend to process the frontier advances one element at a time, batching
422            // and sending smaller bites than we might have otherwise done.
423
424            // Assert that the frontier never regresses.
425            assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier()));
426
427            // Test to see if strict progress has occurred, which happens whenever the new
428            // frontier isn't equal to the previous. It is only in this case that we have any
429            // data processing to do.
430            if prev_frontier.borrow() != input.frontier().frontier() {
431                // There are two cases to handle with some care:
432                //
433                // 1. If any held capabilities are not in advance of the new input frontier,
434                //    we must carve out updates now in advance of the new input frontier and
435                //    transmit them as batches, which requires appropriate *single* capabilities;
436                //    Until timely dataflow supports multiple capabilities on messages, at least.
437                //
438                // 2. If there are no held capabilities in advance of the new input frontier,
439                //    then there are no updates not in advance of the new input frontier and
440                //    we can simply create an empty input batch with the new upper frontier
441                //    and feed this to the trace agent (but not along the timely output).
442
443                // If there is at least one capability not in advance of the input frontier ...
444                if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {
445
446                    let mut upper = Antichain::new();   // re-used allocation for sealing batches.
447
448                    // For each capability not in advance of the input frontier ...
449                    for (index, capability) in capabilities.elements().iter().enumerate() {
450
451                        if !input.frontier().less_equal(capability.time()) {
452
453                            // Assemble the upper bound on times we can commit with this capabilities.
454                            // We must respect the input frontier, and *subsequent* capabilities, as
455                            // we are pretending to retire the capability changes one by one.
456                            upper.clear();
457                            for time in input.frontier().frontier().iter() {
458                                upper.insert(time.clone());
459                            }
460                            for other_capability in &capabilities.elements()[(index + 1) .. ] {
461                                upper.insert(other_capability.time().clone());
462                            }
463
464                            // Extract updates not in advance of `upper`.
465                            let batch = batcher.seal::<Bu>(upper.clone());
466
467                            writer.insert(batch.clone(), Some(capability.time().clone()));
468
469                            // send the batch to downstream consumers, empty or not.
470                            output.session(&capabilities.elements()[index]).give(batch);
471                        }
472                    }
473
474                    // Having extracted and sent batches between each capability and the input frontier,
475                    // we should downgrade all capabilities to match the batcher's lower update frontier.
476                    // This may involve discarding capabilities, which is fine as any new updates arrive
477                    // in messages with new capabilities.
478
479                    let mut new_capabilities = Antichain::new();
480                    for time in batcher.frontier().iter() {
481                        if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
482                            new_capabilities.insert(capability.delayed(time));
483                        }
484                        else {
485                            panic!("failed to find capability");
486                        }
487                    }
488
489                    capabilities = new_capabilities;
490                }
491                else {
492                    // Announce progress updates, even without data.
493                    let _batch = batcher.seal::<Bu>(input.frontier().frontier().to_owned());
494                    writer.seal(input.frontier().frontier().to_owned());
495                }
496
497                prev_frontier.clear();
498                prev_frontier.extend(input.frontier().frontier().iter().cloned());
499            }
500
501            writer.exert();
502        }
503    });
504
505    Arranged { stream, trace: reader.unwrap() }
506}
507
508impl<G, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, Vec<((K, ()), G::Timestamp, R)>> for Collection<G, K, R>
509where
510    G: Scope<Timestamp: Lattice+Ord>,
511{
512    fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
513    where
514        Ba: Batcher<Input=Vec<((K,()),G::Timestamp,R)>, Time=G::Timestamp> + 'static,
515        Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
516        Tr: Trace<Time=G::Timestamp> + 'static,
517    {
518        let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into());
519        arrange_core::<_,_,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name)
520    }
521}
522
523/// Arranges something as `(Key,Val)` pairs according to a type `T` of trace.
524///
525/// This arrangement requires `Key: Hashable`, and uses the `hashed()` method to place keys in a hashed
526/// map. This can result in many hash calls, and in some cases it may help to first transform `K` to the
527/// pair `(u64, K)` of hash value and key.
528pub trait ArrangeByKey<G: Scope, K: Data+Hashable, V: Data, R: Ord+Semigroup+'static>
529where
530    G: Scope<Timestamp: Lattice+Ord>,
531{
532    /// Arranges a collection of `(Key, Val)` records by `Key`.
533    ///
534    /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
535    /// This trace is current for all times completed by the output stream, which can be used to
536    /// safely identify the stable times and values in the trace.
537    fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
538
539    /// As `arrange_by_key` but with the ability to name the arrangement.
540    fn arrange_by_key_named(&self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
541}
542
543impl<G, K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData+Semigroup> ArrangeByKey<G, K, V, R> for Collection<G, (K,V), R>
544where
545    G: Scope<Timestamp: Lattice+Ord>,
546{
547    fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
548        self.arrange_by_key_named("ArrangeByKey")
549    }
550
551    fn arrange_by_key_named(&self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
552        self.arrange_named::<ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(name)
553    }
554}
555
556/// Arranges something as `(Key, ())` pairs according to a type `T` of trace.
557///
558/// This arrangement requires `Key: Hashable`, and uses the `hashed()` method to place keys in a hashed
559/// map. This can result in many hash calls, and in some cases it may help to first transform `K` to the
560/// pair `(u64, K)` of hash value and key.
561pub trait ArrangeBySelf<G, K: Data+Hashable, R: Ord+Semigroup+'static>
562where
563    G: Scope<Timestamp: Lattice+Ord>,
564{
565    /// Arranges a collection of `Key` records by `Key`.
566    ///
567    /// This operator arranges a collection of records into a shared trace, whose contents it maintains.
568    /// This trace is current for all times complete in the output stream, which can be used to safely
569    /// identify the stable times and values in the trace.
570    fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
571
572    /// As `arrange_by_self` but with the ability to name the arrangement.
573    fn arrange_by_self_named(&self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
574}
575
576
577impl<G, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> ArrangeBySelf<G, K, R> for Collection<G, K, R>
578where
579    G: Scope<Timestamp: Lattice+Ord>,
580{
581    fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
582        self.arrange_by_self_named("ArrangeBySelf")
583    }
584
585    fn arrange_by_self_named(&self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
586        self.map(|k| (k, ()))
587            .arrange_named::<KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(name)
588    }
589}