async fn drain_sealed_input<T, O>(
sealed: Vec<Column<(UpsertKey, T, UpsertDiff<O>)>>,
ineligible: &mut Vec<(UpsertKey, T, UpsertDiff<O>)>,
output_handle: &AsyncOutputHandle<T, FueledBuilder<CapacityContainerBuilder<Vec<(Result<Row, Box<UpsertError>>, T, Diff)>>>>,
output_cap: &Capability<T>,
persist_upper: &Antichain<T>,
trace: &mut TraceAgent<ValRowSpine<UpsertKey, T, Diff>>,
worker_id: &usize,
source_id: &GlobalId,
) -> DrainStatswhere
T: TotalOrder + Lattice + ExchangeData + Timestamp + Clone + Debug + Ord + Sync + Columnation + Columnar,
O: Columnar,Expand description
Process sealed chunks from the batcher, classifying each entry by its
timestamp relative to persist_upper: entries at the frontier are eligible
for processing now (cursor lookup + output), entries above it are returned
in ineligible for re-stashing, and entries below it are already persisted
and dropped (see the body for why).
The sealed chunks are already sorted and consolidated by the MergeBatcher, so the trace cursor walks forward through keys in order — seeks amortize.