fn mint_batch_descriptions<G>(
sink_id: GlobalId,
operator_name: String,
target: &CollectionMetadata,
desired_oks: &Stream<G, (Row, Timestamp, Diff)>,
desired_errs: &Stream<G, (DataflowError, Timestamp, Diff)>,
as_of: Antichain<Timestamp>,
persist_clients: Arc<PersistClientCache>,
compute_state: &mut ComputeState,
) -> (Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>, Stream<G, (Row, Timestamp, Diff)>, Stream<G, (DataflowError, Timestamp, Diff)>, Rc<dyn Any>)
Expand description
Whenever the frontier advances, this mints a new batch description (lower and upper) that writers should use for writing the next set of batches to persist.
Only one of the workers does this, meaning there will only be one
description in the stream, even in case of multiple timely workers. Use
broadcast()
to, ahem, broadcast, the one description to all downstream
write operators/workers.
This also keeps the shared frontier that is stored in compute_state
in
sync with the upper of the persist shard.