fn write_batches<G>(
    scope: &G,
    collection_id: GlobalId,
    operator_name: &str,
    target: &CollectionMetadata,
    batch_descriptions: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
    desired_collection: &Collection<G, Result<Row, DataflowError>, Diff>,
    persist_clients: Arc<PersistClientCache>,
    storage_state: &StorageState
) -> (Stream<G, HollowBatchAndMetadata<SourceData, (), Timestamp, Diff>>, PressOnDropButton)where
    G: Scope<Timestamp = Timestamp>,
Expand description

Writes desired_collection to persist, but only for updates that fall into batch a description that we get via batch_descriptions. This forwards a HollowBatch (with additional metadata) for any batch of updates that was written.

This operator assumes that the desired_collection comes pre-sharded.

This also and updates various metrics.