Struct mz_persist_client::iter::Consolidator
source · pub(crate) struct Consolidator<T, D, Sort: RowSort<T, D> = CodecSort<T, D>> {
context: String,
shard_id: ShardId,
sort: Sort,
blob: Arc<dyn Blob>,
metrics: Arc<Metrics>,
shard_metrics: Arc<ShardMetrics>,
read_metrics: Arc<ReadMetrics>,
runs: Vec<VecDeque<(ConsolidationPart<T, D, Sort>, usize)>>,
filter: FetchBatchFilter<T>,
budget: usize,
drop_stash: Option<Sort::Updates>,
}
Expand description
A tool for incrementally consolidating a persist shard.
The naive way to consolidate a Persist shard would be to fetch every part, then consolidate the whole thing. We’d like to improve on that in two ways:
- Concurrency: we’d like to be able to start consolidating and returning results before every part is fetched. (And continue fetching while we’re doing other work.)
- Memory usage: we’d like to limit the number of parts we have in memory at once, dropping parts that are fully consolidated and fetching parts just a little before they’re needed.
This interface supports this by consolidating in multiple steps. Each call to Self::next
will do some housekeeping work – prefetching needed parts, dropping any unneeded parts – and
return an iterator over a consolidated subset of the data. To read an entire dataset, the
client should call next
until it returns None
, which signals all data has been returned…
but it’s also free to abandon the instance at any time if it eg. only needs a few entries.
Fields§
§context: String
§shard_id: ShardId
§sort: Sort
§blob: Arc<dyn Blob>
§metrics: Arc<Metrics>
§shard_metrics: Arc<ShardMetrics>
§read_metrics: Arc<ReadMetrics>
§runs: Vec<VecDeque<(ConsolidationPart<T, D, Sort>, usize)>>
§filter: FetchBatchFilter<T>
§budget: usize
§drop_stash: Option<Sort::Updates>
Implementations§
source§impl<T, D, Sort> Consolidator<T, D, Sort>
impl<T, D, Sort> Consolidator<T, D, Sort>
sourcepub fn new(
context: String,
shard_id: ShardId,
sort: Sort,
blob: Arc<dyn Blob>,
metrics: Arc<Metrics>,
shard_metrics: Arc<ShardMetrics>,
read_metrics: ReadMetrics,
filter: FetchBatchFilter<T>,
prefetch_budget_bytes: usize,
) -> Self
pub fn new( context: String, shard_id: ShardId, sort: Sort, blob: Arc<dyn Blob>, metrics: Arc<Metrics>, shard_metrics: Arc<ShardMetrics>, read_metrics: ReadMetrics, filter: FetchBatchFilter<T>, prefetch_budget_bytes: usize, ) -> Self
Create a new Self instance with the given prefetch budget. This budget is a “soft limit” on the size of the parts that the consolidator will fetch… we’ll try and stay below the limit, but may burst above it if that’s necessary to make progress.
source§impl<T, D, Sort> Consolidator<T, D, Sort>
impl<T, D, Sort> Consolidator<T, D, Sort>
sourcepub fn enqueue_run(
&mut self,
desc: &Description<T>,
run_meta: &RunMeta,
parts: impl IntoIterator<Item = BatchPart<T>>,
)
pub fn enqueue_run( &mut self, desc: &Description<T>, run_meta: &RunMeta, parts: impl IntoIterator<Item = BatchPart<T>>, )
Add another run of data to be consolidated.
To ensure consolidation, every tuple in this run should be larger than any tuple already returned from the iterator. At the moment, this invariant is not checked. The simplest way to ensure this is to enqueue every run before any calls to next.
fn push_run(&mut self, run: VecDeque<(ConsolidationPart<T, D, Sort>, usize)>)
sourcefn trim(&mut self)
fn trim(&mut self)
Tidy up: discard any empty parts, and discard any runs that have no parts left.
sourcefn iter(&mut self) -> Option<ConsolidatingIter<'_, T, D, Sort>>
fn iter(&mut self) -> Option<ConsolidatingIter<'_, T, D, Sort>>
Return an iterator over the next consolidated chunk of output, if there’s any left.
Requirement: at least the first part of each run should be fetched and nonempty.
sourceasync fn unblock_progress(&mut self) -> Result<()>
async fn unblock_progress(&mut self) -> Result<()>
We don’t need to have fetched every part to make progress, but we do at least need to have fetched some parts: in particular, parts at the beginning of their runs which may include the smallest remaining KVT.
Returns success when we’ve successfully fetched enough parts to be able to make progress.
sourcepub(crate) async fn next(
&mut self,
) -> Result<Option<impl Iterator<Item = (Sort::KV<'_>, T, D)>>>
pub(crate) async fn next( &mut self, ) -> Result<Option<impl Iterator<Item = (Sort::KV<'_>, T, D)>>>
Wait until data is available, then return an iterator over the next
consolidated chunk of output. If this method returns None
, that all the data has been
exhausted and the full consolidated dataset has been returned.
sourcepub(crate) async fn next_chunk(
&mut self,
max_len: usize,
max_bytes: usize,
) -> Result<Option<BlobTraceUpdates>>
pub(crate) async fn next_chunk( &mut self, max_len: usize, max_bytes: usize, ) -> Result<Option<BlobTraceUpdates>>
Wait until data is available, then return an iterator over the next
consolidated chunk of output. If this method returns None
, that all the data has been
exhausted and the full consolidated dataset has been returned.
sourcefn live_bytes(&self) -> usize
fn live_bytes(&self) -> usize
The size of the data that we might be holding concurrently in memory. While this is normally kept less than the budget, it may burst over it temporarily, since we need at least one part in every run to continue making progress.
sourcepub(crate) fn start_prefetches(&mut self) -> Option<usize>
pub(crate) fn start_prefetches(&mut self) -> Option<usize>
Returns None if the budget was exhausted, or Some(remaining_bytes) if it is not.
Trait Implementations§
Auto Trait Implementations§
impl<T, D, Sort> Freeze for Consolidator<T, D, Sort>
impl<T, D, Sort = CodecSort<T, D>> !RefUnwindSafe for Consolidator<T, D, Sort>
impl<T, D, Sort> Send for Consolidator<T, D, Sort>
impl<T, D, Sort> Sync for Consolidator<T, D, Sort>
impl<T, D, Sort> Unpin for Consolidator<T, D, Sort>
impl<T, D, Sort = CodecSort<T, D>> !UnwindSafe for Consolidator<T, D, Sort>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.