mz_persist_client/cli/
inspect.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! CLI introspection tools for persist
11
12use std::collections::btree_map::Entry;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt;
15use std::pin::pin;
16use std::str::FromStr;
17use std::sync::{Arc, Mutex};
18
19use anyhow::anyhow;
20use bytes::{BufMut, Bytes};
21use differential_dataflow::difference::{IsZero, Semigroup};
22use differential_dataflow::lattice::Lattice;
23use differential_dataflow::trace::Description;
24use futures_util::{StreamExt, TryStreamExt};
25use mz_ore::cast::CastFrom;
26use mz_ore::metrics::MetricsRegistry;
27use mz_ore::now::SYSTEM_TIME;
28use mz_ore::url::SensitiveUrl;
29use mz_persist::indexed::encoding::BlobTraceBatchPart;
30use mz_persist_types::codec_impls::TodoSchema;
31use mz_persist_types::{Codec, Codec64, Opaque};
32use mz_proto::RustType;
33use prost::Message;
34use serde_json::json;
35
36use crate::async_runtime::IsolatedRuntime;
37use crate::cache::StateCache;
38use crate::cli::args::{NO_COMMIT, READ_ALL_BUILD_INFO, StateArgs, make_blob, make_consensus};
39use crate::error::CodecConcreteType;
40use crate::fetch::EncodedPart;
41use crate::internal::encoding::{Rollup, UntypedState};
42use crate::internal::paths::{
43    BlobKey, BlobKeyPrefix, PartialBatchKey, PartialBlobKey, PartialRollupKey, WriterKey,
44};
45use crate::internal::state::{BatchPart, ProtoRollup, ProtoStateDiff, State};
46use crate::rpc::NoopPubSubSender;
47use crate::usage::{HumanBytes, StorageUsageClient};
48use crate::{Metrics, PersistClient, PersistConfig, ShardId};
49
50/// Commands for read-only inspection of persist state
51#[derive(Debug, clap::Args)]
52pub struct InspectArgs {
53    #[clap(subcommand)]
54    command: Command,
55}
56
57/// Individual subcommands of inspect
58#[derive(Debug, clap::Subcommand)]
59pub(crate) enum Command {
60    /// Prints latest consensus state as JSON
61    State(StateArgs),
62
63    /// Prints latest consensus rollup state as JSON
64    StateRollup(StateRollupArgs),
65
66    /// Prints consensus rollup state of all known rollups as JSON
67    StateRollups(StateArgs),
68
69    /// Prints the count and size of blobs in an environment
70    BlobCount(BlobArgs),
71
72    /// Prints blob batch part contents
73    BlobBatchPart(BlobBatchPartArgs),
74
75    /// Prints consolidated and unconsolidated size, in bytes and update count
76    ConsolidatedSize(StateArgs),
77
78    /// Prints the unreferenced blobs across all shards
79    UnreferencedBlobs(StateArgs),
80
81    /// Prints various statistics about the latest rollups for all the shards in an environment
82    ShardStats(BlobArgs),
83
84    /// Prints information about blob usage for a shard
85    BlobUsage(StateArgs),
86
87    /// Prints each consensus state change as JSON. Output includes the full consensus state
88    /// before and after each state transitions:
89    ///
90    /// ```text
91    /// {
92    ///     "previous": previous_consensus_state,
93    ///     "new": new_consensus_state,
94    /// }
95    /// ```
96    ///
97    /// This is most helpfully consumed using a JSON diff tool like `jd`. A useful incantation
98    /// to show only the changed fields between state transitions:
99    ///
100    /// ```text
101    /// persistcli inspect state-diff --shard-id <shard> --consensus-uri <consensus_uri> |
102    ///     while read diff; do
103    ///         echo $diff | jq '.new' > temp_new
104    ///         echo $diff | jq '.previous' > temp_previous
105    ///         echo $diff | jq '.new.seqno'
106    ///         jd -color -set temp_previous temp_new
107    ///     done
108    /// ```
109    ///
110    #[clap(verbatim_doc_comment)]
111    StateDiff(StateArgs),
112}
113
114/// Runs the given read-only inspect command.
115pub async fn run(command: InspectArgs) -> Result<(), anyhow::Error> {
116    match command.command {
117        Command::State(args) => {
118            let state = fetch_latest_state(&args).await?;
119            println!(
120                "{}",
121                serde_json::to_string_pretty(&state).expect("unserializable state")
122            );
123        }
124        Command::StateRollup(args) => {
125            let state_rollup = fetch_state_rollup(&args).await?;
126            println!(
127                "{}",
128                serde_json::to_string_pretty(&state_rollup).expect("unserializable state")
129            );
130        }
131        Command::StateRollups(args) => {
132            let state_rollups = fetch_state_rollups(&args).await?;
133            println!(
134                "{}",
135                serde_json::to_string_pretty(&state_rollups).expect("unserializable state")
136            );
137        }
138        Command::StateDiff(args) => {
139            let states = fetch_state_diffs(&args).await?;
140            for window in states.windows(2) {
141                println!(
142                    "{}",
143                    json!({
144                        "previous": window[0],
145                        "new": window[1]
146                    })
147                );
148            }
149        }
150        Command::BlobCount(args) => {
151            let blob_counts = blob_counts(&args.blob_uri).await?;
152            println!("{}", json!(blob_counts));
153        }
154        Command::BlobBatchPart(args) => {
155            let shard_id = ShardId::from_str(&args.shard_id).expect("invalid shard id");
156            let updates = blob_batch_part(&args.blob_uri, shard_id, args.key, args.limit).await?;
157            println!("{}", json!(updates));
158        }
159        Command::ConsolidatedSize(args) => {
160            let () = consolidated_size(&args).await?;
161        }
162        Command::UnreferencedBlobs(args) => {
163            let unreferenced_blobs = unreferenced_blobs(&args).await?;
164            println!("{}", json!(unreferenced_blobs));
165        }
166        Command::BlobUsage(args) => {
167            let () = blob_usage(&args).await?;
168        }
169        Command::ShardStats(args) => {
170            shard_stats(&args.blob_uri).await?;
171        }
172    }
173
174    Ok(())
175}
176
177/// Arguments for viewing the state rollup of a shard
178#[derive(Debug, Clone, clap::Parser)]
179pub struct StateRollupArgs {
180    #[clap(flatten)]
181    pub(crate) state: StateArgs,
182
183    /// Inspect the state rollup with the given ID, if available.
184    #[clap(long)]
185    pub(crate) rollup_key: Option<String>,
186}
187
188/// Fetches the current state of a given shard
189pub async fn fetch_latest_state(args: &StateArgs) -> Result<impl serde::Serialize, anyhow::Error> {
190    let shard_id = args.shard_id();
191    let state_versions = args.open().await?;
192    let versions = state_versions
193        .fetch_recent_live_diffs::<u64>(&shard_id)
194        .await;
195    let state = state_versions
196        .fetch_current_state::<u64>(&shard_id, versions.0.clone())
197        .await;
198    Ok(Rollup::from_untyped_state_without_diffs(state).into_proto())
199}
200
201/// Fetches a state rollup of a given shard. If the seqno is not provided, choose the latest;
202/// if the rollup id is not provided, discover it by inspecting state.
203pub async fn fetch_state_rollup(
204    args: &StateRollupArgs,
205) -> Result<impl serde::Serialize, anyhow::Error> {
206    let shard_id = args.state.shard_id();
207    let state_versions = args.state.open().await?;
208
209    let rollup_key = if let Some(rollup_key) = &args.rollup_key {
210        PartialRollupKey(rollup_key.to_owned())
211    } else {
212        let latest_state = state_versions.consensus.head(&shard_id.to_string()).await?;
213        let diff_buf = latest_state.ok_or_else(|| anyhow!("unknown shard"))?;
214        let diff = ProtoStateDiff::decode(diff_buf.data).expect("invalid encoded diff");
215        PartialRollupKey(diff.latest_rollup_key)
216    };
217    let rollup_buf = state_versions
218        .blob
219        .get(&rollup_key.complete(&shard_id))
220        .await?
221        .expect("fetching the specified state rollup");
222    let proto = ProtoRollup::decode(rollup_buf).expect("invalid encoded state");
223    Ok(proto)
224}
225
226/// Fetches the state from all known rollups of a given shard
227pub async fn fetch_state_rollups(args: &StateArgs) -> Result<impl serde::Serialize, anyhow::Error> {
228    let shard_id = args.shard_id();
229    let state_versions = args.open().await?;
230
231    let mut rollup_keys = BTreeSet::new();
232    let mut state_iter = state_versions
233        .fetch_all_live_states::<u64>(shard_id)
234        .await
235        .expect("requested shard should exist")
236        .check_ts_codec()?;
237    while let Some(v) = state_iter.next(|_| {}) {
238        for rollup in v.collections.rollups.values() {
239            rollup_keys.insert(rollup.key.clone());
240        }
241    }
242
243    if rollup_keys.is_empty() {
244        return Err(anyhow!("unknown shard"));
245    }
246
247    let mut rollup_states = BTreeMap::new();
248    for key in rollup_keys {
249        let rollup_buf = state_versions
250            .blob
251            .get(&key.complete(&shard_id))
252            .await
253            .unwrap();
254        if let Some(rollup_buf) = rollup_buf {
255            let proto = ProtoRollup::decode(rollup_buf).expect("invalid encoded state");
256            rollup_states.insert(key.to_string(), proto);
257        }
258    }
259
260    Ok(rollup_states)
261}
262
263/// Fetches each state in a shard
264pub async fn fetch_state_diffs(
265    args: &StateArgs,
266) -> Result<Vec<impl serde::Serialize>, anyhow::Error> {
267    let shard_id = args.shard_id();
268    let state_versions = args.open().await?;
269
270    let mut live_states = vec![];
271    let mut state_iter = state_versions
272        .fetch_all_live_states::<u64>(shard_id)
273        .await
274        .expect("requested shard should exist")
275        .check_ts_codec()?;
276    while let Some(_) = state_iter.next(|_| {}) {
277        live_states.push(state_iter.into_rollup_proto_without_diffs());
278    }
279
280    Ok(live_states)
281}
282
283/// Arguments for viewing contents of a batch part
284#[derive(Debug, Clone, clap::Parser)]
285pub struct BlobBatchPartArgs {
286    /// Shard to view
287    #[clap(long)]
288    shard_id: String,
289
290    /// Blob key (without shard)
291    #[clap(long)]
292    key: String,
293
294    /// Blob to use
295    ///
296    /// When connecting to a deployed environment's blob, the necessary connection glue must be in
297    /// place. e.g. for S3, sign into SSO, set AWS_PROFILE and AWS_REGION appropriately, with a blob
298    /// URI scoped to the environment's bucket prefix.
299    #[clap(long)]
300    blob_uri: SensitiveUrl,
301
302    /// Number of updates to output. Default is unbounded.
303    #[clap(long, default_value = "18446744073709551615")]
304    limit: usize,
305}
306
307#[derive(Debug, serde::Serialize)]
308struct BatchPartOutput {
309    desc: Description<u64>,
310    updates: Vec<BatchPartUpdate>,
311}
312
313#[derive(Debug, serde::Serialize)]
314struct BatchPartUpdate {
315    k: String,
316    v: String,
317    t: u64,
318    d: i64,
319}
320
321#[derive(PartialOrd, Ord, PartialEq, Eq)]
322struct PrettyBytes<'a>(&'a [u8]);
323
324impl fmt::Debug for PrettyBytes<'_> {
325    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
326        match std::str::from_utf8(self.0) {
327            Ok(x) => fmt::Debug::fmt(x, f),
328            Err(_) => fmt::Debug::fmt(self.0, f),
329        }
330    }
331}
332
333/// Fetches the updates in a blob batch part
334pub async fn blob_batch_part(
335    blob_uri: &SensitiveUrl,
336    shard_id: ShardId,
337    partial_key: String,
338    limit: usize,
339) -> Result<impl serde::Serialize, anyhow::Error> {
340    let cfg = PersistConfig::new_default_configs(&READ_ALL_BUILD_INFO, SYSTEM_TIME.clone());
341    let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
342    let blob = make_blob(&cfg, blob_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
343
344    let key = PartialBatchKey(partial_key);
345    let buf = blob
346        .get(&*key.complete(&shard_id))
347        .await
348        .expect("blob exists")
349        .expect("part exists");
350    let parsed = BlobTraceBatchPart::<u64>::decode(&buf, &metrics.columnar).expect("decodable");
351    let desc = parsed.desc.clone();
352
353    let encoded_part = EncodedPart::new(
354        metrics.read.snapshot.clone(),
355        parsed.desc.clone(),
356        &key.0,
357        None,
358        parsed,
359    );
360    let mut out = BatchPartOutput {
361        desc,
362        updates: Vec::new(),
363    };
364    let records = encoded_part.normalize(&metrics.columnar);
365    for ((k, v), t, d) in records
366        .records()
367        .expect("only implemented for records")
368        .iter()
369    {
370        if out.updates.len() > limit {
371            break;
372        }
373        out.updates.push(BatchPartUpdate {
374            k: format!("{:?}", PrettyBytes(k)),
375            v: format!("{:?}", PrettyBytes(v)),
376            t: u64::from_le_bytes(t),
377            d: i64::from_le_bytes(d),
378        });
379    }
380
381    Ok(out)
382}
383
384async fn consolidated_size(args: &StateArgs) -> Result<(), anyhow::Error> {
385    let shard_id = args.shard_id();
386    let state_versions = args.open().await?;
387    let versions = state_versions
388        .fetch_recent_live_diffs::<u64>(&shard_id)
389        .await;
390    let state = state_versions
391        .fetch_current_state::<u64>(&shard_id, versions.0.clone())
392        .await;
393    let state = state.check_ts_codec(&shard_id)?;
394    let shard_metrics = state_versions.metrics.shards.shard(&shard_id, "unknown");
395    // This is odd, but advance by the upper to get maximal consolidation.
396    let as_of = state.upper().borrow();
397
398    let mut updates = Vec::new();
399    for batch in state.collections.trace.batches() {
400        let mut part_stream =
401            pin!(batch.part_stream(shard_id, &*state_versions.blob, &*state_versions.metrics));
402        while let Some(part) = part_stream.try_next().await? {
403            tracing::info!("fetching {}", part.printable_name());
404            let encoded_part = EncodedPart::fetch(
405                &shard_id,
406                &*state_versions.blob,
407                &state_versions.metrics,
408                &shard_metrics,
409                &state_versions.metrics.read.snapshot,
410                &batch.desc,
411                &part,
412            )
413            .await
414            .expect("part exists");
415            let part = encoded_part.normalize(&state_versions.metrics.columnar);
416            for ((k, v), t, d) in part.records().expect("codec records").iter() {
417                let mut t = <u64 as Codec64>::decode(t);
418                t.advance_by(as_of);
419                let d = <i64 as Codec64>::decode(d);
420                updates.push(((k.to_owned(), v.to_owned()), t, d));
421            }
422        }
423    }
424
425    let bytes: usize = updates.iter().map(|((k, v), _, _)| k.len() + v.len()).sum();
426    println!("before: {} updates {} bytes", updates.len(), bytes);
427    differential_dataflow::consolidation::consolidate_updates(&mut updates);
428    let bytes: usize = updates.iter().map(|((k, v), _, _)| k.len() + v.len()).sum();
429    println!("after : {} updates {} bytes", updates.len(), bytes);
430
431    Ok(())
432}
433
434/// Arguments for commands that run only against the blob store.
435#[derive(Debug, Clone, clap::Parser)]
436pub struct BlobArgs {
437    /// Blob to use
438    ///
439    /// When connecting to a deployed environment's blob, the necessary connection glue must be in
440    /// place. e.g. for S3, sign into SSO, set AWS_PROFILE and AWS_REGION appropriately, with a blob
441    /// URI scoped to the environment's bucket prefix.
442    #[clap(long)]
443    blob_uri: SensitiveUrl,
444}
445
446#[derive(Debug, Default, serde::Serialize)]
447struct BlobCounts {
448    batch_part_count: usize,
449    batch_part_bytes: usize,
450    rollup_count: usize,
451    rollup_bytes: usize,
452}
453
454/// Fetches the blob count for given path
455pub async fn blob_counts(blob_uri: &SensitiveUrl) -> Result<impl serde::Serialize, anyhow::Error> {
456    let cfg = PersistConfig::new_default_configs(&READ_ALL_BUILD_INFO, SYSTEM_TIME.clone());
457    let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
458    let blob = make_blob(&cfg, blob_uri, NO_COMMIT, metrics).await?;
459
460    let mut blob_counts = BTreeMap::new();
461    let () = blob
462        .list_keys_and_metadata(&BlobKeyPrefix::All.to_string(), &mut |metadata| {
463            match BlobKey::parse_ids(metadata.key) {
464                Ok((shard, PartialBlobKey::Batch(_, _))) => {
465                    let blob_count = blob_counts.entry(shard).or_insert_with(BlobCounts::default);
466                    blob_count.batch_part_count += 1;
467                    blob_count.batch_part_bytes += usize::cast_from(metadata.size_in_bytes);
468                }
469                Ok((shard, PartialBlobKey::Rollup(_, _))) => {
470                    let blob_count = blob_counts.entry(shard).or_insert_with(BlobCounts::default);
471                    blob_count.rollup_count += 1;
472                    blob_count.rollup_bytes += usize::cast_from(metadata.size_in_bytes);
473                }
474                Err(err) => {
475                    eprintln!("error parsing blob: {}", err);
476                }
477            }
478        })
479        .await?;
480
481    Ok(blob_counts)
482}
483
484/// Rummages through S3 to find the latest rollup for each shard, then calculates summary stats.
485pub async fn shard_stats(blob_uri: &SensitiveUrl) -> anyhow::Result<()> {
486    let cfg = PersistConfig::new_default_configs(&READ_ALL_BUILD_INFO, SYSTEM_TIME.clone());
487    let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
488    let blob = make_blob(&cfg, blob_uri, NO_COMMIT, metrics).await?;
489
490    // Collect the latest rollup for every shard with the given blob_uri
491    let mut rollup_keys = BTreeMap::new();
492    blob.list_keys_and_metadata(&BlobKeyPrefix::All.to_string(), &mut |metadata| {
493        if let Ok((shard, PartialBlobKey::Rollup(seqno, rollup_id))) =
494            BlobKey::parse_ids(metadata.key)
495        {
496            let key = (seqno, rollup_id);
497            match rollup_keys.entry(shard) {
498                Entry::Vacant(v) => {
499                    v.insert(key);
500                }
501                Entry::Occupied(o) => {
502                    if key.0 > o.get().0 {
503                        *o.into_mut() = key;
504                    }
505                }
506            };
507        }
508    })
509    .await?;
510
511    println!(
512        "shard,bytes,parts,runs,batches,empty_batches,longest_run,byte_width,leased_readers,critical_readers,writers"
513    );
514    for (shard, (seqno, rollup)) in rollup_keys {
515        let rollup_key = PartialRollupKey::new(seqno, &rollup).complete(&shard);
516        // Basic stats about the trace.
517        let mut bytes = 0;
518        let mut parts = 0;
519        let mut runs = 0;
520        let mut batches = 0;
521        let mut empty_batches = 0;
522        let mut longest_run = 0;
523        // The sum of the largest part in every run, measured in bytes.
524        // A rough proxy for the worst-case amount of data we'd need to fetch to consolidate
525        // down a single key-value pair.
526        let mut byte_width = 0;
527
528        let Some(rollup) = blob.get(&rollup_key).await? else {
529            // Deleted between listing and now?
530            continue;
531        };
532
533        let state: State<u64> =
534            UntypedState::decode(&cfg.build_version, rollup).check_ts_codec(&shard)?;
535
536        let leased_readers = state.collections.leased_readers.len();
537        let critical_readers = state.collections.critical_readers.len();
538        let writers = state.collections.writers.len();
539
540        state.collections.trace.map_batches(|b| {
541            bytes += b.encoded_size_bytes();
542            parts += b.part_count();
543            batches += 1;
544            if b.is_empty() {
545                empty_batches += 1;
546            }
547            for (_meta, run) in b.runs() {
548                let largest_part = run.iter().map(|p| p.max_part_bytes()).max().unwrap_or(0);
549                runs += 1;
550                longest_run = longest_run.max(run.len());
551                byte_width += largest_part;
552            }
553        });
554        println!(
555            "{shard},{bytes},{parts},{runs},{batches},{empty_batches},{longest_run},{byte_width},{leased_readers},{critical_readers},{writers}"
556        );
557    }
558
559    Ok(())
560}
561
562#[derive(Debug, Default, serde::Serialize)]
563struct UnreferencedBlobs {
564    batch_parts: BTreeSet<PartialBatchKey>,
565    rollups: BTreeSet<PartialRollupKey>,
566}
567
568/// Fetches the unreferenced blobs for given environment
569pub async fn unreferenced_blobs(args: &StateArgs) -> Result<impl serde::Serialize, anyhow::Error> {
570    let shard_id = args.shard_id();
571    let state_versions = args.open().await?;
572
573    let mut all_parts = vec![];
574    let mut all_rollups = vec![];
575    let () = state_versions
576        .blob
577        .list_keys_and_metadata(
578            &BlobKeyPrefix::Shard(&shard_id).to_string(),
579            &mut |metadata| match BlobKey::parse_ids(metadata.key) {
580                Ok((_, PartialBlobKey::Batch(writer, part))) => {
581                    all_parts.push((PartialBatchKey::new(&writer, &part), writer.clone()));
582                }
583                Ok((_, PartialBlobKey::Rollup(seqno, rollup))) => {
584                    all_rollups.push(PartialRollupKey::new(seqno, &rollup));
585                }
586                Err(_) => {}
587            },
588        )
589        .await?;
590
591    let mut state_iter = state_versions
592        .fetch_all_live_states::<u64>(shard_id)
593        .await
594        .expect("requested shard should exist")
595        .check_ts_codec()?;
596
597    let mut known_parts = BTreeSet::new();
598    let mut known_rollups = BTreeSet::new();
599    let mut known_writers = BTreeSet::new();
600    while let Some(v) = state_iter.next(|_| {}) {
601        for writer_id in v.collections.writers.keys() {
602            known_writers.insert(writer_id.clone());
603        }
604        for batch in v.collections.trace.batches() {
605            // TODO: this may end up refetching externally-stored runs once per batch...
606            // but if we have enough parts for this to be a problem, we may need to track a more
607            // efficient state representation.
608            let mut parts =
609                pin!(batch.part_stream(shard_id, &*state_versions.blob, &*state_versions.metrics));
610            while let Some(batch_part) = parts.next().await {
611                match &*batch_part? {
612                    BatchPart::Hollow(x) => known_parts.insert(x.key.clone()),
613                    BatchPart::Inline { .. } => continue,
614                };
615            }
616        }
617        for rollup in v.collections.rollups.values() {
618            known_rollups.insert(rollup.key.clone());
619        }
620    }
621
622    let mut unreferenced_blobs = UnreferencedBlobs::default();
623    // In the future, this is likely to include a "grace period" so recent but non-current
624    // versions are also considered live
625    let minimum_version = WriterKey::for_version(&state_versions.cfg.build_version);
626    for (part, writer) in all_parts {
627        let is_unreferenced = writer < minimum_version;
628        if is_unreferenced && !known_parts.contains(&part) {
629            unreferenced_blobs.batch_parts.insert(part);
630        }
631    }
632    for rollup in all_rollups {
633        if !known_rollups.contains(&rollup) {
634            unreferenced_blobs.rollups.insert(rollup);
635        }
636    }
637
638    Ok(unreferenced_blobs)
639}
640
641/// Returns information about blob usage for a shard
642pub async fn blob_usage(args: &StateArgs) -> Result<(), anyhow::Error> {
643    let shard_id = if args.shard_id.is_empty() {
644        None
645    } else {
646        Some(args.shard_id())
647    };
648    let cfg = PersistConfig::new_default_configs(&READ_ALL_BUILD_INFO, SYSTEM_TIME.clone());
649    let metrics_registry = MetricsRegistry::new();
650    let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
651    let consensus =
652        make_consensus(&cfg, &args.consensus_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
653    let blob = make_blob(&cfg, &args.blob_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
654    let isolated_runtime = Arc::new(IsolatedRuntime::new(&metrics_registry, None));
655    let state_cache = Arc::new(StateCache::new(
656        &cfg,
657        Arc::clone(&metrics),
658        Arc::new(NoopPubSubSender),
659    ));
660    let usage = StorageUsageClient::open(PersistClient::new(
661        cfg,
662        blob,
663        consensus,
664        metrics,
665        isolated_runtime,
666        state_cache,
667        Arc::new(NoopPubSubSender),
668    )?);
669
670    if let Some(shard_id) = shard_id {
671        let usage = usage.shard_usage_audit(shard_id).await;
672        println!("{}\n{}", shard_id, usage);
673    } else {
674        let usage = usage.shards_usage_audit().await;
675        let mut by_shard = usage.by_shard.iter().collect::<Vec<_>>();
676        by_shard.sort_by_key(|(_, x)| x.total_bytes());
677        by_shard.reverse();
678        for (shard_id, usage) in by_shard {
679            println!("{}\n{}\n", shard_id, usage);
680        }
681        println!("unattributable: {}", HumanBytes(usage.unattributable_bytes));
682    }
683
684    Ok(())
685}
686
687/// The following is a very terrible hack that no one should draw inspiration from. Currently State
688/// is generic over <K, V, T, D>, with KVD being represented as phantom data for type safety and to
689/// detect persisted codec mismatches. However, reading persisted States does not require actually
690/// decoding KVD, so we only need their codec _names_ to match, not the full types. For the purposes
691/// of `persistcli inspect`, which only wants to read the persistent data, we create new types that
692/// return static Codec names, and rebind the names if/when we get a CodecMismatch, so we can convince
693/// the type system and our safety checks that we really can read the data.
694
695#[derive(Default, Debug, PartialEq, Eq)]
696pub(crate) struct K;
697#[derive(Default, Debug, PartialEq, Eq)]
698pub(crate) struct V;
699#[derive(Default, Debug, PartialEq, Eq)]
700struct T;
701#[derive(Default, Debug, Clone, Eq, PartialEq, PartialOrd, Ord)]
702struct D(i64);
703
704pub(crate) static KVTD_CODECS: Mutex<(String, String, String, String, Option<CodecConcreteType>)> =
705    Mutex::new((
706        String::new(),
707        String::new(),
708        String::new(),
709        String::new(),
710        None,
711    ));
712
713impl Codec for K {
714    type Storage = ();
715    type Schema = TodoSchema<K>;
716
717    fn codec_name() -> String {
718        KVTD_CODECS.lock().expect("lockable").0.clone()
719    }
720
721    fn encode<B>(&self, _buf: &mut B)
722    where
723        B: BufMut,
724    {
725    }
726
727    fn decode(_buf: &[u8], _schema: &TodoSchema<K>) -> Result<Self, String> {
728        Ok(Self)
729    }
730
731    fn encode_schema(_schema: &Self::Schema) -> Bytes {
732        Bytes::new()
733    }
734
735    fn decode_schema(buf: &Bytes) -> Self::Schema {
736        assert_eq!(*buf, Bytes::new());
737        TodoSchema::default()
738    }
739}
740
741impl Codec for V {
742    type Storage = ();
743    type Schema = TodoSchema<V>;
744
745    fn codec_name() -> String {
746        KVTD_CODECS.lock().expect("lockable").1.clone()
747    }
748
749    fn encode<B>(&self, _buf: &mut B)
750    where
751        B: BufMut,
752    {
753    }
754
755    fn decode(_buf: &[u8], _schema: &TodoSchema<V>) -> Result<Self, String> {
756        Ok(Self)
757    }
758
759    fn encode_schema(_schema: &Self::Schema) -> Bytes {
760        Bytes::new()
761    }
762
763    fn decode_schema(buf: &Bytes) -> Self::Schema {
764        assert_eq!(*buf, Bytes::new());
765        TodoSchema::default()
766    }
767}
768
769impl Codec for T {
770    type Storage = ();
771    type Schema = TodoSchema<T>;
772
773    fn codec_name() -> String {
774        KVTD_CODECS.lock().expect("lockable").2.clone()
775    }
776
777    fn encode<B>(&self, _buf: &mut B)
778    where
779        B: BufMut,
780    {
781    }
782
783    fn decode(_buf: &[u8], _schema: &TodoSchema<T>) -> Result<Self, String> {
784        Ok(Self)
785    }
786
787    fn encode_schema(_schema: &Self::Schema) -> Bytes {
788        Bytes::new()
789    }
790
791    fn decode_schema(buf: &Bytes) -> Self::Schema {
792        assert_eq!(*buf, Bytes::new());
793        TodoSchema::default()
794    }
795}
796
797impl Codec64 for D {
798    fn codec_name() -> String {
799        KVTD_CODECS.lock().expect("lockable").3.clone()
800    }
801
802    fn encode(&self) -> [u8; 8] {
803        [0; 8]
804    }
805
806    fn decode(_buf: [u8; 8]) -> Self {
807        Self(0)
808    }
809}
810
811impl Semigroup for D {
812    fn plus_equals(&mut self, _rhs: &Self) {}
813}
814
815impl IsZero for D {
816    fn is_zero(&self) -> bool {
817        false
818    }
819}
820
821pub(crate) static FAKE_OPAQUE_CODEC: Mutex<String> = Mutex::new(String::new());
822
823#[derive(Debug, Clone, PartialEq)]
824pub(crate) struct O([u8; 8]);
825
826impl Codec64 for O {
827    fn codec_name() -> String {
828        FAKE_OPAQUE_CODEC.lock().expect("lockable").clone()
829    }
830
831    fn encode(&self) -> [u8; 8] {
832        self.0
833    }
834
835    fn decode(buf: [u8; 8]) -> Self {
836        Self(buf)
837    }
838}
839
840impl Opaque for O {
841    fn initial() -> Self {
842        Self([0; 8])
843    }
844}