pub fn upsert_inner<'scope, T, FromTime>(
input: VecCollection<'scope, T, (UpsertKey, Option<Result<Row, Box<UpsertError>>>, FromTime), Diff>,
key_indices: Vec<usize>,
resume_upper: Antichain<T>,
persist_input: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
persist_token: Option<Vec<PressOnDropButton>>,
upsert_metrics: UpsertMetrics,
source_config: SourceExportCreationConfig,
) -> (VecCollection<'scope, T, Result<Row, DataflowError>, Diff>, StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>, StreamVec<'scope, T, Infallible>, PressOnDropButton)Expand description
Transforms a stream of upserts (key-value updates) into a differential collection.
Persist feedback is arranged into a differential trace (DD manages the
spine lifecycle). Source input is stashed with a custom UpsertDiff
Semigroup that deduplicates by keeping the highest FromTime per (key, time).
Has two inputs:
- Source input — upsert commands from the external source.
- Persist input — feedback of the operator’s own output, read back from persist. Arranged into a trace for cursor-based lookups.