mz_persist_client/cli/
inspect.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! CLI introspection tools for persist
11
12use std::collections::btree_map::Entry;
13use std::collections::{BTreeMap, BTreeSet};
14use std::pin::pin;
15use std::str::FromStr;
16use std::sync::{Arc, Mutex};
17
18use anyhow::anyhow;
19use bytes::{BufMut, Bytes};
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::trace::Description;
22use futures_util::{StreamExt, TryStreamExt};
23use mz_ore::cast::CastFrom;
24use mz_ore::metrics::MetricsRegistry;
25use mz_ore::now::SYSTEM_TIME;
26use mz_ore::url::SensitiveUrl;
27use mz_persist::indexed::encoding::BlobTraceBatchPart;
28use mz_persist_types::codec_impls::TodoSchema;
29use mz_persist_types::{Codec, Codec64, Opaque};
30use mz_proto::RustType;
31use prost::Message;
32use serde_json::json;
33
34use crate::async_runtime::IsolatedRuntime;
35use crate::cache::StateCache;
36use crate::cli::args::{NO_COMMIT, READ_ALL_BUILD_INFO, StateArgs, make_blob, make_consensus};
37use crate::error::CodecConcreteType;
38use crate::fetch::{EncodedPart, FetchConfig};
39use crate::internal::encoding::{Rollup, UntypedState};
40use crate::internal::paths::{
41    BlobKey, BlobKeyPrefix, PartialBatchKey, PartialBlobKey, PartialRollupKey, WriterKey,
42};
43use crate::internal::state::{BatchPart, ProtoRollup, ProtoStateDiff, State};
44use crate::rpc::NoopPubSubSender;
45use crate::usage::{HumanBytes, StorageUsageClient};
46use crate::{Metrics, PersistClient, PersistConfig, ShardId};
47
48/// Commands for read-only inspection of persist state
49#[derive(Debug, clap::Args)]
50pub struct InspectArgs {
51    #[clap(subcommand)]
52    command: Command,
53}
54
55/// Individual subcommands of inspect
56#[derive(Debug, clap::Subcommand)]
57pub(crate) enum Command {
58    /// Prints latest consensus state as JSON
59    State(StateArgs),
60
61    /// Prints latest consensus rollup state as JSON
62    StateRollup(StateRollupArgs),
63
64    /// Prints consensus rollup state of all known rollups as JSON
65    StateRollups(StateArgs),
66
67    /// Prints the count and size of blobs in an environment
68    BlobCount(BlobArgs),
69
70    /// Prints blob batch part contents
71    BlobBatchPart(BlobBatchPartArgs),
72
73    /// Prints consolidated and unconsolidated size, in bytes and update count
74    ConsolidatedSize(StateArgs),
75
76    /// Prints the unreferenced blobs across all shards
77    UnreferencedBlobs(StateArgs),
78
79    /// Prints various statistics about the latest rollups for all the shards in an environment
80    ShardStats(BlobArgs),
81
82    /// Prints information about blob usage for a shard
83    BlobUsage(StateArgs),
84
85    /// Prints each consensus state change as JSON. Output includes the full consensus state
86    /// before and after each state transitions:
87    ///
88    /// ```text
89    /// {
90    ///     "previous": previous_consensus_state,
91    ///     "new": new_consensus_state,
92    /// }
93    /// ```
94    ///
95    /// This is most helpfully consumed using a JSON diff tool like `jd`. A useful incantation
96    /// to show only the changed fields between state transitions:
97    ///
98    /// ```text
99    /// persistcli inspect state-diff --shard-id <shard> --consensus-uri <consensus_uri> |
100    ///     while read diff; do
101    ///         echo $diff | jq '.new' > temp_new
102    ///         echo $diff | jq '.previous' > temp_previous
103    ///         echo $diff | jq '.new.seqno'
104    ///         jd -color -set temp_previous temp_new
105    ///     done
106    /// ```
107    ///
108    #[clap(verbatim_doc_comment)]
109    StateDiff(StateArgs),
110}
111
112/// Runs the given read-only inspect command.
113pub async fn run(command: InspectArgs) -> Result<(), anyhow::Error> {
114    match command.command {
115        Command::State(args) => {
116            let state = fetch_latest_state(&args).await?;
117            println!(
118                "{}",
119                serde_json::to_string_pretty(&state).expect("unserializable state")
120            );
121        }
122        Command::StateRollup(args) => {
123            let state_rollup = fetch_state_rollup(&args).await?;
124            println!(
125                "{}",
126                serde_json::to_string_pretty(&state_rollup).expect("unserializable state")
127            );
128        }
129        Command::StateRollups(args) => {
130            let state_rollups = fetch_state_rollups(&args).await?;
131            println!(
132                "{}",
133                serde_json::to_string_pretty(&state_rollups).expect("unserializable state")
134            );
135        }
136        Command::StateDiff(args) => {
137            let states = fetch_state_diffs(&args).await?;
138            for window in states.windows(2) {
139                println!(
140                    "{}",
141                    json!({
142                        "previous": window[0],
143                        "new": window[1]
144                    })
145                );
146            }
147        }
148        Command::BlobCount(args) => {
149            let blob_counts = blob_counts(&args.blob_uri).await?;
150            println!("{}", json!(blob_counts));
151        }
152        Command::BlobBatchPart(args) => {
153            let shard_id = ShardId::from_str(&args.shard_id).expect("invalid shard id");
154            let updates = blob_batch_part(&args.blob_uri, shard_id, args.key, args.limit).await?;
155            println!("{}", json!(updates));
156        }
157        Command::ConsolidatedSize(args) => {
158            let () = consolidated_size(&args).await?;
159        }
160        Command::UnreferencedBlobs(args) => {
161            let unreferenced_blobs = unreferenced_blobs(&args).await?;
162            println!("{}", json!(unreferenced_blobs));
163        }
164        Command::BlobUsage(args) => {
165            let () = blob_usage(&args).await?;
166        }
167        Command::ShardStats(args) => {
168            shard_stats(&args.blob_uri).await?;
169        }
170    }
171
172    Ok(())
173}
174
175/// Arguments for viewing the state rollup of a shard
176#[derive(Debug, Clone, clap::Parser)]
177pub struct StateRollupArgs {
178    #[clap(flatten)]
179    pub(crate) state: StateArgs,
180
181    /// Inspect the state rollup with the given ID, if available.
182    #[clap(long)]
183    pub(crate) rollup_key: Option<String>,
184}
185
186/// Fetches the current state of a given shard
187pub async fn fetch_latest_state(args: &StateArgs) -> Result<impl serde::Serialize, anyhow::Error> {
188    let shard_id = args.shard_id();
189    let state_versions = args.open().await?;
190    let versions = state_versions
191        .fetch_recent_live_diffs::<u64>(&shard_id)
192        .await;
193    let state = state_versions
194        .fetch_current_state::<u64>(&shard_id, versions.0.clone())
195        .await;
196    Ok(Rollup::from_untyped_state_without_diffs(state).into_proto())
197}
198
199/// Fetches a state rollup of a given shard. If the seqno is not provided, choose the latest;
200/// if the rollup id is not provided, discover it by inspecting state.
201pub async fn fetch_state_rollup(
202    args: &StateRollupArgs,
203) -> Result<impl serde::Serialize, anyhow::Error> {
204    let shard_id = args.state.shard_id();
205    let state_versions = args.state.open().await?;
206
207    let rollup_key = if let Some(rollup_key) = &args.rollup_key {
208        PartialRollupKey(rollup_key.to_owned())
209    } else {
210        let latest_state = state_versions.consensus.head(&shard_id.to_string()).await?;
211        let diff_buf = latest_state.ok_or_else(|| anyhow!("unknown shard"))?;
212        let diff = ProtoStateDiff::decode(diff_buf.data).expect("invalid encoded diff");
213        PartialRollupKey(diff.latest_rollup_key)
214    };
215    let rollup_buf = state_versions
216        .blob
217        .get(&rollup_key.complete(&shard_id))
218        .await?
219        .expect("fetching the specified state rollup");
220    let proto = ProtoRollup::decode(rollup_buf).expect("invalid encoded state");
221    Ok(proto)
222}
223
224/// Fetches the state from all known rollups of a given shard
225pub async fn fetch_state_rollups(args: &StateArgs) -> Result<impl serde::Serialize, anyhow::Error> {
226    let shard_id = args.shard_id();
227    let state_versions = args.open().await?;
228
229    let mut rollup_keys = BTreeSet::new();
230    let mut state_iter = state_versions
231        .fetch_all_live_states::<u64>(shard_id)
232        .await
233        .expect("requested shard should exist")
234        .check_ts_codec()?;
235    while let Some(v) = state_iter.next(|_| {}) {
236        for rollup in v.collections.rollups.values() {
237            rollup_keys.insert(rollup.key.clone());
238        }
239    }
240
241    if rollup_keys.is_empty() {
242        return Err(anyhow!("unknown shard"));
243    }
244
245    let mut rollup_states = BTreeMap::new();
246    for key in rollup_keys {
247        let rollup_buf = state_versions
248            .blob
249            .get(&key.complete(&shard_id))
250            .await
251            .unwrap();
252        if let Some(rollup_buf) = rollup_buf {
253            let proto = ProtoRollup::decode(rollup_buf).expect("invalid encoded state");
254            rollup_states.insert(key.to_string(), proto);
255        }
256    }
257
258    Ok(rollup_states)
259}
260
261/// Fetches each state in a shard
262pub async fn fetch_state_diffs(
263    args: &StateArgs,
264) -> Result<Vec<impl serde::Serialize>, anyhow::Error> {
265    let shard_id = args.shard_id();
266    let state_versions = args.open().await?;
267
268    let mut live_states = vec![];
269    let mut state_iter = state_versions
270        .fetch_all_live_states::<u64>(shard_id)
271        .await
272        .expect("requested shard should exist")
273        .check_ts_codec()?;
274    while let Some(_) = state_iter.next(|_| {}) {
275        live_states.push(state_iter.into_rollup_proto_without_diffs());
276    }
277
278    Ok(live_states)
279}
280
281/// Arguments for viewing contents of a batch part
282#[derive(Debug, Clone, clap::Parser)]
283pub struct BlobBatchPartArgs {
284    /// Shard to view
285    #[clap(long)]
286    shard_id: String,
287
288    /// Blob key (without shard)
289    #[clap(long)]
290    key: String,
291
292    /// Blob to use
293    ///
294    /// When connecting to a deployed environment's blob, the necessary connection glue must be in
295    /// place. e.g. for S3, sign into SSO, set AWS_PROFILE and AWS_REGION appropriately, with a blob
296    /// URI scoped to the environment's bucket prefix.
297    #[clap(long)]
298    blob_uri: SensitiveUrl,
299
300    /// Number of updates to output. Default is unbounded.
301    #[clap(long, default_value = "18446744073709551615")]
302    limit: usize,
303}
304
305#[derive(Debug, serde::Serialize)]
306struct BatchPartOutput {
307    desc: Description<u64>,
308    updates: Vec<BatchPartUpdate>,
309}
310
311#[derive(Debug, serde::Serialize)]
312struct BatchPartUpdate {
313    k: String,
314    v: String,
315    t: u64,
316    d: i64,
317}
318
319/// Fetches the updates in a blob batch part
320pub async fn blob_batch_part(
321    blob_uri: &SensitiveUrl,
322    shard_id: ShardId,
323    partial_key: String,
324    limit: usize,
325) -> Result<impl serde::Serialize, anyhow::Error> {
326    let cfg = PersistConfig::new_default_configs(&READ_ALL_BUILD_INFO, SYSTEM_TIME.clone());
327    let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
328    let blob = make_blob(&cfg, blob_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
329
330    let key = PartialBatchKey(partial_key);
331    let buf = blob
332        .get(&*key.complete(&shard_id))
333        .await
334        .expect("blob exists")
335        .expect("part exists");
336    let parsed = BlobTraceBatchPart::<u64>::decode(&buf, &metrics.columnar).expect("decodable");
337    let desc = parsed.desc.clone();
338
339    let encoded_part = EncodedPart::new(
340        &FetchConfig::from_persist_config(&cfg),
341        metrics.read.snapshot.clone(),
342        parsed.desc.clone(),
343        &key.0,
344        None,
345        parsed,
346    );
347    let mut out = BatchPartOutput {
348        desc,
349        updates: Vec::new(),
350    };
351    let records = encoded_part
352        .updates()
353        .as_part()
354        .ok_or_else(|| anyhow!("expected structured data"))?
355        .as_ord();
356    for (k, v, t, d) in records.iter() {
357        if out.updates.len() > limit {
358            break;
359        }
360        out.updates.push(BatchPartUpdate {
361            k: k.to_string(),
362            v: v.to_string(),
363            t: u64::from_le_bytes(t),
364            d: i64::from_le_bytes(d),
365        });
366    }
367
368    Ok(out)
369}
370
371async fn consolidated_size(args: &StateArgs) -> Result<(), anyhow::Error> {
372    let shard_id = args.shard_id();
373    let state_versions = args.open().await?;
374    let cfg = &state_versions.cfg;
375    let versions = state_versions
376        .fetch_recent_live_diffs::<u64>(&shard_id)
377        .await;
378    let state = state_versions
379        .fetch_current_state::<u64>(&shard_id, versions.0.clone())
380        .await;
381    let state = state.check_ts_codec(&shard_id)?;
382    let shard_metrics = state_versions.metrics.shards.shard(&shard_id, "unknown");
383    // This is odd, but advance by the upper to get maximal consolidation.
384    let as_of = state.upper().borrow();
385
386    let mut parts = Vec::new();
387    for batch in state.collections.trace.batches() {
388        let mut part_stream =
389            pin!(batch.part_stream(shard_id, &*state_versions.blob, &*state_versions.metrics));
390        while let Some(part) = part_stream.try_next().await? {
391            tracing::info!("fetching {}", part.printable_name());
392            let encoded_part = EncodedPart::fetch(
393                &FetchConfig::from_persist_config(cfg),
394                &shard_id,
395                &*state_versions.blob,
396                &state_versions.metrics,
397                &shard_metrics,
398                &state_versions.metrics.read.snapshot,
399                &batch.desc,
400                &part,
401            )
402            .await
403            .expect("part exists");
404            let part = encoded_part.updates();
405            let part = part
406                .as_part()
407                .ok_or_else(|| anyhow!("expected structured data"))?
408                .as_ord();
409            parts.push(part);
410        }
411    }
412
413    let mut updates = vec![];
414    for part in &parts {
415        for (k, v, t, d) in part.iter() {
416            let mut t = <u64 as Codec64>::decode(t);
417            t.advance_by(as_of);
418            let d = <i64 as Codec64>::decode(d);
419            updates.push(((k, v), t, d));
420        }
421    }
422
423    let bytes: usize = updates
424        .iter()
425        .map(|((k, v), _, _)| k.goodbytes() + v.goodbytes())
426        .sum();
427    println!("before: {} updates {} bytes", updates.len(), bytes);
428    differential_dataflow::consolidation::consolidate_updates(&mut updates);
429    let bytes: usize = updates
430        .iter()
431        .map(|((k, v), _, _)| k.goodbytes() + v.goodbytes())
432        .sum();
433    println!("after : {} updates {} bytes", updates.len(), bytes);
434
435    Ok(())
436}
437
438/// Arguments for commands that run only against the blob store.
439#[derive(Debug, Clone, clap::Parser)]
440pub struct BlobArgs {
441    /// Blob to use
442    ///
443    /// When connecting to a deployed environment's blob, the necessary connection glue must be in
444    /// place. e.g. for S3, sign into SSO, set AWS_PROFILE and AWS_REGION appropriately, with a blob
445    /// URI scoped to the environment's bucket prefix.
446    #[clap(long)]
447    blob_uri: SensitiveUrl,
448}
449
450#[derive(Debug, Default, serde::Serialize)]
451struct BlobCounts {
452    batch_part_count: usize,
453    batch_part_bytes: usize,
454    rollup_count: usize,
455    rollup_bytes: usize,
456}
457
458/// Fetches the blob count for given path
459pub async fn blob_counts(blob_uri: &SensitiveUrl) -> Result<impl serde::Serialize, anyhow::Error> {
460    let cfg = PersistConfig::new_default_configs(&READ_ALL_BUILD_INFO, SYSTEM_TIME.clone());
461    let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
462    let blob = make_blob(&cfg, blob_uri, NO_COMMIT, metrics).await?;
463
464    let mut blob_counts = BTreeMap::new();
465    let () = blob
466        .list_keys_and_metadata(&BlobKeyPrefix::All.to_string(), &mut |metadata| {
467            match BlobKey::parse_ids(metadata.key) {
468                Ok((shard, PartialBlobKey::Batch(_, _))) => {
469                    let blob_count = blob_counts.entry(shard).or_insert_with(BlobCounts::default);
470                    blob_count.batch_part_count += 1;
471                    blob_count.batch_part_bytes += usize::cast_from(metadata.size_in_bytes);
472                }
473                Ok((shard, PartialBlobKey::Rollup(_, _))) => {
474                    let blob_count = blob_counts.entry(shard).or_insert_with(BlobCounts::default);
475                    blob_count.rollup_count += 1;
476                    blob_count.rollup_bytes += usize::cast_from(metadata.size_in_bytes);
477                }
478                Err(err) => {
479                    eprintln!("error parsing blob: {}", err);
480                }
481            }
482        })
483        .await?;
484
485    Ok(blob_counts)
486}
487
488/// Rummages through S3 to find the latest rollup for each shard, then calculates summary stats.
489pub async fn shard_stats(blob_uri: &SensitiveUrl) -> anyhow::Result<()> {
490    let cfg = PersistConfig::new_default_configs(&READ_ALL_BUILD_INFO, SYSTEM_TIME.clone());
491    let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
492    let blob = make_blob(&cfg, blob_uri, NO_COMMIT, metrics).await?;
493
494    // Collect the latest rollup for every shard with the given blob_uri
495    let mut rollup_keys = BTreeMap::new();
496    blob.list_keys_and_metadata(&BlobKeyPrefix::All.to_string(), &mut |metadata| {
497        if let Ok((shard, PartialBlobKey::Rollup(seqno, rollup_id))) =
498            BlobKey::parse_ids(metadata.key)
499        {
500            let key = (seqno, rollup_id);
501            match rollup_keys.entry(shard) {
502                Entry::Vacant(v) => {
503                    v.insert(key);
504                }
505                Entry::Occupied(o) => {
506                    if key.0 > o.get().0 {
507                        *o.into_mut() = key;
508                    }
509                }
510            };
511        }
512    })
513    .await?;
514
515    println!(
516        "shard,bytes,parts,runs,batches,empty_batches,longest_run,byte_width,leased_readers,critical_readers,writers"
517    );
518    for (shard, (seqno, rollup)) in rollup_keys {
519        let rollup_key = PartialRollupKey::new(seqno, &rollup).complete(&shard);
520        // Basic stats about the trace.
521        let mut bytes = 0;
522        let mut parts = 0;
523        let mut runs = 0;
524        let mut batches = 0;
525        let mut empty_batches = 0;
526        let mut longest_run = 0;
527        // The sum of the largest part in every run, measured in bytes.
528        // A rough proxy for the worst-case amount of data we'd need to fetch to consolidate
529        // down a single key-value pair.
530        let mut byte_width = 0;
531
532        let Some(rollup) = blob.get(&rollup_key).await? else {
533            // Deleted between listing and now?
534            continue;
535        };
536
537        let state: State<u64> =
538            UntypedState::decode(&cfg.build_version, rollup).check_ts_codec(&shard)?;
539
540        let leased_readers = state.collections.leased_readers.len();
541        let critical_readers = state.collections.critical_readers.len();
542        let writers = state.collections.writers.len();
543
544        state.collections.trace.map_batches(|b| {
545            bytes += b.encoded_size_bytes();
546            parts += b.part_count();
547            batches += 1;
548            if b.is_empty() {
549                empty_batches += 1;
550            }
551            for (_meta, run) in b.runs() {
552                let largest_part = run.iter().map(|p| p.max_part_bytes()).max().unwrap_or(0);
553                runs += 1;
554                longest_run = longest_run.max(run.len());
555                byte_width += largest_part;
556            }
557        });
558        println!(
559            "{shard},{bytes},{parts},{runs},{batches},{empty_batches},{longest_run},{byte_width},{leased_readers},{critical_readers},{writers}"
560        );
561    }
562
563    Ok(())
564}
565
566#[derive(Debug, Default, serde::Serialize)]
567struct UnreferencedBlobs {
568    batch_parts: BTreeSet<PartialBatchKey>,
569    rollups: BTreeSet<PartialRollupKey>,
570}
571
572/// Fetches the unreferenced blobs for given environment
573pub async fn unreferenced_blobs(args: &StateArgs) -> Result<impl serde::Serialize, anyhow::Error> {
574    let shard_id = args.shard_id();
575    let state_versions = args.open().await?;
576
577    let mut all_parts = vec![];
578    let mut all_rollups = vec![];
579    let () = state_versions
580        .blob
581        .list_keys_and_metadata(
582            &BlobKeyPrefix::Shard(&shard_id).to_string(),
583            &mut |metadata| match BlobKey::parse_ids(metadata.key) {
584                Ok((_, PartialBlobKey::Batch(writer, part))) => {
585                    all_parts.push((PartialBatchKey::new(&writer, &part), writer.clone()));
586                }
587                Ok((_, PartialBlobKey::Rollup(seqno, rollup))) => {
588                    all_rollups.push(PartialRollupKey::new(seqno, &rollup));
589                }
590                Err(_) => {}
591            },
592        )
593        .await?;
594
595    let mut state_iter = state_versions
596        .fetch_all_live_states::<u64>(shard_id)
597        .await
598        .expect("requested shard should exist")
599        .check_ts_codec()?;
600
601    let mut known_parts = BTreeSet::new();
602    let mut known_rollups = BTreeSet::new();
603    let mut known_writers = BTreeSet::new();
604    while let Some(v) = state_iter.next(|_| {}) {
605        for writer_id in v.collections.writers.keys() {
606            known_writers.insert(writer_id.clone());
607        }
608        for batch in v.collections.trace.batches() {
609            // TODO: this may end up refetching externally-stored runs once per batch...
610            // but if we have enough parts for this to be a problem, we may need to track a more
611            // efficient state representation.
612            let mut parts =
613                pin!(batch.part_stream(shard_id, &*state_versions.blob, &*state_versions.metrics));
614            while let Some(batch_part) = parts.next().await {
615                match &*batch_part? {
616                    BatchPart::Hollow(x) => known_parts.insert(x.key.clone()),
617                    BatchPart::Inline { .. } => continue,
618                };
619            }
620        }
621        for rollup in v.collections.rollups.values() {
622            known_rollups.insert(rollup.key.clone());
623        }
624    }
625
626    let mut unreferenced_blobs = UnreferencedBlobs::default();
627    // In the future, this is likely to include a "grace period" so recent but non-current
628    // versions are also considered live
629    let minimum_version = WriterKey::for_version(&state_versions.cfg.build_version);
630    for (part, writer) in all_parts {
631        let is_unreferenced = writer < minimum_version;
632        if is_unreferenced && !known_parts.contains(&part) {
633            unreferenced_blobs.batch_parts.insert(part);
634        }
635    }
636    for rollup in all_rollups {
637        if !known_rollups.contains(&rollup) {
638            unreferenced_blobs.rollups.insert(rollup);
639        }
640    }
641
642    Ok(unreferenced_blobs)
643}
644
645/// Returns information about blob usage for a shard
646pub async fn blob_usage(args: &StateArgs) -> Result<(), anyhow::Error> {
647    let shard_id = if args.shard_id.is_empty() {
648        None
649    } else {
650        Some(args.shard_id())
651    };
652    let cfg = PersistConfig::new_default_configs(&READ_ALL_BUILD_INFO, SYSTEM_TIME.clone());
653    let metrics_registry = MetricsRegistry::new();
654    let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
655    let consensus =
656        make_consensus(&cfg, &args.consensus_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
657    let blob = make_blob(&cfg, &args.blob_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
658    let isolated_runtime = Arc::new(IsolatedRuntime::new(&metrics_registry, None));
659    let state_cache = Arc::new(StateCache::new(
660        &cfg,
661        Arc::clone(&metrics),
662        Arc::new(NoopPubSubSender),
663    ));
664    let usage = StorageUsageClient::open(PersistClient::new(
665        cfg,
666        blob,
667        consensus,
668        metrics,
669        isolated_runtime,
670        state_cache,
671        Arc::new(NoopPubSubSender),
672    )?);
673
674    if let Some(shard_id) = shard_id {
675        let usage = usage.shard_usage_audit(shard_id).await;
676        println!("{}\n{}", shard_id, usage);
677    } else {
678        let usage = usage.shards_usage_audit().await;
679        let mut by_shard = usage.by_shard.iter().collect::<Vec<_>>();
680        by_shard.sort_by_key(|(_, x)| x.total_bytes());
681        by_shard.reverse();
682        for (shard_id, usage) in by_shard {
683            println!("{}\n{}\n", shard_id, usage);
684        }
685        println!("unattributable: {}", HumanBytes(usage.unattributable_bytes));
686    }
687
688    Ok(())
689}
690
691/// The following is a very terrible hack that no one should draw inspiration from. Currently State
692/// is generic over <K, V, T, D>, with KVD being represented as phantom data for type safety and to
693/// detect persisted codec mismatches. However, reading persisted States does not require actually
694/// decoding KVD, so we only need their codec _names_ to match, not the full types. For the purposes
695/// of `persistcli inspect`, which only wants to read the persistent data, we create new types that
696/// return static Codec names, and rebind the names if/when we get a CodecMismatch, so we can convince
697/// the type system and our safety checks that we really can read the data.
698
699#[derive(Default, Debug, PartialEq, Eq)]
700pub(crate) struct K;
701#[derive(Default, Debug, PartialEq, Eq)]
702pub(crate) struct V;
703#[derive(Default, Debug, PartialEq, Eq)]
704struct T;
705
706pub(crate) static KVTD_CODECS: Mutex<(String, String, String, String, Option<CodecConcreteType>)> =
707    Mutex::new((
708        String::new(),
709        String::new(),
710        String::new(),
711        String::new(),
712        None,
713    ));
714
715impl Codec for K {
716    type Storage = ();
717    type Schema = TodoSchema<K>;
718
719    fn codec_name() -> String {
720        KVTD_CODECS.lock().expect("lockable").0.clone()
721    }
722
723    fn encode<B>(&self, _buf: &mut B)
724    where
725        B: BufMut,
726    {
727    }
728
729    fn decode(_buf: &[u8], _schema: &TodoSchema<K>) -> Result<Self, String> {
730        Ok(Self)
731    }
732
733    fn encode_schema(_schema: &Self::Schema) -> Bytes {
734        Bytes::new()
735    }
736
737    fn decode_schema(buf: &Bytes) -> Self::Schema {
738        assert_eq!(*buf, Bytes::new());
739        TodoSchema::default()
740    }
741}
742
743impl Codec for V {
744    type Storage = ();
745    type Schema = TodoSchema<V>;
746
747    fn codec_name() -> String {
748        KVTD_CODECS.lock().expect("lockable").1.clone()
749    }
750
751    fn encode<B>(&self, _buf: &mut B)
752    where
753        B: BufMut,
754    {
755    }
756
757    fn decode(_buf: &[u8], _schema: &TodoSchema<V>) -> Result<Self, String> {
758        Ok(Self)
759    }
760
761    fn encode_schema(_schema: &Self::Schema) -> Bytes {
762        Bytes::new()
763    }
764
765    fn decode_schema(buf: &Bytes) -> Self::Schema {
766        assert_eq!(*buf, Bytes::new());
767        TodoSchema::default()
768    }
769}
770
771impl Codec for T {
772    type Storage = ();
773    type Schema = TodoSchema<T>;
774
775    fn codec_name() -> String {
776        KVTD_CODECS.lock().expect("lockable").2.clone()
777    }
778
779    fn encode<B>(&self, _buf: &mut B)
780    where
781        B: BufMut,
782    {
783    }
784
785    fn decode(_buf: &[u8], _schema: &TodoSchema<T>) -> Result<Self, String> {
786        Ok(Self)
787    }
788
789    fn encode_schema(_schema: &Self::Schema) -> Bytes {
790        Bytes::new()
791    }
792
793    fn decode_schema(buf: &Bytes) -> Self::Schema {
794        assert_eq!(*buf, Bytes::new());
795        TodoSchema::default()
796    }
797}
798
799pub(crate) static FAKE_OPAQUE_CODEC: Mutex<String> = Mutex::new(String::new());
800
801#[derive(Debug, Clone, PartialEq)]
802pub(crate) struct O([u8; 8]);
803
804impl Codec64 for O {
805    fn codec_name() -> String {
806        FAKE_OPAQUE_CODEC.lock().expect("lockable").clone()
807    }
808
809    fn encode(&self) -> [u8; 8] {
810        self.0
811    }
812
813    fn decode(buf: [u8; 8]) -> Self {
814        Self(buf)
815    }
816}
817
818impl Opaque for O {
819    fn initial() -> Self {
820        Self([0; 8])
821    }
822}