Expand description
Implementation of the feedback UPSERT operator.
§Architecture
The operator converts a stream of upsert commands (key, Option<value>) into
a differential collection of (key, value) pairs, using a feedback loop
through persist to maintain the “previous value” state needed for computing
retractions.
§Dataflow topology
Source input ──► ┌──────────┐ ──► Output ──► Persist
│ Upsert │
Persist read ──► └──────────┘
▲ │
└───────────── feedback ────────────────────┘§Operator loop (each iteration)
-
Ingest source data. Read upsert commands from the source input, wrap each in an
UpsertDiff(carryingfrom_timefor dedup), and push into theMergeBatcher. The batcher consolidates entries for the same(key, time)using theUpsertDiffSemigroup, which keeps the update with the highestFromTime(latest Kafka offset). This happens via amortized geometric merging as data is pushed in, bounding memory to O(unique key-time pairs) even during large Kafka snapshots. -
Read persist frontier. Check the probe on the persist arrangement to learn which times have been committed. When the persist frontier reaches the resume upper, rehydration is complete.
-
Seal & drain. Call
batcher.seal(input_upper)to extract all source-finalized entries as sorted, consolidated chunks. Each entry is classified:- Eligible (at the persist frontier): the persist trace has the correct “before” state for this time. Look up the old value via a cursor, emit a retraction if present, and emit the new value.
- Ineligible (between persist and input frontiers): persist hasn’t caught up yet. Push back into the batcher for the next iteration.
-
Capability management. Downgrade the output capability to the minimum time of any remaining buffered data (in the batcher or pushed back as ineligible). Drop the capability entirely when the batcher is empty.
§Eligibility condition (total order)
For a total-order timestamp with input_upper = {i} and
persist_upper = {p}, an entry at time ts is eligible when
ts == p < i — the source has finalized it and persist is exactly at
that time, so the trace cursor returns the correct prior state.
Structs§
- Capturing
Builder 🔒 - A minimal
Builderthat captures sealed chains without copying. - Drain
Stats 🔒 - Counts from a single call to
drain_sealed_input, used to update metrics. - Upsert
Diff 🔒
Functions§
- drain_
sealed_ 🔒input - Process sealed chunks from the batcher. Entries at the persist frontier are
eligible for processing (cursor lookup + output); all others are returned
in
ineligiblefor re-stashing. - upsert_
inner - Transforms a stream of upserts (key-value updates) into a differential collection.