pub(crate) fn render<G>(
scope: &G,
collection_id: GlobalId,
target: CollectionMetadata,
desired_collection: Collection<G, Result<Row, DataflowError>, Diff>,
storage_state: &StorageState,
metrics: SourcePersistSinkMetrics,
busy_signal: Arc<Semaphore>,
) -> (Stream<G, ()>, Stream<G, Rc<Error>>, Vec<PressOnDropButton>)Expand description
Continuously writes the desired_stream into persist
This is done via a multi-stage operator graph:
mint_batch_descriptionsemits new batch descriptions whenever the frontier ofdesired_collectionadvances. A batch description is a pair of(lower, upper)that tells write operators which updates to write and in the end tells the append operator what frontiers to use when callingappend/compare_and_append. This is a single-worker operator.write_batcheswrites thedesired_collectionto persist as batches and sends those batches along. This does not yet append the batches to the persist shard, the update are only uploaded/prepared to be appended to a shard. Also: we only write updates for batch descriptions that we learned about frommint_batch_descriptions.append_batchestakes as input the minted batch descriptions and written batches. Whenever the frontiers sufficiently advance, we take a batch description and all the batches that belong to it and append it to the persist shard.
This operator assumes that the desired_collection comes pre-sharded.
Note that mint_batch_descriptions inspects the frontier of
desired_collection, and passes the data through to write_batches.
This is done to avoid a clone of the underlying data so that both
operators can have the collection as input.