differential_dataflow/operators/arrange/
mod.rs

1//! Types and traits for arranging collections.
2//!
3//! Differential dataflow collections can be "arranged" into maintained, worker-local
4//! indices that can be re-used by other dataflows at relatively low cost.
5//!
6//! The `arrange` operator, and its variants, takes a `Collection` and produces as an
7//! output an instance of the `Arrangement` type. An arrangement is logically equivalent
8//! to its input collection, but it is distributed across workers and maintained in a
9//! way that makes it easy to re-use.
10//!
11//! The `arrange` operator receives update triples `(data, time, diff)` from its input,
12//! and responds to changes in its input frontier, which as it advances signals further
13//! times that will no longer be observed in input updates. For each frontier advance,
14//! the operator creates a new "batch", containing exactly those updates whose times are
15//! in advance of the previous frontier but not in advance of the new frontier. Updates
16//! are partitioned among workers by a key, and each batch is indexed by this key.
17//!
18//! This sequence of batches defines a continually expanding view of committed updates
19//! in the collection.
20//! The sequence is presented by the `Arrangement` in two forms (its fields):
21//!
22//! 1.  A timely dataflow `Stream` of batch elements.
23//!
24//!     The stream is used by operators that want to exploit the arranged structure of
25//!     batches, but want the push-based computational model of timely dataflow.
26//!     Many differential dataflow operators can consume streams of batches, although
27//!     they may also rely on access to the second representation of the sequence.
28//!
29//! 2.  A `Trace` type that provides a compact representation of the accumulated batches.
30//!
31//!     A trace is logically equivalent to a sequence of batches, but it is able to alter
32//!     the representation for efficiency. In particular, the trace may merge batches so
33//!     that the total number is kept small, and it may merge logical times if it able to
34//!     determine that no trace users can distinguish between them.
35//!
36//! Importantly, the `Trace` type has no connection to the timely dataflow runtime.
37//! This means a trace can be used in a variety of contexts where a `Stream` would not be
38//! appropriate, for example outside of the dataflow in which the arrangement is performed.
39//! Traces may be directly inspected by any code with access to them, and they can even be
40//! used to introduce the batches to other dataflows with the `import` method.
41
42use std::rc::{Rc, Weak};
43use std::cell::RefCell;
44use std::collections::VecDeque;
45
46use timely::scheduling::Activator;
47use timely::progress::Antichain;
48use crate::trace::TraceReader;
49
50/// Operating instructions on how to replay a trace.
51pub enum TraceReplayInstruction<Tr>
52where
53    Tr: TraceReader,
54{
55    /// Describes a frontier advance.
56    Frontier(Antichain<Tr::Time>),
57    /// Describes a batch of data and a capability hint.
58    Batch(Tr::Batch, Option<Tr::Time>),
59}
60
61// Short names for strongly and weakly owned activators and shared queues.
62type BatchQueue<Tr> = VecDeque<TraceReplayInstruction<Tr>>;
63type TraceAgentQueueReader<Tr> = Rc<(Activator, RefCell<BatchQueue<Tr>>)>;
64type TraceAgentQueueWriter<Tr> = Weak<(Activator, RefCell<BatchQueue<Tr>>)>;
65
66pub mod writer;
67pub mod agent;
68pub mod arrangement;
69
70pub mod upsert;
71
72pub use self::writer::TraceWriter;
73pub use self::agent::{TraceAgent, ShutdownButton};
74
75pub use self::arrangement::{Arranged, Arrange, ArrangeByKey, ArrangeBySelf};