Struct mz_persist_client::internal::trace::Spine

source ·
struct Spine<T> {
    effort: usize,
    next_id: usize,
    since: Antichain<T>,
    upper: Antichain<T>,
    merging: Vec<MergeState<T>>,
}
Expand description

An append-only collection of update batches.

The Spine is a general-purpose trace implementation based on collection and merging immutable batches of updates. It is generic with respect to the batch type, and can be instantiated for any implementor of trace::Batch.

§Design

This spine is represented as a list of layers, where each element in the list is either

  1. MergeState::Vacant empty
  2. MergeState::Single a single batch
  3. MergeState::Double a pair of batches

Each “batch” has the option to be None, indicating a non-batch that nonetheless acts as a number of updates proportionate to the level at which it exists (for bookkeeping).

Each of the batches at layer i contains at most 2^i elements. The sequence of batches should have the upper bound of one match the lower bound of the next. Batches may be logically empty, with matching upper and lower bounds, as a bookkeeping mechanism.

Each batch at layer i is treated as if it contains exactly 2^i elements, even though it may actually contain fewer elements. This allows us to decouple the physical representation from logical amounts of effort invested in each batch. It allows us to begin compaction and to reduce the number of updates, without compromising our ability to continue to move updates along the spine. We are explicitly making the trade-off that while some batches might compact at lower levels, we want to treat them as if they contained their full set of updates for accounting reasons (to apply work to higher levels).

We maintain the invariant that for any in-progress merge at level k there should be fewer than 2^k records at levels lower than k. That is, even if we were to apply an unbounded amount of effort to those records, we would not have enough records to prompt a merge into the in-progress merge. Ideally, we maintain the extended invariant that for any in-progress merge at level k, the remaining effort required (number of records minus applied effort) is less than the number of records that would need to be added to reach 2^k records in layers below.

§Mathematics

When a merge is initiated, there should be a non-negative deficit of updates before the layers below could plausibly produce a new batch for the currently merging layer. We must determine a factor of proportionality, so that newly arrived updates provide at least that amount of “fuel” towards the merging layer, so that the merge completes before lower levels invade.

§Deficit:

A new merge is initiated only in response to the completion of a prior merge, or the introduction of new records from outside. The latter case is special, and will maintain our invariant trivially, so we will focus on the former case.

When a merge at level k completes, assuming we have maintained our invariant then there should be fewer than 2^k records at lower levels. The newly created merge at level k+1 will require up to 2^k+2 units of work, and should not expect a new batch until strictly more than 2^k records are added. This means that a factor of proportionality of four should be sufficient to ensure that the merge completes before a new merge is initiated.

When new records get introduced, we will need to roll up any batches at lower levels, which we treat as the introduction of records. Each of these virtual records introduced should either be accounted for the fuel it should contribute, as it results in the promotion of batches closer to in-progress merges.

§Fuel sharing

We like the idea of applying fuel preferentially to merges at lower levels, under the idea that they are easier to complete, and we benefit from fewer total merges in progress. This does delay the completion of merges at higher levels, and may not obviously be a total win. If we choose to do this, we should make sure that we correctly account for completed merges at low layers: they should still extract fuel from new updates even though they have completed, at least until they have paid back any “debt” to higher layers by continuing to provide fuel as updates arrive.

Fields§

§effort: usize§next_id: usize§since: Antichain<T>§upper: Antichain<T>§merging: Vec<MergeState<T>>

Implementations§

source§

impl<T> Spine<T>

source

pub fn spine_batches(&self) -> impl Iterator<Item = &SpineBatch<T>>

All batches in the spine, oldest to newest.

source

pub fn spine_batches_mut( &mut self, ) -> impl DoubleEndedIterator<Item = &mut SpineBatch<T>>

All (mutable) batches in the spine, oldest to newest.

source§

impl<T: Timestamp + Lattice> Spine<T>

source

pub fn new() -> Self

Allocates a fueled Spine.

This trace will merge batches progressively, with each inserted batch applying a multiple of the batch’s length in effort to each merge. The effort parameter is that multiplier. This value should be at least one for the merging to happen; a value of zero is not helpful.

source

