fn append_batches<G>(
scope: &G,
collection_id: GlobalId,
operator_name: String,
target: &CollectionMetadata,
batch_descriptions: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
batches: &Stream<G, HollowBatchAndMetadata<Timestamp>>,
persist_clients: Arc<PersistClientCache>,
storage_state: &StorageState,
metrics: SourcePersistSinkMetrics,
busy_signal: Arc<Semaphore>,
) -> (Stream<G, ()>, Stream<G, Rc<Error>>, PressOnDropButton)Expand description
Fuses written batches together and appends them to persist using one
compare_and_append call. Writing only happens for batch descriptions where
we know that no future batches will arrive, that is, for those batch
descriptions that are not beyond the frontier of both the
batch_descriptions and batches inputs.
This also keeps the shared frontier that is stored in compute_state in
sync with the upper of the persist shard, and updates various metrics
and statistics objects.