fn append_batches<G>(
    scope: &G,
    collection_id: GlobalId,
    operator_name: String,
    target: &CollectionMetadata,
    batch_descriptions: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
    batches: &Stream<G, HollowBatchAndMetadata<SourceData, (), Timestamp, Diff>>,
    persist_clients: Arc<PersistClientCache>,
    storage_state: &StorageState,
    output_index: usize,
    metrics: SourcePersistSinkMetrics
) -> (Stream<G, ()>, Stream<G, Rc<Error>>, PressOnDropButton)where
    G: Scope<Timestamp = Timestamp>,
Expand description

Fuses written batches together and appends them to persist using one compare_and_append call. Writing only happens for batch descriptions where we know that no future batches will arrive, that is, for those batch descriptions that are not beyond the frontier of both the batch_descriptions and batches inputs.

This also keeps the shared frontier that is stored in compute_state in sync with the upper of the persist shard, and updates various metrics and statistics objects.