Renders ingestions and exports into timely dataflow


§Overall structure

Before describing any of the timely operators involved in ingesting a source it helps to understand the high level structure of the timely scopes involved. The reason for this structure is the fact that we ingest external sources with a source-specific, and source implementation defined, timestamp type which tracks progress in a way that the source implementation understands. Each source specific timestamp must be compatible with timely’s timely::progress::Timestamp trait and so it’s suitable to represent timely streams and by extension differential collections.

On the other hand, Materialize expects a specific timestamp type for all its collections (currently mz_repr::Timestamp) so at some point the dataflow’s timestamp must change. More generally, the ingestion dataflow starts with some timestamp type FromTime and ends with another timestamp type IntoTime.

Here we run into a problem though because we want to start with a timely stream of type Stream<G1: Scope<Timestamp=FromTime>, ..> and end up using it in a scope G2 whose timestamp type is IntoTime. Timely dataflows are organized in scopes where each scope has an associated timestamp type that must refine the timestamp type of its parent scope. What “refines” means is defined by the timely::progress::timestamp::Refines trait in timely. FromTime however does not refine IntoTime nor does IntoTime refine FromTime.

In order to acomplish this we split ingestion dataflows in two scopes, both of which are children of the root timely scope. The first scope is timestamped with FromTime and the second one with IntoTime. To move timely streams from the one scope to the other we must do so manually. Each stream that needs to be transferred between scopes is first captured using timely::dataflow::operators::capture::capture::Capture into a tokio unbounded mpsc channel. The data in the channel record in full detail the worker-local view of the original stream and whoever controls the receiver can read in the events, in the standard way of consuming the async channel, and work with it. How the receiver is turned back into a timely stream in the destination scope is described in the next section.

For now keep in mind the general structure of the dataflow:

|                                                         |
|  +---FromTime Scope---+         +---IntoTime Scope--+   |                                                   |
|  |                    |         |                   |   |
|  |                 *--+---------+-->                |   |
|  |                    |         |                   |   |
|  |                 <--+---------+--*                |   |
|  +--------------------+    ^    +-------------------+   |
|                            |                            |
|                            |                            |
|                  data exchanged between                 |
|                 scopes with capture/reclock             |

§Detailed dataflow

We are now ready to describe the detailed structure of the ingestion dataflow. The dataflow begins with the source reader dataflow fragment which is rendered in a FromTime timely scope. This scope’s timestamp is controlled by the crate::source::types::SourceRender::Time associated type and can be anything the source implementation desires.

Each source is free to render any arbitrary dataflow fragment in that scope as long as it produces the collections expected by the rest of the framework. The rendering is handled by the `crate::source::types::SourceRender::render method.

When rendering a source dataflow we expect three outputs. First, a health output, which is how the source communicates status updates about its health. Second, a data output, which is the main output of a source and contains the data that will eventually be recorded in the persist shard. Finally, an optional upper frontier output, which tracks the overall upstream upper frontier. When a source doesn’t provide a dedicated progress output the framework derives one by observing the progress of the data output. This output (derived or not) is what drives reclocking. When a source provides a dedicated upper output, it can manage it independently of the data output frontier. For example, it’s possible that a source implementation queries the upstream system to learn what are the latest offsets for and set the upper output based on that, even before having started the actual ingestion, which would be presented as data and progress trickling in via the data output.

                                                  resume upper
                                            /                     |
                           health     ,----+---.                  |
                           output     | source |                  |
                          ,-----------| reader |                  |
                         /            +--,---.-+                  |
                        /               /     \                   |
                 +-----/----+   data   /       \  upper           |
                 |  health  |   output/         \ output          |
                 | operator |         |          \                |
                 +----------+         |           |               |
 FromTime                             |           |               |
    scope                             |           |               |
 IntoTime                             |           |               |
    scope                             |      ,----+-----.         |
                                      |     |  remap   |          |
                                      |     | operator |          |
                                      |     +---,------+          |
                                      |        /                  |
                                      |       / bindings          |
                                      |      /                    |
                                    ,-+-----+--.                  |
                                    | reclock  |                  |
                                    | operator |                  |
                                    +-,--,---.-+                  |
                          ,----------´.-´     \                   |
                      _.-´         .-´         \                  |
                  _.-´          .-´             \                 |
               .-´            ,´                 \                |
              /              /                    \               |
       ,----------.   ,----------.           ,----------.         |
       |  decode  |   |  decode  |   ....    |  decode  |         |
       | output 0 |   | output 1 |           | output N |         |
       +-----+----+   +-----+----+           +-----+----+         |
             |              |                      |              |
             |              |                      |              |
       ,-----+----.   ,-----+----.           ,-----+----.         |
       | envelope |   | envelope |   ....    | envelope |         |
       | output 0 |   | output 1 |           | output N |         |
       +----------+   +-----+----+           +-----+----+         |
             |              |                      |              |
             |              |                      |              |
       ,-----+----.   ,-----+----.           ,-----+----.         |
       |  persist |   |  persist |   ....    |  persist |         |
       |  sink 0  |   |  sink 1  |           |  sink N  |         |
       +-----+----+   +-----+----+           +-----+----+         |
              \              \                    /               |
               `-.            `,                 /                |
                  `-._          `-.             /                 |
                      `-._         `-.         /                  |
                          `---------. `-.     /                   |
                                    +`---`---+---,                |
                                    |   resume   |                |
                                    | calculator |                |
                                    +------+-----+                |
                                            \                     |

Whenever a dataflow edge crosses the scope boundaries it must first be converted into a captured stream via the [mz_timely_util::capture::UnboundedTokioCapture] utility. This disassociates the stream and its progress information from the original timely scope and allows it to be read from a different place. The downside of this mechanism is that it’s invisible to timely’s progress tracking, but that seems like a necessary evil if we want to do reclocking.

The two main ways these tokio-fied streams are turned back into normal timely streams in the destination scope are by the reclock operator and the remap operator which process the data output and upper output of the source reader respectively.

The remap operator reads the upper output, which is composed only of frontiers, mints new bindings, and writes them into the remap shard. The final durable timestamp bindings are emitted as its output for consumption by the reclock operator.

The reclock operator reads the data output, which contains both data and progress statements, and uses the bindings it receives from the remap operator to reclock each piece of data and each frontier statement into the target scope’s timestamp and emit the reclocked stream in its output.


At this point we have a timely stream with correctly timestamped data in the mz time domain (mz_repr::Timestamp) which contains multiplexed messages for each of the potential subsources of this source. Each message selects the output it belongs to by setting the output field in crate::source::types::SourceMessage. By convention, the main source output is always output zero and subsources get the outputs from one onwards.

However, regardless of whether the output is the main source or a subsource it is treated identically by the pipeline. Each output is demultiplexed into its own timely stream using timely::dataflow::operators::partition::Partition and the rest of the ingestion pipeline is rendered independently.

§Resumption frontier

At the end of each per-output dataflow fragment is an instance of persist_sink, which is responsible for writing the final Row data into the corresponding output shard. The durable upper of each of the output shards is then recombined in a way that calculates the minimum upper frontier between them. This is what we refer to as the “resumption frontier” or “resume upper” and at this stage it is expressed in terms of IntoTime timestamps. As a final step, this resumption frontier is converted back into a FromTime timestamped frontier using ReclockFollower::source_upper_at_frontier and connected back to the source reader operator. This frontier is what drives the OffsetCommiter which informs the upstream system to release resources until the specified offsets.


