mz_compute::sink::materialized_view

Function install_desired_into_persist

Source
fn install_desired_into_persist<G>(
    sink_id: GlobalId,
    target: &CollectionMetadata,
    desired_oks: Stream<G, (Row, Timestamp, Diff)>,
    desired_errs: Stream<G, (DataflowError, Timestamp, Diff)>,
    persist_oks: Stream<G, (Row, Timestamp, Diff)>,
    persist_errs: Stream<G, (DataflowError, Timestamp, Diff)>,
    as_of: Antichain<Timestamp>,
    compute_state: &mut ComputeState,
) -> Option<Rc<dyn Any>>
where G: Scope<Timestamp = Timestamp>,
Expand description

Continuously writes the difference between persist_stream and desired_stream into persist, such that the persist shard is made to contain the same updates as desired_stream. This is done via a multi-stage operator graph:

  1. mint_batch_descriptions emits new batch descriptions whenever the frontier of persist_stream advances and persist_frontier is less than desired_frontier. A batch description is a pair of (lower, upper) that tells write operators which updates to write and in the end tells the append operator what frontiers to use when calling append/compare_and_append. This is a single-worker operator.
  2. write_batches writes the difference between desired_stream and persist_stream to persist as batches and sends those batches along. This does not yet append the batches to the persist shard, the update are only uploaded/prepared to be appended to a shard. Also: we only write updates for batch descriptions that we learned about from mint_batch_descriptions.
  3. append_batches takes as input the minted batch descriptions and written batches. Whenever the frontiers sufficiently advance, we take a batch description and all the batches that belong to it and append it to the persist shard.

Note that mint_batch_descriptions inspects the frontier of desired_collection, and passes the data through to write_batches. This is done to avoid a clone of the underlying data so that both operators can have the collection as input.