mz_persist_client/cli/
bench.rs
1use futures_util::stream::StreamExt;
13use futures_util::{TryStreamExt, stream};
14use std::sync::Arc;
15use std::time::Instant;
16
17use mz_persist::indexed::encoding::BlobTraceBatchPart;
18
19use crate::cli::args::StateArgs;
20use crate::internal::state::BatchPart;
21
22#[derive(Debug, clap::Args)]
24pub struct BenchArgs {
25 #[clap(subcommand)]
26 command: Command,
27}
28
29#[derive(Debug, clap::Subcommand)]
31pub(crate) enum Command {
32 S3Fetch(S3FetchArgs),
35}
36
37#[derive(Debug, Clone, clap::Parser)]
40pub struct S3FetchArgs {
41 #[clap(flatten)]
42 shard: StateArgs,
43
44 #[clap(long, default_value_t = 1)]
45 iters: usize,
46
47 #[clap(long)]
48 parse: bool,
49}
50
51pub async fn run(command: BenchArgs) -> Result<(), anyhow::Error> {
53 match command.command {
54 Command::S3Fetch(args) => bench_s3(&args).await?,
55 }
56
57 Ok(())
58}
59
60async fn bench_s3(args: &S3FetchArgs) -> Result<(), anyhow::Error> {
61 let parse = args.parse;
62 let shard_id = args.shard.shard_id();
63 let state_versions = args.shard.open().await?;
64 let versions = state_versions
65 .fetch_recent_live_diffs::<u64>(&shard_id)
66 .await;
67 let state = state_versions
68 .fetch_current_state::<u64>(&shard_id, versions.0)
69 .await;
70 let state = state.check_ts_codec(&shard_id)?;
71 let snap = state
72 .snapshot(state.since())
73 .expect("since should be available for reads");
74
75 let batch_parts: Vec<_> = stream::iter(&snap)
76 .flat_map(|batch| {
77 batch.part_stream(shard_id, &*state_versions.blob, &*state_versions.metrics)
78 })
79 .try_collect()
80 .await?;
81
82 println!("iter,key,size_bytes,fetch_secs,parse_secs");
83 for iter in 0..args.iters {
84 let start = Instant::now();
85 let mut fetches = Vec::new();
86 for part in &batch_parts {
87 let key = match &**part {
88 BatchPart::Hollow(x) => x.key.complete(&shard_id),
89 _ => continue,
90 };
91 let blob = Arc::clone(&state_versions.blob);
92 let metrics = Arc::clone(&state_versions.metrics);
93 let fetch = mz_ore::task::spawn(|| "", async move {
94 let buf = blob.get(&key).await.unwrap().unwrap();
95 let fetch_elapsed = start.elapsed();
96 let buf_len = buf.len();
97 let parse_elapsed = mz_ore::task::spawn_blocking(
98 || "",
99 move || {
100 let start = Instant::now();
101 if parse {
102 BlobTraceBatchPart::<u64>::decode(&buf, &metrics.columnar).unwrap();
103 }
104 start.elapsed()
105 },
106 )
107 .await
108 .unwrap();
109 (
110 key,
111 buf_len,
112 fetch_elapsed.as_secs_f64(),
113 parse_elapsed.as_secs_f64(),
114 )
115 });
116 fetches.push(fetch);
117 }
118 for fetch in fetches {
119 let (key, size_bytes, fetch_secs, parse_secs) = fetch.await.unwrap();
120 println!(
121 "{},{},{},{},{}",
122 iter, key, size_bytes, fetch_secs, parse_secs
123 );
124 }
125 }
126
127 Ok(())
128}