mz_compute::sink

Module correction_v2

Source
Expand description

An implementation of the Correction data structure used by the MV sink’s write_batches operator to stash updates before they are written.

The Correction data structure provides methods to:

  • insert new updates
  • advance the compaction frontier (called since)
  • obtain an iterator over consolidated updates before some upper
  • force consolidation of updates before some upper

The goal is to provide good performance for each of these operations, even in the presence of future updates. MVs downstream of temporal filters might have to deal with large amounts of retractions for future times and we want those to be handled efficiently as well.

Note that Correction does not provide a method to directly remove updates. Instead updates are removed by inserting their retractions so that they consolidate away to nothing.

§Storage of Updates

Stored updates are of the form (data, time, diff), where time and diff are fixed to mz_repr::Timestamp and mz_repr::Diff, respectively.

CorrectionV2 holds onto a list of Chains containing Chunks of stashed updates. Each Chunk is a columnation region containing a fixed maximum number of updates. All updates in a chunk, and all updates in a chain, are ordered by (time, data) and consolidated.

      chain[0]   |   chain[1]   |   chain[2]
                 |              |
    chunk[0]     | chunk[0]     | chunk[0]
      (a, 1, +1) |   (a, 1, +1) |   (d, 3, +1)
      (b, 1, +1) |   (b, 2, -1) |   (d, 4, -1)
    chunk[1]     | chunk[1]     |
      (c, 1, +1) |   (c, 2, -2) |
      (a, 2, -1) |   (c, 4, -1) |
    chunk[2]     |              |
      (b, 2, +1) |              |
      (c, 2, +1) |              |
    chunk[3]     |              |
      (b, 3, -1) |              |
      (c, 3, +1) |              |

The “chain invariant” states that each chain has at least CHAIN_PROPORTIONALITY times as many chunks as the next one. This means that chain sizes will often be powers of CHAIN_PROPORTIONALITY, but they don’t have to be. For example, for a proportionality of 2, the chain sizes [11, 5, 2, 1] would satisfy the chain invariant.

Choosing the CHAIN_PROPORTIONALITY value allows tuning the trade-off between memory and CPU resources required to maintain corrections. A higher proportionality forces more frequent chain merges, and therefore consolidation, reducing memory usage but increasing CPU usage.

§Inserting Updates

A batch of updates is appended as a new chain. Then chains are merged at the end of the chain list until the chain invariant is restored.

Inserting an update into the correction buffer can be expensive: It involves allocating a new chunk, copying the update in, and then likely merging with an existing chain to restore the chain invariant. If updates trickle in in small batches, this can cause a considerable overhead. The amortize this overhead, new updates aren’t immediately inserted into the sorted chains but instead stored in a Stage buffer. Once enough updates have been staged to fill a Chunk, they are sorted an inserted into the chains.

The insert operation has an amortized complexity of O(log N), with N being the current number of updates stored.

§Retrieving Consolidated Updates

Retrieving consolidated updates before a given upper works by first consolidating all updates at times before the upper, merging them all into one chain, then returning an iterator over that chain.

Because each chain contains updates ordered by time first, consolidation of all updates before an upper is possible without touching updates at future times. It works by merging the chains only up to the upper, producing a merged chain containing consolidated times before the upper and leaving behind the chain parts containing later times. The complexity of this operation is O(U log K), with U being the number of updates before upper and K the number of chains.

Unfortunately, performing consolidation as described above can break the chain invariant and we might need to restore it by merging chains, including ones containing future updates. This is something that would be great to fix! In the meantime the hope is that in steady state it doesn’t matter too much because either there are no future retractions and U is approximately equal to N, or the amount of future retractions is much larger than the amount of current changes, in which case removing the current changes has a good chance of leaving the chain invariant intact.

§Merging Chains

Merging multiple chains into a single chain is done using a k-way merge. As the input chains are sorted by (time, data) and consolidated, the same properties hold for the output chain. The complexity of a merge of K chains containing N updates is O(N log K).

There is a twist though: Merging also has to respect the since frontier, which determines how far the times of updates should be advanced. Advancing times in a sorted chain of updates can make them become unsorted, so we cannot just merge the chains from top to bottom.

For example, consider these two chains, assuming since = [2]: chain 1: [(c, 1, +1), (b, 2, -1), (a, 3, -1)] chain 2: [(b, 1, +1), (a, 2, +1), (c, 2, -1)] After time advancement, the chains look like this: chain 1: [(c, 2, +1), (b, 2, -1), (a, 3, -1)] chain 2: [(b, 2, +1), (a, 2, +1), (c, 2, -1)] Merging them naively yields [(b, 2, +1), (a, 2, +1), (b, 2, -1), (a, 3, -1)], a chain that’s neither sorted nor consolidated.

Instead we need to merge sub-chains, one for each distinct time that’s before or at the since. Each of these sub-chains retains the (time, data) ordering after the time advancement to since, so merging those yields the expected result.

For the above example, the chains we would merge are: chain 1.a: [(c, 2, +1)] chain 1.b: [(b, 2, -1), (a, 3, -1)] chain 2.a: [(b, 2, +1)], chain 2.b: [(a, 2, +1), (c, 2, -1)]

Structs§

  • Chain 🔒
    A chain of Chunks containing updates.
  • Chunk 🔒
    A non-empty chunk of updates, backed by a columnation region.
  • A data structure used to store corrections in the MV sink implementation.
  • Cursor 🔒
    A cursor over updates in a chain.
  • A wrapper for Cursors on a MergeHeap.
  • MergeHeap 🔒
    A binary heap specialized for merging Cursors.
  • Stage 🔒
    A buffer for staging updates before they are inserted into the sorted chains.

Constants§

  • Determines the size factor of subsequent chains required by the chain invariant.

Traits§

  • Convenient alias for use in data trait bounds.

Functions§

  • Sort and consolidate the given list of updates.
  • merge_2 🔒
    Merge the given two cursors using a 2-way merge.
  • Merge the given chains, advancing times by the given since in the process.
  • Merge the given chains, advancing times by the given since in the process, but only up to the given upper.
  • Merge the given cursors into one chain.
  • merge_many 🔒
    Merge the given cursors using a k-way merge with a binary heap.