pub(crate) fn render<G>(
    scope: &G,
    collection_id: GlobalId,
    target: CollectionMetadata,
    desired_collection: Collection<G, Result<Row, DataflowError>, Diff>,
    storage_state: &StorageState,
    metrics: SourcePersistSinkMetrics,
    output_index: usize
) -> (Stream<G, ()>, Stream<G, Rc<Error>>, Vec<PressOnDropButton>)where
    G: Scope<Timestamp = Timestamp>,
Expand description

Continuously writes the desired_stream into persist This is done via a multi-stage operator graph:

  1. mint_batch_descriptions emits new batch descriptions whenever the frontier of desired_collection advances. A batch description is a pair of (lower, upper) that tells write operators which updates to write and in the end tells the append operator what frontiers to use when calling append/compare_and_append. This is a single-worker operator.
  2. write_batches writes the desired_collection to persist as batches and sends those batches along. This does not yet append the batches to the persist shard, the update are only uploaded/prepared to be appended to a shard. Also: we only write updates for batch descriptions that we learned about from mint_batch_descriptions.
  3. append_batches takes as input the minted batch descriptions and written batches. Whenever the frontiers sufficiently advance, we take a batch description and all the batches that belong to it and append it to the persist shard.

This operator assumes that the desired_collection comes pre-sharded.

Note that mint_batch_descriptions inspects the frontier of desired_collection, and passes the data through to write_batches. This is done to avoid a clone of the underlying data so that both operators can have the collection as input.