fn write_batches<G>(
sink_id: GlobalId,
operator_name: String,
target: &CollectionMetadata,
batch_descriptions: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
desired_oks: &Stream<G, (Row, Timestamp, Diff)>,
desired_errs: &Stream<G, (DataflowError, Timestamp, Diff)>,
persist_oks: &Stream<G, (Row, Timestamp, Diff)>,
persist_errs: &Stream<G, (DataflowError, Timestamp, Diff)>,
persist_clients: Arc<PersistClientCache>,
read_only: Receiver<bool>,
) -> (Stream<G, ProtoBatch>, Rc<dyn Any>)
Expand description
Writes desired_stream - persist_stream
to persist, but only for updates
that fall into batch a description that we get via batch_descriptions
.
This forwards a HollowBatch
for any batch of updates that was written.