mz_persist_client/internal/gc.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Borrow;
11use std::collections::BTreeSet;
12use std::fmt::Debug;
13use std::marker::PhantomData;
14use std::mem;
15use std::sync::Arc;
16use std::time::Instant;
17
18use differential_dataflow::difference::Monoid;
19use differential_dataflow::lattice::Lattice;
20use futures_util::StreamExt;
21use futures_util::stream::FuturesUnordered;
22use prometheus::Counter;
23use timely::progress::Timestamp;
24use tokio::sync::mpsc::UnboundedSender;
25use tokio::sync::{Semaphore, mpsc, oneshot};
26use tracing::{Instrument, Span, debug, debug_span, warn};
27
28use crate::async_runtime::IsolatedRuntime;
29use crate::batch::PartDeletes;
30use crate::cfg::GC_BLOB_DELETE_CONCURRENCY_LIMIT;
31
32use mz_ore::cast::CastFrom;
33use mz_ore::collections::HashSet;
34use mz_ore::soft_assert_or_log;
35use mz_persist::location::{Blob, SeqNo};
36use mz_persist_types::{Codec, Codec64};
37
38use crate::ShardId;
39use crate::internal::machine::{Machine, retry_external};
40use crate::internal::maintenance::RoutineMaintenance;
41use crate::internal::metrics::{GcStepTimings, RetryMetrics};
42use crate::internal::paths::{BlobKey, PartialBlobKey, PartialRollupKey};
43use crate::internal::state::{GC_USE_ACTIVE_GC, HollowBlobRef};
44use crate::internal::state_versions::{InspectDiff, StateVersionsIter, UntypedStateVersionsIter};
45
46#[derive(Debug, Clone, PartialEq)]
47pub struct GcReq {
48 pub shard_id: ShardId,
49 pub new_seqno_since: SeqNo,
50}
51
52#[derive(Debug)]
53pub struct GarbageCollector<K, V, T, D> {
54 sender: UnboundedSender<(GcReq, oneshot::Sender<RoutineMaintenance>)>,
55 _phantom: PhantomData<fn() -> (K, V, T, D)>,
56}
57
58impl<K, V, T, D> Clone for GarbageCollector<K, V, T, D> {
59 fn clone(&self) -> Self {
60 GarbageCollector {
61 sender: self.sender.clone(),
62 _phantom: PhantomData,
63 }
64 }
65}
66
67/// Cleanup for no longer necessary blobs and consensus versions.
68///
69/// - Every read handle, snapshot, and listener is given a capability on seqno
70/// with a very long lease (allowing for infrequent heartbeats). This is a
71/// guarantee that no blobs referenced by the state at that version will be
72/// deleted (even if they've been compacted in some newer version of the
73/// state). This is called a seqno_since in the code as it has obvious
74/// parallels to how sinces work at the shard/collection level. (Is reusing
75/// "since" here a good idea? We could also call it a "seqno capability" or
76/// something else instead.)
77/// - Every state transition via apply_unbatched_cmd has the opportunity to
78/// determine that the overall seqno_since for the shard has changed. In the
79/// common case in production, this will be in response to a snapshot
80/// finishing or a listener emitting some batch.
81/// - It would be nice if this only ever happened in response to read-y things
82/// (there'd be a nice parallel to how compaction background work is only
83/// spawned by write activity), but if there are no readers, we still very
84/// much want to continue to garbage collect. Notably, if there are no
85/// readers, we naturally only need to hold a capability on the current
86/// version of state. This means that if there are only writers, a write
87/// commands will result in the seqno_since advancing immediately from the
88/// previous version of the state to the new one.
89/// - Like Compacter, GarbageCollector uses a heuristic to ignore some requests
90/// to save work. In this case, the tradeoff is between consensus traffic
91/// (plus a bit of cpu) and keeping blobs around longer than strictly
92/// necessary. This is correct because a process could always die while
93/// executing one of these requests (or be slow and still working on it when
94/// the next request is generated), so we anyway need to handle them being
95/// dropped.
96/// - GarbageCollector works by `Consensus::scan`-ing for every live version of
97/// state (ignoring what the request things the prev_state_seqno was for the
98/// reasons mentioned immediately above). It then walks through them in a
99/// loop, accumulating a BTreeSet of every referenced blob key. When it finds
100/// the version corresponding to the new_seqno_since, it removes every blob in
101/// that version of the state from the BTreeSet and exits the loop. This
102/// results in the BTreeSet containing every blob eligible for deletion. It
103/// deletes those blobs and then truncates the state to the new_seqno_since to
104/// indicate that this work doesn't need to be done again.
105/// - Note that these requests are being processed concurrently, so it's always
106/// possible that some future request has already deleted the blobs and
107/// truncated consensus. It's also possible that this is the future request.
108/// As a result, the only guarantee that we get is that the current version of
109/// head is >= new_seqno_since.
110/// - (Aside: The above also means that if Blob is not linearizable, there is a
111/// possible race where a blob gets deleted before it written and thus is
112/// leaked. We anyway always have the possibility of a write process being
113/// killed between when it writes a blob and links it into state, so this is
114/// fine; it'll be caught and fixed by the same mechanism.)
115impl<K, V, T, D> GarbageCollector<K, V, T, D>
116where
117 K: Debug + Codec,
118 V: Debug + Codec,
119 T: Timestamp + Lattice + Codec64 + Sync,
120 D: Monoid + Codec64,
121{
122 pub fn new(machine: Machine<K, V, T, D>, isolated_runtime: Arc<IsolatedRuntime>) -> Self {
123 let (gc_req_sender, mut gc_req_recv) =
124 mpsc::unbounded_channel::<(GcReq, oneshot::Sender<RoutineMaintenance>)>();
125
126 // spin off a single task responsible for executing GC requests.
127 // work is enqueued into the task through a channel
128 let _worker_handle = mz_ore::task::spawn(|| "PersistGcWorker", async move {
129 while let Some((req, completer)) = gc_req_recv.recv().await {
130 let mut consolidated_req = req;
131 let mut gc_completed_senders = vec![completer];
132
133 // check if any further gc requests have built up. we'll merge their requests
134 // together and run a single GC pass to satisfy all of them
135 while let Ok((req, completer)) = gc_req_recv.try_recv() {
136 assert_eq!(req.shard_id, consolidated_req.shard_id);
137 gc_completed_senders.push(completer);
138 consolidated_req.new_seqno_since =
139 std::cmp::max(req.new_seqno_since, consolidated_req.new_seqno_since);
140 }
141
142 let merged_requests = gc_completed_senders.len() - 1;
143 if merged_requests > 0 {
144 machine
145 .applier
146 .metrics
147 .gc
148 .merged
149 .inc_by(u64::cast_from(merged_requests));
150 debug!(
151 "Merged {} gc requests together for shard {}",
152 merged_requests, consolidated_req.shard_id
153 );
154 }
155
156 let gc_span = debug_span!(parent: None, "gc_and_truncate", shard_id=%consolidated_req.shard_id);
157 gc_span.follows_from(&Span::current());
158
159 let start = Instant::now();
160 machine.applier.metrics.gc.started.inc();
161 let (mut maintenance, _stats) = {
162 let name = format!("gc_and_truncate ({})", &consolidated_req.shard_id);
163 let machine = machine.clone();
164 isolated_runtime
165 .spawn_named(|| name, async move {
166 Self::gc_and_truncate(&machine, consolidated_req)
167 .instrument(gc_span)
168 .await
169 })
170 .await
171 };
172 machine.applier.metrics.gc.finished.inc();
173 machine.applier.shard_metrics.gc_finished.inc();
174 machine
175 .applier
176 .metrics
177 .gc
178 .seconds
179 .inc_by(start.elapsed().as_secs_f64());
180
181 // inform all callers who enqueued GC reqs that their work is complete
182 for sender in gc_completed_senders {
183 // we can safely ignore errors here, it's possible the caller
184 // wasn't interested in waiting and dropped their receiver.
185 // maintenance will be somewhat-arbitrarily assigned to the first oneshot.
186 let _ = sender.send(mem::take(&mut maintenance));
187 }
188 }
189 });
190
191 GarbageCollector {
192 sender: gc_req_sender,
193 _phantom: PhantomData,
194 }
195 }
196
197 /// Enqueues a [GcReq] to be consumed by the GC background task when available.
198 ///
199 /// Returns a future that indicates when GC has cleaned up to at least [GcReq::new_seqno_since]
200 pub fn gc_and_truncate_background(
201 &self,
202 req: GcReq,
203 ) -> Option<oneshot::Receiver<RoutineMaintenance>> {
204 let (gc_completed_sender, gc_completed_receiver) = oneshot::channel();
205 let new_gc_sender = self.sender.clone();
206 let send = new_gc_sender.send((req, gc_completed_sender));
207
208 if let Err(e) = send {
209 // In the steady state we expect this to always succeed, but during
210 // shutdown it is possible the destination task has already spun down
211 warn!(
212 "gc_and_truncate_background failed to send gc request: {}",
213 e
214 );
215 return None;
216 }
217
218 Some(gc_completed_receiver)
219 }
220
221 pub(crate) async fn gc_and_truncate(
222 machine: &Machine<K, V, T, D>,
223 req: GcReq,
224 ) -> (RoutineMaintenance, GcResults) {
225 let mut step_start = Instant::now();
226 let mut report_step_timing = |counter: &Counter| {
227 let now = Instant::now();
228 counter.inc_by(now.duration_since(step_start).as_secs_f64());
229 step_start = now;
230 };
231 assert_eq!(req.shard_id, machine.shard_id());
232
233 // Double check our GC req: seqno_since will never regress
234 // so we can verify it's not somehow greater than the last-
235 // known seqno_since
236 if req.new_seqno_since > machine.applier.seqno_since() {
237 machine
238 .applier
239 .fetch_and_update_state(Some(req.new_seqno_since))
240 .await;
241 let current_seqno_since = machine.applier.seqno_since();
242 assert!(
243 req.new_seqno_since <= current_seqno_since,
244 "invalid gc req: {:?} vs machine seqno_since {}",
245 req,
246 current_seqno_since
247 );
248 }
249
250 // First, check the latest known state to this process to see
251 // if there's relevant GC work for this seqno_since
252 let gc_rollups =
253 GcRollups::new(machine.applier.rollups_lte_seqno(req.new_seqno_since), &req);
254 let rollups_to_remove_from_state = gc_rollups.rollups_to_remove_from_state();
255 report_step_timing(&machine.applier.metrics.gc.steps.find_removable_rollups);
256
257 let mut gc_results = GcResults::default();
258
259 if rollups_to_remove_from_state.is_empty() {
260 // If there are no rollups to remove from state (either the work has already
261 // been done, or the there aren't enough rollups <= seqno_since to have any
262 // to delete), we can safely exit.
263 machine.applier.metrics.gc.noop.inc();
264 return (RoutineMaintenance::default(), gc_results);
265 }
266
267 debug!(
268 "Finding all rollups <= ({}). Will truncate: {:?}. Will remove rollups from state: {:?}",
269 req.new_seqno_since,
270 gc_rollups.truncate_seqnos().collect::<Vec<_>>(),
271 rollups_to_remove_from_state,
272 );
273
274 let mut states = if GC_USE_ACTIVE_GC.get(&machine.applier.cfg) {
275 let diffs = machine
276 .applier
277 .state_versions
278 .fetch_live_diffs_through(&req.shard_id, req.new_seqno_since)
279 .await;
280
281 let initial_seqno = diffs.first().expect("state is initialized").seqno;
282
283 let Some(initial_rollup) = gc_rollups.get(initial_seqno) else {
284 // The latest state is always expected to have a reference to a rollup for the
285 // earliest seqno. If our state doesn't, that could mean:
286 // - Our diffs are too old, and some other process has already truncated past this point.
287 // (But currently we fetch diffs _after_ checking state, so that shouldn't happen.)
288 // - Our diffs are too new... someone has added a rollup for this seqno _after_ we fetched
289 // state, then truncated to it.
290 // In either case we're working on outdated data and should stop.
291 debug!(
292 ?initial_seqno,
293 ?gc_rollups,
294 "skipping gc - no rollup at initial seqno. concurrent GC?"
295 );
296 return (RoutineMaintenance::default(), gc_results);
297 };
298
299 let Some(state) = machine
300 .applier
301 .state_versions
302 .fetch_rollup_at_key::<T>(&req.shard_id, initial_rollup)
303 .await
304 else {
305 debug!(
306 ?initial_seqno,
307 ?gc_rollups,
308 "skipping gc - deleted rollup at initial seqno. concurrent GC?"
309 );
310 return (RoutineMaintenance::default(), gc_results);
311 };
312
313 UntypedStateVersionsIter::new(
314 req.shard_id,
315 machine.applier.cfg.clone(),
316 Arc::clone(&machine.applier.metrics),
317 state,
318 diffs,
319 )
320 .check_ts_codec()
321 .expect("ts codec has not changed")
322 } else {
323 machine
324 .applier
325 .state_versions
326 .fetch_all_live_states(req.shard_id)
327 .await
328 .expect("state is initialized")
329 .check_ts_codec()
330 .expect("ts codec has not changed")
331 };
332
333 let initial_seqno = states.state().seqno;
334 report_step_timing(&machine.applier.metrics.gc.steps.fetch_seconds);
335
336 machine
337 .applier
338 .shard_metrics
339 .gc_live_diffs
340 .set(u64::cast_from(states.len()));
341
342 debug!(
343 "gc seqno_since: ({}) got {} versions from scan",
344 req.new_seqno_since,
345 states.len()
346 );
347
348 Self::incrementally_delete_and_truncate(
349 &mut states,
350 &gc_rollups,
351 machine,
352 &mut report_step_timing,
353 &mut gc_results,
354 )
355 .await;
356
357 // Now that the blobs are deleted / Consensus is truncated, remove
358 // the rollups from state. Doing this at the end ensures that our
359 // invariant is maintained that the current state contains a rollup
360 // to the earliest state in Consensus, and ensures that if GC crashes
361 // part-way through, we still have a reference to these rollups to
362 // resume their deletion.
363 //
364 // This does mean that if GC crashes part-way through we would
365 // repeat work when it resumes. However the redundant work should
366 // be minimal as Consensus is incrementally truncated, allowing
367 // the next run of GC to skip any work needed for rollups less
368 // than the last truncation.
369 //
370 // In short, while this step is not incremental, it does not need
371 // to be for GC to efficiently resume. And in fact, making it
372 // incremental could be quite expensive (e.g. more CaS operations).
373 let (removed_rollups, maintenance) =
374 machine.remove_rollups(rollups_to_remove_from_state).await;
375 report_step_timing(&machine.applier.metrics.gc.steps.remove_rollups_from_state);
376 debug!("CaS removed rollups from state: {:?}", removed_rollups);
377 gc_results.rollups_removed_from_state = removed_rollups;
378
379 // Everything here and below is not strictly needed for GC to complete,
380 // but it's a good opportunity, while we have all live states in hand,
381 // to run some metrics and assertions.
382
383 // Apply all remaining live states to rollup some metrics, like how many
384 // parts are being held (in Blob) that are not part of the latest state.
385 let mut seqno_held_parts = 0;
386 while let Some(_) = states.next(|diff| match diff {
387 InspectDiff::FromInitial(_) => {}
388 InspectDiff::Diff(diff) => {
389 seqno_held_parts += diff.part_deletes().count();
390 }
391 }) {}
392
393 machine
394 .applier
395 .shard_metrics
396 .gc_seqno_held_parts
397 .set(u64::cast_from(seqno_held_parts));
398
399 // verify that the "current" state (as of `fetch_all_live_states`) contains
400 // a rollup to the earliest state we fetched. this invariant isn't affected
401 // by the GC work we just performed, but it is a property of GC correctness
402 // overall / is a convenient place to run the assertion.
403 let valid_pre_gc_state = states
404 .state()
405 .collections
406 .rollups
407 .contains_key(&initial_seqno);
408
409 // this should never be true in the steady-state, but may be true the
410 // first time GC runs after fixing any correctness bugs related to our
411 // state version invariants. we'll make it an error so we can track
412 // any violations in Sentry, but opt not to panic because the root
413 // cause of the violation cannot be from this GC run (in fact, this
414 // GC run, assuming it's correct, should have fixed the violation!)
415 soft_assert_or_log!(
416 valid_pre_gc_state,
417 "earliest state fetched during GC did not have corresponding rollup: rollups = {:?}, state seqno = {}",
418 states.state().collections.rollups,
419 initial_seqno
420 );
421
422 report_step_timing(
423 &machine
424 .applier
425 .metrics
426 .gc
427 .steps
428 .post_gc_calculations_seconds,
429 );
430
431 (maintenance, gc_results)
432 }
433
434 /// Physically deletes all blobs from Blob and live diffs from Consensus that
435 /// are safe to delete, given the `seqno_since`, ensuring that the earliest
436 /// live diff in Consensus has a rollup of seqno `<= seqno_since`.
437 ///
438 /// Internally, performs deletions for each rollup encountered, ensuring that
439 /// incremental progress is made even if the process is interrupted before
440 /// completing all gc work.
441 async fn incrementally_delete_and_truncate<F>(
442 states: &mut StateVersionsIter<T>,
443 gc_rollups: &GcRollups,
444 machine: &Machine<K, V, T, D>,
445 timer: &mut F,
446 gc_results: &mut GcResults,
447 ) where
448 F: FnMut(&Counter),
449 {
450 assert_eq!(states.state().shard_id, machine.shard_id());
451 let shard_id = states.state().shard_id;
452 let mut batch_parts_to_delete = PartDeletes::default();
453 let mut rollups_to_delete: BTreeSet<PartialRollupKey> = BTreeSet::new();
454
455 for truncate_lt in gc_rollups.truncate_seqnos() {
456 assert!(batch_parts_to_delete.is_empty());
457 assert!(rollups_to_delete.is_empty());
458
459 // our state is already past the truncation point. there's no work to do --
460 // some process already truncated this far
461 if states.state().seqno >= truncate_lt {
462 continue;
463 }
464
465 // By our invariant, `states` should always begin on a rollup.
466 assert!(
467 gc_rollups.contains_seqno(&states.state().seqno),
468 "must start with a present rollup before searching for blobs: rollups = {:#?}, state seqno = {}",
469 gc_rollups,
470 states.state().seqno
471 );
472
473 Self::find_removable_blobs(
474 states,
475 truncate_lt,
476 &machine.applier.metrics.gc.steps,
477 timer,
478 &mut batch_parts_to_delete,
479 &mut rollups_to_delete,
480 );
481
482 // After finding removable blobs, our state should be exactly `truncate_lt`,
483 // to ensure we've seen all blob deletions in the diffs needed to reach
484 // this seqno.
485 //
486 // That we can always reach `truncate_lt` given the live diffs we fetched
487 // earlier is a little subtle:
488 // * Our GC request was generated after `seqno_since` was written.
489 // * If our initial seqno on this loop was < `truncate_lt`, then our read
490 // to `fetch_all_live_states` must have seen live diffs through at least
491 // `seqno_since`, because the diffs were not yet truncated.
492 // * `seqno_since` >= `truncate_lt`, therefore we must have enough live
493 // diffs to reach `truncate_lt`.
494 assert_eq!(states.state().seqno, truncate_lt);
495 // `truncate_lt` _is_ the seqno of a rollup, but let's very explicitly
496 // assert that we're about to truncate everything less than a rollup
497 // to maintain our invariant.
498 assert!(
499 gc_rollups.contains_seqno(&states.state().seqno),
500 "must start with a present rollup after searching for blobs: rollups = {:#?}, state seqno = {}",
501 gc_rollups,
502 states.state().seqno
503 );
504
505 // Extra paranoia: verify that none of the blobs we're about to delete
506 // are in our current state (we should only be truncating blobs from
507 // before this state!)
508 states.state().blobs().for_each(|blob| match blob {
509 HollowBlobRef::Batch(batch) => {
510 for live_part in &batch.parts {
511 assert!(!batch_parts_to_delete.contains(live_part));
512 }
513 }
514 HollowBlobRef::Rollup(live_rollup) => {
515 assert_eq!(rollups_to_delete.get(&live_rollup.key), None);
516 // And double check that the rollups we're about to delete are
517 // earlier than our truncation point:
518 match BlobKey::parse_ids(&live_rollup.key.complete(&shard_id)) {
519 Ok((_shard, PartialBlobKey::Rollup(rollup_seqno, _rollup))) => {
520 assert!(rollup_seqno < truncate_lt);
521 }
522 _ => {
523 panic!("invalid rollup during deletion: {:?}", live_rollup);
524 }
525 }
526 }
527 });
528
529 gc_results.truncated_consensus_to.push(truncate_lt);
530 gc_results.batch_parts_deleted_from_blob += batch_parts_to_delete.len();
531 gc_results.rollups_deleted_from_blob += rollups_to_delete.len();
532
533 Self::delete_and_truncate(
534 truncate_lt,
535 &mut batch_parts_to_delete,
536 &mut rollups_to_delete,
537 machine,
538 timer,
539 )
540 .await;
541 }
542 }
543
544 /// Iterates through `states`, accumulating all deleted blobs (both batch parts
545 /// and rollups) until reaching the seqno `truncate_lt`.
546 ///
547 /// * The initial seqno of `states` MUST be less than `truncate_lt`.
548 /// * The seqno of `states` after this fn will be exactly `truncate_lt`.
549 fn find_removable_blobs<F>(
550 states: &mut StateVersionsIter<T>,
551 truncate_lt: SeqNo,
552 metrics: &GcStepTimings,
553 timer: &mut F,
554 batch_parts_to_delete: &mut PartDeletes<T>,
555 rollups_to_delete: &mut BTreeSet<PartialRollupKey>,
556 ) where
557 F: FnMut(&Counter),
558 {
559 assert!(states.state().seqno < truncate_lt);
560 while let Some(state) = states.next(|diff| match diff {
561 InspectDiff::FromInitial(_) => {}
562 InspectDiff::Diff(diff) => {
563 diff.rollup_deletes().for_each(|rollup| {
564 // we use BTreeSets for fast lookups elsewhere, but we should never
565 // see repeat rollup insertions within a single GC run, otherwise we
566 // have a logic error or our diffs are incorrect (!)
567 assert!(rollups_to_delete.insert(rollup.key.to_owned()));
568 });
569 diff.part_deletes().for_each(|part| {
570 assert!(batch_parts_to_delete.add(part));
571 });
572 }
573 }) {
574 if state.seqno == truncate_lt {
575 break;
576 }
577 }
578 timer(&metrics.find_deletable_blobs_seconds);
579 }
580
581 /// Deletes `batch_parts` and `rollups` from Blob.
582 /// Truncates Consensus to `truncate_lt`.
583 async fn delete_and_truncate<F>(
584 truncate_lt: SeqNo,
585 batch_parts: &mut PartDeletes<T>,
586 rollups: &mut BTreeSet<PartialRollupKey>,
587 machine: &Machine<K, V, T, D>,
588 timer: &mut F,
589 ) where
590 F: FnMut(&Counter),
591 {
592 let shard_id = machine.shard_id();
593 let concurrency_limit = GC_BLOB_DELETE_CONCURRENCY_LIMIT.get(&machine.applier.cfg);
594 let delete_semaphore = Semaphore::new(concurrency_limit);
595
596 let batch_parts = std::mem::take(batch_parts);
597 batch_parts
598 .delete(
599 machine.applier.state_versions.blob.borrow(),
600 shard_id,
601 concurrency_limit,
602 &*machine.applier.metrics,
603 &machine.applier.metrics.retries.external.batch_delete,
604 )
605 .instrument(debug_span!("batch::delete"))
606 .await;
607 timer(&machine.applier.metrics.gc.steps.delete_batch_part_seconds);
608
609 Self::delete_all(
610 machine.applier.state_versions.blob.borrow(),
611 rollups.iter().map(|k| k.complete(&shard_id)),
612 &machine.applier.metrics.retries.external.rollup_delete,
613 debug_span!("rollup::delete"),
614 &delete_semaphore,
615 )
616 .await;
617 rollups.clear();
618 timer(&machine.applier.metrics.gc.steps.delete_rollup_seconds);
619
620 machine
621 .applier
622 .state_versions
623 .truncate_diffs(&shard_id, truncate_lt)
624 .await;
625 timer(&machine.applier.metrics.gc.steps.truncate_diff_seconds);
626 }
627
628 // There's also a bulk delete API in s3 if the performance of this
629 // becomes an issue. Maybe make Blob::delete take a list of keys?
630 //
631 // https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
632 async fn delete_all(
633 blob: &dyn Blob,
634 keys: impl Iterator<Item = BlobKey>,
635 metrics: &RetryMetrics,
636 span: Span,
637 semaphore: &Semaphore,
638 ) {
639 let futures = FuturesUnordered::new();
640 for key in keys {
641 futures.push(
642 retry_external(metrics, move || {
643 let key = key.clone();
644 async move {
645 let _permit = semaphore
646 .acquire()
647 .await
648 .expect("acquiring permit from open semaphore");
649 blob.delete(&key).await.map(|_| ())
650 }
651 })
652 .instrument(span.clone()),
653 )
654 }
655
656 futures.collect().await
657 }
658}
659
660#[derive(Debug, Default)]
661pub(crate) struct GcResults {
662 pub(crate) batch_parts_deleted_from_blob: usize,
663 pub(crate) rollups_deleted_from_blob: usize,
664 pub(crate) truncated_consensus_to: Vec<SeqNo>,
665 pub(crate) rollups_removed_from_state: Vec<SeqNo>,
666}
667
668#[derive(Debug)]
669struct GcRollups {
670 rollups_lte_seqno_since: Vec<(SeqNo, PartialRollupKey)>,
671 rollup_seqnos: HashSet<SeqNo>,
672}
673
674impl GcRollups {
675 fn new(rollups_lte_seqno_since: Vec<(SeqNo, PartialRollupKey)>, gc_req: &GcReq) -> Self {
676 assert!(
677 rollups_lte_seqno_since
678 .iter()
679 .all(|(seqno, _rollup)| *seqno <= gc_req.new_seqno_since)
680 );
681 let rollup_seqnos = rollups_lte_seqno_since.iter().map(|(x, _)| *x).collect();
682 Self {
683 rollups_lte_seqno_since,
684 rollup_seqnos,
685 }
686 }
687
688 /// Return the rollup key for the given seqno, if it exists.
689 fn get(&self, seqno: SeqNo) -> Option<&PartialRollupKey> {
690 let index = self
691 .rollups_lte_seqno_since
692 .binary_search_by_key(&seqno, |(k, _)| *k)
693 .ok()?;
694 Some(&self.rollups_lte_seqno_since[index].1)
695 }
696
697 fn contains_seqno(&self, seqno: &SeqNo) -> bool {
698 self.rollup_seqnos.contains(seqno)
699 }
700
701 /// Returns the seqnos we can safely truncate state to when performing
702 /// incremental GC (all rollups with seqnos <= seqno_since).
703 fn truncate_seqnos(&self) -> impl Iterator<Item = SeqNo> + '_ {
704 self.rollups_lte_seqno_since
705 .iter()
706 .map(|(seqno, _rollup)| *seqno)
707 }
708
709 /// Returns the rollups we can safely remove from state (all rollups
710 /// `<` than the latest rollup `<=` seqno_since).
711 ///
712 /// See the full explanation in [crate::internal::state_versions::StateVersions]
713 /// for how this is derived.
714 fn rollups_to_remove_from_state(&self) -> &[(SeqNo, PartialRollupKey)] {
715 match self.rollups_lte_seqno_since.split_last() {
716 None => &[],
717 Some((_rollup_to_keep, rollups_to_remove_from_state)) => rollups_to_remove_from_state,
718 }
719 }
720}