Expand description
Implementation of the feedback UPSERT operator.
§Architecture
The operator converts a stream of upsert commands (key, Option<value>) into
a differential collection of (key, value) pairs, using a feedback loop
through persist to maintain the “previous value” state needed for computing
retractions.
§Dataflow topology
Source input ──► ┌──────────┐ ──► Output ──► Persist
│ Upsert │
Persist read ──► └──────────┘
▲ │
└───────────── feedback ────────────────────┘§Operator loop (each iteration)
-
Ingest source data. Read upsert commands from the source input, wrap each in an
UpsertDiff(carrying a columnar order key projected fromFromTimeviaUpsertSourceTimefor dedup), and push into the source-stash batcher. The batcher is a paged columnar merge batcher: it consolidates entries for the same(key, time)via theUpsertDiffSemigroup — keeping the update with the highest order key (latest source offset) — through amortized geometric merging as data is pushed in, and pages cold chains out of RSS through the pager. This bounds resident memory to O(unique key-time pairs) even during large source snapshots. -
Read persist frontier. Check the probe on the persist arrangement to learn which times have been committed. When the persist frontier reaches the resume upper, rehydration is complete.
-
Seal & drain. Call
batcher.seal(input_upper)to extract all source-finalized entries as sorted, consolidatedColumnchunks. Each entry is classified:- Eligible (at the persist frontier): the persist trace has the correct “before” state for this time. Look up the old value via a cursor, emit a retraction if present, and emit the new value.
- Ineligible (between persist and input frontiers): persist hasn’t caught up yet. Push back into the batcher for the next iteration.
- Already persisted (below the persist frontier): some writer has
already advanced the shard past this time, so it is dropped. See
drain_sealed_inputfor why re-stashing it would strand the data and pin the output frontier below the shard upper.
-
Capability management. Downgrade the output capability to the minimum time of any remaining buffered data (in the batcher or pushed back as ineligible). Drop the capability entirely when the batcher is empty.
§Eligibility condition (total order)
For a total-order timestamp with input_upper = {i} and
persist_upper = {p}, an entry at time ts is eligible when
ts == p < i — the source has finalized it and persist is exactly at
that time, so the trace cursor returns the correct prior state. An entry
with p < ts is ineligible (persist hasn’t caught up), and one with
ts < p is already persisted and dropped.
Structs§
- Drain
Stats 🔒 - Counts from a single call to
drain_sealed_input, used to update metrics. - Upsert
Diff 🔒 - Upsert
Diff 🔒Container - Derived columnar container for a struct.
- Upsert
Diff 🔒Reference - Derived columnar reference for a struct.
- Upsert
Feedback 🔒Batcher - The persist-feedback arrangement’s batcher, wrapping
Col2ValPagedBatcheronly to capture the storage upsert-stash pager at construction.
Functions§
- decode_
upsert_ 🔒value - Decode an
UpsertValueproduced byupsert_value_to_rowfrom any datum iterator — aValRowSpinecursor’sDatumSeqor a stashedRow’siter. - drain_
sealed_ 🔒input - Process sealed chunks from the batcher, classifying each entry by its
timestamp relative to
persist_upper: entries at the frontier are eligible for processing now (cursor lookup + output), entries above it are returned inineligiblefor re-stashing, and entries below it are already persisted and dropped (see the body for why). - flush_
to_ 🔒batcher - Consolidate
updatesthroughchunkerintoColumnchunks and push them intobatcher, emptyingupdates(keeping its capacity). The chunker readies a fully-consolidated chunk perpush_into, so theextractloop drains everything it produced. - upsert_
inner - Transforms a stream of upserts (key-value updates) into a differential collection.
- upsert_
value_ 🔒byte_ len - Heap-size estimate for an emitted
UpsertValue, used to drivegive_fueledyielding on the output edge. - upsert_
value_ 🔒to_ row - Encode an
UpsertValueas aRowwith a leading tag column so bothOkandErrpayloads round-trip throughRowbyte storage.
Type Aliases§
- Upsert
Batcher 🔒 - Upsert
Chunker 🔒 - The chunker that sorts and consolidates raw input into the
ColumnchunksUpsertBatcherconsumes. - Upsert
Output 🔒Handle - The operator’s data-output handle. A fueled
Vecbuilder so the drain cangive_fueledeach emitted update and yield to timely under large snapshot drains instead of monopolizing the worker. - Upsert
Update 🔒 - One source-stash update: a key, its dataflow time, and the payload diff.
Ois the columnar order key projected from the sourceFromTime(seeUpsertSourceTime).