Function mz_storage::render::persist_sink::render
source ยท pub(crate) fn render<G>(
scope: &G,
collection_id: GlobalId,
target: CollectionMetadata,
desired_collection: Collection<G, Result<Row, DataflowError>, Diff>,
storage_state: &StorageState,
metrics: SourcePersistSinkMetrics,
output_index: usize,
busy_signal: Arc<Semaphore>,
) -> (Stream<G, ()>, Stream<G, Rc<Error>>, Vec<PressOnDropButton>)
Expand description
Continuously writes the desired_stream
into persist
This is done via a multi-stage operator graph:
mint_batch_descriptions
emits new batch descriptions whenever the frontier ofdesired_collection
advances. A batch description is a pair of(lower, upper)
that tells write operators which updates to write and in the end tells the append operator what frontiers to use when callingappend
/compare_and_append
. This is a single-worker operator.write_batches
writes thedesired_collection
to persist as batches and sends those batches along. This does not yet append the batches to the persist shard, the update are only uploaded/prepared to be appended to a shard. Also: we only write updates for batch descriptions that we learned about frommint_batch_descriptions
.append_batches
takes as input the minted batch descriptions and written batches. Whenever the frontiers sufficiently advance, we take a batch description and all the batches that belong to it and append it to the persist shard.
This operator assumes that the desired_collection
comes pre-sharded.
Note that mint_batch_descriptions
inspects the frontier of
desired_collection
, and passes the data through to write_batches
.
This is done to avoid a clone of the underlying data so that both
operators can have the collection as input.