fn exert(&mut self, effort: usize, log: &mut SpineLog<'_, T>) -> bool

Apply some amount of effort to trace maintenance.

The units of effort are updates, and the method should be thought of as analogous to inserting as many empty updates, where the trace is permitted to perform proportionate work.

Returns true if this did work and false if it left the spine unchanged.

source

pub fn next_id(&mut self) -> SpineId

source

pub fn insert(&mut self, batch: HollowBatch<T>, log: &mut SpineLog<'_, T>)

source

fn reduced(&self) -> bool

True iff there is at most one HollowBatch in self.merging.

When true, there is no maintenance work to perform in the trace, other than compaction. We do not yet have logic in place to determine if compaction would improve a trace, so for now we are ignoring that.

source

fn describe(&self) -> Vec<(usize, usize)>

Describes the merge progress of layers in the trace.

Intended for diagnostics rather than public consumption.

source

fn introduce_batch( &mut self, batch: SpineBatch<T>, batch_index: usize, log: &mut SpineLog<'_, T>, )

Introduces a batch at an indicated level.

The level indication is often related to the size of the batch, but it can also be used to artificially fuel the computation by supplying empty batches at non-trivial indices, to move merges along.

source

fn roll_up(&mut self, index: usize, log: &mut SpineLog<'_, T>)

Ensures that an insertion at layer index will succeed.

This method is subject to the constraint that all existing batches should occur at higher levels, which requires it to “roll up” batches present at lower levels before the method is called. In doing this, we should not introduce more virtual records than 2^index, as that is the amount of excess fuel we have budgeted for completing merges.

source

pub fn apply_fuel(&mut self, fuel: &isize, log: &mut SpineLog<'_, T>)

Applies an amount of fuel to merges in progress.

The supplied fuel is for each in progress merge, and if we want to spend the fuel non-uniformly (e.g. prioritizing merges at low layers) we could do so in order to maintain fewer batches on average (at the risk of completing merges of large batches later, but tbh probably not much later).

source

fn insert_at(&mut self, batch: SpineBatch<T>, index: usize)

Inserts a batch at a specific location.

This is a non-public internal method that can panic if we try and insert into a layer which already contains two batches (and is still in the process of merging).

source

fn complete_at( &mut self, index: usize, log: &mut SpineLog<'_, T>, ) -> Option<SpineBatch<T>>

Completes and extracts what ever is at layer index, leaving this layer vacant.

source

fn tidy_layers(&mut self)

Attempts to draw down large layers to size appropriate layers.

source

fn validate(&self) -> Result<(), String>

Checks invariants:

  • The lowers and uppers of all batches “line up”.
  • The lower of the “minimum” batch is antichain[T::minimum].
  • The upper of the “maximum” batch is == self.upper.
  • The since of each batch is less_equal self.since.
  • The SpineIds all “line up” and cover from 0 to self.next_id.
  • TODO: Verify fuel and level invariants.

Trait Implementations§

source§

impl<T: Clone> Clone for Spine<T>

source§

fn clone(&self) -> Spine<T>

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl<T: Debug> Debug for Spine<T>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<T> Freeze for Spine<T>
where T: Freeze,

§

impl<T> RefUnwindSafe for Spine<T>
where T: RefUnwindSafe,

§

impl<T> Send for Spine<T>
where T: Send + Sync,

§

impl<T> Sync for Spine<T>
where T: Sync + Send,

§

impl<T> Unpin for Spine<T>
where T: Unpin,

§

impl<T> UnwindSafe for Spine<T>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T, U> CastInto<U> for T
where U: CastFrom<T>,

source§

fn cast_into(self) -> U

Performs the cast.
source§

impl<T> CloneToUninit for T
where T: Clone,

source§

default unsafe fn clone_to_uninit(&self, dst: *mut T)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dst. Read more
source§

impl<T> CopyAs<T> for T

source§

fn copy_as(self) -> T

source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FromRef<T> for T
where T: Clone,

source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
source§

impl<T> Pointable for T

source§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<T> ProgressEventTimestamp for T
where T: Data + Debug + Any,

source§

fn as_any(&self) -> &(dyn Any + 'static)

Upcasts this ProgressEventTimestamp to Any. Read more
source§

fn type_name(&self) -> &'static str

Returns the name of the concrete type of this object. Read more
source§

impl<P, R> ProtoType<R> for P
where R: RustType<P>,

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<'a, S, T> Semigroup<&'a S> for T
where T: Semigroup<S>,

source§

fn plus_equals(&mut self, rhs: &&'a S)

The method of std::ops::AddAssign, for types that do not implement AddAssign.
source§

impl<T> ToOwned for T
where T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

impl<T> Allocation for T
where T: RefUnwindSafe + Send + Sync,

source§

impl<T> Data for T
where T: Clone + 'static,