Skip to main content

Module upsert_continual_feedback_v2

Module upsert_continual_feedback_v2 

Source
Expand description

Implementation of the feedback UPSERT operator.

§Architecture

The operator converts a stream of upsert commands (key, Option<value>) into a differential collection of (key, value) pairs, using a feedback loop through persist to maintain the “previous value” state needed for computing retractions.

§Dataflow topology

  Source input ──► ┌──────────┐ ──► Output ──► Persist
                   │  Upsert  │
  Persist read ──► └──────────┘
      ▲                                           │
      └───────────── feedback ────────────────────┘

§Operator loop (each iteration)

  1. Ingest source data. Read upsert commands from the source input, wrap each in an UpsertDiff (carrying a columnar order key projected from FromTime via UpsertSourceTime for dedup), and push into the source-stash batcher. The batcher is a paged columnar merge batcher: it consolidates entries for the same (key, time) via the UpsertDiff Semigroup — keeping the update with the highest order key (latest source offset) — through amortized geometric merging as data is pushed in, and pages cold chains out of RSS through the pager. This bounds resident memory to O(unique key-time pairs) even during large source snapshots.

  2. Read persist frontier. Check the probe on the persist arrangement to learn which times have been committed. When the persist frontier reaches the resume upper, rehydration is complete.

  3. Seal & drain. Call batcher.seal(input_upper) to extract all source-finalized entries as sorted, consolidated Column chunks. Each entry is classified:

    • Eligible (at the persist frontier): the persist trace has the correct “before” state for this time. Look up the old value via a cursor, emit a retraction if present, and emit the new value.
    • Ineligible (between persist and input frontiers): persist hasn’t caught up yet. Push back into the batcher for the next iteration.
    • Already persisted (below the persist frontier): some writer has already advanced the shard past this time, so it is dropped. See drain_sealed_input for why re-stashing it would strand the data and pin the output frontier below the shard upper.
  4. Capability management. Downgrade the output capability to the minimum time of any remaining buffered data (in the batcher or pushed back as ineligible). Drop the capability entirely when the batcher is empty.

§Eligibility condition (total order)

For a total-order timestamp with input_upper = {i} and persist_upper = {p}, an entry at time ts is eligible when ts == p < i — the source has finalized it and persist is exactly at that time, so the trace cursor returns the correct prior state. An entry with p < ts is ineligible (persist hasn’t caught up), and one with ts < p is already persisted and dropped.

Structs§

DrainStats 🔒
Counts from a single call to drain_sealed_input, used to update metrics.
UpsertDiff 🔒
UpsertDiffContainer 🔒
Derived columnar container for a struct.
UpsertDiffReference 🔒
Derived columnar reference for a struct.
UpsertFeedbackBatcher 🔒
The persist-feedback arrangement’s batcher, wrapping Col2ValPagedBatcher only to capture the storage upsert-stash pager at construction.

Functions§

decode_upsert_value 🔒
Decode an UpsertValue produced by upsert_value_to_row from any datum iterator — a ValRowSpine cursor’s DatumSeq or a stashed Row’s iter.
drain_sealed_input 🔒
Process sealed chunks from the batcher, classifying each entry by its timestamp relative to persist_upper: entries at the frontier are eligible for processing now (cursor lookup + output), entries above it are returned in ineligible for re-stashing, and entries below it are already persisted and dropped (see the body for why).
flush_to_batcher 🔒
Consolidate updates through chunker into Column chunks and push them into batcher, emptying updates (keeping its capacity). The chunker readies a fully-consolidated chunk per push_into, so the extract loop drains everything it produced.
upsert_inner
Transforms a stream of upserts (key-value updates) into a differential collection.
upsert_value_byte_len 🔒
Heap-size estimate for an emitted UpsertValue, used to drive give_fueled yielding on the output edge.
upsert_value_to_row 🔒
Encode an UpsertValue as a Row with a leading tag column so both Ok and Err payloads round-trip through Row byte storage.

Type Aliases§

UpsertBatcher 🔒
UpsertChunker 🔒
The chunker that sorts and consolidates raw input into the Column chunks UpsertBatcher consumes.
UpsertOutputHandle 🔒
The operator’s data-output handle. A fueled Vec builder so the drain can give_fueled each emitted update and yield to timely under large snapshot drains instead of monopolizing the worker.
UpsertUpdate 🔒
One source-stash update: a key, its dataflow time, and the payload diff. O is the columnar order key projected from the source FromTime (see UpsertSourceTime).