persistcli/
open_loop.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![allow(clippy::cast_precision_loss)]
11
12use std::fs::File;
13use std::future::IntoFuture;
14use std::net::SocketAddr;
15use std::str::FromStr;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use anyhow::bail;
20use mz_ore::cast::CastFrom;
21use mz_ore::metrics::MetricsRegistry;
22use mz_ore::now::SYSTEM_TIME;
23use mz_ore::task::JoinHandle;
24use mz_ore::url::SensitiveUrl;
25use mz_persist::workload::DataGenerator;
26use mz_persist_client::cache::PersistClientCache;
27use mz_persist_client::cfg::PersistConfig;
28use mz_persist_client::metrics::Metrics;
29use mz_persist_client::rpc::PubSubClientConnection;
30use mz_persist_client::{PersistLocation, ShardId};
31use prometheus::Encoder;
32use tokio::net::TcpListener;
33use tokio::sync::Barrier;
34use tokio::sync::mpsc::error::SendError;
35use tracing::{Instrument, debug, error, info, info_span, trace};
36
37use crate::open_loop::api::{BenchmarkReader, BenchmarkWriter};
38
39/// Different benchmark configurations.
40#[derive(clap::ValueEnum, Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
41enum BenchmarkType {
42    /// Data is written straight into persistence from the data generator.
43    RawWriter,
44    /// A simulated materialize source pipeline where data is first timestamped using a consensus
45    /// persist shard and then written to persistence.
46    MzSourceModel,
47}
48
49/// Open-loop benchmark for persistence.
50#[derive(Debug, clap::Parser)]
51pub struct Args {
52    /// Number of writer instances.
53    #[clap(long, value_name = "W", default_value_t = 1)]
54    num_writers: usize,
55
56    /// Number of reader instances.
57    #[clap(long, value_name = "R", default_value_t = 1)]
58    num_readers: usize,
59
60    /// Handle to the persist consensus system.
61    #[clap(long, value_name = "CONSENSUS_URI")]
62    consensus_uri: SensitiveUrl,
63
64    /// Handle to the persist blob storage.
65    #[clap(long, value_name = "BLOB_URI")]
66    blob_uri: SensitiveUrl,
67
68    /// The type of benchmark to run
69    #[clap(value_enum, long, default_value_t = BenchmarkType::RawWriter)]
70    benchmark_type: BenchmarkType,
71
72    /// Runtime in a whole number of seconds
73    #[clap(long, value_parser = humantime::parse_duration, value_name = "S", default_value = "60s")]
74    runtime: Duration,
75
76    /// How many records writers should emit per second.
77    #[clap(long, value_name = "R", default_value_t = 100)]
78    records_per_second: usize,
79
80    /// Size of records (goodbytes) in bytes.
81    #[clap(long, value_name = "B", default_value_t = 64)]
82    record_size_bytes: usize,
83
84    /// Batch size in number of records (if applicable).
85    #[clap(long, env = "", value_name = "R", default_value_t = 100)]
86    batch_size: usize,
87
88    /// Duration between subsequent informational log outputs.
89    #[clap(long, value_parser = humantime::parse_duration, value_name = "L", default_value = "1s")]
90    logging_granularity: Duration,
91
92    /// Id of the persist shard (for use in multi-process runs).
93    #[clap(short, long, value_name = "I")]
94    shard_id: Option<String>,
95
96    /// The address of the internal HTTP server.
97    #[clap(long, value_name = "HOST:PORT", default_value = "127.0.0.1:6878")]
98    internal_http_listen_addr: SocketAddr,
99
100    /// Path of a file to write metrics at the end of the run.
101    #[clap(long)]
102    metrics_file: Option<String>,
103}
104
105const MIB: u64 = 1024 * 1024;
106
107pub async fn run(args: Args) -> Result<(), anyhow::Error> {
108    let metrics_registry = MetricsRegistry::new();
109    {
110        let metrics_registry = metrics_registry.clone();
111        info!(
112            "serving internal HTTP server on http://{}/metrics",
113            args.internal_http_listen_addr
114        );
115        let listener = TcpListener::bind(&args.internal_http_listen_addr)
116            .await
117            .expect("can bind");
118        mz_ore::task::spawn(
119            || "http_server",
120            axum::serve(
121                listener,
122                axum::Router::new()
123                    .route(
124                        "/metrics",
125                        axum::routing::get(move || async move {
126                            mz_http_util::handle_prometheus(&metrics_registry).await
127                        }),
128                    )
129                    .into_make_service(),
130            )
131            .into_future(),
132        );
133    }
134
135    let location = PersistLocation {
136        blob_uri: args.blob_uri.clone(),
137        consensus_uri: args.consensus_uri.clone(),
138    };
139    let persist = PersistClientCache::new(
140        PersistConfig::new_default_configs(&mz_persist_client::BUILD_INFO, SYSTEM_TIME.clone()),
141        &metrics_registry,
142        |_, _| PubSubClientConnection::noop(),
143    )
144    .open(location)
145    .await?;
146
147    let shard_id = match args.shard_id.clone() {
148        Some(shard_id) => ShardId::from_str(&shard_id).map_err(anyhow::Error::msg)?,
149        None => ShardId::new(),
150    };
151
152    let metrics = Arc::clone(persist.metrics());
153    let (writers, readers) = match args.benchmark_type.clone() {
154        BenchmarkType::RawWriter => {
155            raw_persist_benchmark::setup_raw_persist(
156                persist,
157                shard_id,
158                args.num_writers,
159                args.num_readers,
160            )
161            .await?
162        }
163        BenchmarkType::MzSourceModel => panic!("source model"),
164    };
165
166    run_benchmark(args, metrics_registry, metrics, writers, readers).await
167}
168
169async fn run_benchmark<W, R>(
170    args: Args,
171    metrics_registry: MetricsRegistry,
172    metrics: Arc<Metrics>,
173    writers: Vec<W>,
174    readers: Vec<R>,
175) -> Result<(), anyhow::Error>
176where
177    W: BenchmarkWriter + Send + Sync + 'static,
178    R: BenchmarkReader + Send + Sync + 'static,
179{
180    let num_records_total = args.records_per_second * usize::cast_from(args.runtime.as_secs());
181    let data_generator =
182        DataGenerator::new(num_records_total, args.record_size_bytes, args.batch_size);
183
184    let benchmark_description = format!(
185        "num-readers={} num-writers={} runtime={:?} num_records_total={} records-per-second={} record-size-bytes={} batch-size={}",
186        args.num_readers,
187        args.num_writers,
188        args.runtime,
189        num_records_total,
190        args.records_per_second,
191        args.record_size_bytes,
192        args.batch_size
193    );
194
195    info!("starting benchmark: {}", benchmark_description);
196    let mut generator_handles: Vec<JoinHandle<Result<String, anyhow::Error>>> = vec![];
197    let mut write_handles: Vec<JoinHandle<Result<String, anyhow::Error>>> = vec![];
198    let mut read_handles: Vec<JoinHandle<Result<(String, R), anyhow::Error>>> = vec![];
199
200    // All workers should have the starting time (so they can consistently track progress
201    // and reason about lag independently).
202    let start = Instant::now();
203    // Use a barrier to start all threads at the same time. We need 2x the number of
204    // writers because we start 2 distinct tasks per writer.
205    let barrier = Arc::new(Barrier::new(2 * args.num_writers + args.num_readers));
206
207    // The batch interarrival time. We'll use this quantity to rate limit the
208    // data generation.
209    // No other known way to convert `usize` to `f64`.
210    #[allow(clippy::as_conversions)]
211    let time_per_batch = {
212        let records_per_second_f64 = args.records_per_second as f64;
213        let batch_size_f64 = args.batch_size as f64;
214
215        let batches_per_second = records_per_second_f64 / batch_size_f64;
216        Duration::from_secs(1).div_f64(batches_per_second)
217    };
218
219    for (idx, mut writer) in writers.into_iter().enumerate() {
220        let b = Arc::clone(&barrier);
221        let data_generator = data_generator.clone();
222        let start = start.clone();
223        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
224
225        // Intentionally create the span outside the task to set the parent.
226        let generator_span = info_span!("generator", idx);
227        let data_generator_handle = mz_ore::task::spawn(
228            || format!("data-generator-{}", idx),
229            async move {
230                trace!("data generator {} waiting for barrier", idx);
231                b.wait().await;
232                info!("starting data generator {}", idx);
233
234                // The number of batches this data generator has sent over to the
235                // corresponding writer task.
236                let mut batch_idx = 0;
237                // The last time we emitted progress information to stdout, expressed
238                // as a relative duration from start.
239                let mut prev_log = Duration::from_millis(0);
240                loop {
241                    // Data generation can be CPU expensive, so generate it
242                    // in a spawn_blocking to play nicely with the rest of
243                    // the async code.
244                    let mut data_generator = data_generator.clone();
245                    // Intentionally create the span outside the task to set the
246                    // parent.
247                    let batch_span = info_span!("batch", batch_idx);
248                    let batch = mz_ore::task::spawn_blocking(
249                        || "data_generator-batch",
250                        move || {
251                            batch_span
252                                .in_scope(|| data_generator.gen_batch(usize::cast_from(batch_idx)))
253                        },
254                    )
255                    .await
256                    .expect("task failed");
257                    trace!("data generator {} made a batch", idx);
258                    let batch = match batch {
259                        Some(x) => x,
260                        None => {
261                            let records_sent = usize::cast_from(batch_idx) * args.batch_size;
262                            let finished = format!(
263                                "Data generator {} finished after {} ms and sent {} records",
264                                idx,
265                                start.elapsed().as_millis(),
266                                records_sent
267                            );
268                            return Ok(finished);
269                        }
270                    };
271                    batch_idx += 1;
272
273                    // Sleep so this doesn't busy wait if it's ahead of
274                    // schedule.
275                    let elapsed = start.elapsed();
276                    let next_batch_time = time_per_batch * (batch_idx);
277                    let sleep = next_batch_time.saturating_sub(elapsed);
278                    if sleep > Duration::ZERO {
279                        async {
280                            debug!("Data generator ahead of schedule, sleeping for {:?}", sleep);
281                            tokio::time::sleep(sleep).await
282                        }
283                        .instrument(info_span!("throttle"))
284                        .await;
285                    }
286
287                    // send will only error if the matching receiver has been dropped.
288                    if let Err(SendError(_)) = tx.send(batch) {
289                        bail!("receiver unexpectedly dropped");
290                    }
291                    trace!("data generator {} wrote a batch", idx);
292
293                    if elapsed - prev_log > args.logging_granularity {
294                        let records_sent = usize::cast_from(batch_idx) * args.batch_size;
295                        debug!(
296                            "After {} ms data generator {} has sent {} records.",
297                            start.elapsed().as_millis(),
298                            idx,
299                            records_sent
300                        );
301                        prev_log = elapsed;
302                    }
303                }
304            }
305            .instrument(generator_span),
306        );
307
308        generator_handles.push(data_generator_handle);
309        let b = Arc::clone(&barrier);
310
311        // Intentionally create the span outside the task to set the parent.
312        let writer_span = info_span!("writer", idx);
313        let writer_handle = mz_ore::task::spawn(|| format!("writer-{}", idx), async move {
314            trace!("writer {} waiting for barrier", idx);
315            b.wait().await;
316            info!("starting writer {}", idx);
317
318            // Max observed latency for BenchmarkWriter::write.
319            let mut max_write_latency = Duration::from_millis(0);
320
321            // The last time we emitted progress information to stdout, expressed
322            // as a relative duration from start.
323            let mut prev_log = Duration::from_millis(0);
324            let mut records_written = 0;
325
326            loop {
327                let batch = match rx.recv().await {
328                    Some(batch) => batch,
329                    None => break,
330                };
331
332                trace!("writer {} received a batch. writing", idx);
333                let write_start = Instant::now();
334                writer.write(batch).await?;
335
336                records_written += args.batch_size;
337                let write_latency = write_start.elapsed();
338                if write_latency > max_write_latency {
339                    max_write_latency = write_latency;
340                }
341
342                let elapsed = start.elapsed();
343
344                if elapsed - prev_log > args.logging_granularity {
345                    info!("After {} ms writer {} has written {} records. Max write latency {} ms most recent write latency {} ms.",
346                          elapsed.as_millis(), idx, records_written, max_write_latency.as_millis(), write_latency.as_millis());
347                    prev_log = elapsed;
348                }
349
350                if records_written >= num_records_total {
351                    break;
352                }
353            }
354            let elapsed = start.elapsed();
355            let finished = format!(
356                "Writer {} finished after {} ms and wrote {} records. Max write latency {} ms.",
357                idx,
358                elapsed.as_millis(),
359                records_written,
360                max_write_latency.as_millis()
361            );
362
363            writer.finish().await.unwrap();
364
365            Ok(finished)
366        }.instrument(writer_span));
367
368        write_handles.push(writer_handle);
369    }
370
371    // TODO(benesch): rewrite to avoid dangerous `as` conversions.
372    #[allow(clippy::as_conversions)]
373    for (idx, mut reader) in readers.into_iter().enumerate() {
374        let b = Arc::clone(&barrier);
375        // Intentionally create the span outside the task to set the parent.
376        let reader_span = info_span!("reader", idx);
377        let reader_handle = mz_ore::task::spawn(|| format!("reader-{}", idx), async move {
378            trace!("reader {} waiting for barrier", idx);
379            b.wait().await;
380            info!("starting reader {}", idx);
381
382            // Max observed latency for BenchmarkReader::num_records.
383            let mut max_read_latency = Duration::from_millis(0);
384
385            // Max observed delay between the number of records expected to be read at any
386            // point in time vs the number of records actually ingested by that point.
387            let mut max_lag = 0;
388
389            // The last time we emitted progress information to stdout, expressed
390            // as a relative duration from start.
391            let mut prev_log = Duration::from_millis(0);
392            loop {
393                let elapsed = start.elapsed();
394                let expected_sent = elapsed.as_millis() as usize
395                    / (time_per_batch.as_millis() as usize)
396                    * args.batch_size;
397                let read_start = Instant::now();
398                let num_records_read = reader.num_records().await?;
399                let read_latency = read_start.elapsed();
400                let lag = if expected_sent > num_records_read {
401                    expected_sent - num_records_read
402                } else {
403                    0
404                };
405                if lag > max_lag {
406                    max_lag = lag;
407                }
408
409                if read_latency > max_read_latency {
410                    max_read_latency = read_latency;
411                }
412
413                if elapsed - prev_log > args.logging_granularity {
414                    let elapsed_seconds = elapsed.as_secs();
415                    let mb_read = (num_records_read * args.record_size_bytes) as f64 / MIB as f64;
416                    let throughput = mb_read / elapsed_seconds as f64;
417                    info!("After {} ms reader {} has read {} records (throughput {:.3} MiB/s). Max read lag {} records, most recent read lag {} records. Max read latency {} ms, most recent read latency {} ms",
418                          elapsed.as_millis(), idx, num_records_read, throughput, max_lag, lag, max_read_latency.as_millis(), read_latency.as_millis());
419                    prev_log = elapsed;
420                }
421                if num_records_read == num_records_total {
422                    let elapsed_seconds = elapsed.as_secs();
423                    let mb_read = (num_records_read * args.record_size_bytes) as f64 / MIB as f64;
424                    let throughput = mb_read / elapsed_seconds as f64;
425                    let finished = format!("Reader {} finished after {} ms and read {} records (throughput {:.3} MiB/s). Max read lag {} records. Max read latency {} ms.",
426                          idx, elapsed.as_millis(), num_records_read, throughput, max_lag, max_read_latency.as_millis());
427                    return Ok((finished, reader));
428                }
429            }
430        }.instrument(reader_span));
431        read_handles.push(reader_handle);
432    }
433
434    for handle in generator_handles {
435        match handle.await? {
436            Ok(finished) => info!("{}", finished),
437            Err(e) => error!("error: {:?}", e),
438        }
439    }
440    for handle in write_handles {
441        match handle.await? {
442            Ok(finished) => info!("{}", finished),
443            Err(e) => error!("error: {:?}", e),
444        }
445    }
446    for handle in read_handles {
447        match handle.await? {
448            Ok((finished, _)) => info!("{}", finished),
449            Err(e) => error!("error: {:?}", e),
450        }
451    }
452
453    if let Some(metrics_file) = args.metrics_file {
454        let mut file = File::create(metrics_file)?;
455        let encoder = prometheus::TextEncoder::new();
456        encoder.encode(&metrics_registry.gather(), &mut file)?;
457        file.sync_all()?;
458    }
459    eprintln!("write amp: {}", metrics.write_amplification());
460
461    Ok(())
462}
463
464mod api {
465    use async_trait::async_trait;
466    use mz_persist::indexed::columnar::ColumnarRecords;
467
468    /// An interface to write a batch of data into a persistent system.
469    #[async_trait]
470    pub trait BenchmarkWriter {
471        /// Writes the given batch to this writer.
472        async fn write(&mut self, batch: ColumnarRecords) -> Result<(), anyhow::Error>;
473
474        /// Signals that we are finished writing to this [BenchmarkWriter]. This
475        /// will join any async tasks that might have been spawned for this
476        /// [BenchmarkWriter].
477        async fn finish(self) -> Result<(), anyhow::Error>;
478    }
479
480    /// An abstraction over a reader of data, which can report the number
481    /// of distinct records its read so far.
482    #[async_trait]
483    pub trait BenchmarkReader {
484        async fn num_records(&mut self) -> Result<usize, anyhow::Error>;
485    }
486}
487
488mod raw_persist_benchmark {
489    use std::sync::Arc;
490
491    use async_trait::async_trait;
492    use mz_ore::cast::CastFrom;
493    use mz_ore::task::JoinHandle;
494    use mz_persist::indexed::columnar::ColumnarRecords;
495    use mz_persist_client::read::{Listen, ListenEvent};
496    use mz_persist_client::{Diagnostics, PersistClient, ShardId};
497    use mz_persist_types::Codec64;
498    use mz_persist_types::codec_impls::VecU8Schema;
499    use timely::progress::Antichain;
500    use tokio::sync::mpsc::Sender;
501    use tracing::{Instrument, info_span};
502
503    use crate::open_loop::api::{BenchmarkReader, BenchmarkWriter};
504
505    pub async fn setup_raw_persist(
506        persist: PersistClient,
507        id: ShardId,
508        num_writers: usize,
509        num_readers: usize,
510    ) -> Result<
511        (
512            Vec<RawBenchmarkWriter>,
513            Vec<Listen<Vec<u8>, Vec<u8>, u64, i64>>,
514        ),
515        anyhow::Error,
516    > {
517        let mut writers = vec![];
518        for idx in 0..num_writers {
519            let writer = RawBenchmarkWriter::new(&persist, id, idx).await?;
520
521            writers.push(writer);
522        }
523
524        let mut readers = vec![];
525        for _ in 0..num_readers {
526            let reader = persist
527                .open_leased_reader::<Vec<u8>, Vec<u8>, u64, i64>(
528                    id,
529                    Arc::new(VecU8Schema),
530                    Arc::new(VecU8Schema),
531                    Diagnostics::from_purpose("open loop"),
532                    true,
533                )
534                .await?;
535
536            let listen = reader
537                .listen(Antichain::from_elem(0))
538                .await
539                .expect("cannot serve requested as_of");
540            readers.push(listen);
541        }
542
543        Ok((writers, readers))
544    }
545
546    pub struct RawBenchmarkWriter {
547        tx: Option<Sender<ColumnarRecords>>,
548        #[allow(dead_code)]
549        handles: Vec<JoinHandle<()>>,
550    }
551
552    impl RawBenchmarkWriter {
553        async fn new(
554            persist: &PersistClient,
555            id: ShardId,
556            idx: usize,
557        ) -> Result<Self, anyhow::Error> {
558            let mut handles = Vec::<JoinHandle<()>>::new();
559            let (records_tx, mut records_rx) = tokio::sync::mpsc::channel::<ColumnarRecords>(2);
560            let (batch_tx, mut batch_rx) = tokio::sync::mpsc::channel(10);
561
562            let write = persist
563                .open_writer::<Vec<u8>, Vec<u8>, u64, i64>(
564                    id,
565                    Arc::new(VecU8Schema),
566                    Arc::new(VecU8Schema),
567                    Diagnostics::from_purpose("open loop"),
568                )
569                .await?;
570
571            // Intentionally create the span outside the task to set the parent.
572            let batch_writer_span = info_span!("batch-writer", idx);
573            let handle = mz_ore::task::spawn(
574                || format!("batch-writer-{}", idx),
575                async move {
576                    let mut current_upper = timely::progress::Timestamp::minimum();
577                    while let Some(records) = records_rx.recv().await {
578                        let mut max_ts = 0;
579                        let current_upper_chain = Antichain::from_elem(current_upper);
580
581                        let mut builder = write.builder(current_upper_chain);
582
583                        for ((k, v), t, d) in records.iter() {
584                            builder
585                                .add(&k.to_vec(), &v.to_vec(), &u64::decode(t), &i64::decode(d))
586                                .await
587                                .expect("invalid usage");
588
589                            max_ts = std::cmp::max(max_ts, u64::decode(t));
590                        }
591
592                        max_ts = max_ts + 1;
593                        let new_upper_chain = Antichain::from_elem(max_ts);
594                        current_upper = max_ts;
595
596                        let batch = builder
597                            .finish(new_upper_chain)
598                            .await
599                            .expect("invalid usage");
600
601                        match batch_tx.send(batch).await {
602                            Ok(_) => (),
603                            Err(e) => panic!("send error: {}", e),
604                        }
605                    }
606                }
607                .instrument(batch_writer_span),
608            );
609            handles.push(handle);
610
611            let mut write = persist
612                .open_writer::<Vec<u8>, Vec<u8>, u64, i64>(
613                    id,
614                    Arc::new(VecU8Schema),
615                    Arc::new(VecU8Schema),
616                    Diagnostics::from_purpose("open loop"),
617                )
618                .await?;
619
620            // Intentionally create the span outside the task to set the parent.
621            let appender_span = info_span!("appender", idx);
622            let handle = mz_ore::task::spawn(
623                || format!("appender-{}", idx),
624                async move {
625                    while let Some(batch) = batch_rx.recv().await {
626                        let lower = batch.lower().clone();
627                        let upper = batch.upper().clone();
628                        write
629                            .append_batch(batch, lower, upper)
630                            .await
631                            .expect("invalid usage")
632                            .expect("unexpected upper");
633                    }
634                }
635                .instrument(appender_span),
636            );
637            handles.push(handle);
638
639            let writer = RawBenchmarkWriter {
640                tx: Some(records_tx),
641                handles,
642            };
643
644            Ok(writer)
645        }
646    }
647
648    #[async_trait]
649    impl BenchmarkWriter for RawBenchmarkWriter {
650        async fn write(&mut self, batch: ColumnarRecords) -> Result<(), anyhow::Error> {
651            self.tx
652                .as_mut()
653                .expect("writer was already finished")
654                .send(batch)
655                .await
656                .expect("writer send error");
657            Ok(())
658        }
659
660        async fn finish(mut self) -> Result<(), anyhow::Error> {
661            self.tx.take().expect("already finished");
662
663            for handle in self.handles.drain(..) {
664                let () = handle.await?;
665            }
666
667            Ok(())
668        }
669    }
670
671    #[async_trait]
672    impl BenchmarkReader for Listen<Vec<u8>, Vec<u8>, u64, i64> {
673        async fn num_records(&mut self) -> Result<usize, anyhow::Error> {
674            // This impl abuses the fact that DataGenerator timestamps each
675            // record with the record count to avoid having to actually count
676            // the number of records..
677            let mut count = 0;
678            let events = self.fetch_next().await;
679
680            for event in events {
681                if let ListenEvent::Progress(t) = event {
682                    count = usize::cast_from(t.elements()[0]);
683                }
684            }
685
686            Ok(count)
687        }
688    }
689}