Skip to main content

Module correction_v2

Module correction_v2 

Source
Expand description

An implementation of the Correction data structure used by the MV sink’s write_batches operator to stash updates before they are written.

The Correction data structure provides methods to:

  • insert new updates
  • advance the compaction frontier (called since)
  • obtain an iterator over consolidated updates before some upper
  • force consolidation of updates before some upper

The goal is to provide good performance for each of these operations, even in the presence of future updates. MVs downstream of temporal filters might have to deal with large amounts of retractions for future times and we want those to be handled efficiently as well.

Note that Correction does not provide a method to directly remove updates. Instead updates are removed by inserting their retractions so that they consolidate away to nothing.

§Storage of Updates

Stored updates are of the form (data, time, diff), where time and diff are fixed to mz_repr::Timestamp and mz_repr::Diff, respectively.

CorrectionV2 holds onto a list of Chains containing Chunks of stashed updates. Each Chunk is a columnation region containing a fixed maximum number of updates. All updates in a chunk, and all updates in a chain, are ordered by (time, data) and consolidated.

Chains live in three places:

  • A BucketChain partitions times at or beyond the boundary (the largest read upper seen so far) into buckets of exponentially growing time ranges, each holding a list of chains. Reads only touch the buckets below their upper, so the bulk of the buffered updates — in particular far-future retractions produced by temporal filters — is left alone.
  • pending_low holds chains at times below the boundary, mostly insertions arriving through the persist feedback.
  • emitted is a single chain holding the updates returned by the last read. Updates must stay in the buffer until their feedback retractions arrive, and keeping them separate from the bucket chain means reads never have to re-merge future updates.
      chain[0]   |   chain[1]   |   chain[2]
                 |              |
    chunk[0]     | chunk[0]     | chunk[0]
      (a, 1, +1) |   (a, 1, +1) |   (d, 3, +1)
      (b, 1, +1) |   (b, 2, -1) |   (d, 4, -1)
    chunk[1]     | chunk[1]     |
      (c, 1, +1) |   (c, 2, -2) |
      (a, 2, -1) |   (c, 4, -1) |
    chunk[2]     |              |
      (b, 2, +1) |              |
      (c, 2, +1) |              |
    chunk[3]     |              |
      (b, 3, -1) |              |
      (c, 3, +1) |              |

The “chain invariant” states that each chain in a bucket has at least chain_proportionality times as many updates as the next one. This means that chain sizes will often be powers of chain_proportionality, but they don’t have to be. For example, for a proportionality of 2, the chain sizes [11, 5, 2, 1] would satisfy the chain invariant.

Note that the invariant is maintained on update counts, not chunk counts. Chunks are byte-bounded (see ChunkBuilder), so chunk count is not proportional to update count and would be a poor proxy: any chain below the chunk byte boundary is a single chunk regardless of how many updates it holds, which would let the geometric invariant collapse and break the O(log N) amortization of inserts.

Choosing the chain_proportionality value allows tuning the trade-off between memory and CPU resources required to maintain corrections. A higher proportionality forces more frequent chain merges, and therefore consolidation, reducing memory usage but increasing CPU usage.

§Inserting Updates

A batch of updates is routed by time: updates below the boundary become a pending_low chain, the rest is appended as new chains to their respective buckets. Appending to a bucket merges chains until the chain invariant is restored.

Inserting an update into the correction buffer can be expensive: It involves allocating a new chunk, copying the update in, and then likely merging with an existing chain to restore the chain invariant. If updates trickle in in small batches, this can cause a considerable overhead. To amortize this overhead, new updates aren’t immediately inserted into the sorted chains but instead stored in a Stage buffer. Once enough updates have been staged to fill a Chunk, they are sorted and routed.

The insert operation has an amortized complexity of O(log N), with N being the current number of updates stored.

§Retrieving Consolidated Updates

Retrieving consolidated updates before a given upper works by peeling all buckets below the upper off the bucket chain, splitting their chains, the pending low chains, and the previous emitted chain at the upper, merging the parts below the upper into the new emitted chain, and returning an iterator over that chain.

Because each chain contains updates ordered by time first, splitting a chain at the upper reuses whole chunks and copies at most one chunk straddling the split point. Updates at times at or beyond the upper are never touched, no matter how many the buffer holds. The complexity of a read is O(U log K), with U being the number of updates before upper and K the number of chains containing them.

§Merging Chains

Merging multiple chains into a single chain is done using a k-way merge. As the input chains are sorted by (time, data) and consolidated, the same properties hold for the output chain. The complexity of a merge of K chains containing N updates is O(N log K).

There is a twist though: Merging also has to respect the since frontier, which determines how far the times of updates should be advanced. Advancing times in a sorted chain of updates can make them become unsorted, so we cannot just merge the chains from top to bottom.

For example, consider these two chains, assuming since = [2]: chain 1: [(c, 1, +1), (b, 2, -1), (a, 3, -1)] chain 2: [(b, 1, +1), (a, 2, +1), (c, 2, -1)] After time advancement, the chains look like this: chain 1: [(c, 2, +1), (b, 2, -1), (a, 3, -1)] chain 2: [(b, 2, +1), (a, 2, +1), (c, 2, -1)] Merging them naively yields [(b, 2, +1), (a, 2, +1), (b, 2, -1), (a, 3, -1)], a chain that’s neither sorted nor consolidated.

Times below the since can only exist in chains read by consolidate_before, and only if the since advanced past buffered times since the previous read. For few distinct stale times — the steady state, where the previously emitted chain was written just before the since advanced past it — we merge sub-chains, one for each distinct time that’s before or at the since. Each of these sub-chains retains the (time, data) ordering after the time advancement to since, so merging those yields the expected result.

For the above example, the chains we would merge are: chain 1.a: [(c, 2, +1)] chain 1.b: [(b, 2, -1), (a, 3, -1)] chain 2.a: [(b, 2, +1)], chain 2.b: [(a, 2, +1), (c, 2, -1)]

For many distinct stale times — e.g. a since jump across many buffered timestamps when a sink restarts with an old as-of — the number of sub-chains grows with the number of distinct times, so we instead materialize the affected updates, advance their times, and sort and consolidate them in one O(U log U) pass.

Structs§

CorrectionV2
A data structure used to store corrections in the MV sink implementation.

Traits§

Data
Convenient alias for use in data trait bounds.