fn append_batches<G>(
    sink_id: GlobalId,
    operator_name: String,
    target: &CollectionMetadata,
    batch_descriptions: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
    batches: &Stream<G, BatchOrData>,
    persist_clients: Arc<PersistClientCache>
) -> (Stream<G, ()>, Rc<dyn Any>)where
    G: Scope<Timestamp = Timestamp>,
Expand description

Fuses written batches together and appends them to persist using one compare_and_append call. Writing only happens for batch descriptions where we know that no future batches will arrive, that is, for those batch descriptions that are not beyond the frontier of both the batch_descriptions and batches inputs.

To avoid contention over the persist shard, we route all batches to a single worker. This worker may also batch up individual records sent by the upstream operator, as a way to coalesce what would otherwise be many tiny batches into fewer, larger ones.