Skip to main content

Module upsert_continual_feedback_v2

Module upsert_continual_feedback_v2 

Source
Expand description

Implementation of the feedback UPSERT operator.

§Architecture

The operator converts a stream of upsert commands (key, Option<value>) into a differential collection of (key, value) pairs, using a feedback loop through persist to maintain the “previous value” state needed for computing retractions.

§Dataflow topology

  Source input ──► ┌──────────┐ ──► Output ──► Persist
                   │  Upsert  │
  Persist read ──► └──────────┘
      ▲                                           │
      └───────────── feedback ────────────────────┘

§Operator loop (each iteration)

  1. Ingest source data. Read upsert commands from the source input, wrap each in an UpsertDiff (carrying from_time for dedup), and push into the MergeBatcher. The batcher consolidates entries for the same (key, time) using the UpsertDiff Semigroup, which keeps the update with the highest FromTime (latest Kafka offset). This happens via amortized geometric merging as data is pushed in, bounding memory to O(unique key-time pairs) even during large Kafka snapshots.

  2. Read persist frontier. Check the probe on the persist arrangement to learn which times have been committed. When the persist frontier reaches the resume upper, rehydration is complete.

  3. Seal & drain. Call batcher.seal(input_upper) to extract all source-finalized entries as sorted, consolidated chunks. Each entry is classified:

    • Eligible (at the persist frontier): the persist trace has the correct “before” state for this time. Look up the old value via a cursor, emit a retraction if present, and emit the new value.
    • Ineligible (between persist and input frontiers): persist hasn’t caught up yet. Push back into the batcher for the next iteration.
  4. Capability management. Downgrade the output capability to the minimum time of any remaining buffered data (in the batcher or pushed back as ineligible). Drop the capability entirely when the batcher is empty.

§Eligibility condition (total order)

For a total-order timestamp with input_upper = {i} and persist_upper = {p}, an entry at time ts is eligible when ts == p < i — the source has finalized it and persist is exactly at that time, so the trace cursor returns the correct prior state.

Structs§

CapturingBuilder 🔒
A minimal Builder that captures sealed chains without copying.
DrainStats 🔒
Counts from a single call to drain_sealed_input, used to update metrics.
UpsertDiff 🔒

Functions§

drain_sealed_input 🔒
Process sealed chunks from the batcher. Entries at the persist frontier are eligible for processing (cursor lookup + output); all others are returned in ineligible for re-stashing.
upsert_inner
Transforms a stream of upserts (key-value updates) into a differential collection.

Type Aliases§

UpsertBatcher 🔒