Expand description
A dataflow sink that writes input records to a persist shard.
This implementation is both parallel and self-correcting.
- parallel: Multiple workers can participate in writing updates for the same times, letting sink throughput scale with the number of workers allocated to the replica.
- self-correcting: The sink continually compares the contents of the persist shard with the contents of the input collection and writes down the difference. If the persist shard ends up with undesired contents for any reason, this is corrected the next time the sink manages to append to the shard.
§Operators
The persist sink consists of a graph of operators.
desired persist <—————. | | | | | | |———————. | | | | | | | | | | v v v | +––––+ +––––+ +––––+ | mint | –descs-.–> | write | –batches–> | append | +––––+ \ +––––+ .-> +––––+ _____________________/
mintmints batch descriptions, i.e.,(lower, upper)bounds of batches that should be written. The persist API requires that all workers write batches with the same bounds, so they can be appended as a single logical batch. To ensure this, themintoperator only runs on a single worker that broadcasts minted descriptions to all workers. Batch bounds are picked based on the frontiers of thedesiredstream and the output persist shard.writestages batch data in persist, based on the batch descriptions received from themintoperator, but without appending it to the persist shard. This is a multi-worker operator, with each worker writing batches of the data that arrives at its local inputs. To do so it reads from thedesiredandpersiststreams and produces the difference between them to write back out, ensuring that the final contents of the persist shard matchdesired.appendappends the batches minted bymintand written bywriteto the persist shard. This is a multi-worker operator, where workers are responsible for different subsets of batch descriptions. If a worker is responsible for a given batch description, it waits for all workers to stage their batches for that batch description, then appends all the batches together as a single logical batch.
Note that while the above graph suggests that mint and write both receive copies of the
desired stream, the actual implementation passes that stream through mint and lets write
read the passed-through stream, to avoid cloning data.
Also note that the append operator’s implementation would perhaps be more natural as a
single-worker implementation. The purpose of sharing the work between all workers is to avoid a
work imbalance where one worker is overloaded (doing both appends and the consequent persist
maintenance work) while others are comparatively idle.
The persist sink is written to be robust to the presence of other conflicting instances (e.g. from other replicas) writing to the same persist shard. Each of the three operators needs to be able to handle conflicting writes that unexpectedly change the contents of the output persist shard.
§Frontiers
The desired frontier tracks the progress of the upstream dataflow, but may be rounded up to
the next refresh time for dataflows that follow a refresh schedule other than “on commit”.
The persist frontier tracks the upper frontier of the target persist shard, with one
exception: When the persist_source that reads back the shard is rendered, it will start
reading at its since frontier. So if the shard’s since is initially greater than its
upper, the persist frontier too will be in advance of the shard upper, until the upper
has caught up. To avoid getting confused by this edge case, the mint operator does not use
the persist stream to observe the shard frontier but keeps its own WriteHandle instead.
The descs frontier communicates which lower bounds may still be emitted in batch
descriptions. All future batch descriptions will have a lower that is greater or equal to the
current descs frontier.
The batches frontier communicates for which lower bounds batches may still be written. All
batches for descriptions with lowers less than the current batches frontier have already
been written.
§Invariants
The implementation upholds several invariants that can be relied upon to simplify the implementation:
lowers in minted batch descriptions are unique and strictly increasing. That is, themintoperator will never mint the samelowertwice and a mintedloweris always greater than any previously minted ones.uppers in minted batch descriptions are monotonically increasing.- From (1) follows that there is always at most one “valid” batch description in flight in the operator graph. “Valid” here means that the described batch can be appended to the persist shard.
The main simplification these invariants allow is that operators only need to keep track of the
most recent batch description and/or lower. Previous batch descriptions are not valid
anymore, so there is no reason to hold any state or perform any work in support of them.
§Read-only Mode
The persist sink can optionally be initialized in read-only mode. In this mode it is passive
and avoids any writes to persist. Activating the read_only_rx transitions the sink into write
mode, where it commences normal operation.
Read-only mode is implemented by the mint operator. To disable writes, the mint operator
simply avoids minting any batch descriptions. Since both the write and the append operator
require batch descriptions to write/append batches, this suppresses any persist communication.
At the same time, the write operator still observes changes to the desired and persist
collections, allowing it to keep its correction buffer up-to-date.
Modules§
- append 🔒
- Implementation of the
appendoperator. - mint 🔒
- Implementation of the
mintoperator. - write 🔒
- Implementation of the
writeoperator.
Structs§
- Batch
Description 🔒 - A description for a batch of updates to be written.
- OkErr 🔒
- Generic wrapper around ok/err pairs (e.g. streams, frontiers), to simplify code dealing with such pairs.
- Persist
Api 🔒 - A persist API specialized to a single collection.
Functions§
- advance 🔒
- Advance the given
frontiertonew, if the latter one is greater. - operator_
name 🔒 - Construct a name for the given sub-operator.
- persist_
sink 🔒 - Renders an MV sink writing the given desired collection into the
targetpersist collection. - persist_
source 🔒 - Instantiate a persist source reading back the
targetcollection.
Type Aliases§
- Batches
Stream 🔒 - Type of the
batchesstream. - Descs
Stream 🔒 - Type of the
descsstream. - Desired
Streams 🔒 - Type of the
desiredstream, split intoOkandErrstreams. - Persist
Streams 🔒 - Type of the
persiststream, split intoOkandErrstreams. - Shared
Sink 🔒Frontier - Type of the shared sink write frontier.