Struct mz_persist_client::internal::trace::Spine
source · struct Spine<T> {
effort: usize,
next_id: usize,
since: Antichain<T>,
upper: Antichain<T>,
merging: Vec<MergeState<T>>,
}
Expand description
An append-only collection of update batches.
The Spine
is a general-purpose trace implementation based on collection
and merging immutable batches of updates. It is generic with respect to the
batch type, and can be instantiated for any implementor of trace::Batch
.
§Design
This spine is represented as a list of layers, where each element in the list is either
- MergeState::Vacant empty
- MergeState::Single a single batch
- MergeState::Double a pair of batches
Each “batch” has the option to be None
, indicating a non-batch that
nonetheless acts as a number of updates proportionate to the level at which
it exists (for bookkeeping).
Each of the batches at layer i contains at most 2^i elements. The sequence of batches should have the upper bound of one match the lower bound of the next. Batches may be logically empty, with matching upper and lower bounds, as a bookkeeping mechanism.
Each batch at layer i is treated as if it contains exactly 2^i elements, even though it may actually contain fewer elements. This allows us to decouple the physical representation from logical amounts of effort invested in each batch. It allows us to begin compaction and to reduce the number of updates, without compromising our ability to continue to move updates along the spine. We are explicitly making the trade-off that while some batches might compact at lower levels, we want to treat them as if they contained their full set of updates for accounting reasons (to apply work to higher levels).
We maintain the invariant that for any in-progress merge at level k there should be fewer than 2^k records at levels lower than k. That is, even if we were to apply an unbounded amount of effort to those records, we would not have enough records to prompt a merge into the in-progress merge. Ideally, we maintain the extended invariant that for any in-progress merge at level k, the remaining effort required (number of records minus applied effort) is less than the number of records that would need to be added to reach 2^k records in layers below.
§Mathematics
When a merge is initiated, there should be a non-negative deficit of updates before the layers below could plausibly produce a new batch for the currently merging layer. We must determine a factor of proportionality, so that newly arrived updates provide at least that amount of “fuel” towards the merging layer, so that the merge completes before lower levels invade.
§Deficit:
A new merge is initiated only in response to the completion of a prior merge, or the introduction of new records from outside. The latter case is special, and will maintain our invariant trivially, so we will focus on the former case.
When a merge at level k completes, assuming we have maintained our invariant then there should be fewer than 2^k records at lower levels. The newly created merge at level k+1 will require up to 2^k+2 units of work, and should not expect a new batch until strictly more than 2^k records are added. This means that a factor of proportionality of four should be sufficient to ensure that the merge completes before a new merge is initiated.
When new records get introduced, we will need to roll up any batches at lower levels, which we treat as the introduction of records. Each of these virtual records introduced should either be accounted for the fuel it should contribute, as it results in the promotion of batches closer to in-progress merges.
§Fuel sharing
We like the idea of applying fuel preferentially to merges at lower levels, under the idea that they are easier to complete, and we benefit from fewer total merges in progress. This does delay the completion of merges at higher levels, and may not obviously be a total win. If we choose to do this, we should make sure that we correctly account for completed merges at low layers: they should still extract fuel from new updates even though they have completed, at least until they have paid back any “debt” to higher layers by continuing to provide fuel as updates arrive.
Fields§
§effort: usize
§next_id: usize
§since: Antichain<T>
§upper: Antichain<T>
§merging: Vec<MergeState<T>>
Implementations§
source§impl<T> Spine<T>
impl<T> Spine<T>
sourcepub fn spine_batches(&self) -> impl Iterator<Item = &SpineBatch<T>>
pub fn spine_batches(&self) -> impl Iterator<Item = &SpineBatch<T>>
All batches in the spine, oldest to newest.
sourcepub fn spine_batches_mut(
&mut self,
) -> impl DoubleEndedIterator<Item = &mut SpineBatch<T>>
pub fn spine_batches_mut( &mut self, ) -> impl DoubleEndedIterator<Item = &mut SpineBatch<T>>
All (mutable) batches in the spine, oldest to newest.
source§impl<T: Timestamp + Lattice> Spine<T>
impl<T: Timestamp + Lattice> Spine<T>
sourcepub fn new() -> Self
pub fn new() -> Self
Allocates a fueled Spine
.
This trace will merge batches progressively, with each inserted batch
applying a multiple of the batch’s length in effort to each merge. The
effort
parameter is that multiplier. This value should be at least one
for the merging to happen; a value of zero is not helpful.
sourcefn exert(&mut self, effort: usize, log: &mut SpineLog<'_, T>) -> bool
fn exert(&mut self, effort: usize, log: &mut SpineLog<'_, T>) -> bool
Apply some amount of effort to trace maintenance.
The units of effort are updates, and the method should be thought of as analogous to inserting as many empty updates, where the trace is permitted to perform proportionate work.
Returns true if this did work and false if it left the spine unchanged.
pub fn next_id(&mut self) -> SpineId
pub fn insert(&mut self, batch: HollowBatch<T>, log: &mut SpineLog<'_, T>)
sourcefn reduced(&self) -> bool
fn reduced(&self) -> bool
True iff there is at most one HollowBatch in self.merging
.
When true, there is no maintenance work to perform in the trace, other than compaction. We do not yet have logic in place to determine if compaction would improve a trace, so for now we are ignoring that.
sourcefn describe(&self) -> Vec<(usize, usize)>
fn describe(&self) -> Vec<(usize, usize)>
Describes the merge progress of layers in the trace.
Intended for diagnostics rather than public consumption.
sourcefn introduce_batch(
&mut self,
batch: SpineBatch<T>,
batch_index: usize,
log: &mut SpineLog<'_, T>,
)
fn introduce_batch( &mut self, batch: SpineBatch<T>, batch_index: usize, log: &mut SpineLog<'_, T>, )
Introduces a batch at an indicated level.
The level indication is often related to the size of the batch, but it can also be used to artificially fuel the computation by supplying empty batches at non-trivial indices, to move merges along.
sourcefn roll_up(&mut self, index: usize, log: &mut SpineLog<'_, T>)
fn roll_up(&mut self, index: usize, log: &mut SpineLog<'_, T>)
Ensures that an insertion at layer index
will succeed.
This method is subject to the constraint that all existing batches should occur at higher levels, which requires it to “roll up” batches present at lower levels before the method is called. In doing this, we should not introduce more virtual records than 2^index, as that is the amount of excess fuel we have budgeted for completing merges.
sourcepub fn apply_fuel(&mut self, fuel: &isize, log: &mut SpineLog<'_, T>)
pub fn apply_fuel(&mut self, fuel: &isize, log: &mut SpineLog<'_, T>)
Applies an amount of fuel to merges in progress.
The supplied fuel
is for each in progress merge, and if we want to
spend the fuel non-uniformly (e.g. prioritizing merges at low layers) we
could do so in order to maintain fewer batches on average (at the risk
of completing merges of large batches later, but tbh probably not much
later).
sourcefn insert_at(&mut self, batch: SpineBatch<T>, index: usize)
fn insert_at(&mut self, batch: SpineBatch<T>, index: usize)
Inserts a batch at a specific location.
This is a non-public internal method that can panic if we try and insert into a layer which already contains two batches (and is still in the process of merging).
sourcefn complete_at(
&mut self,
index: usize,
log: &mut SpineLog<'_, T>,
) -> Option<SpineBatch<T>>
fn complete_at( &mut self, index: usize, log: &mut SpineLog<'_, T>, ) -> Option<SpineBatch<T>>
Completes and extracts what ever is at layer index
, leaving this layer vacant.
sourcefn tidy_layers(&mut self)
fn tidy_layers(&mut self)
Attempts to draw down large layers to size appropriate layers.
sourcefn validate(&self) -> Result<(), String>
fn validate(&self) -> Result<(), String>
Checks invariants:
- The lowers and uppers of all batches “line up”.
- The lower of the “minimum” batch is
antichain[T::minimum]
. - The upper of the “maximum” batch is
== self.upper
. - The since of each batch is
less_equal self.since
. - The
SpineIds
all “line up” and cover from0
toself.next_id
. - TODO: Verify fuel and level invariants.
Trait Implementations§
Auto Trait Implementations§
impl<T> Freeze for Spine<T>where
T: Freeze,
impl<T> RefUnwindSafe for Spine<T>where
T: RefUnwindSafe,
impl<T> Send for Spine<T>
impl<T> Sync for Spine<T>
impl<T> Unpin for Spine<T>where
T: Unpin,
impl<T> UnwindSafe for Spine<T>where
T: RefUnwindSafe + UnwindSafe,
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
source§default unsafe fn clone_to_uninit(&self, dst: *mut T)
default unsafe fn clone_to_uninit(&self, dst: *mut T)
clone_to_uninit
)source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<T> ProgressEventTimestamp for T
impl<T> ProgressEventTimestamp for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.