Struct mz_persist_client::internal::compact::Compactor
source · pub struct Compactor<K, V, T, D> {
cfg: PersistConfig,
metrics: Arc<Metrics>,
sender: Sender<(Instant, CompactReq<T>, Machine<K, V, T, D>, Sender<Result<ApplyMergeResult, Error>>)>,
_phantom: PhantomData<fn() -> D>,
}
Expand description
A service for performing physical and logical compaction.
This will possibly be called over RPC in the future. Physical compaction is merging adjacent batches. Logical compaction is advancing timestamps to a new since and consolidating the resulting updates.
Fields§
§cfg: PersistConfig
§metrics: Arc<Metrics>
§sender: Sender<(Instant, CompactReq<T>, Machine<K, V, T, D>, Sender<Result<ApplyMergeResult, Error>>)>
§_phantom: PhantomData<fn() -> D>
Implementations§
source§impl<K, V, T, D> Compactor<K, V, T, D>
impl<K, V, T, D> Compactor<K, V, T, D>
pub fn new( cfg: PersistConfig, metrics: Arc<Metrics>, write_schemas: Schemas<K, V>, gc: GarbageCollector<K, V, T, D>, ) -> Self
sourcepub fn compact_and_apply_background(
&self,
req: CompactReq<T>,
machine: &Machine<K, V, T, D>,
) -> Option<Receiver<Result<ApplyMergeResult, Error>>>
pub fn compact_and_apply_background( &self, req: CompactReq<T>, machine: &Machine<K, V, T, D>, ) -> Option<Receiver<Result<ApplyMergeResult, Error>>>
Enqueues a CompactReq to be consumed by the compaction background task when available.
Returns a receiver that indicates when compaction has completed. The receiver can be safely dropped at any time if the caller does not wish to wait on completion.
pub(crate) async fn compact_and_apply( machine: &mut Machine<K, V, T, D>, req: CompactReq<T>, write_schemas: Schemas<K, V>, ) -> Result<(ApplyMergeResult, RoutineMaintenance), Error>
sourcepub async fn compact(
cfg: CompactConfig,
blob: Arc<dyn Blob>,
metrics: Arc<Metrics>,
shard_metrics: Arc<ShardMetrics>,
isolated_runtime: Arc<IsolatedRuntime>,
req: CompactReq<T>,
write_schemas: Schemas<K, V>,
) -> Result<CompactRes<T>, Error>
pub async fn compact( cfg: CompactConfig, blob: Arc<dyn Blob>, metrics: Arc<Metrics>, shard_metrics: Arc<ShardMetrics>, isolated_runtime: Arc<IsolatedRuntime>, req: CompactReq<T>, write_schemas: Schemas<K, V>, ) -> Result<CompactRes<T>, Error>
Compacts input batches in bounded memory.
The memory bound is broken into pieces: 1. in-progress work 2. fetching parts from runs 3. additional in-flight requests to Blob
- In-progress work is bounded by 2 * BatchBuilderConfig::blob_target_size. This usage is met at two mutually exclusive moments:
- When reading in a part, we hold the columnar format in memory while writing its contents into a heap.
- When writing a part, we hold a temporary updates buffer while encoding/writing it into a columnar format for Blob.
-
When compacting runs, only 1 part from each one is held in memory at a time. Compaction will determine an appropriate number of runs to compact together given the memory bound and accounting for the reservation in (1). A minimum of 2 * BatchBuilderConfig::blob_target_size of memory is expected, to be able to at least have the capacity to compact two runs together at a time, and more runs will be compacted together if more memory is available.
-
If there is excess memory after accounting for (1) and (2), we increase the number of outstanding parts we can keep in-flight to Blob.
sourcefn chunk_runs<'a>(
req: &'a CompactReq<T>,
cfg: &CompactConfig,
metrics: &Metrics,
run_reserved_memory_bytes: usize,
) -> Vec<(Vec<(&'a Description<T>, &'a RunMeta, &'a [BatchPart<T>])>, usize)>
fn chunk_runs<'a>( req: &'a CompactReq<T>, cfg: &CompactConfig, metrics: &Metrics, run_reserved_memory_bytes: usize, ) -> Vec<(Vec<(&'a Description<T>, &'a RunMeta, &'a [BatchPart<T>])>, usize)>
Sorts and groups all runs from the inputs into chunks, each of which has been determined
to consume no more than run_reserved_memory_bytes
at a time, unless the input parts
were written with a different target size than this build. Uses Self::order_runs to
determine the order in which runs are selected.
sourcefn order_runs(
req: &CompactReq<T>,
target_order: RunOrder,
) -> Vec<(&Description<T>, &RunMeta, &[BatchPart<T>])>
fn order_runs( req: &CompactReq<T>, target_order: RunOrder, ) -> Vec<(&Description<T>, &RunMeta, &[BatchPart<T>])>
With bounded memory where we cannot compact all runs/parts together, the groupings in which we select runs to compact together will affect how much we’re able to consolidate updates.
This approach orders the input runs by cycling through each batch, selecting the head element until all are consumed. It assumes that it is generally more effective to prioritize compacting runs from different batches, rather than runs from within a single batch.
ex.
inputs output
b0 runs=[A, B]
b1 runs=[C] output=[A, C, D, B, E, F]
b2 runs=[D, E, F]
sourceasync fn compact_runs<'a>(
cfg: &'a CompactConfig,
shard_id: &'a ShardId,
desc: &'a Description<T>,
runs: Vec<(&'a Description<T>, &'a RunMeta, &'a [BatchPart<T>])>,
blob: Arc<dyn Blob>,
metrics: Arc<Metrics>,
shard_metrics: Arc<ShardMetrics>,
isolated_runtime: Arc<IsolatedRuntime>,
write_schemas: Schemas<K, V>,
) -> Result<HollowBatch<T>, Error>
async fn compact_runs<'a>( cfg: &'a CompactConfig, shard_id: &'a ShardId, desc: &'a Description<T>, runs: Vec<(&'a Description<T>, &'a RunMeta, &'a [BatchPart<T>])>, blob: Arc<dyn Blob>, metrics: Arc<Metrics>, shard_metrics: Arc<ShardMetrics>, isolated_runtime: Arc<IsolatedRuntime>, write_schemas: Schemas<K, V>, ) -> Result<HollowBatch<T>, Error>
Compacts runs together. If the input runs are sorted, a single run will be created as output.
Maximum possible memory usage is (# runs + 2) * [crate::PersistConfig::blob_target_size]
fn validate_req(req: &CompactReq<T>) -> Result<(), Error>
Trait Implementations§
Auto Trait Implementations§
impl<K, V, T, D> Freeze for Compactor<K, V, T, D>
impl<K, V, T, D> !RefUnwindSafe for Compactor<K, V, T, D>
impl<K, V, T, D> Send for Compactor<K, V, T, D>
impl<K, V, T, D> Sync for Compactor<K, V, T, D>
impl<K, V, T, D> Unpin for Compactor<K, V, T, D>
impl<K, V, T, D> !UnwindSafe for Compactor<K, V, T, D>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
source§default unsafe fn clone_to_uninit(&self, dst: *mut T)
default unsafe fn clone_to_uninit(&self, dst: *mut T)
clone_to_uninit
)source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<T> ProgressEventTimestamp for T
impl<T> ProgressEventTimestamp for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.