mz_compute::sink

Module materialized_view_v2

Source
Expand description

A dataflow sink that writes input records to a persist shard.

This implementation is both parallel and self-correcting.

  • parallel: Multiple workers can participate in writing updates for the same times, letting sink throughput scale with the number of workers allocated to the replica.
  • self-correcting: The sink continually compares the contents of the persist shard with the contents of the input collection and writes down the difference. If the persist shard ends up with undesired contents for any reason, this is corrected the next time the sink manages to append to the shard.

§Operators

The persist sink consists of a graph of operators.

desired persist <—————. | | | | | | |———————. | | | | | | | | | | v v v | +––––+ +––––+ +––––+ | mint | –descs-.–> | write | –batches–> | append | +––––+ \ +––––+ .-> +––––+ _____________________/

  • mint mints batch descriptions, i.e., (lower, upper) bounds of batches that should be written. The persist API requires that all workers write batches with the same bounds, so they can be appended as a single logical batch. To ensure this, the mint operator only runs on a single worker that broadcasts minted descriptions to all workers. Batch bounds are picked based on the frontiers of the desired stream and the output persist shard.
  • write stages batch data in persist, based on the batch descriptions received from the mint operator, but without appending it to the persist shard. This is a multi-worker operator, with each worker writing batches of the data that arrives at its local inputs. To do so it reads from the desired and persist streams and produces the difference between them to write back out, ensuring that the final contents of the persist shard match desired.
  • append appends the batches minted by mint and written by write to the persist shard. This is again a single-worker operator. It waits for all workers to stage their batches for a given batch description, then appends all the batches together as a single logical batch.

Note that while the above graph suggests that mint and write both receive copies of the desired stream, the actual implementation passes that stream through mint and lets write read the passed-through stream, to avoid cloning data.

The persist sink is written to be robust to the presence of other conflicting instances (e.g. from other replicas) writing to the same persist shard. Each of the three operators needs to be able to handle conflicting writes that unexpectedly change the contents of the output persist shard.

§Frontiers

The desired frontier tracks the progress of the upstream dataflow, but may be rounded up to the next refresh time for dataflows that follow a refresh schedule other than “on commit”.

The persist frontier tracks the upper frontier of the target persist shard, with one exception: When the persist_source that reads back the shard is rendered, it will start reading at its since frontier. So if the shard’s since is initially greater than its upper, the persist frontier too will be in advance of the shard upper, until the upper has caught up. To avoid getting confused by this edge case, the mint operator does not use the persist stream to observe the shard frontier but keeps its own WriteHandle instead.

The descs frontier communicates which lower bounds may still be emitted in batch descriptions. All future batch descriptions will have a lower that is greater or equal to the current descs frontier.

The batches frontier communicates for which lower bounds batches may still be written. All batches for descriptions with lowers less than the current batches frontier have already been written.

§Invariants

The implementation upholds several invariants that can be relied upon to simplify the implementation:

  1. lowers in minted batch descriptions are unique and strictly increasing. That is, the mint operator will never mint the same lower twice and a minted lower is always greater than any previously minted ones.
  2. uppers in minted batch descriptions are monotonically increasing.
  3. From (1) follows that there is always at most one “valid” batch description in flight in the operator graph. “Valid” here means that the described batch can be appended to the persist shard.

The main simplification these invariants allow is that operators only need to keep track of the most recent batch description and/or lower. Previous batch descriptions are not valid anymore, so there is no reason to hold any state or perform any work in support of them.

§Read-only Mode

The persist sink can optionally be initialized in read-only mode. In this mode it is passive and avoids any writes to persist. Activating the read_only_rx transitions the sink into write mode, where it commences normal operation.

Read-only mode is implemented by the mint operator. To disable writes, the mint operator simply avoids minting any batch descriptions. Since both the write and the append operator require batch descriptions to write/append batches, this suppresses any persist communication. At the same time, the write operator still observes changes to the desired and persist collections, allowing it to keep its correction buffer up-to-date.

Modules§

  • append 🔒
    Implementation of the append operator.
  • mint 🔒
    Implementation of the mint operator.
  • write 🔒
    Implementation of the write operator.

Structs§

  • A description for a batch of updates to be written.
  • OkErr 🔒
    Generic wrapper around ok/err pairs (e.g. streams, frontiers), to simplify code dealing with such pairs.
  • PersistApi 🔒
    A persist API specialized to a single collection.

Functions§

  • advance 🔒
    Advance the given frontier to new, if the latter one is greater.
  • Construct a name for the given sub-operator.
  • Renders an MV sink writing the given desired collection into the target persist collection.
  • Instantiate a persist source reading back the target collection.

Type Aliases§