fn append_batches<G>(
sink_id: GlobalId,
operator_name: String,
target: &CollectionMetadata,
batch_descriptions: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
batches: &Stream<G, ProtoBatch>,
persist_clients: Arc<PersistClientCache>,
read_only: Receiver<bool>,
) -> Rc<dyn Any>
Expand description
Fuses written batches together and appends them to persist using one
compare_and_append
call. Writing only happens for batch descriptions where
we know that no future batches will arrive, that is, for those batch
descriptions that are not beyond the frontier of both the
batch_descriptions
and batches
inputs.
To avoid contention over the persist shard, we route all batches to a single worker. This worker may also batch up individual records sent by the upstream operator, as a way to coalesce what would otherwise be many tiny batches into fewer, larger ones.