Expand description
An implementation of the Correction
data structure used by the MV sink’s write_batches
operator to stash updates before they are written.
The Correction
data structure provides methods to:
- insert new updates
- advance the compaction frontier (called
since
) - obtain an iterator over consolidated updates before some
upper
- force consolidation of updates before some
upper
The goal is to provide good performance for each of these operations, even in the presence of future updates. MVs downstream of temporal filters might have to deal with large amounts of retractions for future times and we want those to be handled efficiently as well.
Note that Correction
does not provide a method to directly remove updates. Instead updates
are removed by inserting their retractions so that they consolidate away to nothing.
§Storage of Updates
Stored updates are of the form (data, time, diff)
, where time
and diff
are fixed to
mz_repr::Timestamp
and mz_repr::Diff
, respectively.
CorrectionV2
holds onto a list of Chain
s containing Chunk
s of stashed updates. Each
Chunk
is a columnation region containing a fixed maximum number of updates. All updates in
a chunk, and all updates in a chain, are ordered by (time, data) and consolidated.
chain[0] | chain[1] | chain[2]
| |
chunk[0] | chunk[0] | chunk[0]
(a, 1, +1) | (a, 1, +1) | (d, 3, +1)
(b, 1, +1) | (b, 2, -1) | (d, 4, -1)
chunk[1] | chunk[1] |
(c, 1, +1) | (c, 2, -2) |
(a, 2, -1) | (c, 4, -1) |
chunk[2] | |
(b, 2, +1) | |
(c, 2, +1) | |
chunk[3] | |
(b, 3, -1) | |
(c, 3, +1) | |
The “chain invariant” states that each chain has at least CHAIN_PROPORTIONALITY
times as
many chunks as the next one. This means that chain sizes will often be powers of
CHAIN_PROPORTIONALITY
, but they don’t have to be. For example, for a proportionality of 2,
the chain sizes [11, 5, 2, 1]
would satisfy the chain invariant.
Choosing the CHAIN_PROPORTIONALITY
value allows tuning the trade-off between memory and CPU
resources required to maintain corrections. A higher proportionality forces more frequent chain
merges, and therefore consolidation, reducing memory usage but increasing CPU usage.
§Inserting Updates
A batch of updates is appended as a new chain. Then chains are merged at the end of the chain list until the chain invariant is restored.
Inserting an update into the correction buffer can be expensive: It involves allocating a new
chunk, copying the update in, and then likely merging with an existing chain to restore the
chain invariant. If updates trickle in in small batches, this can cause a considerable
overhead. The amortize this overhead, new updates aren’t immediately inserted into the sorted
chains but instead stored in a Stage
buffer. Once enough updates have been staged to fill a
Chunk
, they are sorted an inserted into the chains.
The insert operation has an amortized complexity of O(log N), with N being the current number of updates stored.
§Retrieving Consolidated Updates
Retrieving consolidated updates before a given upper
works by first consolidating all updates
at times before the upper
, merging them all into one chain, then returning an iterator over
that chain.
Because each chain contains updates ordered by time first, consolidation of all updates before
an upper
is possible without touching updates at future times. It works by merging the chains
only up to the upper
, producing a merged chain containing consolidated times before the
upper
and leaving behind the chain parts containing later times. The complexity of this
operation is O(U log K), with U being the number of updates before upper
and K the number
of chains.
Unfortunately, performing consolidation as described above can break the chain invariant and we might need to restore it by merging chains, including ones containing future updates. This is something that would be great to fix! In the meantime the hope is that in steady state it doesn’t matter too much because either there are no future retractions and U is approximately equal to N, or the amount of future retractions is much larger than the amount of current changes, in which case removing the current changes has a good chance of leaving the chain invariant intact.
§Merging Chains
Merging multiple chains into a single chain is done using a k-way merge. As the input chains are sorted by (time, data) and consolidated, the same properties hold for the output chain. The complexity of a merge of K chains containing N updates is O(N log K).
There is a twist though: Merging also has to respect the since
frontier, which determines how
far the times of updates should be advanced. Advancing times in a sorted chain of updates
can make them become unsorted, so we cannot just merge the chains from top to bottom.
For example, consider these two chains, assuming since = [2]
:
chain 1: [(c, 1, +1), (b, 2, -1), (a, 3, -1)]
chain 2: [(b, 1, +1), (a, 2, +1), (c, 2, -1)]
After time advancement, the chains look like this:
chain 1: [(c, 2, +1), (b, 2, -1), (a, 3, -1)]
chain 2: [(b, 2, +1), (a, 2, +1), (c, 2, -1)]
Merging them naively yields [(b, 2, +1), (a, 2, +1), (b, 2, -1), (a, 3, -1)], a chain that’s
neither sorted nor consolidated.
Instead we need to merge sub-chains, one for each distinct time that’s before or at the
since
. Each of these sub-chains retains the (time, data) ordering after the time advancement
to since
, so merging those yields the expected result.
For the above example, the chains we would merge are: chain 1.a: [(c, 2, +1)] chain 1.b: [(b, 2, -1), (a, 3, -1)] chain 2.a: [(b, 2, +1)], chain 2.b: [(a, 2, +1), (c, 2, -1)]
Structs§
- Chain 🔒A chain of
Chunk
s containing updates. - Chunk 🔒A non-empty chunk of updates, backed by a columnation region.
- A data structure used to store corrections in the MV sink implementation.
- Cursor 🔒A cursor over updates in a chain.
- A binary heap specialized for merging
Cursor
s. - Stage 🔒A buffer for staging updates before they are inserted into the sorted chains.
Constants§
- Determines the size factor of subsequent chains required by the chain invariant.
Traits§
- Convenient alias for use in data trait bounds.
Functions§
- Sort and consolidate the given list of updates.
- merge_2 🔒Merge the given two cursors using a 2-way merge.
- Merge the given chains, advancing times by the given
since
in the process. - Merge the given chains, advancing times by the given
since
in the process, but only up to the givenupper
. - Merge the given cursors into one chain.
- Merge the given cursors using a k-way merge with a binary heap.