mz_persist_client/
usage.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Introspection of storage utilization by persist
11
12use std::collections::BTreeMap;
13use std::sync::Arc;
14use std::time::Instant;
15
16use futures::stream::{FuturesUnordered, StreamExt};
17use mz_ore::cast::CastFrom;
18use mz_persist::location::Blob;
19use tokio::sync::Semaphore;
20use tracing::{error, info};
21
22use crate::cfg::{PersistConfig, USAGE_STATE_FETCH_CONCURRENCY_LIMIT};
23use crate::internal::paths::{BlobKey, BlobKeyPrefix, PartialBlobKey, WriterKey};
24use crate::internal::state::HollowBlobRef;
25use crate::internal::state_versions::StateVersions;
26use crate::{Metrics, PersistClient, ShardId, retry_external};
27
28/// A breakdown of the size of various contributions to a shard's blob
29/// usage that is actively referenced by any live state in Consensus.
30#[derive(Clone, Debug)]
31pub struct ShardUsageReferenced {
32    pub(crate) batches_bytes: u64,
33    pub(crate) rollup_bytes: u64,
34}
35
36impl ShardUsageReferenced {
37    /// Byte size of all data referenced in state for the shard.
38    pub fn size_bytes(&self) -> u64 {
39        let Self {
40            batches_bytes,
41            rollup_bytes,
42        } = self;
43        *batches_bytes + *rollup_bytes
44    }
45}
46
47/// The referenced blob usage for a set of shards.
48#[derive(Debug)]
49pub struct ShardsUsageReferenced {
50    /// The data for each shard.
51    pub by_shard: BTreeMap<ShardId, ShardUsageReferenced>,
52}
53
54/// A breakdown of the size of various contributions to a shard's blob (S3)
55/// usage.
56///
57/// This is structured as a "funnel", in which the steps are additive.
58/// Specifically `1=2a+2b`, `2a=3a+3b`, `3a=4a+4b`, `4a=5a+5b` (so the "a"s are
59/// the funnel and the "b"s are places where data splits out of the funnel).
60#[derive(Clone, Debug)]
61pub struct ShardUsageAudit {
62    /// 5a: Data in batches/parts referenced by the most recent version of
63    /// state.
64    pub current_state_batches_bytes: u64,
65    /// 5b: Data in rollups referenced by the most recent version of state.
66    pub current_state_rollups_bytes: u64,
67    /// 4b: Data referenced by a live version of state that is not the most
68    /// recent.
69    ///
70    /// Possible causes:
71    /// - SeqNo hold
72    /// - Waiting for a GC run
73    pub referenced_not_current_state_bytes: u64,
74    /// 3b: Data not referenced by any live version of state.
75    ///
76    /// Possible causes:
77    /// - A batch or rollup that's about to be linked into state
78    /// - A batch leaked by a crash, but the writer has not yet been force
79    ///   expired
80    /// - A rollup leaked by a crash, but GC has not yet advanced past the
81    ///   SeqNo
82    pub not_leaked_not_referenced_bytes: u64,
83    /// 2b: Data that is eligible for reclamation by a (future) leaked blob
84    /// cleanup task (database-issues#5018).
85    ///
86    /// Possible causes:
87    /// - A batch or rollup written by a process which crashed (or was rolled)
88    ///   before it could be linked into state.
89    pub leaked_bytes: u64,
90}
91
92impl ShardUsageAudit {
93    /// 4a: Data referenced by the most recent version of state.
94    pub fn current_state_bytes(&self) -> u64 {
95        self.current_state_batches_bytes + self.current_state_rollups_bytes
96    }
97
98    /// 3a: Data referenced by any live version of state.
99    pub fn referenced_bytes(&self) -> u64 {
100        self.current_state_bytes() + self.referenced_not_current_state_bytes
101    }
102
103    /// 2a: Data that would not be reclaimed by a (future) leaked blob
104    /// cleanup task (database-issues#5018).
105    pub fn not_leaked_bytes(&self) -> u64 {
106        self.referenced_bytes() + self.not_leaked_not_referenced_bytes
107    }
108
109    /// 1: Raw blob (S3) usage.
110    ///
111    /// NB: Due to race conditions between reads of blob and consensus in the
112    /// usage code, this might be a slight under-counting.
113    pub fn total_bytes(&self) -> u64 {
114        self.not_leaked_bytes() + self.leaked_bytes
115    }
116}
117
118/// The blob (S3) usage of all shards in an environment.
119#[derive(Clone, Debug)]
120pub struct ShardsUsageAudit {
121    /// The data for each shard.
122    pub by_shard: BTreeMap<ShardId, ShardUsageAudit>,
123    /// Data not attributable to any particular shard. This _should_ always be
124    /// 0; a nonzero value indicates either persist wrote an invalid blob key,
125    /// or another process is storing data under the same path (!)
126    pub unattributable_bytes: u64,
127}
128
129#[derive(Clone, Debug, Default)]
130struct BlobUsage {
131    by_shard: BTreeMap<ShardId, ShardBlobUsage>,
132    unattributable_bytes: u64,
133    batch_part_bytes: u64,
134    batch_part_count: u64,
135    rollup_size: u64,
136    rollup_count: u64,
137    total_size: u64,
138    total_count: u64,
139}
140
141#[derive(Clone, Debug, Default)]
142struct ShardBlobUsage {
143    by_writer: BTreeMap<WriterKey, u64>,
144    rollup_bytes: u64,
145}
146
147impl ShardBlobUsage {
148    fn total_bytes(&self) -> u64 {
149        self.by_writer.values().copied().sum::<u64>() + self.rollup_bytes
150    }
151}
152
153/// Provides access to storage usage metrics for a specific Blob
154#[derive(Clone, Debug)]
155pub struct StorageUsageClient {
156    cfg: PersistConfig,
157    blob: Arc<dyn Blob>,
158    metrics: Arc<Metrics>,
159    state_versions: Arc<StateVersions>,
160}
161
162impl StorageUsageClient {
163    /// Creates a new StorageUsageClient.
164    pub fn open(client: PersistClient) -> Self {
165        let state_versions = Arc::new(StateVersions::new(
166            client.cfg.clone(),
167            Arc::clone(&client.consensus),
168            Arc::clone(&client.blob),
169            Arc::clone(&client.metrics),
170        ));
171        StorageUsageClient {
172            cfg: client.cfg,
173            blob: client.blob,
174            metrics: client.metrics,
175            state_versions,
176        }
177    }
178
179    /// Computes [ShardUsageReferenced] for a single shard. Suitable for customer billing.
180    pub async fn shard_usage_referenced(&self, shard_id: ShardId) -> ShardUsageReferenced {
181        let mut start = Instant::now();
182        let states_iter = self
183            .state_versions
184            .fetch_all_live_states::<u64>(shard_id)
185            .await;
186        let states_iter = match states_iter {
187            Some(x) => x,
188            None => {
189                return ShardUsageReferenced {
190                    batches_bytes: 0,
191                    rollup_bytes: 0,
192                };
193            }
194        };
195        let mut states_iter = states_iter
196            .check_ts_codec()
197            .expect("ts should be a u64 in all prod shards");
198
199        let shard_metrics = &self.metrics.shards.shard(&shard_id, "unknown");
200        shard_metrics
201            .gc_live_diffs
202            .set(u64::cast_from(states_iter.len()));
203
204        let now = Instant::now();
205        self.metrics
206            .audit
207            .step_state
208            .inc_by(now.duration_since(start).as_secs_f64());
209        start = now;
210
211        let mut batches_bytes = 0;
212        let mut rollup_bytes = 0;
213        while let Some(_) = states_iter.next(|diff| {
214            diff.referenced_blobs().for_each(|blob| match blob {
215                HollowBlobRef::Batch(batch) => {
216                    batches_bytes += batch.encoded_size_bytes();
217                }
218                HollowBlobRef::Rollup(rollup) => {
219                    rollup_bytes += rollup.encoded_size_bytes.unwrap_or(1);
220                }
221            })
222        }) {}
223
224        let referenced = ShardUsageReferenced {
225            batches_bytes: u64::cast_from(batches_bytes),
226            rollup_bytes: u64::cast_from(rollup_bytes),
227        };
228
229        let current_state_sizes = states_iter.state().size_metrics();
230        shard_metrics
231            .usage_current_state_batches_bytes
232            .set(u64::cast_from(current_state_sizes.state_batches_bytes));
233        shard_metrics
234            .usage_current_state_rollups_bytes
235            .set(u64::cast_from(current_state_sizes.state_rollups_bytes));
236        shard_metrics.usage_referenced_not_current_state_bytes.set(
237            referenced.size_bytes()
238                - u64::cast_from(
239                    current_state_sizes.state_batches_bytes
240                        + current_state_sizes.state_rollups_bytes,
241                ),
242        );
243
244        self.metrics
245            .audit
246            .step_math
247            .inc_by(now.duration_since(start).as_secs_f64());
248
249        referenced
250    }
251
252    /// Computes [ShardUsageReferenced] for a given set of shards. Suitable for customer billing.
253    pub async fn shards_usage_referenced<I>(&self, shard_ids: I) -> ShardsUsageReferenced
254    where
255        I: IntoIterator<Item = ShardId>,
256    {
257        let semaphore = Arc::new(Semaphore::new(
258            USAGE_STATE_FETCH_CONCURRENCY_LIMIT.get(&self.cfg),
259        ));
260        let by_shard_futures = FuturesUnordered::new();
261        for shard_id in shard_ids {
262            let semaphore = Arc::clone(&semaphore);
263            let shard_usage_fut = async move {
264                let _permit = semaphore
265                    .acquire()
266                    .await
267                    .expect("acquiring permit from open semaphore");
268                let shard_usage = self.shard_usage_referenced(shard_id).await;
269                (shard_id, shard_usage)
270            };
271            by_shard_futures.push(shard_usage_fut);
272        }
273        let by_shard = by_shard_futures.collect().await;
274        ShardsUsageReferenced { by_shard }
275    }
276
277    /// Computes [ShardUsageAudit] for a single shard.
278    ///
279    /// Performs a full scan of [Blob] and [mz_persist::location::Consensus] to compute a full audit
280    /// of blob usage, categorizing both referenced and unreferenced blobs (see [ShardUsageAudit]
281    /// for full details). While [ShardUsageAudit::referenced_bytes] is suitable for billing, prefer
282    /// [Self::shard_usage_referenced] to avoid the (costly!) scan of [Blob] if the additional
283    /// categorizations are not needed.
284    pub async fn shard_usage_audit(&self, shard_id: ShardId) -> ShardUsageAudit {
285        let mut blob_usage = self.blob_raw_usage(BlobKeyPrefix::Shard(&shard_id)).await;
286        let blob_usage = blob_usage.by_shard.remove(&shard_id).unwrap_or_default();
287        self.shard_usage_given_blob_usage(shard_id, &blob_usage)
288            .await
289    }
290
291    /// Computes [ShardUsageAudit] for every shard in an env.
292    ///
293    /// See [Self::shard_usage_audit] for more details on when to use a full audit.
294    pub async fn shards_usage_audit(&self) -> ShardsUsageAudit {
295        let blob_usage = self.blob_raw_usage(BlobKeyPrefix::All).await;
296        self.metrics
297            .audit
298            .blob_batch_part_bytes
299            .set(blob_usage.batch_part_bytes);
300        self.metrics
301            .audit
302            .blob_batch_part_count
303            .set(blob_usage.batch_part_count);
304        self.metrics
305            .audit
306            .blob_rollup_bytes
307            .set(blob_usage.rollup_size);
308        self.metrics
309            .audit
310            .blob_rollup_count
311            .set(blob_usage.rollup_count);
312        self.metrics.audit.blob_bytes.set(blob_usage.total_size);
313        self.metrics.audit.blob_count.set(blob_usage.total_count);
314
315        let semaphore = Semaphore::new(USAGE_STATE_FETCH_CONCURRENCY_LIMIT.get(&self.cfg));
316        let by_shard_futures = FuturesUnordered::new();
317        for (shard_id, total_bytes) in blob_usage.by_shard.iter() {
318            let shard_usage_fut = async {
319                let _permit = semaphore
320                    .acquire()
321                    .await
322                    .expect("acquiring permit from open semaphore");
323                let shard_usage = self
324                    .shard_usage_given_blob_usage(*shard_id, total_bytes)
325                    .await;
326                (*shard_id, shard_usage)
327            };
328            by_shard_futures.push(shard_usage_fut);
329        }
330
331        let by_shard = by_shard_futures.collect().await;
332        ShardsUsageAudit {
333            by_shard,
334            unattributable_bytes: blob_usage.unattributable_bytes,
335        }
336    }
337
338    async fn blob_raw_usage(&self, prefix: BlobKeyPrefix<'_>) -> BlobUsage {
339        retry_external(
340            &self.metrics.retries.external.storage_usage_shard_size,
341            || async {
342                let mut start = Instant::now();
343                let mut keys = 0;
344                let mut usage = BlobUsage::default();
345                self.blob
346                    .list_keys_and_metadata(&prefix.to_string(), &mut |metadata| {
347                        // Increment the step timing metrics as we go, so it
348                        // doesn't all show up at the end.
349                        keys += 1;
350                        if keys % 100 == 0 {
351                            let now = Instant::now();
352                            self.metrics
353                                .audit
354                                .step_blob_metadata
355                                .inc_by(now.duration_since(start).as_secs_f64());
356                            start = now;
357                        }
358
359                        match BlobKey::parse_ids(metadata.key) {
360                            Ok((shard, partial_blob_key)) => {
361                                let shard_usage = usage.by_shard.entry(shard).or_default();
362
363                                match partial_blob_key {
364                                    PartialBlobKey::Batch(writer_id, _) => {
365                                        usage.batch_part_bytes += metadata.size_in_bytes;
366                                        usage.batch_part_count += 1;
367                                        *shard_usage.by_writer.entry(writer_id).or_default() +=
368                                            metadata.size_in_bytes;
369                                    }
370                                    PartialBlobKey::Rollup(_, _) => {
371                                        usage.rollup_size += metadata.size_in_bytes;
372                                        usage.rollup_count += 1;
373                                        shard_usage.rollup_bytes += metadata.size_in_bytes;
374                                    }
375                                }
376                            }
377                            _ => {
378                                info!("unknown blob: {}: {}", metadata.key, metadata.size_in_bytes);
379                                usage.unattributable_bytes += metadata.size_in_bytes;
380                            }
381                        }
382                        usage.total_size += metadata.size_in_bytes;
383                        usage.total_count += 1;
384                    })
385                    .await?;
386                self.metrics
387                    .audit
388                    .step_blob_metadata
389                    .inc_by(start.elapsed().as_secs_f64());
390                Ok(usage)
391            },
392        )
393        .await
394    }
395
396    async fn shard_usage_given_blob_usage(
397        &self,
398        shard_id: ShardId,
399        blob_usage: &ShardBlobUsage,
400    ) -> ShardUsageAudit {
401        let mut start = Instant::now();
402        let states_iter = self
403            .state_versions
404            .fetch_all_live_states::<u64>(shard_id)
405            .await;
406        let states_iter = match states_iter {
407            Some(x) => x,
408            None => {
409                // It's unexpected for a shard to exist in blob but not in
410                // consensus, but it could happen. For example, if an initial
411                // rollup has been written but the initial CaS hasn't yet
412                // succeeded (or if a `bin/environmentd --reset` is interrupted
413                // in dev). Be loud because it's unexpected, but handle it
414                // because it can happen.
415                error!(
416                    concat!(
417                        "shard {} existed in blob but not in consensus. This should be quite rare in ",
418                        "prod, but is semi-expected in development if `bin/environmentd --reset` gets ",
419                        "interrupted"
420                    ),
421                    shard_id
422                );
423                return ShardUsageAudit {
424                    current_state_batches_bytes: 0,
425                    current_state_rollups_bytes: 0,
426                    referenced_not_current_state_bytes: 0,
427                    not_leaked_not_referenced_bytes: 0,
428                    leaked_bytes: blob_usage.total_bytes(),
429                };
430            }
431        };
432        let mut states_iter = states_iter
433            .check_ts_codec()
434            .expect("ts should be a u64 in all prod shards");
435        let now = Instant::now();
436        self.metrics
437            .audit
438            .step_state
439            .inc_by(now.duration_since(start).as_secs_f64());
440        start = now;
441
442        let shard_metrics = self.metrics.shards.shard(&shard_id, "unknown");
443        shard_metrics
444            .gc_live_diffs
445            .set(u64::cast_from(states_iter.len()));
446
447        let mut referenced_batches_bytes = BTreeMap::new();
448        let mut referenced_other_bytes = 0;
449        while let Some(_) = states_iter.next(|x| {
450            x.referenced_blobs().for_each(|x| match x {
451                HollowBlobRef::Batch(x) => {
452                    for part in x.parts.iter() {
453                        if let Some(writer_id) = part.writer_key() {
454                            let writer_referenced_batches_bytes =
455                                referenced_batches_bytes.entry(writer_id).or_default();
456                            *writer_referenced_batches_bytes += u64::cast_from(part.hollow_bytes());
457                        } else {
458                            referenced_other_bytes += u64::cast_from(part.hollow_bytes());
459                        }
460                    }
461                }
462                HollowBlobRef::Rollup(x) => {
463                    referenced_other_bytes +=
464                        u64::cast_from(x.encoded_size_bytes.unwrap_or_default());
465                }
466            })
467        }) {}
468
469        let mut current_state_batches_bytes = 0;
470        let mut current_state_rollups_bytes = 0;
471        states_iter.state().blobs().for_each(|x| match x {
472            HollowBlobRef::Batch(x) => {
473                for part in x.parts.iter() {
474                    current_state_batches_bytes += u64::cast_from(part.hollow_bytes());
475                }
476            }
477            HollowBlobRef::Rollup(x) => {
478                current_state_rollups_bytes +=
479                    u64::cast_from(x.encoded_size_bytes.unwrap_or_default());
480            }
481        });
482        let current_state_bytes = current_state_batches_bytes + current_state_rollups_bytes;
483
484        let ret = ShardUsageAudit::from(ShardUsageCumulativeMaybeRacy {
485            current_state_batches_bytes,
486            current_state_bytes,
487            referenced_other_bytes,
488            referenced_batches_bytes: &referenced_batches_bytes,
489            // In the future, this is likely to include a "grace period" so recent but non-current
490            // versions are also considered live
491            minimum_key: WriterKey::for_version(&self.cfg.build_version),
492            blob_usage,
493        });
494
495        // Sanity check that we didn't obviously do anything wrong.
496        assert_eq!(ret.total_bytes(), blob_usage.total_bytes());
497
498        shard_metrics
499            .usage_current_state_batches_bytes
500            .set(ret.current_state_batches_bytes);
501        shard_metrics
502            .usage_current_state_rollups_bytes
503            .set(ret.current_state_rollups_bytes);
504        shard_metrics
505            .usage_referenced_not_current_state_bytes
506            .set(ret.referenced_not_current_state_bytes);
507        shard_metrics
508            .usage_not_leaked_not_referenced_bytes
509            .set(ret.not_leaked_not_referenced_bytes);
510        shard_metrics.usage_leaked_bytes.set(ret.leaked_bytes);
511
512        self.metrics
513            .audit
514            .step_math
515            .inc_by(start.elapsed().as_secs_f64());
516        ret
517    }
518
519    /// Returns the size (in bytes) of a subset of blobs specified by
520    /// [BlobKeyPrefix]
521    ///
522    /// Can be safely called within retry_external to ensure it succeeds
523    #[cfg(test)]
524    async fn size(
525        &self,
526        prefix: BlobKeyPrefix<'_>,
527    ) -> Result<u64, mz_persist::location::ExternalError> {
528        let mut total_size = 0;
529        self.blob
530            .list_keys_and_metadata(&prefix.to_string(), &mut |metadata| {
531                total_size += metadata.size_in_bytes;
532            })
533            .await?;
534        Ok(total_size)
535    }
536}
537
538#[derive(Debug)]
539struct ShardUsageCumulativeMaybeRacy<'a> {
540    current_state_batches_bytes: u64,
541    current_state_bytes: u64,
542    referenced_other_bytes: u64,
543    referenced_batches_bytes: &'a BTreeMap<WriterKey, u64>,
544    minimum_key: WriterKey,
545    blob_usage: &'a ShardBlobUsage,
546}
547
548impl From<ShardUsageCumulativeMaybeRacy<'_>> for ShardUsageAudit {
549    fn from(x: ShardUsageCumulativeMaybeRacy<'_>) -> Self {
550        let mut not_leaked_bytes = 0;
551        let mut total_bytes = 0;
552        for (writer_key, bytes) in x.blob_usage.by_writer.iter() {
553            total_bytes += *bytes;
554            let writer_key_is_live = *writer_key >= x.minimum_key;
555            if writer_key_is_live {
556                not_leaked_bytes += *bytes;
557            } else {
558                // This writer is no longer live, so it can never again link
559                // anything into state. As a result, we know that anything it
560                // hasn't linked into state is now leaked and eligible for
561                // reclamation by a (future) leaked blob detector.
562                let writer_referenced =
563                    x.referenced_batches_bytes.get(writer_key).map_or(0, |x| *x);
564                // It's possible, due to races, that a writer has more
565                // referenced batches in state than we saw for that writer in
566                // blob. Cap it at the number of bytes we saw in blob, otherwise
567                // we could hit the "blob inputs should be cumulative" panic
568                // below.
569                not_leaked_bytes += std::cmp::min(*bytes, writer_referenced);
570            }
571        }
572        // For now, assume rollups aren't leaked. We could compute which rollups
573        // are leaked by plumbing things more precisely, if that's necessary.
574        total_bytes += x.blob_usage.rollup_bytes;
575        not_leaked_bytes += x.blob_usage.rollup_bytes;
576
577        let leaked_bytes = total_bytes
578            .checked_sub(not_leaked_bytes)
579            .expect("blob inputs should be cumulative");
580        let referenced_batches_bytes = x.referenced_batches_bytes.values().sum::<u64>();
581        let referenced_bytes = referenced_batches_bytes + x.referenced_other_bytes;
582        let mut referenced_not_current_state_bytes = referenced_bytes
583            .checked_sub(x.current_state_bytes)
584            .expect("state inputs should be cumulative");
585        let mut current_state_rollups_bytes = x
586            .current_state_bytes
587            .checked_sub(x.current_state_batches_bytes)
588            .expect("state inputs should be cumulative");
589        let mut current_state_batches_bytes = x.current_state_batches_bytes;
590
591        // If we could transactionally read both blob and consensus, the
592        // cumulative numbers would all line up. We can't, so we have to adjust
593        // them up a bit to account for the race condition. We read blob first,
594        // and then consensus, but the race could go either way: a blob that is
595        // currently in state could be deleted from both in between the reads,
596        // OR a blob could be written and linked into state in between the
597        // reads. We could do a blob-state-blob sandwich, and then use
598        // differences between the two blob reads to reason about what
599        // specifically happens in a race, but this: (a) takes memory
600        // proportional to `O(blobs)` and (b) is overkill. Instead, we adjust by
601        // category.
602        //
603        // In the event of a discrepancy, we ensure that numbers will only get
604        // smaller (by policy, we prefer to under-count for billing).
605        // Concretely:
606        // - If referenced_bytes (which comes from state) is > not_leaked_bytes
607        //   (which is a subset of what we read from blob), then we've
608        //   definitely hit the race and the funnel doesn't make sense (some of
609        //   the things that are supposed to be smaller are actually bigger).
610        //   Figure out how much we have to fix up the numbers and call it
611        //   "possible_over_count".
612        // - Then go "down" ("up"?) the funnel category by category (each of
613        //   which represented here by diffs from the previous category)
614        //   reducing them until we've adjusted them collectively down by
615        //   "possible_over_count".
616        // - First is not_leaked_not_referenced_bytes (the diff from
617        //   referenced_bytes to not_leaked_bytes).
618        // - Then, if necessary, carry the adjustment to
619        //   referenced_not_current_state_bytes (the diff from
620        //   current_state_bytes to referenced_bytes).
621        // - And so on.
622        // - Note that the largest possible value for possible_over_count is
623        //   referenced_bytes (e.g. if we read nothing from blob). Because all
624        //   the diffs add up to referenced_bytes, we're guaranteed that
625        //   "possible_over_count" will have reached 0 by the time we've
626        //   finished adjusting all the categories.
627        let mut not_leaked_not_referenced_bytes = not_leaked_bytes.saturating_sub(referenced_bytes);
628        let mut possible_over_count = referenced_bytes.saturating_sub(not_leaked_bytes);
629        fn adjust(adjustment: &mut u64, val: &mut u64) {
630            let x = std::cmp::min(*adjustment, *val);
631            *adjustment -= x;
632            *val -= x;
633        }
634        adjust(
635            &mut possible_over_count,
636            &mut not_leaked_not_referenced_bytes,
637        );
638        adjust(
639            &mut possible_over_count,
640            &mut referenced_not_current_state_bytes,
641        );
642        adjust(&mut possible_over_count, &mut current_state_rollups_bytes);
643        adjust(&mut possible_over_count, &mut current_state_batches_bytes);
644        assert_eq!(possible_over_count, 0);
645
646        let ret = ShardUsageAudit {
647            current_state_batches_bytes,
648            current_state_rollups_bytes,
649            referenced_not_current_state_bytes,
650            not_leaked_not_referenced_bytes,
651            leaked_bytes,
652        };
653
654        // These ones are guaranteed to be equal.
655        debug_assert_eq!(ret.total_bytes(), total_bytes);
656        debug_assert_eq!(ret.not_leaked_bytes(), not_leaked_bytes);
657        // The rest might have been reduced because of the race condition.
658        debug_assert!(ret.referenced_bytes() <= referenced_bytes);
659        debug_assert!(ret.current_state_bytes() <= x.current_state_bytes);
660        debug_assert!(ret.current_state_batches_bytes <= x.current_state_batches_bytes);
661        ret
662    }
663}
664
665impl std::fmt::Display for ShardUsageAudit {
666    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
667        write!(
668            f,
669            concat!(
670                "total s3 contents:                  {}\n",
671                "  leaked:                           {}\n",
672                "  not leaked:                       {}\n",
673                "    not leaked not referenced:      {}\n",
674                "    referenced:                     {}\n",
675                "      referenced not current state: {}\n",
676                "      current state:                {}\n",
677                "        current rollups:            {}\n",
678                "        current batches:            {}",
679            ),
680            HumanBytes(self.total_bytes()),
681            HumanBytes(self.leaked_bytes),
682            HumanBytes(self.not_leaked_bytes()),
683            HumanBytes(self.not_leaked_not_referenced_bytes),
684            HumanBytes(self.referenced_bytes()),
685            HumanBytes(self.referenced_not_current_state_bytes),
686            HumanBytes(self.current_state_bytes()),
687            HumanBytes(self.current_state_rollups_bytes),
688            HumanBytes(self.current_state_batches_bytes),
689        )
690    }
691}
692
693pub(crate) struct HumanBytes(pub u64);
694
695impl std::fmt::Display for HumanBytes {
696    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
697        if self.0 < 1_240u64 {
698            return write!(f, "{}B", self.0);
699        }
700        #[allow(clippy::as_conversions)]
701        let mut bytes = self.0 as f64 / 1_024f64;
702        if bytes < 1_240f64 {
703            return write!(f, "{:.1}KiB", bytes);
704        }
705        bytes = bytes / 1_024f64;
706        if bytes < 1_240f64 {
707            return write!(f, "{:.1}MiB", bytes);
708        }
709        bytes = bytes / 1_024f64;
710        if bytes < 1_240f64 {
711            return write!(f, "{:.1}GiB", bytes);
712        }
713        bytes = bytes / 1_024f64;
714        write!(f, "{:.1}TiB", bytes)
715    }
716}
717
718#[cfg(test)]
719mod tests {
720    use bytes::Bytes;
721    use mz_dyncfg::ConfigUpdates;
722    use mz_persist::location::SeqNo;
723    use semver::Version;
724    use timely::progress::Antichain;
725
726    use crate::ShardId;
727    use crate::batch::{
728        BLOB_TARGET_SIZE, BatchBuilderConfig, INLINE_WRITES_SINGLE_MAX_BYTES,
729        INLINE_WRITES_TOTAL_MAX_BYTES,
730    };
731    use crate::internal::paths::{PartialRollupKey, RollupId};
732    use crate::tests::new_test_client;
733
734    use super::*;
735
736    #[mz_persist_proc::test(tokio::test)]
737    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
738    async fn size(dyncfgs: ConfigUpdates) {
739        let data = vec![
740            (("1".to_owned(), "one".to_owned()), 1, 1),
741            (("2".to_owned(), "two".to_owned()), 2, 1),
742            (("3".to_owned(), "three".to_owned()), 3, 1),
743            (("4".to_owned(), "four".to_owned()), 4, 1),
744        ];
745
746        let client = new_test_client(&dyncfgs).await;
747        let inline_writes_enabled = INLINE_WRITES_SINGLE_MAX_BYTES.get(&client.cfg) > 0;
748        let build_version = client.cfg.build_version.clone();
749        let shard_id_one = ShardId::new();
750        let shard_id_two = ShardId::new();
751
752        // write one row into shard 1
753        let (mut write, _) = client
754            .expect_open::<String, String, u64, i64>(shard_id_one)
755            .await;
756        write.expect_append(&data[..1], vec![0], vec![2]).await;
757
758        // write two rows into shard 2 from writer 1
759        let (mut write, _) = client
760            .expect_open::<String, String, u64, i64>(shard_id_two)
761            .await;
762        write.expect_append(&data[1..3], vec![0], vec![4]).await;
763        let writer_one = WriterKey::Id(write.writer_id.clone());
764
765        // write one row into shard 2 from writer 2
766        let (mut write, _) = client
767            .expect_open::<String, String, u64, i64>(shard_id_two)
768            .await;
769        write.expect_append(&data[4..], vec![0], vec![5]).await;
770        let writer_two = WriterKey::Id(write.writer_id.clone());
771
772        let usage = StorageUsageClient::open(client);
773
774        let shard_one_size = usage
775            .size(BlobKeyPrefix::Shard(&shard_id_one))
776            .await
777            .expect("must have shard size");
778        let shard_two_size = usage
779            .size(BlobKeyPrefix::Shard(&shard_id_two))
780            .await
781            .expect("must have shard size");
782        let writer_one_size = usage
783            .size(BlobKeyPrefix::Writer(&shard_id_two, &writer_one))
784            .await
785            .expect("must have shard size");
786        let writer_two_size = usage
787            .size(BlobKeyPrefix::Writer(&shard_id_two, &writer_two))
788            .await
789            .expect("must have shard size");
790        let versioned_size = usage
791            .size(BlobKeyPrefix::Writer(
792                &shard_id_two,
793                &WriterKey::for_version(&build_version),
794            ))
795            .await
796            .expect("must have shard size");
797        let rollups_size = usage
798            .size(BlobKeyPrefix::Rollups(&shard_id_two))
799            .await
800            .expect("must have shard size");
801        let all_size = usage
802            .size(BlobKeyPrefix::All)
803            .await
804            .expect("must have shard size");
805
806        assert!(shard_one_size > 0);
807        assert!(shard_two_size > 0);
808        if inline_writes_enabled {
809            // Allow equality, but only if inline writes are enabled.
810            assert!(shard_one_size <= shard_two_size);
811        } else {
812            assert!(shard_one_size < shard_two_size);
813        }
814        assert_eq!(
815            shard_two_size,
816            writer_one_size + writer_two_size + versioned_size + rollups_size
817        );
818        assert_eq!(all_size, shard_one_size + shard_two_size);
819
820        assert_eq!(
821            usage.shard_usage_audit(shard_id_one).await.total_bytes(),
822            shard_one_size
823        );
824        assert_eq!(
825            usage.shard_usage_audit(shard_id_two).await.total_bytes(),
826            shard_two_size
827        );
828
829        let shards_usage = usage.shards_usage_audit().await;
830        assert_eq!(shards_usage.by_shard.len(), 2);
831        assert_eq!(
832            shards_usage
833                .by_shard
834                .get(&shard_id_one)
835                .map(|x| x.total_bytes()),
836            Some(shard_one_size)
837        );
838        assert_eq!(
839            shards_usage
840                .by_shard
841                .get(&shard_id_two)
842                .map(|x| x.total_bytes()),
843            Some(shard_two_size)
844        );
845    }
846
847    /// This is just a sanity check for the overall flow of computing ShardUsage.
848    /// The edge cases are exercised in separate tests.
849    #[mz_persist_proc::test(tokio::test)]
850    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
851    async fn usage_sanity(dyncfgs: ConfigUpdates) {
852        let data = vec![
853            (("1".to_owned(), "one".to_owned()), 1, 1),
854            (("2".to_owned(), "two".to_owned()), 2, 1),
855            (("3".to_owned(), "three".to_owned()), 3, 1),
856            (("4".to_owned(), "four".to_owned()), 4, 1),
857        ];
858
859        let shard_id = ShardId::new();
860        let mut client = new_test_client(&dyncfgs).await;
861        let inline_writes_enabled = INLINE_WRITES_SINGLE_MAX_BYTES.get(&client.cfg) > 0;
862
863        let (mut write0, _) = client
864            .expect_open::<String, String, u64, i64>(shard_id)
865            .await;
866        // Successfully link in a batch from a writer that stays registered.
867        write0.expect_compare_and_append(&data[..2], 0, 3).await;
868        // Leak a batch from a writer that stays registered.
869        let batch = write0
870            .batch(&data[..2], Antichain::from_elem(0), Antichain::from_elem(3))
871            .await
872            .unwrap();
873        std::mem::forget(batch);
874
875        let (mut write1, _) = client
876            .expect_open::<String, String, u64, i64>(shard_id)
877            .await;
878
879        // Successfully link in a batch from a writer that gets expired.
880        write1.expect_compare_and_append(&data[2..], 3, 5).await;
881        // Leak a batch from a writer that gets expired.
882        let batch = write1
883            .batch(&data[2..], Antichain::from_elem(3), Antichain::from_elem(5))
884            .await
885            .unwrap();
886        std::mem::forget(batch);
887        write1.expire().await;
888
889        // Write a rollup that has an encoded size (the initial rollup has size 0);
890        let maintenance = write0.machine.add_rollup_for_current_seqno().await;
891        maintenance.perform(&write0.machine, &write0.gc).await;
892
893        client.cfg.build_version.minor += 1;
894        let usage = StorageUsageClient::open(client);
895        let shard_usage_audit = usage.shard_usage_audit(shard_id).await;
896        let shard_usage_referenced = usage.shard_usage_referenced(shard_id).await;
897        if !inline_writes_enabled {
898            // We've written data.
899            assert!(shard_usage_audit.current_state_batches_bytes > 0);
900            assert!(shard_usage_referenced.batches_bytes > 0);
901        }
902        // There's always at least one rollup.
903        assert!(shard_usage_audit.current_state_rollups_bytes > 0);
904        assert!(shard_usage_referenced.rollup_bytes > 0);
905        // Sadly, it's tricky (and brittle) to ensure that there is data
906        // referenced by some live state, but no longer referenced by the
907        // current one, so no asserts on referenced_not_current_state_bytes for
908        // now.
909        //
910        // write0 wrote a batch, but never linked it in, but is still active.
911        assert!(shard_usage_audit.not_leaked_not_referenced_bytes > 0);
912        if !inline_writes_enabled {
913            // write0 wrote a batch, but never linked it in, and is now expired.
914            assert!(shard_usage_audit.leaked_bytes > 0);
915        }
916    }
917
918    #[mz_persist_proc::test(tokio::test)]
919    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
920    async fn usage_referenced(dyncfgs: ConfigUpdates) {
921        mz_ore::test::init_logging();
922
923        let data = vec![
924            (("1".to_owned(), "one".to_owned()), 1, 1),
925            (("2".to_owned(), "two".to_owned()), 2, 1),
926            (("3".to_owned(), "three".to_owned()), 3, 1),
927            (("4".to_owned(), "four".to_owned()), 4, 1),
928        ];
929
930        let shard_id = ShardId::new();
931        let mut client = new_test_client(&dyncfgs).await;
932        // make our bookkeeping simple by skipping compaction blobs writes
933        client.cfg.compaction_enabled = false;
934        // make things interesting and create multiple parts per batch
935        client.cfg.set_config(&BLOB_TARGET_SIZE, 0);
936        // Inline write backpressure will change the encoded size, but the CaAB
937        // call consumes the Batch, so we don't have any way of getting the new
938        // one. So, sniff out whether backpressure would flush out the part and
939        // do it before we get the sizes.
940        let backpressure_would_flush = INLINE_WRITES_TOTAL_MAX_BYTES.get(&client.cfg) == 0;
941
942        let (mut write, _read) = client
943            .expect_open::<String, String, u64, i64>(shard_id)
944            .await;
945
946        let mut b1 = write.expect_batch(&data[..2], 0, 3).await;
947        let mut b2 = write.expect_batch(&data[2..], 2, 5).await;
948        if backpressure_would_flush {
949            let cfg = BatchBuilderConfig::new(&client.cfg, shard_id);
950            b1.flush_to_blob(
951                &cfg,
952                &client.metrics.user,
953                &client.isolated_runtime,
954                &write.write_schemas,
955            )
956            .await;
957            b2.flush_to_blob(
958                &cfg,
959                &client.metrics.user,
960                &client.isolated_runtime,
961                &write.write_schemas,
962            )
963            .await;
964        }
965
966        let batches_size =
967            u64::cast_from(b1.batch.encoded_size_bytes() + b2.batch.encoded_size_bytes());
968
969        write
970            .expect_compare_and_append_batch(&mut [&mut b1], 0, 3)
971            .await;
972        write
973            .expect_compare_and_append_batch(&mut [&mut b2], 3, 5)
974            .await;
975
976        let usage = StorageUsageClient::open(client);
977        let shard_usage_referenced = usage.shard_usage_referenced(shard_id).await;
978
979        // with compaction disabled, we can do an exact match on batch part byte size
980        assert_eq!(shard_usage_referenced.batches_bytes, batches_size);
981    }
982
983    struct TestCase {
984        current_state_batches_bytes: u64,
985        current_state_bytes: u64,
986        referenced_other_bytes: u64,
987        referenced_batches_bytes: Vec<(WriterKey, u64)>,
988        min_writer_key: WriterKey,
989        blob_usage_by_writer: Vec<(WriterKey, u64)>,
990        blob_usage_rollups: u64,
991    }
992
993    impl TestCase {
994        #[track_caller]
995        fn run(&self, expected: &str) {
996            let referenced_batches_bytes = self
997                .referenced_batches_bytes
998                .iter()
999                .map(|(id, b)| (id.clone(), *b))
1000                .collect();
1001            let blob_usage = ShardBlobUsage {
1002                by_writer: self
1003                    .blob_usage_by_writer
1004                    .iter()
1005                    .map(|(id, b)| (id.clone(), *b))
1006                    .collect(),
1007                rollup_bytes: self.blob_usage_rollups,
1008            };
1009            let input = ShardUsageCumulativeMaybeRacy {
1010                current_state_batches_bytes: self.current_state_batches_bytes,
1011                current_state_bytes: self.current_state_bytes,
1012                referenced_other_bytes: self.referenced_other_bytes,
1013                referenced_batches_bytes: &referenced_batches_bytes,
1014                minimum_key: self.min_writer_key.clone(),
1015                blob_usage: &blob_usage,
1016            };
1017            let usage = ShardUsageAudit::from(input);
1018            let actual = format!(
1019                "{} {}/{} {}/{} {}/{} {}/{}",
1020                usage.total_bytes(),
1021                usage.leaked_bytes,
1022                usage.not_leaked_bytes(),
1023                usage.not_leaked_not_referenced_bytes,
1024                usage.referenced_bytes(),
1025                usage.referenced_not_current_state_bytes,
1026                usage.current_state_bytes(),
1027                usage.current_state_rollups_bytes,
1028                usage.current_state_batches_bytes
1029            );
1030            assert_eq!(actual, expected);
1031        }
1032    }
1033
1034    fn version(minor: u64) -> WriterKey {
1035        WriterKey::for_version(&Version::new(0, minor, 0))
1036    }
1037
1038    #[mz_ore::test]
1039    fn usage_kitchen_sink() {
1040        TestCase {
1041            // - Some data in current batches
1042            current_state_batches_bytes: 1,
1043            // - Some data in current rollups: this - current_state_batches_bytes
1044            current_state_bytes: 2,
1045            // - Some data in a key we couldn't parse: this-(rollup)
1046            //   - This one is unexpected in prod, but it seemed nicer than a
1047            //     panic, ymmv
1048            referenced_other_bytes: 3,
1049            // - Some data written by a still active writer: (a, 4)
1050            // - Some data written by a now-expired writer: (b, 5)
1051            referenced_batches_bytes: vec![(version(3), 4), (version(2), 5)],
1052            min_writer_key: version(3),
1053            // - Some data leaked by a still active writer: (v3, 7) - (a, 4)
1054            // - Some data leaked by a now-expired writer: (v2, 8) - (b, 5)
1055            blob_usage_by_writer: vec![(version(3), 7), (version(2), 8)],
1056            // - Some data in rollups
1057            blob_usage_rollups: 6,
1058        }
1059        .run("21 3/18 6/12 10/2 1/1");
1060    }
1061
1062    #[mz_ore::test]
1063    fn usage_funnel() {
1064        // All data in current_state_batches_bytes
1065        TestCase {
1066            current_state_batches_bytes: 1,
1067            current_state_bytes: 1,
1068            referenced_other_bytes: 0,
1069            referenced_batches_bytes: vec![(version(3), 1)],
1070            min_writer_key: version(3),
1071            blob_usage_by_writer: vec![(version(3), 1)],
1072            blob_usage_rollups: 0,
1073        }
1074        .run("1 0/1 0/1 0/1 0/1");
1075
1076        // All data in current_state_rollups_bytes
1077        TestCase {
1078            current_state_batches_bytes: 0,
1079            current_state_bytes: 1,
1080            referenced_other_bytes: 0,
1081            referenced_batches_bytes: vec![(version(3), 1)],
1082            min_writer_key: version(3),
1083            blob_usage_by_writer: vec![(version(3), 1)],
1084            blob_usage_rollups: 0,
1085        }
1086        .run("1 0/1 0/1 0/1 1/0");
1087
1088        // All data in referenced_not_current_state_bytes
1089        TestCase {
1090            current_state_batches_bytes: 0,
1091            current_state_bytes: 0,
1092            referenced_other_bytes: 0,
1093            referenced_batches_bytes: vec![(version(3), 1)],
1094            min_writer_key: version(3),
1095            blob_usage_by_writer: vec![(version(3), 1)],
1096            blob_usage_rollups: 0,
1097        }
1098        .run("1 0/1 0/1 1/0 0/0");
1099
1100        // All data in not_leaked_not_referenced_bytes
1101        TestCase {
1102            current_state_batches_bytes: 0,
1103            current_state_bytes: 0,
1104            referenced_other_bytes: 0,
1105            referenced_batches_bytes: vec![],
1106            min_writer_key: version(3),
1107            blob_usage_by_writer: vec![(version(3), 1)],
1108            blob_usage_rollups: 0,
1109        }
1110        .run("1 0/1 1/0 0/0 0/0");
1111
1112        // All data in leaked_bytes
1113        TestCase {
1114            current_state_batches_bytes: 0,
1115            current_state_bytes: 0,
1116            referenced_other_bytes: 0,
1117            referenced_batches_bytes: vec![],
1118            min_writer_key: version(3),
1119            blob_usage_by_writer: vec![(version(2), 1)],
1120            blob_usage_rollups: 0,
1121        }
1122        .run("1 1/0 0/0 0/0 0/0");
1123
1124        // No data
1125        TestCase {
1126            current_state_batches_bytes: 0,
1127            current_state_bytes: 0,
1128            referenced_other_bytes: 0,
1129            referenced_batches_bytes: vec![],
1130            min_writer_key: version(3),
1131            blob_usage_by_writer: vec![],
1132            blob_usage_rollups: 0,
1133        }
1134        .run("0 0/0 0/0 0/0 0/0");
1135    }
1136
1137    #[mz_ore::test]
1138    fn usage_races() {
1139        // We took a snapshot of blob, and then before getting our states, a
1140        // bunch of interesting things happened to persist state. We adjust to
1141        // account for the race down the funnel.
1142
1143        // Base case: no race
1144        TestCase {
1145            current_state_batches_bytes: 2,
1146            current_state_bytes: 4,
1147            referenced_other_bytes: 2,
1148            referenced_batches_bytes: vec![(version(3), 4)],
1149            min_writer_key: version(3),
1150            blob_usage_by_writer: vec![(version(3), 8), (version(2), 2)],
1151            blob_usage_rollups: 0,
1152        }
1153        .run("10 2/8 2/6 2/4 2/2");
1154
1155        // Race was enough to affect into leaked
1156        TestCase {
1157            current_state_batches_bytes: 2,
1158            current_state_bytes: 4,
1159            referenced_other_bytes: 2,
1160            referenced_batches_bytes: vec![(version(3), 4)],
1161            min_writer_key: version(3),
1162            blob_usage_by_writer: vec![(version(3), 8), (version(2), 1)],
1163            blob_usage_rollups: 0,
1164        }
1165        .run("9 1/8 2/6 2/4 2/2");
1166
1167        // Race was enough to affect into not_leaked_not_referenced_bytes
1168        TestCase {
1169            current_state_batches_bytes: 2,
1170            current_state_bytes: 4,
1171            referenced_other_bytes: 2,
1172            referenced_batches_bytes: vec![(version(3), 4)],
1173            min_writer_key: version(3),
1174            blob_usage_by_writer: vec![(version(3), 7)],
1175            blob_usage_rollups: 0,
1176        }
1177        .run("7 0/7 1/6 2/4 2/2");
1178
1179        // Race was enough to affect into referenced_not_current_state_bytes
1180        TestCase {
1181            current_state_batches_bytes: 2,
1182            current_state_bytes: 4,
1183            referenced_other_bytes: 2,
1184            referenced_batches_bytes: vec![(version(3), 4)],
1185            min_writer_key: version(3),
1186            blob_usage_by_writer: vec![(version(3), 5)],
1187            blob_usage_rollups: 0,
1188        }
1189        .run("5 0/5 0/5 1/4 2/2");
1190
1191        // Race was enough to affect into current_state_rollups_bytes
1192        TestCase {
1193            current_state_batches_bytes: 2,
1194            current_state_bytes: 4,
1195            referenced_other_bytes: 2,
1196            referenced_batches_bytes: vec![(version(3), 4)],
1197            min_writer_key: version(3),
1198            blob_usage_by_writer: vec![(version(3), 3)],
1199            blob_usage_rollups: 0,
1200        }
1201        .run("3 0/3 0/3 0/3 1/2");
1202
1203        // Race was enough to affect into current_state_batches_bytes
1204        TestCase {
1205            current_state_batches_bytes: 2,
1206            current_state_bytes: 4,
1207            referenced_other_bytes: 2,
1208            referenced_batches_bytes: vec![(version(3), 4)],
1209            min_writer_key: version(3),
1210            blob_usage_by_writer: vec![(version(3), 1)],
1211            blob_usage_rollups: 0,
1212        }
1213        .run("1 0/1 0/1 0/1 0/1");
1214    }
1215
1216    /// A regression test for (part of) database-issues#5170, which led to seeing the "blob
1217    /// inputs should be cumulative" should be cumulative panic in
1218    /// staging/canary.
1219    #[mz_ore::test]
1220    fn usage_regression_referenced_greater_than_blob() {
1221        TestCase {
1222            current_state_batches_bytes: 0,
1223            current_state_bytes: 0,
1224            referenced_other_bytes: 0,
1225            referenced_batches_bytes: vec![(version(3), 5)],
1226            min_writer_key: version(10),
1227            blob_usage_by_writer: vec![(version(3), 3)],
1228            blob_usage_rollups: 0,
1229        }
1230        .run("3 0/3 0/3 3/0 0/0");
1231    }
1232
1233    /// Regression test for (part of) database-issues#5170, where an interrupted
1234    /// `bin/environmentd --reset` resulted in panic in persist usage code.
1235    ///
1236    /// This also tests a (hypothesized) race that's possible in prod where an
1237    /// initial rollup is written for a shard, but the initial CaS hasn't yet
1238    /// succeeded.
1239    #[mz_persist_proc::test(tokio::test)]
1240    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1241    async fn usage_regression_shard_in_blob_not_consensus(dyncfgs: ConfigUpdates) {
1242        let client = new_test_client(&dyncfgs).await;
1243        let shard_id = ShardId::new();
1244
1245        // Somewhat unsatisfying, we manually construct a rollup blob key.
1246        let key = PartialRollupKey::new(SeqNo(1), &RollupId::new());
1247        let key = key.complete(&shard_id);
1248        let () = client
1249            .blob
1250            .set(&key, Bytes::from(vec![0, 1, 2]))
1251            .await
1252            .unwrap();
1253        let usage = StorageUsageClient::open(client);
1254        let shards_usage = usage.shards_usage_audit().await;
1255        assert_eq!(shards_usage.by_shard.len(), 1);
1256        assert_eq!(
1257            shards_usage.by_shard.get(&shard_id).unwrap().leaked_bytes,
1258            3
1259        );
1260    }
1261}