fn write_batches<G>(
    sink_id: GlobalId,
    operator_name: String,
    target: &CollectionMetadata,
    batch_descriptions: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
    desired_stream: &Stream<G, (Result<Row, DataflowError>, Timestamp, Diff)>,
    persist_stream: &Stream<G, (Result<Row, DataflowError>, Timestamp, Diff)>,
    persist_clients: Arc<PersistClientCache>
) -> (Stream<G, BatchOrData>, Rc<dyn Any>)where
    G: Scope<Timestamp = Timestamp>,
Expand description

Writes desired_stream - persist_stream to persist, but only for updates that fall into batch a description that we get via batch_descriptions. This forwards a HollowBatch for any batch of updates that was written.