Skip to main content

upsert_inner

Function upsert_inner 

Source
pub fn upsert_inner<'scope, T, FromTime>(
    input: VecCollection<'scope, T, (UpsertKey, Option<Result<Row, Box<UpsertError>>>, FromTime), Diff>,
    key_indices: Vec<usize>,
    resume_upper: Antichain<T>,
    persist_input: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
    persist_token: Option<Vec<PressOnDropButton>>,
    upsert_metrics: UpsertMetrics,
    source_config: SourceExportCreationConfig,
) -> (VecCollection<'scope, T, Result<Row, DataflowError>, Diff>, StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>, StreamVec<'scope, T, Infallible>, PressOnDropButton)
Expand description

Transforms a stream of upserts (key-value updates) into a differential collection.

Persist feedback is arranged into a differential trace (DD manages the spine lifecycle). Source input is stashed with a custom UpsertDiff Semigroup that deduplicates by keeping the highest FromTime per (key, time).

Has two inputs:

  1. Source input — upsert commands from the external source.
  2. Persist input — feedback of the operator’s own output, read back from persist. Arranged into a trace for cursor-based lookups.