mz_persist_client/cli/
bench.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! CLI benchmarking tools for persist
11
12use futures_util::stream::StreamExt;
13use futures_util::{TryStreamExt, stream};
14use std::sync::Arc;
15use std::time::Instant;
16
17use mz_persist::indexed::encoding::BlobTraceBatchPart;
18
19use crate::cli::args::StateArgs;
20use crate::internal::state::BatchPart;
21
22/// Commands for read-only inspection of persist state
23#[derive(Debug, clap::Args)]
24pub struct BenchArgs {
25    #[clap(subcommand)]
26    command: Command,
27}
28
29/// Individual subcommands of bench
30#[derive(Debug, clap::Subcommand)]
31pub(crate) enum Command {
32    /// Fetch the blobs in a shard as quickly as possible, repeated some number
33    /// of times.
34    S3Fetch(S3FetchArgs),
35}
36
37/// Fetch the blobs in a shard as quickly as possible, repeated some number of
38/// times.
39#[derive(Debug, Clone, clap::Parser)]
40pub struct S3FetchArgs {
41    #[clap(flatten)]
42    shard: StateArgs,
43
44    #[clap(long, default_value_t = 1)]
45    iters: usize,
46
47    #[clap(long)]
48    parse: bool,
49}
50
51/// Runs the given bench command.
52pub async fn run(command: BenchArgs) -> Result<(), anyhow::Error> {
53    match command.command {
54        Command::S3Fetch(args) => bench_s3(&args).await?,
55    }
56
57    Ok(())
58}
59
60async fn bench_s3(args: &S3FetchArgs) -> Result<(), anyhow::Error> {
61    let parse = args.parse;
62    let shard_id = args.shard.shard_id();
63    let state_versions = args.shard.open().await?;
64    let versions = state_versions
65        .fetch_recent_live_diffs::<u64>(&shard_id)
66        .await;
67    let state = state_versions
68        .fetch_current_state::<u64>(&shard_id, versions.0)
69        .await;
70    let state = state.check_ts_codec(&shard_id)?;
71    let snap = state
72        .snapshot(state.since())
73        .expect("since should be available for reads");
74
75    let batch_parts: Vec<_> = stream::iter(&snap)
76        .flat_map(|batch| {
77            batch.part_stream(shard_id, &*state_versions.blob, &*state_versions.metrics)
78        })
79        .try_collect()
80        .await?;
81
82    println!("iter,key,size_bytes,fetch_secs,parse_secs");
83    for iter in 0..args.iters {
84        let start = Instant::now();
85        let mut fetches = Vec::new();
86        for part in &batch_parts {
87            let key = match &**part {
88                BatchPart::Hollow(x) => x.key.complete(&shard_id),
89                _ => continue,
90            };
91            let blob = Arc::clone(&state_versions.blob);
92            let metrics = Arc::clone(&state_versions.metrics);
93            let fetch = mz_ore::task::spawn(|| "", async move {
94                let buf = blob.get(&key).await.unwrap().unwrap();
95                let fetch_elapsed = start.elapsed();
96                let buf_len = buf.len();
97                let parse_elapsed = mz_ore::task::spawn_blocking(
98                    || "",
99                    move || {
100                        let start = Instant::now();
101                        if parse {
102                            BlobTraceBatchPart::<u64>::decode(&buf, &metrics.columnar).unwrap();
103                        }
104                        start.elapsed()
105                    },
106                )
107                .await
108                .unwrap();
109                (
110                    key,
111                    buf_len,
112                    fetch_elapsed.as_secs_f64(),
113                    parse_elapsed.as_secs_f64(),
114                )
115            });
116            fetches.push(fetch);
117        }
118        for fetch in fetches {
119            let (key, size_bytes, fetch_secs, parse_secs) = fetch.await.unwrap();
120            println!(
121                "{},{},{},{},{}",
122                iter, key, size_bytes, fetch_secs, parse_secs
123            );
124        }
125    }
126
127    Ok(())
128}