Struct mz_persist_client::iter::Consolidator

source ·
pub(crate) struct Consolidator<T, D, Sort: RowSort<T, D> = CodecSort<T, D>> {
    context: String,
    shard_id: ShardId,
    sort: Sort,
    blob: Arc<dyn Blob>,
    metrics: Arc<Metrics>,
    shard_metrics: Arc<ShardMetrics>,
    read_metrics: Arc<ReadMetrics>,
    runs: Vec<VecDeque<(ConsolidationPart<T, D, Sort>, usize)>>,
    filter: FetchBatchFilter<T>,
    budget: usize,
    drop_stash: Option<Sort::Updates>,
}
Expand description

A tool for incrementally consolidating a persist shard.

The naive way to consolidate a Persist shard would be to fetch every part, then consolidate the whole thing. We’d like to improve on that in two ways:

  • Concurrency: we’d like to be able to start consolidating and returning results before every part is fetched. (And continue fetching while we’re doing other work.)
  • Memory usage: we’d like to limit the number of parts we have in memory at once, dropping parts that are fully consolidated and fetching parts just a little before they’re needed.

This interface supports this by consolidating in multiple steps. Each call to Self::next will do some housekeeping work – prefetching needed parts, dropping any unneeded parts – and return an iterator over a consolidated subset of the data. To read an entire dataset, the client should call next until it returns None, which signals all data has been returned… but it’s also free to abandon the instance at any time if it eg. only needs a few entries.

Fields§

§context: String§shard_id: ShardId§sort: Sort§blob: Arc<dyn Blob>§metrics: Arc<Metrics>§shard_metrics: Arc<ShardMetrics>§read_metrics: Arc<ReadMetrics>§runs: Vec<VecDeque<(ConsolidationPart<T, D, Sort>, usize)>>§filter: FetchBatchFilter<T>§budget: usize§drop_stash: Option<Sort::Updates>

Implementations§

source§

impl<T, D, Sort> Consolidator<T, D, Sort>
where T: Timestamp + Codec64 + Lattice, D: Codec64 + Semigroup + Ord, Sort: RowSort<T, D>,

source

pub fn new( context: String, shard_id: ShardId, sort: Sort, blob: Arc<dyn Blob>, metrics: Arc<Metrics>, shard_metrics: Arc<ShardMetrics>, read_metrics: ReadMetrics, filter: FetchBatchFilter<T>, prefetch_budget_bytes: usize, ) -> Self

Create a new Self instance with the given prefetch budget. This budget is a “soft limit” on the size of the parts that the consolidator will fetch… we’ll try and stay below the limit, but may burst above it if that’s necessary to make progress.

source§

impl<T, D, Sort> Consolidator<T, D, Sort>
where T: Timestamp + Codec64 + Lattice, D: Codec64 + Semigroup + Ord, Sort: RowSort<T, D>,

source

pub fn enqueue_run( &mut self, desc: &Description<T>, run_meta: &RunMeta, parts: impl IntoIterator<Item = BatchPart<T>>, )

Add another run of data to be consolidated.

To ensure consolidation, every tuple in this run should be larger than any tuple already returned from the iterator. At the moment, this invariant is not checked. The simplest way to ensure this is to enqueue every run before any calls to next.

source

fn push_run(&mut self, run: VecDeque<(ConsolidationPart<T, D, Sort>, usize)>)

source

fn trim(&mut self)

Tidy up: discard any empty parts, and discard any runs that have no parts left.

source

fn iter(&mut self) -> Option<ConsolidatingIter<'_, T, D, Sort>>

Return an iterator over the next consolidated chunk of output, if there’s any left.

Requirement: at least the first part of each run should be fetched and nonempty.

source

async fn unblock_progress(&mut self) -> Result<()>

We don’t need to have fetched every part to make progress, but we do at least need to have fetched some parts: in particular, parts at the beginning of their runs which may include the smallest remaining KVT.

Returns success when we’ve successfully fetched enough parts to be able to make progress.

source

pub(crate) async fn next( &mut self, ) -> Result<Option<impl Iterator<Item = (Sort::KV<'_>, T, D)>>>

Wait until data is available, then return an iterator over the next consolidated chunk of output. If this method returns None, that all the data has been exhausted and the full consolidated dataset has been returned.

source

pub(crate) async fn next_chunk( &mut self, max_len: usize, ) -> Result<Option<BlobTraceUpdates>>

Wait until data is available, then return an iterator over the next consolidated chunk of output. If this method returns None, that all the data has been exhausted and the full consolidated dataset has been returned.

source

fn live_bytes(&self) -> usize

The size of the data that we might be holding concurrently in memory. While this is normally kept less than the budget, it may burst over it temporarily, since we need at least one part in every run to continue making progress.

source

pub(crate) fn start_prefetches(&mut self) -> Option<usize>

Returns None if the budget was exhausted, or Some(remaining_bytes) if it is not.

Trait Implementations§

source§

impl<T: Debug, D: Debug, Sort: Debug + RowSort<T, D>> Debug for Consolidator<T, D, Sort>
where Sort::Updates: Debug,

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl<T, D, Sort: RowSort<T, D>> Drop for Consolidator<T, D, Sort>

source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

§

impl<T, D, Sort> Freeze for Consolidator<T, D, Sort>
where Sort: Freeze, <Sort as RowSort<T, D>>::Updates: Freeze,

§

impl<T, D, Sort = CodecSort<T, D>> !RefUnwindSafe for Consolidator<T, D, Sort>

§

impl<T, D, Sort> Send for Consolidator<T, D, Sort>
where Sort: Send, <Sort as RowSort<T, D>>::Updates: Send, T: Send + Sync,

§

impl<T, D, Sort> Sync for Consolidator<T, D, Sort>
where Sort: Sync, <Sort as RowSort<T, D>>::Updates: Sync, T: Sync + Send,

§

impl<T, D, Sort> Unpin for Consolidator<T, D, Sort>
where Sort: Unpin, <Sort as RowSort<T, D>>::Updates: Unpin, T: Unpin,

§

impl<T, D, Sort = CodecSort<T, D>> !UnwindSafe for Consolidator<T, D, Sort>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T, U> CastInto<U> for T
where U: CastFrom<T>,

source§

fn cast_into(self) -> U

Performs the cast.
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
source§

impl<T> Pointable for T

source§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<P, R> ProtoType<R> for P
where R: RustType<P>,

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<'a, S, T> Semigroup<&'a S> for T
where T: Semigroup<S>,

source§

fn plus_equals(&mut self, rhs: &&'a S)

The method of std::ops::AddAssign, for types that do not implement AddAssign.
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more