pub struct GarbageCollector<K, V, T, D> {
    sender: UnboundedSender<(GcReq, Sender<RoutineMaintenance>)>,
    _phantom: PhantomData<fn() -> (K, V, T, D)>,
}

Fields§

§sender: UnboundedSender<(GcReq, Sender<RoutineMaintenance>)>§_phantom: PhantomData<fn() -> (K, V, T, D)>

Implementations§

source§

impl<K, V, T, D> GarbageCollector<K, V, T, D>where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64, D: Semigroup + Codec64,

Cleanup for no longer necessary blobs and consensus versions.

  • Every read handle, snapshot, and listener is given a capability on seqno with a very long lease (allowing for infrequent heartbeats). This is a guarantee that no blobs referenced by the state at that version will be deleted (even if they’ve been compacted in some newer version of the state). This is called a seqno_since in the code as it has obvious parallels to how sinces work at the shard/collection level. (Is reusing “since” here a good idea? We could also call it a “seqno capability” or something else instead.)
  • Every state transition via apply_unbatched_cmd has the opportunity to determine that the overall seqno_since for the shard has changed. In the common case in production, this will be in response to a snapshot finishing or a listener emitting some batch.
  • It would be nice if this only ever happened in response to read-y things (there’d be a nice parallel to how compaction background work is only spawned by write activity), but if there are no readers, we still very much want to continue to garbage collect. Notably, if there are no readers, we naturally only need to hold a capability on the current version of state. This means that if there are only writers, a write commands will result in the seqno_since advancing immediately from the previous version of the state to the new one.
  • Like Compacter, GarbageCollector uses a heuristic to ignore some requests to save work. In this case, the tradeoff is between consensus traffic (plus a bit of cpu) and keeping blobs around longer than strictly necessary. This is correct because a process could always die while executing one of these requests (or be slow and still working on it when the next request is generated), so we anyway need to handle them being dropped.
  • GarbageCollector works by Consensus::scan-ing for every live version of state (ignoring what the request things the prev_state_seqno was for the reasons mentioned immediately above). It then walks through them in a loop, accumulating a BTreeSet of every referenced blob key. When it finds the version corresponding to the new_seqno_since, it removes every blob in that version of the state from the BTreeSet and exits the loop. This results in the BTreeSet containing every blob eligible for deletion. It deletes those blobs and then truncates the state to the new_seqno_since to indicate that this work doesn’t need to be done again.
  • Note that these requests are being processed concurrently, so it’s always possible that some future request has already deleted the blobs and truncated consensus. It’s also possible that this is the future request. As a result, the only guarantee that we get is that the current version of head is >= new_seqno_since.
  • (Aside: The above also means that if Blob is not linearizable, there is a possible race where a blob gets deleted before it written and thus is leaked. We anyway always have the possibility of a write process being killed between when it writes a blob and links it into state, so this is fine; it’ll be caught and fixed by the same mechanism.)
source

pub fn new( machine: Machine<K, V, T, D>, isolated_runtime: Arc<IsolatedRuntime> ) -> Self

source

pub fn gc_and_truncate_background( &self, req: GcReq ) -> Option<Receiver<RoutineMaintenance>>

Enqueues a GcReq to be consumed by the GC background task when available.

Returns a future that indicates when GC has cleaned up to at least GcReq::new_seqno_since

source

pub(crate) async fn gc_and_truncate( machine: &mut Machine<K, V, T, D>, req: GcReq ) -> (RoutineMaintenance, GcResults)

source

async fn incrementally_delete_and_truncate<F>( states: &mut StateVersionsIter<T>, gc_rollups: &GcRollups, machine: &Machine<K, V, T, D>, timer: &mut F, gc_results: &mut GcResults )where F: FnMut(&Counter),

Physically deletes all blobs from Blob and live diffs from Consensus that are safe to delete, given the seqno_since, ensuring that the earliest live diff in Consensus has a rollup of seqno <= seqno_since.

Internally, performs deletions for each rollup encountered, ensuring that incremental progress is made even if the process is interrupted before completing all gc work.

source

fn find_removable_blobs<F>( states: &mut StateVersionsIter<T>, truncate_lt: SeqNo, metrics: &GcStepTimings, timer: &mut F, batch_parts_to_delete: &mut BTreeSet<PartialBatchKey>, rollups_to_delete: &mut BTreeSet<PartialRollupKey> )where F: FnMut(&Counter),

Iterates through states, accumulating all deleted blobs (both batch parts and rollups) until reaching the seqno truncate_lt.

  • The initial seqno of states MUST be less than truncate_lt.
  • The seqno of states after this fn will be exactly truncate_lt.
source

async fn delete_and_truncate<F>( truncate_lt: SeqNo, batch_parts: &mut BTreeSet<PartialBatchKey>, rollups: &mut BTreeSet<PartialRollupKey>, machine: &Machine<K, V, T, D>, timer: &mut F )where F: FnMut(&Counter),

Deletes batch_parts and rollups from Blob. Truncates Consensus to truncate_lt.

source

async fn delete_all( blob: &(dyn Blob + Send + Sync), keys: impl Iterator<Item = BlobKey>, metrics: &RetryMetrics, span: Span, semaphore: &Semaphore )

Trait Implementations§

source§

impl<K, V, T, D> Clone for GarbageCollector<K, V, T, D>

source§

fn clone(&self) -> Self

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl<K: Debug, V: Debug, T: Debug, D: Debug> Debug for GarbageCollector<K, V, T, D>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<K, V, T, D> !RefUnwindSafe for GarbageCollector<K, V, T, D>

§

impl<K, V, T, D> Send for GarbageCollector<K, V, T, D>

§

impl<K, V, T, D> Sync for GarbageCollector<K, V, T, D>

§

impl<K, V, T, D> Unpin for GarbageCollector<K, V, T, D>

§

impl<K, V, T, D> !UnwindSafe for GarbageCollector<K, V, T, D>

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T, U> CastInto<U> for Twhere U: CastFrom<T>,

source§

fn cast_into(self) -> U

Performs the cast.
source§

impl<T> DynClone for Twhere T: Clone,

source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FromRef<T> for Twhere T: Clone,

source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for Twhere U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<Unshared, Shared> IntoShared<Shared> for Unsharedwhere Shared: FromUnshared<Unshared>,

source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
source§

impl<T> Pointable for T

source§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<T> ProgressEventTimestamp for Twhere T: Data + Debug + Any,

source§

fn as_any(&self) -> &(dyn Any + 'static)

Upcasts this ProgressEventTimestamp to Any. Read more
source§

fn type_name(&self) -> &'static str

Returns the name of the concrete type of this object. Read more
source§

impl<P, R> ProtoType<R> for Pwhere R: RustType<P>,

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<T> ToOwned for Twhere T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for Twhere V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

impl<T> Data for Twhere T: Clone + 'static,