Expand description
Renders a plan into a timely/differential dataflow computation.
§Error handling
Timely and differential have no idioms for computations that can error. The philosophy is, reasonably, to define the semantics of the computation such that errors are unnecessary: e.g., by using wrap-around semantics for integer overflow.
Unfortunately, SQL semantics are not nearly so elegant, and require errors in myriad cases. The classic example is a division by zero, but invalid input for casts, overflowing integer operations, and dozens of other functions need the ability to produce errors ar runtime.
At the moment, only scalar expression evaluation can fail, so only operators that evaluate scalar expressions can fail. At the time of writing, that includes map, filter, reduce, and join operators. Constants are a bit of a special case: they can be either a constant vector of rows or a constant, singular error.
The approach taken is to build two parallel trees of computation: one for the rows that have been successfully evaluated (the “oks tree”), and one for the errors that have been generated (the “errs tree”). For example:
   oks1  errs1       oks2  errs2
     |     |           |     |
     |     |           |     |
  project  |           |     |
     |     |           |     |
     |     |           |     |
    map    |           |     |
     |\    |           |     |
     | \   |           |     |
     |  \  |           |     |
     |   \ |           |     |
     |    \|           |     |
  project  +           +     +
     |     |          /     /
     |     |         /     /
   join ------------+     /
     |     |             /
     |     | +----------+
     |     |/
    oks   errsThe project operation cannot fail, so errors from errs1 are propagated directly. Map operators are fallible and so can inject additional errors into the stream. Join operators combine the errors from each of their inputs.
The semantics of the error stream are minimal. From the perspective of SQL, a dataflow is considered to be in an error state if there is at least one element in the final errs collection. The error value returned to the user is selected arbitrarily; SQL only makes provisions to return one error to the user at a time. There are plans to make the err collection accessible to end users, so they can see all errors at once.
To make errors transient, simply ensure that the operator can retract any produced errors when corrected data arrives. To make errors permanent, write the operator such that it never retracts the errors it produced. Future work will likely want to introduce some sort of sort order for errors, so that permanent errors are returned to the user ahead of transient errors—probably by introducing a new error type a la:
enum DataflowError {
    Transient(EvalError),
    Permanent(SourceError),
}If the error stream is empty, the oks stream must be correct. If the error stream is non-empty, then there are no semantics for the oks stream. This is sufficient to support SQL in its current form, but is likely to be unsatisfactory long term. We suspect that we can continue to imbue the oks stream with semantics if we are very careful in describing what data should and should not be produced upon encountering an error. Roughly speaking, the oks stream could represent the correct result of the computation where all rows that caused an error have been pruned from the stream. There are strange and confusing questions here around foreign keys, though: what if the optimizer proves that a particular key must exist in a collection, but the key gets pruned away because its row participated in a scalar expression evaluation that errored?
In the meantime, it is probably wise for operators to keep the oks stream roughly “as correct as possible” even when errors are present in the errs stream. This reduces the amount of recomputation that must be performed if/when the errors are retracted.
Re-exports§
- pub use context::CollectionBundle;
- pub use join::LinearJoinSpec;
Modules§
- context
- Management of dataflow-local state, like arrangements, while building a dataflow.
- continual_task 🔒
- A continual task presents as something like a TRIGGER: it watches some input and whenever it changes at timeT, executes a SQL txn, writing to some output at the same timeT. It can also read anything in materialize as a reference, most notably including the output.
- errors 🔒
- Helpers for handling errors encountered by operators.
- flat_map 🔒
- join 🔒
- Rendering of MirRelationExpr::Joinoperators, and supporting types.
- reduce 🔒
- Reduction dataflow construction.
- sinks
- Logic related to the creation of dataflow sinks.
- threshold 🔒
- Threshold execution logic.
- top_k 🔒
- TopK execution logic.
Structs§
- Pairer 🔒
- Helper to merge pairs of datum iterators into a row or split a datum iterator into two rows, given the arity of the first component.
- PendingTimes 🔒Display 
- A formatter for an iterator of timestamps that displays the first element, and subsequently the difference between timestamps.
- StartSignal 🔒
- A signal that can be awaited by operators to suspend them prior to startup.
Enums§
- BindingInfo 🔒
- Information about bindings, tracked in render_recursive_planandrender_plan, to be passed torender_letfree_plan.
Traits§
- LimitProgress 🔒
- Extension trait for StreamCoreto selectively limit progress.
- RenderTimestamp 
- A timestamp type that can be used for operations within MZ’s dataflow layer.
- WithStart 🔒Signal 
- Extension trait to attach a StartSignalto operator outputs.
Functions§
- build_compute_ dataflow 
- Assemble the “compute” side of a dataflow, i.e. all but the sources.
- render_shutdown_ 🔒fuse 
- Wraps the provided Collectionwith an operator that passes through all received inputs as long as the providedShutdownProbedoes not announce dataflow shutdown. Once the token dataflow is shutting down, all data flowing into the operator is dropped.
- suppress_early_ 🔒progress 
- Suppress progress messages for times before the given as_of.