Struct mz_persist_client::internal::gc::GarbageCollector
source · pub struct GarbageCollector<K, V, T, D> {
sender: UnboundedSender<(GcReq, Sender<RoutineMaintenance>)>,
_phantom: PhantomData<fn() -> (K, V, T, D)>,
}
Fields§
§sender: UnboundedSender<(GcReq, Sender<RoutineMaintenance>)>
§_phantom: PhantomData<fn() -> (K, V, T, D)>
Implementations§
source§impl<K, V, T, D> GarbageCollector<K, V, T, D>
impl<K, V, T, D> GarbageCollector<K, V, T, D>
Cleanup for no longer necessary blobs and consensus versions.
- Every read handle, snapshot, and listener is given a capability on seqno with a very long lease (allowing for infrequent heartbeats). This is a guarantee that no blobs referenced by the state at that version will be deleted (even if they’ve been compacted in some newer version of the state). This is called a seqno_since in the code as it has obvious parallels to how sinces work at the shard/collection level. (Is reusing “since” here a good idea? We could also call it a “seqno capability” or something else instead.)
- Every state transition via apply_unbatched_cmd has the opportunity to determine that the overall seqno_since for the shard has changed. In the common case in production, this will be in response to a snapshot finishing or a listener emitting some batch.
- It would be nice if this only ever happened in response to read-y things (there’d be a nice parallel to how compaction background work is only spawned by write activity), but if there are no readers, we still very much want to continue to garbage collect. Notably, if there are no readers, we naturally only need to hold a capability on the current version of state. This means that if there are only writers, a write commands will result in the seqno_since advancing immediately from the previous version of the state to the new one.
- Like Compacter, GarbageCollector uses a heuristic to ignore some requests to save work. In this case, the tradeoff is between consensus traffic (plus a bit of cpu) and keeping blobs around longer than strictly necessary. This is correct because a process could always die while executing one of these requests (or be slow and still working on it when the next request is generated), so we anyway need to handle them being dropped.
- GarbageCollector works by
Consensus::scan
-ing for every live version of state (ignoring what the request things the prev_state_seqno was for the reasons mentioned immediately above). It then walks through them in a loop, accumulating a BTreeSet of every referenced blob key. When it finds the version corresponding to the new_seqno_since, it removes every blob in that version of the state from the BTreeSet and exits the loop. This results in the BTreeSet containing every blob eligible for deletion. It deletes those blobs and then truncates the state to the new_seqno_since to indicate that this work doesn’t need to be done again. - Note that these requests are being processed concurrently, so it’s always possible that some future request has already deleted the blobs and truncated consensus. It’s also possible that this is the future request. As a result, the only guarantee that we get is that the current version of head is >= new_seqno_since.
- (Aside: The above also means that if Blob is not linearizable, there is a possible race where a blob gets deleted before it written and thus is leaked. We anyway always have the possibility of a write process being killed between when it writes a blob and links it into state, so this is fine; it’ll be caught and fixed by the same mechanism.)
pub fn new( machine: Machine<K, V, T, D>, isolated_runtime: Arc<IsolatedRuntime>, ) -> Self
sourcepub fn gc_and_truncate_background(
&self,
req: GcReq,
) -> Option<Receiver<RoutineMaintenance>>
pub fn gc_and_truncate_background( &self, req: GcReq, ) -> Option<Receiver<RoutineMaintenance>>
Enqueues a GcReq to be consumed by the GC background task when available.
Returns a future that indicates when GC has cleaned up to at least GcReq::new_seqno_since
pub(crate) async fn gc_and_truncate( machine: &Machine<K, V, T, D>, req: GcReq, ) -> (RoutineMaintenance, GcResults)
sourceasync fn incrementally_delete_and_truncate<F>(
states: &mut StateVersionsIter<T>,
gc_rollups: &GcRollups,
machine: &Machine<K, V, T, D>,
timer: &mut F,
gc_results: &mut GcResults,
)
async fn incrementally_delete_and_truncate<F>( states: &mut StateVersionsIter<T>, gc_rollups: &GcRollups, machine: &Machine<K, V, T, D>, timer: &mut F, gc_results: &mut GcResults, )
Physically deletes all blobs from Blob and live diffs from Consensus that
are safe to delete, given the seqno_since
, ensuring that the earliest
live diff in Consensus has a rollup of seqno <= seqno_since
.
Internally, performs deletions for each rollup encountered, ensuring that incremental progress is made even if the process is interrupted before completing all gc work.
sourcefn find_removable_blobs<F>(
states: &mut StateVersionsIter<T>,
truncate_lt: SeqNo,
metrics: &GcStepTimings,
timer: &mut F,
batch_parts_to_delete: &mut PartDeletes<T>,
rollups_to_delete: &mut BTreeSet<PartialRollupKey>,
)
fn find_removable_blobs<F>( states: &mut StateVersionsIter<T>, truncate_lt: SeqNo, metrics: &GcStepTimings, timer: &mut F, batch_parts_to_delete: &mut PartDeletes<T>, rollups_to_delete: &mut BTreeSet<PartialRollupKey>, )
Iterates through states
, accumulating all deleted blobs (both batch parts
and rollups) until reaching the seqno truncate_lt
.
- The initial seqno of
states
MUST be less thantruncate_lt
. - The seqno of
states
after this fn will be exactlytruncate_lt
.
sourceasync fn delete_and_truncate<F>(
truncate_lt: SeqNo,
batch_parts: &mut PartDeletes<T>,
rollups: &mut BTreeSet<PartialRollupKey>,
machine: &Machine<K, V, T, D>,
timer: &mut F,
)
async fn delete_and_truncate<F>( truncate_lt: SeqNo, batch_parts: &mut PartDeletes<T>, rollups: &mut BTreeSet<PartialRollupKey>, machine: &Machine<K, V, T, D>, timer: &mut F, )
Deletes batch_parts
and rollups
from Blob.
Truncates Consensus to truncate_lt
.
async fn delete_all( blob: &dyn Blob, keys: impl Iterator<Item = BlobKey>, metrics: &RetryMetrics, span: Span, semaphore: &Semaphore, )
Trait Implementations§
source§impl<K, V, T, D> Clone for GarbageCollector<K, V, T, D>
impl<K, V, T, D> Clone for GarbageCollector<K, V, T, D>
Auto Trait Implementations§
impl<K, V, T, D> Freeze for GarbageCollector<K, V, T, D>
impl<K, V, T, D> !RefUnwindSafe for GarbageCollector<K, V, T, D>
impl<K, V, T, D> Send for GarbageCollector<K, V, T, D>
impl<K, V, T, D> Sync for GarbageCollector<K, V, T, D>
impl<K, V, T, D> Unpin for GarbageCollector<K, V, T, D>
impl<K, V, T, D> !UnwindSafe for GarbageCollector<K, V, T, D>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
source§default unsafe fn clone_to_uninit(&self, dst: *mut T)
default unsafe fn clone_to_uninit(&self, dst: *mut T)
clone_to_uninit
)source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<T> ProgressEventTimestamp for T
impl<T> ProgressEventTimestamp for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.