Skip to main content

mz_persist_client/internal/
gc.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Borrow;
11use std::collections::BTreeSet;
12use std::fmt::Debug;
13use std::marker::PhantomData;
14use std::mem;
15use std::sync::Arc;
16use std::time::Instant;
17
18use differential_dataflow::difference::Monoid;
19use differential_dataflow::lattice::Lattice;
20use futures_util::StreamExt;
21use futures_util::stream::FuturesUnordered;
22use prometheus::Counter;
23use timely::progress::Timestamp;
24use tokio::sync::mpsc::UnboundedSender;
25use tokio::sync::{Semaphore, mpsc, oneshot};
26use tracing::{Instrument, Span, debug, debug_span, warn};
27
28use crate::async_runtime::IsolatedRuntime;
29use crate::batch::PartDeletes;
30use crate::cfg::GC_BLOB_DELETE_CONCURRENCY_LIMIT;
31
32use mz_ore::cast::CastFrom;
33use mz_ore::collections::HashSet;
34use mz_ore::soft_assert_or_log;
35use mz_persist::location::{Blob, SeqNo};
36use mz_persist_types::{Codec, Codec64};
37
38use crate::ShardId;
39use crate::internal::machine::{Machine, retry_external};
40use crate::internal::maintenance::RoutineMaintenance;
41use crate::internal::metrics::{GcStepTimings, RetryMetrics};
42use crate::internal::paths::{BlobKey, PartialBlobKey, PartialRollupKey};
43use crate::internal::state::{GC_USE_ACTIVE_GC, HollowBlobRef};
44use crate::internal::state_versions::{InspectDiff, StateVersionsIter, UntypedStateVersionsIter};
45
46#[derive(Debug, Clone, PartialEq)]
47pub struct GcReq {
48    pub shard_id: ShardId,
49    pub new_seqno_since: SeqNo,
50}
51
52#[derive(Debug)]
53pub struct GarbageCollector<K, V, T, D> {
54    sender: UnboundedSender<(GcReq, oneshot::Sender<RoutineMaintenance>)>,
55    _phantom: PhantomData<fn() -> (K, V, T, D)>,
56}
57
58impl<K, V, T, D> Clone for GarbageCollector<K, V, T, D> {
59    fn clone(&self) -> Self {
60        GarbageCollector {
61            sender: self.sender.clone(),
62            _phantom: PhantomData,
63        }
64    }
65}
66
67/// Cleanup for no longer necessary blobs and consensus versions.
68///
69/// - Every read handle, snapshot, and listener is given a capability on seqno
70///   with a very long lease (allowing for infrequent heartbeats). This is a
71///   guarantee that no blobs referenced by the state at that version will be
72///   deleted (even if they've been compacted in some newer version of the
73///   state). This is called a seqno_since in the code as it has obvious
74///   parallels to how sinces work at the shard/collection level. (Is reusing
75///   "since" here a good idea? We could also call it a "seqno capability" or
76///   something else instead.)
77/// - Every state transition via apply_unbatched_cmd has the opportunity to
78///   determine that the overall seqno_since for the shard has changed. In the
79///   common case in production, this will be in response to a snapshot
80///   finishing or a listener emitting some batch.
81/// - It would be nice if this only ever happened in response to read-y things
82///   (there'd be a nice parallel to how compaction background work is only
83///   spawned by write activity), but if there are no readers, we still very
84///   much want to continue to garbage collect. Notably, if there are no
85///   readers, we naturally only need to hold a capability on the current
86///   version of state. This means that if there are only writers, a write
87///   commands will result in the seqno_since advancing immediately from the
88///   previous version of the state to the new one.
89/// - Like Compacter, GarbageCollector uses a heuristic to ignore some requests
90///   to save work. In this case, the tradeoff is between consensus traffic
91///   (plus a bit of cpu) and keeping blobs around longer than strictly
92///   necessary. This is correct because a process could always die while
93///   executing one of these requests (or be slow and still working on it when
94///   the next request is generated), so we anyway need to handle them being
95///   dropped.
96/// - GarbageCollector works by `Consensus::scan`-ing for every live version of
97///   state (ignoring what the request things the prev_state_seqno was for the
98///   reasons mentioned immediately above). It then walks through them in a
99///   loop, accumulating a BTreeSet of every referenced blob key. When it finds
100///   the version corresponding to the new_seqno_since, it removes every blob in
101///   that version of the state from the BTreeSet and exits the loop. This
102///   results in the BTreeSet containing every blob eligible for deletion. It
103///   deletes those blobs and then truncates the state to the new_seqno_since to
104///   indicate that this work doesn't need to be done again.
105/// - Note that these requests are being processed concurrently, so it's always
106///   possible that some future request has already deleted the blobs and
107///   truncated consensus. It's also possible that this is the future request.
108///   As a result, the only guarantee that we get is that the current version of
109///   head is >= new_seqno_since.
110/// - (Aside: The above also means that if Blob is not linearizable, there is a
111///   possible race where a blob gets deleted before it written and thus is
112///   leaked. We anyway always have the possibility of a write process being
113///   killed between when it writes a blob and links it into state, so this is
114///   fine; it'll be caught and fixed by the same mechanism.)
115impl<K, V, T, D> GarbageCollector<K, V, T, D>
116where
117    K: Debug + Codec,
118    V: Debug + Codec,
119    T: Timestamp + Lattice + Codec64 + Sync,
120    D: Monoid + Codec64,
121{
122    pub fn new(machine: Machine<K, V, T, D>, isolated_runtime: Arc<IsolatedRuntime>) -> Self {
123        let (gc_req_sender, mut gc_req_recv) =
124            mpsc::unbounded_channel::<(GcReq, oneshot::Sender<RoutineMaintenance>)>();
125
126        // spin off a single task responsible for executing GC requests.
127        // work is enqueued into the task through a channel
128        let _worker_handle = mz_ore::task::spawn(|| "PersistGcWorker", async move {
129            while let Some((req, completer)) = gc_req_recv.recv().await {
130                let mut consolidated_req = req;
131                let mut gc_completed_senders = vec![completer];
132
133                // check if any further gc requests have built up. we'll merge their requests
134                // together and run a single GC pass to satisfy all of them
135                while let Ok((req, completer)) = gc_req_recv.try_recv() {
136                    assert_eq!(req.shard_id, consolidated_req.shard_id);
137                    gc_completed_senders.push(completer);
138                    consolidated_req.new_seqno_since =
139                        std::cmp::max(req.new_seqno_since, consolidated_req.new_seqno_since);
140                }
141
142                let merged_requests = gc_completed_senders.len() - 1;
143                if merged_requests > 0 {
144                    machine
145                        .applier
146                        .metrics
147                        .gc
148                        .merged
149                        .inc_by(u64::cast_from(merged_requests));
150                    debug!(
151                        "Merged {} gc requests together for shard {}",
152                        merged_requests, consolidated_req.shard_id
153                    );
154                }
155
156                let gc_span = debug_span!(parent: None, "gc_and_truncate", shard_id=%consolidated_req.shard_id);
157                gc_span.follows_from(&Span::current());
158
159                let start = Instant::now();
160                machine.applier.metrics.gc.started.inc();
161                let (mut maintenance, _stats) = {
162                    let name = format!("gc_and_truncate ({})", &consolidated_req.shard_id);
163                    let machine = machine.clone();
164                    isolated_runtime
165                        .spawn_named(|| name, async move {
166                            Self::gc_and_truncate(&machine, consolidated_req)
167                                .instrument(gc_span)
168                                .await
169                        })
170                        .await
171                };
172                machine.applier.metrics.gc.finished.inc();
173                machine.applier.shard_metrics.gc_finished.inc();
174                machine
175                    .applier
176                    .metrics
177                    .gc
178                    .seconds
179                    .inc_by(start.elapsed().as_secs_f64());
180
181                // inform all callers who enqueued GC reqs that their work is complete
182                for sender in gc_completed_senders {
183                    // we can safely ignore errors here, it's possible the caller
184                    // wasn't interested in waiting and dropped their receiver.
185                    // maintenance will be somewhat-arbitrarily assigned to the first oneshot.
186                    let _ = sender.send(mem::take(&mut maintenance));
187                }
188            }
189        });
190
191        GarbageCollector {
192            sender: gc_req_sender,
193            _phantom: PhantomData,
194        }
195    }
196
197    /// Enqueues a [GcReq] to be consumed by the GC background task when available.
198    ///
199    /// Returns a future that indicates when GC has cleaned up to at least [GcReq::new_seqno_since]
200    pub fn gc_and_truncate_background(
201        &self,
202        req: GcReq,
203    ) -> Option<oneshot::Receiver<RoutineMaintenance>> {
204        let (gc_completed_sender, gc_completed_receiver) = oneshot::channel();
205        let new_gc_sender = self.sender.clone();
206        let send = new_gc_sender.send((req, gc_completed_sender));
207
208        if let Err(e) = send {
209            // In the steady state we expect this to always succeed, but during
210            // shutdown it is possible the destination task has already spun down
211            warn!(
212                "gc_and_truncate_background failed to send gc request: {}",
213                e
214            );
215            return None;
216        }
217
218        Some(gc_completed_receiver)
219    }
220
221    pub(crate) async fn gc_and_truncate(
222        machine: &Machine<K, V, T, D>,
223        req: GcReq,
224    ) -> (RoutineMaintenance, GcResults) {
225        let mut step_start = Instant::now();
226        let mut report_step_timing = |counter: &Counter| {
227            let now = Instant::now();
228            counter.inc_by(now.duration_since(step_start).as_secs_f64());
229            step_start = now;
230        };
231        assert_eq!(req.shard_id, machine.shard_id());
232
233        // Double check our GC req: seqno_since will never regress
234        // so we can verify it's not somehow greater than the last-
235        // known seqno_since
236        if req.new_seqno_since > machine.applier.seqno_since() {
237            machine
238                .applier
239                .fetch_and_update_state(Some(req.new_seqno_since))
240                .await;
241            let current_seqno_since = machine.applier.seqno_since();
242            assert!(
243                req.new_seqno_since <= current_seqno_since,
244                "invalid gc req: {:?} vs machine seqno_since {}",
245                req,
246                current_seqno_since
247            );
248        }
249
250        // First, check the latest known state to this process to see
251        // if there's relevant GC work for this seqno_since
252        let gc_rollups =
253            GcRollups::new(machine.applier.rollups_lte_seqno(req.new_seqno_since), &req);
254        let rollups_to_remove_from_state = gc_rollups.rollups_to_remove_from_state();
255        report_step_timing(&machine.applier.metrics.gc.steps.find_removable_rollups);
256
257        let mut gc_results = GcResults::default();
258
259        if rollups_to_remove_from_state.is_empty() {
260            // If there are no rollups to remove from state (either the work has already
261            // been done, or the there aren't enough rollups <= seqno_since to have any
262            // to delete), we can safely exit. We still call remove_rollups to clear
263            // active_gc if it was set, so the next GC isn't suppressed.
264            let (_removed, maintenance) = machine.remove_rollups(&[]).await;
265            machine.applier.metrics.gc.noop.inc();
266            return (maintenance, gc_results);
267        }
268
269        debug!(
270            "Finding all rollups <= ({}). Will truncate: {:?}. Will remove rollups from state: {:?}",
271            req.new_seqno_since,
272            gc_rollups.truncate_seqnos().collect::<Vec<_>>(),
273            rollups_to_remove_from_state,
274        );
275
276        let mut states = if GC_USE_ACTIVE_GC.get(&machine.applier.cfg) {
277            let diffs = machine
278                .applier
279                .state_versions
280                .fetch_live_diffs_through(&req.shard_id, req.new_seqno_since)
281                .await;
282
283            let initial_seqno = diffs.first().expect("state is initialized").seqno;
284
285            let Some(initial_rollup) = gc_rollups.get(initial_seqno) else {
286                // The latest state is always expected to have a reference to a rollup for the
287                // earliest seqno. If our state doesn't, that could mean:
288                // - Our diffs are too old, and some other process has already truncated past this point.
289                //   (But currently we fetch diffs _after_ checking state, so that shouldn't happen.)
290                // - Our diffs are too new... someone has added a rollup for this seqno _after_ we fetched
291                //   state, then truncated to it.
292                // In either case we're working on outdated data and should stop.
293                debug!(
294                    ?initial_seqno,
295                    ?gc_rollups,
296                    "skipping gc - no rollup at initial seqno. concurrent GC?"
297                );
298                return (RoutineMaintenance::default(), gc_results);
299            };
300
301            let Some(state) = machine
302                .applier
303                .state_versions
304                .fetch_rollup_at_key::<T>(&req.shard_id, initial_rollup)
305                .await
306            else {
307                debug!(
308                    ?initial_seqno,
309                    ?gc_rollups,
310                    "skipping gc - deleted rollup at initial seqno. concurrent GC?"
311                );
312                return (RoutineMaintenance::default(), gc_results);
313            };
314
315            UntypedStateVersionsIter::new(
316                req.shard_id,
317                machine.applier.cfg.clone(),
318                Arc::clone(&machine.applier.metrics),
319                state,
320                diffs,
321            )
322            .check_ts_codec()
323            .expect("ts codec has not changed")
324        } else {
325            machine
326                .applier
327                .state_versions
328                .fetch_all_live_states(req.shard_id)
329                .await
330                .expect("state is initialized")
331                .check_ts_codec()
332                .expect("ts codec has not changed")
333        };
334
335        let initial_seqno = states.state().seqno;
336        report_step_timing(&machine.applier.metrics.gc.steps.fetch_seconds);
337
338        machine
339            .applier
340            .shard_metrics
341            .gc_live_diffs
342            .set(u64::cast_from(states.len()));
343
344        debug!(
345            "gc seqno_since: ({}) got {} versions from scan",
346            req.new_seqno_since,
347            states.len()
348        );
349
350        Self::incrementally_delete_and_truncate(
351            &mut states,
352            &gc_rollups,
353            machine,
354            &mut report_step_timing,
355            &mut gc_results,
356        )
357        .await;
358
359        // Now that the blobs are deleted / Consensus is truncated, remove
360        // the rollups from state. Doing this at the end ensures that our
361        // invariant is maintained that the current state contains a rollup
362        // to the earliest state in Consensus, and ensures that if GC crashes
363        // part-way through, we still have a reference to these rollups to
364        // resume their deletion.
365        //
366        // This does mean that if GC crashes part-way through we would
367        // repeat work when it resumes. However the redundant work should
368        // be minimal as Consensus is incrementally truncated, allowing
369        // the next run of GC to skip any work needed for rollups less
370        // than the last truncation.
371        //
372        // In short, while this step is not incremental, it does not need
373        // to be for GC to efficiently resume. And in fact, making it
374        // incremental could be quite expensive (e.g. more CaS operations).
375        let (removed_rollups, maintenance) =
376            machine.remove_rollups(rollups_to_remove_from_state).await;
377        report_step_timing(&machine.applier.metrics.gc.steps.remove_rollups_from_state);
378        debug!("CaS removed rollups from state: {:?}", removed_rollups);
379        gc_results.rollups_removed_from_state = removed_rollups;
380
381        // Everything here and below is not strictly needed for GC to complete,
382        // but it's a good opportunity, while we have all live states in hand,
383        // to run some metrics and assertions.
384
385        // Apply all remaining live states to rollup some metrics, like how many
386        // parts are being held (in Blob) that are not part of the latest state.
387        let mut seqno_held_parts = 0;
388        while let Some(_) = states.next(|diff| match diff {
389            InspectDiff::FromInitial(_) => {}
390            InspectDiff::Diff(diff) => {
391                seqno_held_parts += diff.part_deletes().count();
392            }
393        }) {}
394
395        machine
396            .applier
397            .shard_metrics
398            .gc_seqno_held_parts
399            .set(u64::cast_from(seqno_held_parts));
400
401        // verify that the "current" state (as of `fetch_all_live_states`) contains
402        // a rollup to the earliest state we fetched. this invariant isn't affected
403        // by the GC work we just performed, but it is a property of GC correctness
404        // overall / is a convenient place to run the assertion.
405        let valid_pre_gc_state = states
406            .state()
407            .collections
408            .rollups
409            .contains_key(&initial_seqno);
410
411        // this should never be true in the steady-state, but may be true the
412        // first time GC runs after fixing any correctness bugs related to our
413        // state version invariants. we'll make it an error so we can track
414        // any violations in Sentry, but opt not to panic because the root
415        // cause of the violation cannot be from this GC run (in fact, this
416        // GC run, assuming it's correct, should have fixed the violation!)
417        soft_assert_or_log!(
418            valid_pre_gc_state,
419            "earliest state fetched during GC did not have corresponding rollup: rollups = {:?}, state seqno = {}",
420            states.state().collections.rollups,
421            initial_seqno
422        );
423
424        report_step_timing(
425            &machine
426                .applier
427                .metrics
428                .gc
429                .steps
430                .post_gc_calculations_seconds,
431        );
432
433        (maintenance, gc_results)
434    }
435
436    /// Physically deletes all blobs from Blob and live diffs from Consensus that
437    /// are safe to delete, given the `seqno_since`, ensuring that the earliest
438    /// live diff in Consensus has a rollup of seqno `<= seqno_since`.
439    ///
440    /// Internally, performs deletions for each rollup encountered, ensuring that
441    /// incremental progress is made even if the process is interrupted before
442    /// completing all gc work.
443    async fn incrementally_delete_and_truncate<F>(
444        states: &mut StateVersionsIter<T>,
445        gc_rollups: &GcRollups,
446        machine: &Machine<K, V, T, D>,
447        timer: &mut F,
448        gc_results: &mut GcResults,
449    ) where
450        F: FnMut(&Counter),
451    {
452        assert_eq!(states.state().shard_id, machine.shard_id());
453        let shard_id = states.state().shard_id;
454        let mut batch_parts_to_delete = PartDeletes::default();
455        let mut rollups_to_delete: BTreeSet<PartialRollupKey> = BTreeSet::new();
456
457        for truncate_lt in gc_rollups.truncate_seqnos() {
458            assert!(batch_parts_to_delete.is_empty());
459            assert!(rollups_to_delete.is_empty());
460
461            // our state is already past the truncation point. there's no work to do --
462            // some process already truncated this far
463            if states.state().seqno >= truncate_lt {
464                continue;
465            }
466
467            // By our invariant, `states` should always begin on a rollup.
468            assert!(
469                gc_rollups.contains_seqno(&states.state().seqno),
470                "must start with a present rollup before searching for blobs: rollups = {:#?}, state seqno = {}",
471                gc_rollups,
472                states.state().seqno
473            );
474
475            Self::find_removable_blobs(
476                states,
477                truncate_lt,
478                &machine.applier.metrics.gc.steps,
479                timer,
480                &mut batch_parts_to_delete,
481                &mut rollups_to_delete,
482            );
483
484            // After finding removable blobs, our state should be exactly `truncate_lt`,
485            // to ensure we've seen all blob deletions in the diffs needed to reach
486            // this seqno.
487            //
488            // That we can always reach `truncate_lt` given the live diffs we fetched
489            // earlier is a little subtle:
490            // * Our GC request was generated after `seqno_since` was written.
491            // * If our initial seqno on this loop was < `truncate_lt`, then our read
492            //   to `fetch_all_live_states` must have seen live diffs through at least
493            //   `seqno_since`, because the diffs were not yet truncated.
494            // * `seqno_since` >= `truncate_lt`, therefore we must have enough live
495            //   diffs to reach `truncate_lt`.
496            assert_eq!(states.state().seqno, truncate_lt);
497            // `truncate_lt` _is_ the seqno of a rollup, but let's very explicitly
498            // assert that we're about to truncate everything less than a rollup
499            // to maintain our invariant.
500            assert!(
501                gc_rollups.contains_seqno(&states.state().seqno),
502                "must start with a present rollup after searching for blobs: rollups = {:#?}, state seqno = {}",
503                gc_rollups,
504                states.state().seqno
505            );
506
507            // Extra paranoia: verify that none of the blobs we're about to delete
508            // are in our current state (we should only be truncating blobs from
509            // before this state!)
510            states.state().blobs().for_each(|blob| match blob {
511                HollowBlobRef::Batch(batch) => {
512                    for live_part in &batch.parts {
513                        assert!(!batch_parts_to_delete.contains(live_part));
514                    }
515                }
516                HollowBlobRef::Rollup(live_rollup) => {
517                    assert_eq!(rollups_to_delete.get(&live_rollup.key), None);
518                    // And double check that the rollups we're about to delete are
519                    // earlier than our truncation point:
520                    match BlobKey::parse_ids(&live_rollup.key.complete(&shard_id)) {
521                        Ok((_shard, PartialBlobKey::Rollup(rollup_seqno, _rollup))) => {
522                            assert!(rollup_seqno < truncate_lt);
523                        }
524                        _ => {
525                            panic!("invalid rollup during deletion: {:?}", live_rollup);
526                        }
527                    }
528                }
529            });
530
531            gc_results.truncated_consensus_to.push(truncate_lt);
532            gc_results.batch_parts_deleted_from_blob += batch_parts_to_delete.len();
533            gc_results.rollups_deleted_from_blob += rollups_to_delete.len();
534
535            Self::delete_and_truncate(
536                truncate_lt,
537                &mut batch_parts_to_delete,
538                &mut rollups_to_delete,
539                machine,
540                timer,
541            )
542            .await;
543        }
544    }
545
546    /// Iterates through `states`, accumulating all deleted blobs (both batch parts
547    /// and rollups) until reaching the seqno `truncate_lt`.
548    ///
549    /// * The initial seqno of `states` MUST be less than `truncate_lt`.
550    /// * The seqno of `states` after this fn will be exactly `truncate_lt`.
551    fn find_removable_blobs<F>(
552        states: &mut StateVersionsIter<T>,
553        truncate_lt: SeqNo,
554        metrics: &GcStepTimings,
555        timer: &mut F,
556        batch_parts_to_delete: &mut PartDeletes<T>,
557        rollups_to_delete: &mut BTreeSet<PartialRollupKey>,
558    ) where
559        F: FnMut(&Counter),
560    {
561        assert!(states.state().seqno < truncate_lt);
562        while let Some(state) = states.next(|diff| match diff {
563            InspectDiff::FromInitial(_) => {}
564            InspectDiff::Diff(diff) => {
565                diff.rollup_deletes().for_each(|rollup| {
566                    // we use BTreeSets for fast lookups elsewhere, but we should never
567                    // see repeat rollup insertions within a single GC run, otherwise we
568                    // have a logic error or our diffs are incorrect (!)
569                    assert!(rollups_to_delete.insert(rollup.key.to_owned()));
570                });
571                diff.part_deletes().for_each(|part| {
572                    assert!(batch_parts_to_delete.add(part));
573                });
574            }
575        }) {
576            if state.seqno == truncate_lt {
577                break;
578            }
579        }
580        timer(&metrics.find_deletable_blobs_seconds);
581    }
582
583    /// Deletes `batch_parts` and `rollups` from Blob.
584    /// Truncates Consensus to `truncate_lt`.
585    async fn delete_and_truncate<F>(
586        truncate_lt: SeqNo,
587        batch_parts: &mut PartDeletes<T>,
588        rollups: &mut BTreeSet<PartialRollupKey>,
589        machine: &Machine<K, V, T, D>,
590        timer: &mut F,
591    ) where
592        F: FnMut(&Counter),
593    {
594        let shard_id = machine.shard_id();
595        let concurrency_limit = GC_BLOB_DELETE_CONCURRENCY_LIMIT.get(&machine.applier.cfg);
596        let delete_semaphore = Semaphore::new(concurrency_limit);
597
598        let batch_parts = std::mem::take(batch_parts);
599        batch_parts
600            .delete(
601                machine.applier.state_versions.blob.borrow(),
602                shard_id,
603                concurrency_limit,
604                &*machine.applier.metrics,
605                &machine.applier.metrics.retries.external.batch_delete,
606            )
607            .instrument(debug_span!("batch::delete"))
608            .await;
609        timer(&machine.applier.metrics.gc.steps.delete_batch_part_seconds);
610
611        Self::delete_all(
612            machine.applier.state_versions.blob.borrow(),
613            rollups.iter().map(|k| k.complete(&shard_id)),
614            &machine.applier.metrics.retries.external.rollup_delete,
615            debug_span!("rollup::delete"),
616            &delete_semaphore,
617        )
618        .await;
619        rollups.clear();
620        timer(&machine.applier.metrics.gc.steps.delete_rollup_seconds);
621
622        machine
623            .applier
624            .state_versions
625            .truncate_diffs(&shard_id, truncate_lt)
626            .await;
627        timer(&machine.applier.metrics.gc.steps.truncate_diff_seconds);
628    }
629
630    // There's also a bulk delete API in s3 if the performance of this
631    // becomes an issue. Maybe make Blob::delete take a list of keys?
632    //
633    // https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
634    async fn delete_all(
635        blob: &dyn Blob,
636        keys: impl Iterator<Item = BlobKey>,
637        metrics: &RetryMetrics,
638        span: Span,
639        semaphore: &Semaphore,
640    ) {
641        let futures = FuturesUnordered::new();
642        for key in keys {
643            futures.push(
644                retry_external(metrics, move || {
645                    let key = key.clone();
646                    async move {
647                        let _permit = semaphore
648                            .acquire()
649                            .await
650                            .expect("acquiring permit from open semaphore");
651                        blob.delete(&key).await.map(|_| ())
652                    }
653                })
654                .instrument(span.clone()),
655            )
656        }
657
658        futures.collect().await
659    }
660}
661
662#[derive(Debug, Default)]
663pub(crate) struct GcResults {
664    pub(crate) batch_parts_deleted_from_blob: usize,
665    pub(crate) rollups_deleted_from_blob: usize,
666    pub(crate) truncated_consensus_to: Vec<SeqNo>,
667    pub(crate) rollups_removed_from_state: Vec<SeqNo>,
668}
669
670#[derive(Debug)]
671struct GcRollups {
672    rollups_lte_seqno_since: Vec<(SeqNo, PartialRollupKey)>,
673    rollup_seqnos: HashSet<SeqNo>,
674}
675
676impl GcRollups {
677    fn new(rollups_lte_seqno_since: Vec<(SeqNo, PartialRollupKey)>, gc_req: &GcReq) -> Self {
678        assert!(
679            rollups_lte_seqno_since
680                .iter()
681                .all(|(seqno, _rollup)| *seqno <= gc_req.new_seqno_since)
682        );
683        let rollup_seqnos = rollups_lte_seqno_since.iter().map(|(x, _)| *x).collect();
684        Self {
685            rollups_lte_seqno_since,
686            rollup_seqnos,
687        }
688    }
689
690    /// Return the rollup key for the given seqno, if it exists.
691    fn get(&self, seqno: SeqNo) -> Option<&PartialRollupKey> {
692        let index = self
693            .rollups_lte_seqno_since
694            .binary_search_by_key(&seqno, |(k, _)| *k)
695            .ok()?;
696        Some(&self.rollups_lte_seqno_since[index].1)
697    }
698
699    fn contains_seqno(&self, seqno: &SeqNo) -> bool {
700        self.rollup_seqnos.contains(seqno)
701    }
702
703    /// Returns the seqnos we can safely truncate state to when performing
704    /// incremental GC (all rollups with seqnos <= seqno_since).
705    fn truncate_seqnos(&self) -> impl Iterator<Item = SeqNo> + '_ {
706        self.rollups_lte_seqno_since
707            .iter()
708            .map(|(seqno, _rollup)| *seqno)
709    }
710
711    /// Returns the rollups we can safely remove from state (all rollups
712    /// `<` than the latest rollup `<=` seqno_since).
713    ///
714    /// See the full explanation in [crate::internal::state_versions::StateVersions]
715    /// for how this is derived.
716    fn rollups_to_remove_from_state(&self) -> &[(SeqNo, PartialRollupKey)] {
717        match self.rollups_lte_seqno_since.split_last() {
718            None => &[],
719            Some((_rollup_to_keep, rollups_to_remove_from_state)) => rollups_to_remove_from_state,
720        }
721    }
722}