fn mint_batch_descriptions<G>(
    sink_id: GlobalId,
    operator_name: String,
    target: &CollectionMetadata,
    desired_stream: &Stream<G, (Result<Row, DataflowError>, Timestamp, Diff)>,
    persist_feedback_stream: &Stream<G, ()>,
    as_of: Antichain<Timestamp>,
    persist_clients: Arc<PersistClientCache>,
    compute_state: &mut ComputeState
) -> (Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>, Stream<G, (Result<Row, DataflowError>, Timestamp, Diff)>, Rc<dyn Any>)where
    G: Scope<Timestamp = Timestamp>,
Expand description

Whenever the frontier advances, this mints a new batch description (lower and upper) that writers should use for writing the next set of batches to persist.

Only one of the workers does this, meaning there will only be one description in the stream, even in case of multiple timely workers. Use broadcast() to, ahem, broadcast, the one description to all downstream write operators/workers.

This also keeps the shared frontier that is stored in compute_state in sync with the upper of the persist shard.