Expand description
An implementation of the Correction data structure used by the MV sink’s write_batches
operator to stash updates before they are written.
The Correction data structure provides methods to:
- insert new updates
- advance the compaction frontier (called
since) - obtain an iterator over consolidated updates before some
upper - force consolidation of updates before some
upper
The goal is to provide good performance for each of these operations, even in the presence of future updates. MVs downstream of temporal filters might have to deal with large amounts of retractions for future times and we want those to be handled efficiently as well.
Note that Correction does not provide a method to directly remove updates. Instead updates
are removed by inserting their retractions so that they consolidate away to nothing.
§Storage of Updates
Stored updates are of the form (data, time, diff), where time and diff are fixed to
mz_repr::Timestamp and mz_repr::Diff, respectively.
CorrectionV2 holds onto a list of Chains containing Chunks of stashed updates. Each
Chunk is a columnation region containing a fixed maximum number of updates. All updates in
a chunk, and all updates in a chain, are ordered by (time, data) and consolidated.
Chains live in three places:
- A
BucketChainpartitions times at or beyond theboundary(the largest readupperseen so far) into buckets of exponentially growing time ranges, each holding a list of chains. Reads only touch the buckets below theirupper, so the bulk of the buffered updates — in particular far-future retractions produced by temporal filters — is left alone. pending_lowholds chains at times below theboundary, mostly insertions arriving through the persist feedback.emittedis a single chain holding the updates returned by the last read. Updates must stay in the buffer until their feedback retractions arrive, and keeping them separate from the bucket chain means reads never have to re-merge future updates.
chain[0] | chain[1] | chain[2]
| |
chunk[0] | chunk[0] | chunk[0]
(a, 1, +1) | (a, 1, +1) | (d, 3, +1)
(b, 1, +1) | (b, 2, -1) | (d, 4, -1)
chunk[1] | chunk[1] |
(c, 1, +1) | (c, 2, -2) |
(a, 2, -1) | (c, 4, -1) |
chunk[2] | |
(b, 2, +1) | |
(c, 2, +1) | |
chunk[3] | |
(b, 3, -1) | |
(c, 3, +1) | |The “chain invariant” states that each chain in a bucket has at least chain_proportionality times as
many updates as the next one. This means that chain sizes will often be powers of
chain_proportionality, but they don’t have to be. For example, for a proportionality of 2,
the chain sizes [11, 5, 2, 1] would satisfy the chain invariant.
Note that the invariant is maintained on update counts, not chunk counts. Chunks are
byte-bounded (see ChunkBuilder), so chunk count is not proportional to update count and
would be a poor proxy: any chain below the chunk byte boundary is a single chunk regardless
of how many updates it holds, which would let the geometric invariant collapse and break the
O(log N) amortization of inserts.
Choosing the chain_proportionality value allows tuning the trade-off between memory and CPU
resources required to maintain corrections. A higher proportionality forces more frequent chain
merges, and therefore consolidation, reducing memory usage but increasing CPU usage.
§Inserting Updates
A batch of updates is routed by time: updates below the boundary become a pending_low
chain, the rest is appended as new chains to their respective buckets. Appending to a bucket
merges chains until the chain invariant is restored.
Inserting an update into the correction buffer can be expensive: It involves allocating a new
chunk, copying the update in, and then likely merging with an existing chain to restore the
chain invariant. If updates trickle in in small batches, this can cause a considerable
overhead. To amortize this overhead, new updates aren’t immediately inserted into the sorted
chains but instead stored in a Stage buffer. Once enough updates have been staged to fill a
Chunk, they are sorted and routed.
The insert operation has an amortized complexity of O(log N), with N being the current number of updates stored.
§Retrieving Consolidated Updates
Retrieving consolidated updates before a given upper works by peeling all buckets below the
upper off the bucket chain, splitting their chains, the pending low chains, and the previous
emitted chain at the upper, merging the parts below the upper into the new emitted
chain, and returning an iterator over that chain.
Because each chain contains updates ordered by time first, splitting a chain at the upper
reuses whole chunks and copies at most one chunk straddling the split point. Updates at times
at or beyond the upper are never touched, no matter how many the buffer holds. The
complexity of a read is O(U log K), with U being the number of updates before upper and K
the number of chains containing them.
§Merging Chains
Merging multiple chains into a single chain is done using a k-way merge. As the input chains are sorted by (time, data) and consolidated, the same properties hold for the output chain. The complexity of a merge of K chains containing N updates is O(N log K).
There is a twist though: Merging also has to respect the since frontier, which determines how
far the times of updates should be advanced. Advancing times in a sorted chain of updates
can make them become unsorted, so we cannot just merge the chains from top to bottom.
For example, consider these two chains, assuming since = [2]:
chain 1: [(c, 1, +1), (b, 2, -1), (a, 3, -1)]
chain 2: [(b, 1, +1), (a, 2, +1), (c, 2, -1)]
After time advancement, the chains look like this:
chain 1: [(c, 2, +1), (b, 2, -1), (a, 3, -1)]
chain 2: [(b, 2, +1), (a, 2, +1), (c, 2, -1)]
Merging them naively yields [(b, 2, +1), (a, 2, +1), (b, 2, -1), (a, 3, -1)], a chain that’s
neither sorted nor consolidated.
Times below the since can only exist in chains read by consolidate_before, and only if
the since advanced past buffered times since the previous read. For few distinct stale
times — the steady state, where the previously emitted chain was written just before the
since advanced past it — we merge sub-chains, one for each distinct time that’s before or at
the since. Each of these sub-chains retains the (time, data) ordering after the time
advancement to since, so merging those yields the expected result.
For the above example, the chains we would merge are: chain 1.a: [(c, 2, +1)] chain 1.b: [(b, 2, -1), (a, 3, -1)] chain 2.a: [(b, 2, +1)], chain 2.b: [(a, 2, +1), (c, 2, -1)]
For many distinct stale times — e.g. a since jump across many buffered timestamps when a sink restarts with an old as-of — the number of sub-chains grows with the number of distinct times, so we instead materialize the affected updates, advance their times, and sort and consolidate them in one O(U log U) pass.
Structs§
- Correction
V2 - A data structure used to store corrections in the MV sink implementation.
Traits§
- Data
- Convenient alias for use in data trait bounds.