1#![allow(clippy::cast_precision_loss)]
11
12use std::fs::File;
13use std::future::IntoFuture;
14use std::net::SocketAddr;
15use std::str::FromStr;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use anyhow::bail;
20use mz_ore::cast::CastFrom;
21use mz_ore::metrics::MetricsRegistry;
22use mz_ore::now::SYSTEM_TIME;
23use mz_ore::task::JoinHandle;
24use mz_ore::url::SensitiveUrl;
25use mz_persist::workload::DataGenerator;
26use mz_persist_client::cache::PersistClientCache;
27use mz_persist_client::cfg::PersistConfig;
28use mz_persist_client::metrics::Metrics;
29use mz_persist_client::rpc::PubSubClientConnection;
30use mz_persist_client::{PersistLocation, ShardId};
31use prometheus::Encoder;
32use tokio::net::TcpListener;
33use tokio::sync::Barrier;
34use tokio::sync::mpsc::error::SendError;
35use tracing::{Instrument, debug, error, info, info_span, trace};
36
37use crate::open_loop::api::{BenchmarkReader, BenchmarkWriter};
38
39#[derive(clap::ValueEnum, Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
41enum BenchmarkType {
42 RawWriter,
44 MzSourceModel,
47}
48
49#[derive(Debug, clap::Parser)]
51pub struct Args {
52 #[clap(long, value_name = "W", default_value_t = 1)]
54 num_writers: usize,
55
56 #[clap(long, value_name = "R", default_value_t = 1)]
58 num_readers: usize,
59
60 #[clap(long, value_name = "CONSENSUS_URI")]
62 consensus_uri: SensitiveUrl,
63
64 #[clap(long, value_name = "BLOB_URI")]
66 blob_uri: SensitiveUrl,
67
68 #[clap(value_enum, long, default_value_t = BenchmarkType::RawWriter)]
70 benchmark_type: BenchmarkType,
71
72 #[clap(long, value_parser = humantime::parse_duration, value_name = "S", default_value = "60s")]
74 runtime: Duration,
75
76 #[clap(long, value_name = "R", default_value_t = 100)]
78 records_per_second: usize,
79
80 #[clap(long, value_name = "B", default_value_t = 64)]
82 record_size_bytes: usize,
83
84 #[clap(long, env = "", value_name = "R", default_value_t = 100)]
86 batch_size: usize,
87
88 #[clap(long, value_parser = humantime::parse_duration, value_name = "L", default_value = "1s")]
90 logging_granularity: Duration,
91
92 #[clap(short, long, value_name = "I")]
94 shard_id: Option<String>,
95
96 #[clap(long, value_name = "HOST:PORT", default_value = "127.0.0.1:6878")]
98 internal_http_listen_addr: SocketAddr,
99
100 #[clap(long)]
102 metrics_file: Option<String>,
103}
104
105const MIB: u64 = 1024 * 1024;
106
107pub async fn run(args: Args) -> Result<(), anyhow::Error> {
108 let metrics_registry = MetricsRegistry::new();
109 {
110 let metrics_registry = metrics_registry.clone();
111 info!(
112 "serving internal HTTP server on http://{}/metrics",
113 args.internal_http_listen_addr
114 );
115 let listener = TcpListener::bind(&args.internal_http_listen_addr)
116 .await
117 .expect("can bind");
118 mz_ore::task::spawn(
119 || "http_server",
120 axum::serve(
121 listener,
122 axum::Router::new()
123 .route(
124 "/metrics",
125 axum::routing::get(move || async move {
126 mz_http_util::handle_prometheus(&metrics_registry).await
127 }),
128 )
129 .into_make_service(),
130 )
131 .into_future(),
132 );
133 }
134
135 let location = PersistLocation {
136 blob_uri: args.blob_uri.clone(),
137 consensus_uri: args.consensus_uri.clone(),
138 };
139 let persist = PersistClientCache::new(
140 PersistConfig::new_default_configs(&mz_persist_client::BUILD_INFO, SYSTEM_TIME.clone()),
141 &metrics_registry,
142 |_, _| PubSubClientConnection::noop(),
143 )
144 .open(location)
145 .await?;
146
147 let shard_id = match args.shard_id.clone() {
148 Some(shard_id) => ShardId::from_str(&shard_id).map_err(anyhow::Error::msg)?,
149 None => ShardId::new(),
150 };
151
152 let metrics = Arc::clone(persist.metrics());
153 let (writers, readers) = match args.benchmark_type.clone() {
154 BenchmarkType::RawWriter => {
155 raw_persist_benchmark::setup_raw_persist(
156 persist,
157 shard_id,
158 args.num_writers,
159 args.num_readers,
160 )
161 .await?
162 }
163 BenchmarkType::MzSourceModel => panic!("source model"),
164 };
165
166 run_benchmark(args, metrics_registry, metrics, writers, readers).await
167}
168
169async fn run_benchmark<W, R>(
170 args: Args,
171 metrics_registry: MetricsRegistry,
172 metrics: Arc<Metrics>,
173 writers: Vec<W>,
174 readers: Vec<R>,
175) -> Result<(), anyhow::Error>
176where
177 W: BenchmarkWriter + Send + Sync + 'static,
178 R: BenchmarkReader + Send + Sync + 'static,
179{
180 let num_records_total = args.records_per_second * usize::cast_from(args.runtime.as_secs());
181 let data_generator =
182 DataGenerator::new(num_records_total, args.record_size_bytes, args.batch_size);
183
184 let benchmark_description = format!(
185 "num-readers={} num-writers={} runtime={:?} num_records_total={} records-per-second={} record-size-bytes={} batch-size={}",
186 args.num_readers,
187 args.num_writers,
188 args.runtime,
189 num_records_total,
190 args.records_per_second,
191 args.record_size_bytes,
192 args.batch_size
193 );
194
195 info!("starting benchmark: {}", benchmark_description);
196 let mut generator_handles: Vec<JoinHandle<Result<String, anyhow::Error>>> = vec![];
197 let mut write_handles: Vec<JoinHandle<Result<String, anyhow::Error>>> = vec![];
198 let mut read_handles: Vec<JoinHandle<Result<(String, R), anyhow::Error>>> = vec![];
199
200 let start = Instant::now();
203 let barrier = Arc::new(Barrier::new(2 * args.num_writers + args.num_readers));
206
207 #[allow(clippy::as_conversions)]
211 let time_per_batch = {
212 let records_per_second_f64 = args.records_per_second as f64;
213 let batch_size_f64 = args.batch_size as f64;
214
215 let batches_per_second = records_per_second_f64 / batch_size_f64;
216 Duration::from_secs(1).div_f64(batches_per_second)
217 };
218
219 for (idx, mut writer) in writers.into_iter().enumerate() {
220 let b = Arc::clone(&barrier);
221 let data_generator = data_generator.clone();
222 let start = start.clone();
223 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
224
225 let generator_span = info_span!("generator", idx);
227 let data_generator_handle = mz_ore::task::spawn(
228 || format!("data-generator-{}", idx),
229 async move {
230 trace!("data generator {} waiting for barrier", idx);
231 b.wait().await;
232 info!("starting data generator {}", idx);
233
234 let mut batch_idx = 0;
237 let mut prev_log = Duration::from_millis(0);
240 loop {
241 let mut data_generator = data_generator.clone();
245 let batch_span = info_span!("batch", batch_idx);
248 let batch = mz_ore::task::spawn_blocking(
249 || "data_generator-batch",
250 move || {
251 batch_span
252 .in_scope(|| data_generator.gen_batch(usize::cast_from(batch_idx)))
253 },
254 )
255 .await;
256 trace!("data generator {} made a batch", idx);
257 let batch = match batch {
258 Some(x) => x,
259 None => {
260 let records_sent = usize::cast_from(batch_idx) * args.batch_size;
261 let finished = format!(
262 "Data generator {} finished after {} ms and sent {} records",
263 idx,
264 start.elapsed().as_millis(),
265 records_sent
266 );
267 return Ok(finished);
268 }
269 };
270 batch_idx += 1;
271
272 let elapsed = start.elapsed();
275 let next_batch_time = time_per_batch * (batch_idx);
276 let sleep = next_batch_time.saturating_sub(elapsed);
277 if sleep > Duration::ZERO {
278 async {
279 debug!("Data generator ahead of schedule, sleeping for {:?}", sleep);
280 tokio::time::sleep(sleep).await
281 }
282 .instrument(info_span!("throttle"))
283 .await;
284 }
285
286 if let Err(SendError(_)) = tx.send(batch) {
288 bail!("receiver unexpectedly dropped");
289 }
290 trace!("data generator {} wrote a batch", idx);
291
292 if elapsed - prev_log > args.logging_granularity {
293 let records_sent = usize::cast_from(batch_idx) * args.batch_size;
294 debug!(
295 "After {} ms data generator {} has sent {} records.",
296 start.elapsed().as_millis(),
297 idx,
298 records_sent
299 );
300 prev_log = elapsed;
301 }
302 }
303 }
304 .instrument(generator_span),
305 );
306
307 generator_handles.push(data_generator_handle);
308 let b = Arc::clone(&barrier);
309
310 let writer_span = info_span!("writer", idx);
312 let writer_handle = mz_ore::task::spawn(|| format!("writer-{}", idx), async move {
313 trace!("writer {} waiting for barrier", idx);
314 b.wait().await;
315 info!("starting writer {}", idx);
316
317 let mut max_write_latency = Duration::from_millis(0);
319
320 let mut prev_log = Duration::from_millis(0);
323 let mut records_written = 0;
324
325 loop {
326 let batch = match rx.recv().await {
327 Some(batch) => batch,
328 None => break,
329 };
330
331 trace!("writer {} received a batch. writing", idx);
332 let write_start = Instant::now();
333 writer.write(batch).await?;
334
335 records_written += args.batch_size;
336 let write_latency = write_start.elapsed();
337 if write_latency > max_write_latency {
338 max_write_latency = write_latency;
339 }
340
341 let elapsed = start.elapsed();
342
343 if elapsed - prev_log > args.logging_granularity {
344 info!("After {} ms writer {} has written {} records. Max write latency {} ms most recent write latency {} ms.",
345 elapsed.as_millis(), idx, records_written, max_write_latency.as_millis(), write_latency.as_millis());
346 prev_log = elapsed;
347 }
348
349 if records_written >= num_records_total {
350 break;
351 }
352 }
353 let elapsed = start.elapsed();
354 let finished = format!(
355 "Writer {} finished after {} ms and wrote {} records. Max write latency {} ms.",
356 idx,
357 elapsed.as_millis(),
358 records_written,
359 max_write_latency.as_millis()
360 );
361
362 writer.finish().await.unwrap();
363
364 Ok(finished)
365 }.instrument(writer_span));
366
367 write_handles.push(writer_handle);
368 }
369
370 #[allow(clippy::as_conversions)]
372 for (idx, mut reader) in readers.into_iter().enumerate() {
373 let b = Arc::clone(&barrier);
374 let reader_span = info_span!("reader", idx);
376 let reader_handle = mz_ore::task::spawn(|| format!("reader-{}", idx), async move {
377 trace!("reader {} waiting for barrier", idx);
378 b.wait().await;
379 info!("starting reader {}", idx);
380
381 let mut max_read_latency = Duration::from_millis(0);
383
384 let mut max_lag = 0;
387
388 let mut prev_log = Duration::from_millis(0);
391 loop {
392 let elapsed = start.elapsed();
393 let expected_sent = elapsed.as_millis() as usize
394 / (time_per_batch.as_millis() as usize)
395 * args.batch_size;
396 let read_start = Instant::now();
397 let num_records_read = reader.num_records().await?;
398 let read_latency = read_start.elapsed();
399 let lag = if expected_sent > num_records_read {
400 expected_sent - num_records_read
401 } else {
402 0
403 };
404 if lag > max_lag {
405 max_lag = lag;
406 }
407
408 if read_latency > max_read_latency {
409 max_read_latency = read_latency;
410 }
411
412 if elapsed - prev_log > args.logging_granularity {
413 let elapsed_seconds = elapsed.as_secs();
414 let mb_read = (num_records_read * args.record_size_bytes) as f64 / MIB as f64;
415 let throughput = mb_read / elapsed_seconds as f64;
416 info!("After {} ms reader {} has read {} records (throughput {:.3} MiB/s). Max read lag {} records, most recent read lag {} records. Max read latency {} ms, most recent read latency {} ms",
417 elapsed.as_millis(), idx, num_records_read, throughput, max_lag, lag, max_read_latency.as_millis(), read_latency.as_millis());
418 prev_log = elapsed;
419 }
420 if num_records_read == num_records_total {
421 let elapsed_seconds = elapsed.as_secs();
422 let mb_read = (num_records_read * args.record_size_bytes) as f64 / MIB as f64;
423 let throughput = mb_read / elapsed_seconds as f64;
424 let finished = format!("Reader {} finished after {} ms and read {} records (throughput {:.3} MiB/s). Max read lag {} records. Max read latency {} ms.",
425 idx, elapsed.as_millis(), num_records_read, throughput, max_lag, max_read_latency.as_millis());
426 return Ok((finished, reader));
427 }
428 }
429 }.instrument(reader_span));
430 read_handles.push(reader_handle);
431 }
432
433 for handle in generator_handles {
434 match handle.await {
435 Ok(finished) => info!("{}", finished),
436 Err(e) => error!("error: {:?}", e),
437 }
438 }
439 for handle in write_handles {
440 match handle.await {
441 Ok(finished) => info!("{}", finished),
442 Err(e) => error!("error: {:?}", e),
443 }
444 }
445 for handle in read_handles {
446 match handle.await {
447 Ok((finished, _)) => info!("{}", finished),
448 Err(e) => error!("error: {:?}", e),
449 }
450 }
451
452 if let Some(metrics_file) = args.metrics_file {
453 let mut file = File::create(metrics_file)?;
454 let encoder = prometheus::TextEncoder::new();
455 encoder.encode(&metrics_registry.gather(), &mut file)?;
456 file.sync_all()?;
457 }
458 eprintln!("write amp: {}", metrics.write_amplification());
459
460 Ok(())
461}
462
463mod api {
464 use async_trait::async_trait;
465 use mz_persist::indexed::columnar::ColumnarRecords;
466
467 #[async_trait]
469 pub trait BenchmarkWriter {
470 async fn write(&mut self, batch: ColumnarRecords) -> Result<(), anyhow::Error>;
472
473 async fn finish(self) -> Result<(), anyhow::Error>;
477 }
478
479 #[async_trait]
482 pub trait BenchmarkReader {
483 async fn num_records(&mut self) -> Result<usize, anyhow::Error>;
484 }
485}
486
487mod raw_persist_benchmark {
488 use std::sync::Arc;
489
490 use async_trait::async_trait;
491 use mz_ore::cast::CastFrom;
492 use mz_ore::task::JoinHandle;
493 use mz_persist::indexed::columnar::ColumnarRecords;
494 use mz_persist_client::read::{Listen, ListenEvent};
495 use mz_persist_client::{Diagnostics, PersistClient, ShardId};
496 use mz_persist_types::Codec64;
497 use mz_persist_types::codec_impls::VecU8Schema;
498 use timely::progress::Antichain;
499 use tokio::sync::mpsc::Sender;
500 use tracing::{Instrument, info_span};
501
502 use crate::open_loop::api::{BenchmarkReader, BenchmarkWriter};
503
504 pub async fn setup_raw_persist(
505 persist: PersistClient,
506 id: ShardId,
507 num_writers: usize,
508 num_readers: usize,
509 ) -> Result<
510 (
511 Vec<RawBenchmarkWriter>,
512 Vec<Listen<Vec<u8>, Vec<u8>, u64, i64>>,
513 ),
514 anyhow::Error,
515 > {
516 let mut writers = vec![];
517 for idx in 0..num_writers {
518 let writer = RawBenchmarkWriter::new(&persist, id, idx).await?;
519
520 writers.push(writer);
521 }
522
523 let mut readers = vec![];
524 for _ in 0..num_readers {
525 let reader = persist
526 .open_leased_reader::<Vec<u8>, Vec<u8>, u64, i64>(
527 id,
528 Arc::new(VecU8Schema),
529 Arc::new(VecU8Schema),
530 Diagnostics::from_purpose("open loop"),
531 true,
532 )
533 .await?;
534
535 let listen = reader
536 .listen(Antichain::from_elem(0))
537 .await
538 .expect("cannot serve requested as_of");
539 readers.push(listen);
540 }
541
542 Ok((writers, readers))
543 }
544
545 pub struct RawBenchmarkWriter {
546 tx: Option<Sender<ColumnarRecords>>,
547 #[allow(dead_code)]
548 handles: Vec<JoinHandle<()>>,
549 }
550
551 impl RawBenchmarkWriter {
552 async fn new(
553 persist: &PersistClient,
554 id: ShardId,
555 idx: usize,
556 ) -> Result<Self, anyhow::Error> {
557 let mut handles = Vec::<JoinHandle<()>>::new();
558 let (records_tx, mut records_rx) = tokio::sync::mpsc::channel::<ColumnarRecords>(2);
559 let (batch_tx, mut batch_rx) = tokio::sync::mpsc::channel(10);
560
561 let write = persist
562 .open_writer::<Vec<u8>, Vec<u8>, u64, i64>(
563 id,
564 Arc::new(VecU8Schema),
565 Arc::new(VecU8Schema),
566 Diagnostics::from_purpose("open loop"),
567 )
568 .await?;
569
570 let batch_writer_span = info_span!("batch-writer", idx);
572 let handle = mz_ore::task::spawn(
573 || format!("batch-writer-{}", idx),
574 async move {
575 let mut current_upper = timely::progress::Timestamp::minimum();
576 while let Some(records) = records_rx.recv().await {
577 let mut max_ts = 0;
578 let current_upper_chain = Antichain::from_elem(current_upper);
579
580 let mut builder = write.builder(current_upper_chain);
581
582 for ((k, v), t, d) in records.iter() {
583 builder
584 .add(&k.to_vec(), &v.to_vec(), &u64::decode(t), &i64::decode(d))
585 .await
586 .expect("invalid usage");
587
588 max_ts = std::cmp::max(max_ts, u64::decode(t));
589 }
590
591 max_ts = max_ts + 1;
592 let new_upper_chain = Antichain::from_elem(max_ts);
593 current_upper = max_ts;
594
595 let batch = builder
596 .finish(new_upper_chain)
597 .await
598 .expect("invalid usage");
599
600 match batch_tx.send(batch).await {
601 Ok(_) => (),
602 Err(e) => panic!("send error: {}", e),
603 }
604 }
605 }
606 .instrument(batch_writer_span),
607 );
608 handles.push(handle);
609
610 let mut write = persist
611 .open_writer::<Vec<u8>, Vec<u8>, u64, i64>(
612 id,
613 Arc::new(VecU8Schema),
614 Arc::new(VecU8Schema),
615 Diagnostics::from_purpose("open loop"),
616 )
617 .await?;
618
619 let appender_span = info_span!("appender", idx);
621 let handle = mz_ore::task::spawn(
622 || format!("appender-{}", idx),
623 async move {
624 while let Some(batch) = batch_rx.recv().await {
625 let lower = batch.lower().clone();
626 let upper = batch.upper().clone();
627 write
628 .append_batch(batch, lower, upper)
629 .await
630 .expect("invalid usage")
631 .expect("unexpected upper");
632 }
633 }
634 .instrument(appender_span),
635 );
636 handles.push(handle);
637
638 let writer = RawBenchmarkWriter {
639 tx: Some(records_tx),
640 handles,
641 };
642
643 Ok(writer)
644 }
645 }
646
647 #[async_trait]
648 impl BenchmarkWriter for RawBenchmarkWriter {
649 async fn write(&mut self, batch: ColumnarRecords) -> Result<(), anyhow::Error> {
650 self.tx
651 .as_mut()
652 .expect("writer was already finished")
653 .send(batch)
654 .await
655 .expect("writer send error");
656 Ok(())
657 }
658
659 async fn finish(mut self) -> Result<(), anyhow::Error> {
660 self.tx.take().expect("already finished");
661
662 for handle in self.handles.drain(..) {
663 let () = handle.await;
664 }
665
666 Ok(())
667 }
668 }
669
670 #[async_trait]
671 impl BenchmarkReader for Listen<Vec<u8>, Vec<u8>, u64, i64> {
672 async fn num_records(&mut self) -> Result<usize, anyhow::Error> {
673 let mut count = 0;
677 let events = self.fetch_next().await;
678
679 for event in events {
680 if let ListenEvent::Progress(t) = event {
681 count = usize::cast_from(t.elements()[0]);
682 }
683 }
684
685 Ok(count)
686 }
687 }
688}