persistcli/
open_loop.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![allow(clippy::cast_precision_loss)]
11
12use std::fs::File;
13use std::future::IntoFuture;
14use std::net::SocketAddr;
15use std::str::FromStr;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use anyhow::bail;
20use mz_ore::cast::CastFrom;
21use mz_ore::metrics::MetricsRegistry;
22use mz_ore::now::SYSTEM_TIME;
23use mz_ore::task::JoinHandle;
24use mz_ore::url::SensitiveUrl;
25use mz_persist::workload::DataGenerator;
26use mz_persist_client::cache::PersistClientCache;
27use mz_persist_client::cfg::PersistConfig;
28use mz_persist_client::metrics::Metrics;
29use mz_persist_client::rpc::PubSubClientConnection;
30use mz_persist_client::{PersistLocation, ShardId};
31use prometheus::Encoder;
32use tokio::net::TcpListener;
33use tokio::sync::Barrier;
34use tokio::sync::mpsc::error::SendError;
35use tracing::{Instrument, debug, error, info, info_span, trace};
36
37use crate::open_loop::api::{BenchmarkReader, BenchmarkWriter};
38
39/// Different benchmark configurations.
40#[derive(clap::ValueEnum, Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
41enum BenchmarkType {
42    /// Data is written straight into persistence from the data generator.
43    RawWriter,
44    /// A simulated materialize source pipeline where data is first timestamped using a consensus
45    /// persist shard and then written to persistence.
46    MzSourceModel,
47}
48
49/// Open-loop benchmark for persistence.
50#[derive(Debug, clap::Parser)]
51pub struct Args {
52    /// Number of writer instances.
53    #[clap(long, value_name = "W", default_value_t = 1)]
54    num_writers: usize,
55
56    /// Number of reader instances.
57    #[clap(long, value_name = "R", default_value_t = 1)]
58    num_readers: usize,
59
60    /// Handle to the persist consensus system.
61    #[clap(long, value_name = "CONSENSUS_URI")]
62    consensus_uri: SensitiveUrl,
63
64    /// Handle to the persist blob storage.
65    #[clap(long, value_name = "BLOB_URI")]
66    blob_uri: SensitiveUrl,
67
68    /// The type of benchmark to run
69    #[clap(value_enum, long, default_value_t = BenchmarkType::RawWriter)]
70    benchmark_type: BenchmarkType,
71
72    /// Runtime in a whole number of seconds
73    #[clap(long, value_parser = humantime::parse_duration, value_name = "S", default_value = "60s")]
74    runtime: Duration,
75
76    /// How many records writers should emit per second.
77    #[clap(long, value_name = "R", default_value_t = 100)]
78    records_per_second: usize,
79
80    /// Size of records (goodbytes) in bytes.
81    #[clap(long, value_name = "B", default_value_t = 64)]
82    record_size_bytes: usize,
83
84    /// Batch size in number of records (if applicable).
85    #[clap(long, env = "", value_name = "R", default_value_t = 100)]
86    batch_size: usize,
87
88    /// Duration between subsequent informational log outputs.
89    #[clap(long, value_parser = humantime::parse_duration, value_name = "L", default_value = "1s")]
90    logging_granularity: Duration,
91
92    /// Id of the persist shard (for use in multi-process runs).
93    #[clap(short, long, value_name = "I")]
94    shard_id: Option<String>,
95
96    /// The address of the internal HTTP server.
97    #[clap(long, value_name = "HOST:PORT", default_value = "127.0.0.1:6878")]
98    internal_http_listen_addr: SocketAddr,
99
100    /// Path of a file to write metrics at the end of the run.
101    #[clap(long)]
102    metrics_file: Option<String>,
103}
104
105const MIB: u64 = 1024 * 1024;
106
107pub async fn run(args: Args) -> Result<(), anyhow::Error> {
108    let metrics_registry = MetricsRegistry::new();
109    {
110        let metrics_registry = metrics_registry.clone();
111        info!(
112            "serving internal HTTP server on http://{}/metrics",
113            args.internal_http_listen_addr
114        );
115        let listener = TcpListener::bind(&args.internal_http_listen_addr)
116            .await
117            .expect("can bind");
118        mz_ore::task::spawn(
119            || "http_server",
120            axum::serve(
121                listener,
122                axum::Router::new()
123                    .route(
124                        "/metrics",
125                        axum::routing::get(move || async move {
126                            mz_http_util::handle_prometheus(&metrics_registry).await
127                        }),
128                    )
129                    .into_make_service(),
130            )
131            .into_future(),
132        );
133    }
134
135    let location = PersistLocation {
136        blob_uri: args.blob_uri.clone(),
137        consensus_uri: args.consensus_uri.clone(),
138    };
139    let persist = PersistClientCache::new(
140        PersistConfig::new_default_configs(&mz_persist_client::BUILD_INFO, SYSTEM_TIME.clone()),
141        &metrics_registry,
142        |_, _| PubSubClientConnection::noop(),
143    )
144    .open(location)
145    .await?;
146
147    let shard_id = match args.shard_id.clone() {
148        Some(shard_id) => ShardId::from_str(&shard_id).map_err(anyhow::Error::msg)?,
149        None => ShardId::new(),
150    };
151
152    let metrics = Arc::clone(persist.metrics());
153    let (writers, readers) = match args.benchmark_type.clone() {
154        BenchmarkType::RawWriter => {
155            raw_persist_benchmark::setup_raw_persist(
156                persist,
157                shard_id,
158                args.num_writers,
159                args.num_readers,
160            )
161            .await?
162        }
163        BenchmarkType::MzSourceModel => panic!("source model"),
164    };
165
166    run_benchmark(args, metrics_registry, metrics, writers, readers).await
167}
168
169async fn run_benchmark<W, R>(
170    args: Args,
171    metrics_registry: MetricsRegistry,
172    metrics: Arc<Metrics>,
173    writers: Vec<W>,
174    readers: Vec<R>,
175) -> Result<(), anyhow::Error>
176where
177    W: BenchmarkWriter + Send + Sync + 'static,
178    R: BenchmarkReader + Send + Sync + 'static,
179{
180    let num_records_total = args.records_per_second * usize::cast_from(args.runtime.as_secs());
181    let data_generator =
182        DataGenerator::new(num_records_total, args.record_size_bytes, args.batch_size);
183
184    let benchmark_description = format!(
185        "num-readers={} num-writers={} runtime={:?} num_records_total={} records-per-second={} record-size-bytes={} batch-size={}",
186        args.num_readers,
187        args.num_writers,
188        args.runtime,
189        num_records_total,
190        args.records_per_second,
191        args.record_size_bytes,
192        args.batch_size
193    );
194
195    info!("starting benchmark: {}", benchmark_description);
196    let mut generator_handles: Vec<JoinHandle<Result<String, anyhow::Error>>> = vec![];
197    let mut write_handles: Vec<JoinHandle<Result<String, anyhow::Error>>> = vec![];
198    let mut read_handles: Vec<JoinHandle<Result<(String, R), anyhow::Error>>> = vec![];
199
200    // All workers should have the starting time (so they can consistently track progress
201    // and reason about lag independently).
202    let start = Instant::now();
203    // Use a barrier to start all threads at the same time. We need 2x the number of
204    // writers because we start 2 distinct tasks per writer.
205    let barrier = Arc::new(Barrier::new(2 * args.num_writers + args.num_readers));
206
207    // The batch interarrival time. We'll use this quantity to rate limit the
208    // data generation.
209    // No other known way to convert `usize` to `f64`.
210    #[allow(clippy::as_conversions)]
211    let time_per_batch = {
212        let records_per_second_f64 = args.records_per_second as f64;
213        let batch_size_f64 = args.batch_size as f64;
214
215        let batches_per_second = records_per_second_f64 / batch_size_f64;
216        Duration::from_secs(1).div_f64(batches_per_second)
217    };
218
219    for (idx, mut writer) in writers.into_iter().enumerate() {
220        let b = Arc::clone(&barrier);
221        let data_generator = data_generator.clone();
222        let start = start.clone();
223        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
224
225        // Intentionally create the span outside the task to set the parent.
226        let generator_span = info_span!("generator", idx);
227        let data_generator_handle = mz_ore::task::spawn(
228            || format!("data-generator-{}", idx),
229            async move {
230                trace!("data generator {} waiting for barrier", idx);
231                b.wait().await;
232                info!("starting data generator {}", idx);
233
234                // The number of batches this data generator has sent over to the
235                // corresponding writer task.
236                let mut batch_idx = 0;
237                // The last time we emitted progress information to stdout, expressed
238                // as a relative duration from start.
239                let mut prev_log = Duration::from_millis(0);
240                loop {
241                    // Data generation can be CPU expensive, so generate it
242                    // in a spawn_blocking to play nicely with the rest of
243                    // the async code.
244                    let mut data_generator = data_generator.clone();
245                    // Intentionally create the span outside the task to set the
246                    // parent.
247                    let batch_span = info_span!("batch", batch_idx);
248                    let batch = mz_ore::task::spawn_blocking(
249                        || "data_generator-batch",
250                        move || {
251                            batch_span
252                                .in_scope(|| data_generator.gen_batch(usize::cast_from(batch_idx)))
253                        },
254                    )
255                    .await;
256                    trace!("data generator {} made a batch", idx);
257                    let batch = match batch {
258                        Some(x) => x,
259                        None => {
260                            let records_sent = usize::cast_from(batch_idx) * args.batch_size;
261                            let finished = format!(
262                                "Data generator {} finished after {} ms and sent {} records",
263                                idx,
264                                start.elapsed().as_millis(),
265                                records_sent
266                            );
267                            return Ok(finished);
268                        }
269                    };
270                    batch_idx += 1;
271
272                    // Sleep so this doesn't busy wait if it's ahead of
273                    // schedule.
274                    let elapsed = start.elapsed();
275                    let next_batch_time = time_per_batch * (batch_idx);
276                    let sleep = next_batch_time.saturating_sub(elapsed);
277                    if sleep > Duration::ZERO {
278                        async {
279                            debug!("Data generator ahead of schedule, sleeping for {:?}", sleep);
280                            tokio::time::sleep(sleep).await
281                        }
282                        .instrument(info_span!("throttle"))
283                        .await;
284                    }
285
286                    // send will only error if the matching receiver has been dropped.
287                    if let Err(SendError(_)) = tx.send(batch) {
288                        bail!("receiver unexpectedly dropped");
289                    }
290                    trace!("data generator {} wrote a batch", idx);
291
292                    if elapsed - prev_log > args.logging_granularity {
293                        let records_sent = usize::cast_from(batch_idx) * args.batch_size;
294                        debug!(
295                            "After {} ms data generator {} has sent {} records.",
296                            start.elapsed().as_millis(),
297                            idx,
298                            records_sent
299                        );
300                        prev_log = elapsed;
301                    }
302                }
303            }
304            .instrument(generator_span),
305        );
306
307        generator_handles.push(data_generator_handle);
308        let b = Arc::clone(&barrier);
309
310        // Intentionally create the span outside the task to set the parent.
311        let writer_span = info_span!("writer", idx);
312        let writer_handle = mz_ore::task::spawn(|| format!("writer-{}", idx), async move {
313            trace!("writer {} waiting for barrier", idx);
314            b.wait().await;
315            info!("starting writer {}", idx);
316
317            // Max observed latency for BenchmarkWriter::write.
318            let mut max_write_latency = Duration::from_millis(0);
319
320            // The last time we emitted progress information to stdout, expressed
321            // as a relative duration from start.
322            let mut prev_log = Duration::from_millis(0);
323            let mut records_written = 0;
324
325            loop {
326                let batch = match rx.recv().await {
327                    Some(batch) => batch,
328                    None => break,
329                };
330
331                trace!("writer {} received a batch. writing", idx);
332                let write_start = Instant::now();
333                writer.write(batch).await?;
334
335                records_written += args.batch_size;
336                let write_latency = write_start.elapsed();
337                if write_latency > max_write_latency {
338                    max_write_latency = write_latency;
339                }
340
341                let elapsed = start.elapsed();
342
343                if elapsed - prev_log > args.logging_granularity {
344                    info!("After {} ms writer {} has written {} records. Max write latency {} ms most recent write latency {} ms.",
345                          elapsed.as_millis(), idx, records_written, max_write_latency.as_millis(), write_latency.as_millis());
346                    prev_log = elapsed;
347                }
348
349                if records_written >= num_records_total {
350                    break;
351                }
352            }
353            let elapsed = start.elapsed();
354            let finished = format!(
355                "Writer {} finished after {} ms and wrote {} records. Max write latency {} ms.",
356                idx,
357                elapsed.as_millis(),
358                records_written,
359                max_write_latency.as_millis()
360            );
361
362            writer.finish().await.unwrap();
363
364            Ok(finished)
365        }.instrument(writer_span));
366
367        write_handles.push(writer_handle);
368    }
369
370    // TODO(benesch): rewrite to avoid dangerous `as` conversions.
371    #[allow(clippy::as_conversions)]
372    for (idx, mut reader) in readers.into_iter().enumerate() {
373        let b = Arc::clone(&barrier);
374        // Intentionally create the span outside the task to set the parent.
375        let reader_span = info_span!("reader", idx);
376        let reader_handle = mz_ore::task::spawn(|| format!("reader-{}", idx), async move {
377            trace!("reader {} waiting for barrier", idx);
378            b.wait().await;
379            info!("starting reader {}", idx);
380
381            // Max observed latency for BenchmarkReader::num_records.
382            let mut max_read_latency = Duration::from_millis(0);
383
384            // Max observed delay between the number of records expected to be read at any
385            // point in time vs the number of records actually ingested by that point.
386            let mut max_lag = 0;
387
388            // The last time we emitted progress information to stdout, expressed
389            // as a relative duration from start.
390            let mut prev_log = Duration::from_millis(0);
391            loop {
392                let elapsed = start.elapsed();
393                let expected_sent = elapsed.as_millis() as usize
394                    / (time_per_batch.as_millis() as usize)
395                    * args.batch_size;
396                let read_start = Instant::now();
397                let num_records_read = reader.num_records().await?;
398                let read_latency = read_start.elapsed();
399                let lag = if expected_sent > num_records_read {
400                    expected_sent - num_records_read
401                } else {
402                    0
403                };
404                if lag > max_lag {
405                    max_lag = lag;
406                }
407
408                if read_latency > max_read_latency {
409                    max_read_latency = read_latency;
410                }
411
412                if elapsed - prev_log > args.logging_granularity {
413                    let elapsed_seconds = elapsed.as_secs();
414                    let mb_read = (num_records_read * args.record_size_bytes) as f64 / MIB as f64;
415                    let throughput = mb_read / elapsed_seconds as f64;
416                    info!("After {} ms reader {} has read {} records (throughput {:.3} MiB/s). Max read lag {} records, most recent read lag {} records. Max read latency {} ms, most recent read latency {} ms",
417                          elapsed.as_millis(), idx, num_records_read, throughput, max_lag, lag, max_read_latency.as_millis(), read_latency.as_millis());
418                    prev_log = elapsed;
419                }
420                if num_records_read == num_records_total {
421                    let elapsed_seconds = elapsed.as_secs();
422                    let mb_read = (num_records_read * args.record_size_bytes) as f64 / MIB as f64;
423                    let throughput = mb_read / elapsed_seconds as f64;
424                    let finished = format!("Reader {} finished after {} ms and read {} records (throughput {:.3} MiB/s). Max read lag {} records. Max read latency {} ms.",
425                          idx, elapsed.as_millis(), num_records_read, throughput, max_lag, max_read_latency.as_millis());
426                    return Ok((finished, reader));
427                }
428            }
429        }.instrument(reader_span));
430        read_handles.push(reader_handle);
431    }
432
433    for handle in generator_handles {
434        match handle.await {
435            Ok(finished) => info!("{}", finished),
436            Err(e) => error!("error: {:?}", e),
437        }
438    }
439    for handle in write_handles {
440        match handle.await {
441            Ok(finished) => info!("{}", finished),
442            Err(e) => error!("error: {:?}", e),
443        }
444    }
445    for handle in read_handles {
446        match handle.await {
447            Ok((finished, _)) => info!("{}", finished),
448            Err(e) => error!("error: {:?}", e),
449        }
450    }
451
452    if let Some(metrics_file) = args.metrics_file {
453        let mut file = File::create(metrics_file)?;
454        let encoder = prometheus::TextEncoder::new();
455        encoder.encode(&metrics_registry.gather(), &mut file)?;
456        file.sync_all()?;
457    }
458    eprintln!("write amp: {}", metrics.write_amplification());
459
460    Ok(())
461}
462
463mod api {
464    use async_trait::async_trait;
465    use mz_persist::indexed::columnar::ColumnarRecords;
466
467    /// An interface to write a batch of data into a persistent system.
468    #[async_trait]
469    pub trait BenchmarkWriter {
470        /// Writes the given batch to this writer.
471        async fn write(&mut self, batch: ColumnarRecords) -> Result<(), anyhow::Error>;
472
473        /// Signals that we are finished writing to this [BenchmarkWriter]. This
474        /// will join any async tasks that might have been spawned for this
475        /// [BenchmarkWriter].
476        async fn finish(self) -> Result<(), anyhow::Error>;
477    }
478
479    /// An abstraction over a reader of data, which can report the number
480    /// of distinct records its read so far.
481    #[async_trait]
482    pub trait BenchmarkReader {
483        async fn num_records(&mut self) -> Result<usize, anyhow::Error>;
484    }
485}
486
487mod raw_persist_benchmark {
488    use std::sync::Arc;
489
490    use async_trait::async_trait;
491    use mz_ore::cast::CastFrom;
492    use mz_ore::task::JoinHandle;
493    use mz_persist::indexed::columnar::ColumnarRecords;
494    use mz_persist_client::read::{Listen, ListenEvent};
495    use mz_persist_client::{Diagnostics, PersistClient, ShardId};
496    use mz_persist_types::Codec64;
497    use mz_persist_types::codec_impls::VecU8Schema;
498    use timely::progress::Antichain;
499    use tokio::sync::mpsc::Sender;
500    use tracing::{Instrument, info_span};
501
502    use crate::open_loop::api::{BenchmarkReader, BenchmarkWriter};
503
504    pub async fn setup_raw_persist(
505        persist: PersistClient,
506        id: ShardId,
507        num_writers: usize,
508        num_readers: usize,
509    ) -> Result<
510        (
511            Vec<RawBenchmarkWriter>,
512            Vec<Listen<Vec<u8>, Vec<u8>, u64, i64>>,
513        ),
514        anyhow::Error,
515    > {
516        let mut writers = vec![];
517        for idx in 0..num_writers {
518            let writer = RawBenchmarkWriter::new(&persist, id, idx).await?;
519
520            writers.push(writer);
521        }
522
523        let mut readers = vec![];
524        for _ in 0..num_readers {
525            let reader = persist
526                .open_leased_reader::<Vec<u8>, Vec<u8>, u64, i64>(
527                    id,
528                    Arc::new(VecU8Schema),
529                    Arc::new(VecU8Schema),
530                    Diagnostics::from_purpose("open loop"),
531                    true,
532                )
533                .await?;
534
535            let listen = reader
536                .listen(Antichain::from_elem(0))
537                .await
538                .expect("cannot serve requested as_of");
539            readers.push(listen);
540        }
541
542        Ok((writers, readers))
543    }
544
545    pub struct RawBenchmarkWriter {
546        tx: Option<Sender<ColumnarRecords>>,
547        #[allow(dead_code)]
548        handles: Vec<JoinHandle<()>>,
549    }
550
551    impl RawBenchmarkWriter {
552        async fn new(
553            persist: &PersistClient,
554            id: ShardId,
555            idx: usize,
556        ) -> Result<Self, anyhow::Error> {
557            let mut handles = Vec::<JoinHandle<()>>::new();
558            let (records_tx, mut records_rx) = tokio::sync::mpsc::channel::<ColumnarRecords>(2);
559            let (batch_tx, mut batch_rx) = tokio::sync::mpsc::channel(10);
560
561            let write = persist
562                .open_writer::<Vec<u8>, Vec<u8>, u64, i64>(
563                    id,
564                    Arc::new(VecU8Schema),
565                    Arc::new(VecU8Schema),
566                    Diagnostics::from_purpose("open loop"),
567                )
568                .await?;
569
570            // Intentionally create the span outside the task to set the parent.
571            let batch_writer_span = info_span!("batch-writer", idx);
572            let handle = mz_ore::task::spawn(
573                || format!("batch-writer-{}", idx),
574                async move {
575                    let mut current_upper = timely::progress::Timestamp::minimum();
576                    while let Some(records) = records_rx.recv().await {
577                        let mut max_ts = 0;
578                        let current_upper_chain = Antichain::from_elem(current_upper);
579
580                        let mut builder = write.builder(current_upper_chain);
581
582                        for ((k, v), t, d) in records.iter() {
583                            builder
584                                .add(&k.to_vec(), &v.to_vec(), &u64::decode(t), &i64::decode(d))
585                                .await
586                                .expect("invalid usage");
587
588                            max_ts = std::cmp::max(max_ts, u64::decode(t));
589                        }
590
591                        max_ts = max_ts + 1;
592                        let new_upper_chain = Antichain::from_elem(max_ts);
593                        current_upper = max_ts;
594
595                        let batch = builder
596                            .finish(new_upper_chain)
597                            .await
598                            .expect("invalid usage");
599
600                        match batch_tx.send(batch).await {
601                            Ok(_) => (),
602                            Err(e) => panic!("send error: {}", e),
603                        }
604                    }
605                }
606                .instrument(batch_writer_span),
607            );
608            handles.push(handle);
609
610            let mut write = persist
611                .open_writer::<Vec<u8>, Vec<u8>, u64, i64>(
612                    id,
613                    Arc::new(VecU8Schema),
614                    Arc::new(VecU8Schema),
615                    Diagnostics::from_purpose("open loop"),
616                )
617                .await?;
618
619            // Intentionally create the span outside the task to set the parent.
620            let appender_span = info_span!("appender", idx);
621            let handle = mz_ore::task::spawn(
622                || format!("appender-{}", idx),
623                async move {
624                    while let Some(batch) = batch_rx.recv().await {
625                        let lower = batch.lower().clone();
626                        let upper = batch.upper().clone();
627                        write
628                            .append_batch(batch, lower, upper)
629                            .await
630                            .expect("invalid usage")
631                            .expect("unexpected upper");
632                    }
633                }
634                .instrument(appender_span),
635            );
636            handles.push(handle);
637
638            let writer = RawBenchmarkWriter {
639                tx: Some(records_tx),
640                handles,
641            };
642
643            Ok(writer)
644        }
645    }
646
647    #[async_trait]
648    impl BenchmarkWriter for RawBenchmarkWriter {
649        async fn write(&mut self, batch: ColumnarRecords) -> Result<(), anyhow::Error> {
650            self.tx
651                .as_mut()
652                .expect("writer was already finished")
653                .send(batch)
654                .await
655                .expect("writer send error");
656            Ok(())
657        }
658
659        async fn finish(mut self) -> Result<(), anyhow::Error> {
660            self.tx.take().expect("already finished");
661
662            for handle in self.handles.drain(..) {
663                let () = handle.await;
664            }
665
666            Ok(())
667        }
668    }
669
670    #[async_trait]
671    impl BenchmarkReader for Listen<Vec<u8>, Vec<u8>, u64, i64> {
672        async fn num_records(&mut self) -> Result<usize, anyhow::Error> {
673            // This impl abuses the fact that DataGenerator timestamps each
674            // record with the record count to avoid having to actually count
675            // the number of records..
676            let mut count = 0;
677            let events = self.fetch_next().await;
678
679            for event in events {
680                if let ListenEvent::Progress(t) = event {
681                    count = usize::cast_from(t.elements()[0]);
682                }
683            }
684
685            Ok(count)
686        }
687    }
688}