Function mz_storage::upsert_continual_feedback::upsert_inner

source ·
pub fn upsert_inner<G: Scope, FromTime, F, Fut, US>(
    input: &Collection<G, (UpsertKey, Option<Result<Row, UpsertError>>, FromTime), Diff>,
    key_indices: Vec<usize>,
    resume_upper: Antichain<G::Timestamp>,
    persist_input: Collection<G, Result<Row, DataflowError>, Diff>,
    persist_token: Option<Vec<PressOnDropButton>>,
    upsert_metrics: UpsertMetrics,
    source_config: SourceExportCreationConfig,
    state_fn: F,
    upsert_config: UpsertConfig,
    prevent_snapshot_buffering: bool,
    snapshot_buffering_max: Option<usize>,
) -> (Collection<G, Result<Row, DataflowError>, Diff>, Stream<G, (usize, HealthStatusUpdate)>, Stream<G, Infallible>, PressOnDropButton)
where G::Timestamp: TotalOrder + Sync, F: FnOnce() -> Fut + 'static, Fut: Future<Output = US>, US: UpsertStateBackend<G::Timestamp, Option<FromTime>>, FromTime: Debug + ExchangeData + Ord + Sync,
Expand description

An operator that transforms an input stream of upserts (updates to key-value pairs), which represents an imaginary key-value state, into a differential collection. It keeps an internal map-like state which keeps the latest value for each key, such that it can emit the retractions and additions implied by a new update for a given key.

This operator is intended to be used in an ingestion pipeline that reads from an external source, and the output of this operator is eventually written to persist.

The operator has two inputs: a) the source input, of upserts, and b) a persist input that feeds back the upsert state to the operator. Below, there is a section for each input that describes how and why we process updates from each input.

An important property of this operator is that it does not update the map-like state that it keeps for translating the stream of upserts into a differential collection when it processes source input. It only updates the map-like state based on updates from the persist (feedback) input. We do this because the operator is expected to be used in cases where there are multiple concurrent instances of the same ingestion pipeline, and the different instances might see different input because of concurrency and non-determinism. All instances of the upsert operator must produce output that is consistent with the current state of the output (that all instances produce “collaboratively”). This global state is what the operator continually learns about via updates from the persist input.

§Processing the Source Input

Updates on the source input are stashed/staged until they can be processed. Whether or not an update can be processed depends both on the upper frontier of the source input and on the upper frontier of the persist input:

  • Input updates are only processed once their timestamp is “done”, that is the input upper is no longer less_equal their timestamp.

  • Input updates are only processed once they are at the persist upper, that is we have emitted and written down updates for all previous times and we have updated our map-like state to the latest global state of the output of the ingestion pipeline. We know this is the case when the persist upper is no longer less_than their timestamp.

As an optimization, we allow processing input updates when they are right at the input frontier. This is called partial emission because we are emitting updates that might be retracted when processing more updates from the same timestamp. In order to be able to process these updates we keep provisional values in our upsert state. These will be overwritten when we get the final upsert values on the persist input.

§Processing the Persist Input

We continually ingest updates from the persist input into our state using UpsertState::consolidate_chunk. We might be ingesting updates from the initial snapshot (when starting the operator) that are not consolidated or we might be ingesting updates from a partial emission (see above). In either case, our input might not be consolidated and consolidate_chunk is able to handle that.