pub struct Compactor<K, V, T, D> {
    cfg: PersistConfig,
    metrics: Arc<Metrics>,
    sender: Sender<(Instant, CompactReq<T>, Machine<K, V, T, D>, Sender<Result<ApplyMergeResult, Error>>)>,
    _phantom: PhantomData<fn() -> D>,
}
Expand description

A service for performing physical and logical compaction.

This will possibly be called over RPC in the future. Physical compaction is merging adjacent batches. Logical compaction is advancing timestamps to a new since and consolidating the resulting updates.

Fields§

§cfg: PersistConfig§metrics: Arc<Metrics>§sender: Sender<(Instant, CompactReq<T>, Machine<K, V, T, D>, Sender<Result<ApplyMergeResult, Error>>)>§_phantom: PhantomData<fn() -> D>

Implementations§

source§

impl<K, V, T, D> Compactor<K, V, T, D>where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64, D: Semigroup + Codec64 + Send + Sync,

source

pub fn new( cfg: PersistConfig, metrics: Arc<Metrics>, isolated_runtime: Arc<IsolatedRuntime>, writer_id: WriterId, schemas: Schemas<K, V>, gc: GarbageCollector<K, V, T, D> ) -> Self

source

pub fn compact_and_apply_background( &self, req: CompactReq<T>, machine: &Machine<K, V, T, D> ) -> Option<Receiver<Result<ApplyMergeResult, Error>>>

Enqueues a CompactReq to be consumed by the compaction background task when available.

Returns a receiver that indicates when compaction has completed. The receiver can be safely dropped at any time if the caller does not wish to wait on completion.

source

async fn compact_and_apply( cfg: PersistConfig, blob: Arc<dyn Blob + Send + Sync>, metrics: Arc<Metrics>, isolated_runtime: Arc<IsolatedRuntime>, req: CompactReq<T>, writer_id: WriterId, schemas: Schemas<K, V>, machine: &mut Machine<K, V, T, D>, gc: &GarbageCollector<K, V, T, D> ) -> Result<ApplyMergeResult, Error>

source

pub async fn compact( cfg: CompactConfig, blob: Arc<dyn Blob + Send + Sync>, metrics: Arc<Metrics>, shard_metrics: Arc<ShardMetrics>, isolated_runtime: Arc<IsolatedRuntime>, req: CompactReq<T>, schemas: Schemas<K, V> ) -> Result<CompactRes<T>, Error>

Compacts input batches in bounded memory.

The memory bound is broken into pieces: 1. in-progress work 2. fetching parts from runs 3. additional in-flight requests to Blob

  1. In-progress work is bounded by 2 * BatchBuilderConfig::blob_target_size. This usage is met at two mutually exclusive moments:
  • When reading in a part, we hold the columnar format in memory while writing its contents into a heap.
  • When writing a part, we hold a temporary updates buffer while encoding/writing it into a columnar format for Blob.
  1. When compacting runs, only 1 part from each one is held in memory at a time. Compaction will determine an appropriate number of runs to compact together given the memory bound and accounting for the reservation in (1). A minimum of 2 * BatchBuilderConfig::blob_target_size of memory is expected, to be able to at least have the capacity to compact two runs together at a time, and more runs will be compacted together if more memory is available.

  2. If there is excess memory after accounting for (1) and (2), we increase the number of outstanding parts we can keep in-flight to Blob.

source

fn chunk_runs<'a>( req: &'a CompactReq<T>, cfg: &CompactConfig, metrics: &Metrics, run_reserved_memory_bytes: usize ) -> Vec<(Vec<(&'a Description<T>, &'a [HollowBatchPart])>, usize)>

Sorts and groups all runs from the inputs into chunks, each of which has been determined to consume no more than run_reserved_memory_bytes at a time, unless the input parts were written with a different target size than this build. Uses Self::order_runs to determine the order in which runs are selected.

source

fn order_runs(req: &CompactReq<T>) -> Vec<(&Description<T>, &[HollowBatchPart])>

With bounded memory where we cannot compact all runs/parts together, the groupings in which we select runs to compact together will affect how much we’re able to consolidate updates.

This approach orders the input runs by cycling through each batch, selecting the head element until all are consumed. It assumes that it is generally more effective to prioritize compacting runs from different batches, rather than runs from within a single batch.

ex.

       inputs                                        output
    b0 runs=[A, B]
    b1 runs=[C]                           output=[A, C, D, B, E, F]
    b2 runs=[D, E, F]
source

async fn compact_runs_streaming<'a>( cfg: &'a CompactConfig, shard_id: &'a ShardId, desc: &'a Description<T>, runs: Vec<(&'a Description<T>, &'a [HollowBatchPart])>, blob: Arc<dyn Blob + Send + Sync>, metrics: Arc<Metrics>, shard_metrics: Arc<ShardMetrics>, isolated_runtime: Arc<IsolatedRuntime>, real_schemas: Schemas<K, V> ) -> Result<HollowBatch<T>, Error>

source

async fn compact_runs<'a>( cfg: &'a CompactConfig, shard_id: &'a ShardId, desc: &'a Description<T>, runs: Vec<(&'a Description<T>, &'a [HollowBatchPart])>, blob: Arc<dyn Blob + Send + Sync>, metrics: Arc<Metrics>, shard_metrics: Arc<ShardMetrics>, isolated_runtime: Arc<IsolatedRuntime>, real_schemas: Schemas<K, V> ) -> Result<HollowBatch<T>, Error>

Compacts runs together. If the input runs are sorted, a single run will be created as output

Maximum possible memory usage is (# runs + 2) * [crate::PersistConfig::blob_target_size]

source

fn validate_req(req: &CompactReq<T>) -> Result<(), Error>

Trait Implementations§

source§

impl<K, V, T, D> Clone for Compactor<K, V, T, D>

source§

fn clone(&self) -> Self

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl<K: Debug, V: Debug, T: Debug, D: Debug> Debug for Compactor<K, V, T, D>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<K, V, T, D> !RefUnwindSafe for Compactor<K, V, T, D>

§

impl<K, V, T, D> Send for Compactor<K, V, T, D>where T: Send + Sync,

§

impl<K, V, T, D> Sync for Compactor<K, V, T, D>where T: Send + Sync,

§

impl<K, V, T, D> Unpin for Compactor<K, V, T, D>

§

impl<K, V, T, D> !UnwindSafe for Compactor<K, V, T, D>

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T, U> CastInto<U> for Twhere U: CastFrom<T>,

source§

fn cast_into(self) -> U

Performs the cast.
source§

impl<T> DynClone for Twhere T: Clone,

source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FromRef<T> for Twhere T: Clone,

source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for Twhere U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<Unshared, Shared> IntoShared<Shared> for Unsharedwhere Shared: FromUnshared<Unshared>,

source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
source§

impl<T> Pointable for T

source§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<T> ProgressEventTimestamp for Twhere T: Data + Debug + Any,

source§

fn as_any(&self) -> &(dyn Any + 'static)

Upcasts this ProgressEventTimestamp to Any. Read more
source§

fn type_name(&self) -> &'static str

Returns the name of the concrete type of this object. Read more
source§

impl<P, R> ProtoType<R> for Pwhere R: RustType<P>,

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<T> ToOwned for Twhere T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for Twhere V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

impl<T> Data for Twhere T: Clone + 'static,