Module mz_compute::sink::materialized_view_v2
source · Expand description
A dataflow sink that writes input records to a persist shard.
This implementation is both parallel and self-correcting.
- parallel: Multiple workers can participate in writing updates for the same times, letting sink throughput scale with the number of workers allocated to the replica.
- self-correcting: The sink continually compares the contents of the persist shard with the contents of the input collection and writes down the difference. If the persist shard ends up with undesired contents for any reason, this is corrected the next time the sink manages to append to the shard.
§Operators
The persist sink consists of a graph of operators.
desired persist <—————. | | | | | | |———————. | | | | | | | | | | v v v | +––––+ +––––+ +––––+ | mint | –descs-.–> | write | –batches–> | append | +––––+ \ +––––+ .-> +––––+ _____________________/
mint
mints batch descriptions, i.e.,(lower, upper)
bounds of batches that should be written. The persist API requires that all workers write batches with the same bounds, so they can be appended as a single logical batch. To ensure this, themint
operator only runs on a single worker that broadcasts minted descriptions to all workers. Batch bounds are picked based on the frontiers of thedesired
stream and the output persist shard.write
stages batch data in persist, based on the batch descriptions received from themint
operator, but without appending it to the persist shard. This is a multi-worker operator, with each worker writing batches of the data that arrives at its local inputs. To do so it reads from thedesired
andpersist
streams and produces the difference between them to write back out, ensuring that the final contents of the persist shard matchdesired
.append
appends the batches minted bymint
and written bywrite
to the persist shard. This is again a single-worker operator. It waits for all workers to stage their batches for a given batch description, then appends all the batches together as a single logical batch.
Note that while the above graph suggests that mint
and write
both receive copies of the
desired
stream, the actual implementation passes that stream through mint
and lets write
read the passed-through stream, to avoid cloning data.
The persist sink is written to be robust to the presence of other conflicting instances (e.g. from other replicas) writing to the same persist shard. Each of the three operators needs to be able to handle conflicting writes that unexpectedly change the contents of the output persist shard.
§Frontiers
The desired
frontier tracks the progress of the upstream dataflow, but may be rounded up to
the next refresh time for dataflows that follow a refresh schedule other than “on commit”.
The persist
frontier tracks the upper
frontier of the target persist shard, with one
exception: When the persist_source
that reads back the shard is rendered, it will start
reading at its since
frontier. So if the shard’s since
is initially greater than its
upper
, the persist
frontier too will be in advance of the shard upper
, until the upper
has caught up. To avoid getting confused by this edge case, the mint
operator does not use
the persist
stream to observe the shard frontier but keeps its own WriteHandle
instead.
The descs
frontier communicates which lower
bounds may still be emitted in batch
descriptions. All future batch descriptions will have a lower
that is greater or equal to the
current descs
frontier.
The batches
frontier communicates for which lower
bounds batches may still be written. All
batches for descriptions with lower
s less than the current batches
frontier have already
been written.
§Invariants
The implementation upholds several invariants that can be relied upon to simplify the implementation:
lower
s in minted batch descriptions are unique and strictly increasing. That is, themint
operator will never mint the samelower
twice and a mintedlower
is always greater than any previously minted ones.upper
s in minted batch descriptions are monotonically increasing.- From (1) follows that there is always at most one “valid” batch description in flight in the operator graph. “Valid” here means that the described batch can be appended to the persist shard.
The main simplification these invariants allow is that operators only need to keep track of the
most recent batch description and/or lower
. Previous batch descriptions are not valid
anymore, so there is no reason to hold any state or perform any work in support of them.
§Read-only Mode
The persist sink can optionally be initialized in read-only mode. In this mode it is passive
and avoids any writes to persist. Activating the read_only_rx
transitions the sink into write
mode, where it commences normal operation.
Read-only mode is implemented by the mint
operator. To disable writes, the mint
operator
simply avoids minting any batch descriptions. Since both the write
and the append
operator
require batch descriptions to write/append batches, this suppresses any persist communication.
At the same time, the write
operator still observes changes to the desired
and persist
collections, allowing it to keep its correction buffer up-to-date.
Modules§
- append 🔒Implementation of the
append
operator. - mint 🔒Implementation of the
mint
operator. - write 🔒Implementation of the
write
operator.
Structs§
- A description for a batch of updates to be written.
- OkErr 🔒Generic wrapper around ok/err pairs (e.g. streams, frontiers), to simplify code dealing with such pairs.
- A persist API specialized to a single collection.
Functions§
- advance 🔒Advance the given
frontier
tonew
, if the latter one is greater. - Renders an MV sink writing the given desired collection into the
target
persist collection. - Instantiate a persist source reading back the
target
collection.
Type Aliases§
- Type of the
batches
stream. - Type of the
descs
stream. - Type of the
desired
stream, split intoOk
andErr
streams. - Type of the
persist
stream, split intoOk
andErr
streams.