Function mz_storage::render::persist_sink::append_batches
source ยท fn append_batches<G>(
scope: &G,
collection_id: GlobalId,
operator_name: String,
target: &CollectionMetadata,
batch_descriptions: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
batches: &Stream<G, HollowBatchAndMetadata<Timestamp>>,
persist_clients: Arc<PersistClientCache>,
storage_state: &StorageState,
output_index: usize,
metrics: SourcePersistSinkMetrics,
busy_signal: Arc<Semaphore>,
) -> (Stream<G, ()>, Stream<G, Rc<Error>>, PressOnDropButton)
Expand description
Fuses written batches together and appends them to persist using one
compare_and_append
call. Writing only happens for batch descriptions where
we know that no future batches will arrive, that is, for those batch
descriptions that are not beyond the frontier of both the
batch_descriptions
and batches
inputs.
This also keeps the shared frontier that is stored in compute_state
in
sync with the upper of the persist shard, and updates various metrics
and statistics objects.