Expand description

Renders a plan into a timely/differential dataflow computation.

Error handling

Timely and differential have no idioms for computations that can error. The philosophy is, reasonably, to define the semantics of the computation such that errors are unnecessary: e.g., by using wrap-around semantics for integer overflow.

Unfortunately, SQL semantics are not nearly so elegant, and require errors in myriad cases. The classic example is a division by zero, but invalid input for casts, overflowing integer operations, and dozens of other functions need the ability to produce errors ar runtime.

At the moment, only scalar expression evaluation can fail, so only operators that evaluate scalar expressions can fail. At the time of writing, that includes map, filter, reduce, and join operators. Constants are a bit of a special case: they can be either a constant vector of rows or a constant, singular error.

The approach taken is to build two parallel trees of computation: one for the rows that have been successfully evaluated (the “oks tree”), and one for the errors that have been generated (the “errs tree”). For example:

   oks1  errs1       oks2  errs2
     |     |           |     |
     |     |           |     |
  project  |           |     |
     |     |           |     |
     |     |           |     |
    map    |           |     |
     |\    |           |     |
     | \   |           |     |
     |  \  |           |     |
     |   \ |           |     |
     |    \|           |     |
  project  +           +     +
     |     |          /     /
     |     |         /     /
   join ------------+     /
     |     |             /
     |     | +----------+
     |     |/
    oks   errs

The project operation cannot fail, so errors from errs1 are propagated directly. Map operators are fallible and so can inject additional errors into the stream. Join operators combine the errors from each of their inputs.

The semantics of the error stream are minimal. From the perspective of SQL, a dataflow is considered to be in an error state if there is at least one element in the final errs collection. The error value returned to the user is selected arbitrarily; SQL only makes provisions to return one error to the user at a time. There are plans to make the err collection accessible to end users, so they can see all errors at once.

To make errors transient, simply ensure that the operator can retract any produced errors when corrected data arrives. To make errors permanent, write the operator such that it never retracts the errors it produced. Future work will likely want to introduce some sort of sort order for errors, so that permanent errors are returned to the user ahead of transient errors—probably by introducing a new error type a la:

enum DataflowError {

If the error stream is empty, the oks stream must be correct. If the error stream is non-empty, then there are no semantics for the oks stream. This is sufficient to support SQL in its current form, but is likely to be unsatisfactory long term. We suspect that we can continue to imbue the oks stream with semantics if we are very careful in describing what data should and should not be produced upon encountering an error. Roughly speaking, the oks stream could represent the correct result of the computation where all rows that caused an error have been pruned from the stream. There are strange and confusing questions here around foreign keys, though: what if the optimizer proves that a particular key must exist in a collection, but the key gets pruned away because its row participated in a scalar expression evaluation that errored?

In the meantime, it is probably wise for operators to keep the oks stream roughly “as correct as possible” even when errors are present in the errs stream. This reduces the amount of recomputation that must be performed if/when the errors are retracted.


Logic related to the creation of dataflow sources.


Assemble the “storage” side of a dataflow, i.e. the sources.