Function mz_compute::sink::materialized_view::install_desired_into_persist
source ยท fn install_desired_into_persist<G>(
sink_id: GlobalId,
target: &CollectionMetadata,
desired_oks: Stream<G, (Row, Timestamp, Diff)>,
desired_errs: Stream<G, (DataflowError, Timestamp, Diff)>,
persist_oks: Stream<G, (Row, Timestamp, Diff)>,
persist_errs: Stream<G, (DataflowError, Timestamp, Diff)>,
as_of: Antichain<Timestamp>,
compute_state: &mut ComputeState,
) -> Option<Rc<dyn Any>>
Expand description
Continuously writes the difference between persist_stream
and
desired_stream
into persist, such that the persist shard is made to
contain the same updates as desired_stream
. This is done via a multi-stage
operator graph:
mint_batch_descriptions
emits new batch descriptions whenever the frontier ofpersist_stream
advances andpersist_frontier
is less thandesired_frontier
. A batch description is a pair of(lower, upper)
that tells write operators which updates to write and in the end tells the append operator what frontiers to use when callingappend
/compare_and_append
. This is a single-worker operator.write_batches
writes the difference betweendesired_stream
andpersist_stream
to persist as batches and sends those batches along. This does not yet append the batches to the persist shard, the update are only uploaded/prepared to be appended to a shard. Also: we only write updates for batch descriptions that we learned about frommint_batch_descriptions
.append_batches
takes as input the minted batch descriptions and written batches. Whenever the frontiers sufficiently advance, we take a batch description and all the batches that belong to it and append it to the persist shard.
Note that mint_batch_descriptions
inspects the frontier of
desired_collection
, and passes the data through to write_batches
.
This is done to avoid a clone of the underlying data so that both
operators can have the collection as input.