1#![allow(clippy::cast_precision_loss)]
11
12use std::fs::File;
13use std::future::IntoFuture;
14use std::net::SocketAddr;
15use std::str::FromStr;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use anyhow::bail;
20use mz_ore::cast::CastFrom;
21use mz_ore::metrics::MetricsRegistry;
22use mz_ore::now::SYSTEM_TIME;
23use mz_ore::task::JoinHandle;
24use mz_ore::url::SensitiveUrl;
25use mz_persist::workload::DataGenerator;
26use mz_persist_client::cache::PersistClientCache;
27use mz_persist_client::cfg::PersistConfig;
28use mz_persist_client::metrics::Metrics;
29use mz_persist_client::rpc::PubSubClientConnection;
30use mz_persist_client::{PersistLocation, ShardId};
31use prometheus::Encoder;
32use tokio::net::TcpListener;
33use tokio::sync::Barrier;
34use tokio::sync::mpsc::error::SendError;
35use tracing::{Instrument, debug, error, info, info_span, trace};
36
37use crate::open_loop::api::{BenchmarkReader, BenchmarkWriter};
38
39#[derive(clap::ValueEnum, Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
41enum BenchmarkType {
42 RawWriter,
44 MzSourceModel,
47}
48
49#[derive(Debug, clap::Parser)]
51pub struct Args {
52 #[clap(long, value_name = "W", default_value_t = 1)]
54 num_writers: usize,
55
56 #[clap(long, value_name = "R", default_value_t = 1)]
58 num_readers: usize,
59
60 #[clap(long, value_name = "CONSENSUS_URI")]
62 consensus_uri: SensitiveUrl,
63
64 #[clap(long, value_name = "BLOB_URI")]
66 blob_uri: SensitiveUrl,
67
68 #[clap(value_enum, long, default_value_t = BenchmarkType::RawWriter)]
70 benchmark_type: BenchmarkType,
71
72 #[clap(long, value_parser = humantime::parse_duration, value_name = "S", default_value = "60s")]
74 runtime: Duration,
75
76 #[clap(long, value_name = "R", default_value_t = 100)]
78 records_per_second: usize,
79
80 #[clap(long, value_name = "B", default_value_t = 64)]
82 record_size_bytes: usize,
83
84 #[clap(long, env = "", value_name = "R", default_value_t = 100)]
86 batch_size: usize,
87
88 #[clap(long, value_parser = humantime::parse_duration, value_name = "L", default_value = "1s")]
90 logging_granularity: Duration,
91
92 #[clap(short, long, value_name = "I")]
94 shard_id: Option<String>,
95
96 #[clap(long, value_name = "HOST:PORT", default_value = "127.0.0.1:6878")]
98 internal_http_listen_addr: SocketAddr,
99
100 #[clap(long)]
102 metrics_file: Option<String>,
103}
104
105const MIB: u64 = 1024 * 1024;
106
107pub async fn run(args: Args) -> Result<(), anyhow::Error> {
108 let metrics_registry = MetricsRegistry::new();
109 {
110 let metrics_registry = metrics_registry.clone();
111 info!(
112 "serving internal HTTP server on http://{}/metrics",
113 args.internal_http_listen_addr
114 );
115 let listener = TcpListener::bind(&args.internal_http_listen_addr)
116 .await
117 .expect("can bind");
118 mz_ore::task::spawn(
119 || "http_server",
120 axum::serve(
121 listener,
122 axum::Router::new()
123 .route(
124 "/metrics",
125 axum::routing::get(move || async move {
126 mz_http_util::handle_prometheus(&metrics_registry).await
127 }),
128 )
129 .into_make_service(),
130 )
131 .into_future(),
132 );
133 }
134
135 let location = PersistLocation {
136 blob_uri: args.blob_uri.clone(),
137 consensus_uri: args.consensus_uri.clone(),
138 };
139 let persist = PersistClientCache::new(
140 PersistConfig::new_default_configs(&mz_persist_client::BUILD_INFO, SYSTEM_TIME.clone()),
141 &metrics_registry,
142 |_, _| PubSubClientConnection::noop(),
143 )
144 .open(location)
145 .await?;
146
147 let shard_id = match args.shard_id.clone() {
148 Some(shard_id) => ShardId::from_str(&shard_id).map_err(anyhow::Error::msg)?,
149 None => ShardId::new(),
150 };
151
152 let metrics = Arc::clone(persist.metrics());
153 let (writers, readers) = match args.benchmark_type.clone() {
154 BenchmarkType::RawWriter => {
155 raw_persist_benchmark::setup_raw_persist(
156 persist,
157 shard_id,
158 args.num_writers,
159 args.num_readers,
160 )
161 .await?
162 }
163 BenchmarkType::MzSourceModel => panic!("source model"),
164 };
165
166 run_benchmark(args, metrics_registry, metrics, writers, readers).await
167}
168
169async fn run_benchmark<W, R>(
170 args: Args,
171 metrics_registry: MetricsRegistry,
172 metrics: Arc<Metrics>,
173 writers: Vec<W>,
174 readers: Vec<R>,
175) -> Result<(), anyhow::Error>
176where
177 W: BenchmarkWriter + Send + Sync + 'static,
178 R: BenchmarkReader + Send + Sync + 'static,
179{
180 let num_records_total = args.records_per_second * usize::cast_from(args.runtime.as_secs());
181 let data_generator =
182 DataGenerator::new(num_records_total, args.record_size_bytes, args.batch_size);
183
184 let benchmark_description = format!(
185 "num-readers={} num-writers={} runtime={:?} num_records_total={} records-per-second={} record-size-bytes={} batch-size={}",
186 args.num_readers,
187 args.num_writers,
188 args.runtime,
189 num_records_total,
190 args.records_per_second,
191 args.record_size_bytes,
192 args.batch_size
193 );
194
195 info!("starting benchmark: {}", benchmark_description);
196 let mut generator_handles: Vec<JoinHandle<Result<String, anyhow::Error>>> = vec![];
197 let mut write_handles: Vec<JoinHandle<Result<String, anyhow::Error>>> = vec![];
198 let mut read_handles: Vec<JoinHandle<Result<(String, R), anyhow::Error>>> = vec![];
199
200 let start = Instant::now();
203 let barrier = Arc::new(Barrier::new(2 * args.num_writers + args.num_readers));
206
207 #[allow(clippy::as_conversions)]
211 let time_per_batch = {
212 let records_per_second_f64 = args.records_per_second as f64;
213 let batch_size_f64 = args.batch_size as f64;
214
215 let batches_per_second = records_per_second_f64 / batch_size_f64;
216 Duration::from_secs(1).div_f64(batches_per_second)
217 };
218
219 for (idx, mut writer) in writers.into_iter().enumerate() {
220 let b = Arc::clone(&barrier);
221 let data_generator = data_generator.clone();
222 let start = start.clone();
223 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
224
225 let generator_span = info_span!("generator", idx);
227 let data_generator_handle = mz_ore::task::spawn(
228 || format!("data-generator-{}", idx),
229 async move {
230 trace!("data generator {} waiting for barrier", idx);
231 b.wait().await;
232 info!("starting data generator {}", idx);
233
234 let mut batch_idx = 0;
237 let mut prev_log = Duration::from_millis(0);
240 loop {
241 let mut data_generator = data_generator.clone();
245 let batch_span = info_span!("batch", batch_idx);
248 let batch = mz_ore::task::spawn_blocking(
249 || "data_generator-batch",
250 move || {
251 batch_span
252 .in_scope(|| data_generator.gen_batch(usize::cast_from(batch_idx)))
253 },
254 )
255 .await
256 .expect("task failed");
257 trace!("data generator {} made a batch", idx);
258 let batch = match batch {
259 Some(x) => x,
260 None => {
261 let records_sent = usize::cast_from(batch_idx) * args.batch_size;
262 let finished = format!(
263 "Data generator {} finished after {} ms and sent {} records",
264 idx,
265 start.elapsed().as_millis(),
266 records_sent
267 );
268 return Ok(finished);
269 }
270 };
271 batch_idx += 1;
272
273 let elapsed = start.elapsed();
276 let next_batch_time = time_per_batch * (batch_idx);
277 let sleep = next_batch_time.saturating_sub(elapsed);
278 if sleep > Duration::ZERO {
279 async {
280 debug!("Data generator ahead of schedule, sleeping for {:?}", sleep);
281 tokio::time::sleep(sleep).await
282 }
283 .instrument(info_span!("throttle"))
284 .await;
285 }
286
287 if let Err(SendError(_)) = tx.send(batch) {
289 bail!("receiver unexpectedly dropped");
290 }
291 trace!("data generator {} wrote a batch", idx);
292
293 if elapsed - prev_log > args.logging_granularity {
294 let records_sent = usize::cast_from(batch_idx) * args.batch_size;
295 debug!(
296 "After {} ms data generator {} has sent {} records.",
297 start.elapsed().as_millis(),
298 idx,
299 records_sent
300 );
301 prev_log = elapsed;
302 }
303 }
304 }
305 .instrument(generator_span),
306 );
307
308 generator_handles.push(data_generator_handle);
309 let b = Arc::clone(&barrier);
310
311 let writer_span = info_span!("writer", idx);
313 let writer_handle = mz_ore::task::spawn(|| format!("writer-{}", idx), async move {
314 trace!("writer {} waiting for barrier", idx);
315 b.wait().await;
316 info!("starting writer {}", idx);
317
318 let mut max_write_latency = Duration::from_millis(0);
320
321 let mut prev_log = Duration::from_millis(0);
324 let mut records_written = 0;
325
326 loop {
327 let batch = match rx.recv().await {
328 Some(batch) => batch,
329 None => break,
330 };
331
332 trace!("writer {} received a batch. writing", idx);
333 let write_start = Instant::now();
334 writer.write(batch).await?;
335
336 records_written += args.batch_size;
337 let write_latency = write_start.elapsed();
338 if write_latency > max_write_latency {
339 max_write_latency = write_latency;
340 }
341
342 let elapsed = start.elapsed();
343
344 if elapsed - prev_log > args.logging_granularity {
345 info!("After {} ms writer {} has written {} records. Max write latency {} ms most recent write latency {} ms.",
346 elapsed.as_millis(), idx, records_written, max_write_latency.as_millis(), write_latency.as_millis());
347 prev_log = elapsed;
348 }
349
350 if records_written >= num_records_total {
351 break;
352 }
353 }
354 let elapsed = start.elapsed();
355 let finished = format!(
356 "Writer {} finished after {} ms and wrote {} records. Max write latency {} ms.",
357 idx,
358 elapsed.as_millis(),
359 records_written,
360 max_write_latency.as_millis()
361 );
362
363 writer.finish().await.unwrap();
364
365 Ok(finished)
366 }.instrument(writer_span));
367
368 write_handles.push(writer_handle);
369 }
370
371 #[allow(clippy::as_conversions)]
373 for (idx, mut reader) in readers.into_iter().enumerate() {
374 let b = Arc::clone(&barrier);
375 let reader_span = info_span!("reader", idx);
377 let reader_handle = mz_ore::task::spawn(|| format!("reader-{}", idx), async move {
378 trace!("reader {} waiting for barrier", idx);
379 b.wait().await;
380 info!("starting reader {}", idx);
381
382 let mut max_read_latency = Duration::from_millis(0);
384
385 let mut max_lag = 0;
388
389 let mut prev_log = Duration::from_millis(0);
392 loop {
393 let elapsed = start.elapsed();
394 let expected_sent = elapsed.as_millis() as usize
395 / (time_per_batch.as_millis() as usize)
396 * args.batch_size;
397 let read_start = Instant::now();
398 let num_records_read = reader.num_records().await?;
399 let read_latency = read_start.elapsed();
400 let lag = if expected_sent > num_records_read {
401 expected_sent - num_records_read
402 } else {
403 0
404 };
405 if lag > max_lag {
406 max_lag = lag;
407 }
408
409 if read_latency > max_read_latency {
410 max_read_latency = read_latency;
411 }
412
413 if elapsed - prev_log > args.logging_granularity {
414 let elapsed_seconds = elapsed.as_secs();
415 let mb_read = (num_records_read * args.record_size_bytes) as f64 / MIB as f64;
416 let throughput = mb_read / elapsed_seconds as f64;
417 info!("After {} ms reader {} has read {} records (throughput {:.3} MiB/s). Max read lag {} records, most recent read lag {} records. Max read latency {} ms, most recent read latency {} ms",
418 elapsed.as_millis(), idx, num_records_read, throughput, max_lag, lag, max_read_latency.as_millis(), read_latency.as_millis());
419 prev_log = elapsed;
420 }
421 if num_records_read == num_records_total {
422 let elapsed_seconds = elapsed.as_secs();
423 let mb_read = (num_records_read * args.record_size_bytes) as f64 / MIB as f64;
424 let throughput = mb_read / elapsed_seconds as f64;
425 let finished = format!("Reader {} finished after {} ms and read {} records (throughput {:.3} MiB/s). Max read lag {} records. Max read latency {} ms.",
426 idx, elapsed.as_millis(), num_records_read, throughput, max_lag, max_read_latency.as_millis());
427 return Ok((finished, reader));
428 }
429 }
430 }.instrument(reader_span));
431 read_handles.push(reader_handle);
432 }
433
434 for handle in generator_handles {
435 match handle.await? {
436 Ok(finished) => info!("{}", finished),
437 Err(e) => error!("error: {:?}", e),
438 }
439 }
440 for handle in write_handles {
441 match handle.await? {
442 Ok(finished) => info!("{}", finished),
443 Err(e) => error!("error: {:?}", e),
444 }
445 }
446 for handle in read_handles {
447 match handle.await? {
448 Ok((finished, _)) => info!("{}", finished),
449 Err(e) => error!("error: {:?}", e),
450 }
451 }
452
453 if let Some(metrics_file) = args.metrics_file {
454 let mut file = File::create(metrics_file)?;
455 let encoder = prometheus::TextEncoder::new();
456 encoder.encode(&metrics_registry.gather(), &mut file)?;
457 file.sync_all()?;
458 }
459 eprintln!("write amp: {}", metrics.write_amplification());
460
461 Ok(())
462}
463
464mod api {
465 use async_trait::async_trait;
466 use mz_persist::indexed::columnar::ColumnarRecords;
467
468 #[async_trait]
470 pub trait BenchmarkWriter {
471 async fn write(&mut self, batch: ColumnarRecords) -> Result<(), anyhow::Error>;
473
474 async fn finish(self) -> Result<(), anyhow::Error>;
478 }
479
480 #[async_trait]
483 pub trait BenchmarkReader {
484 async fn num_records(&mut self) -> Result<usize, anyhow::Error>;
485 }
486}
487
488mod raw_persist_benchmark {
489 use std::sync::Arc;
490
491 use async_trait::async_trait;
492 use mz_ore::cast::CastFrom;
493 use mz_ore::task::JoinHandle;
494 use mz_persist::indexed::columnar::ColumnarRecords;
495 use mz_persist_client::read::{Listen, ListenEvent};
496 use mz_persist_client::{Diagnostics, PersistClient, ShardId};
497 use mz_persist_types::Codec64;
498 use mz_persist_types::codec_impls::VecU8Schema;
499 use timely::progress::Antichain;
500 use tokio::sync::mpsc::Sender;
501 use tracing::{Instrument, info_span};
502
503 use crate::open_loop::api::{BenchmarkReader, BenchmarkWriter};
504
505 pub async fn setup_raw_persist(
506 persist: PersistClient,
507 id: ShardId,
508 num_writers: usize,
509 num_readers: usize,
510 ) -> Result<
511 (
512 Vec<RawBenchmarkWriter>,
513 Vec<Listen<Vec<u8>, Vec<u8>, u64, i64>>,
514 ),
515 anyhow::Error,
516 > {
517 let mut writers = vec![];
518 for idx in 0..num_writers {
519 let writer = RawBenchmarkWriter::new(&persist, id, idx).await?;
520
521 writers.push(writer);
522 }
523
524 let mut readers = vec![];
525 for _ in 0..num_readers {
526 let reader = persist
527 .open_leased_reader::<Vec<u8>, Vec<u8>, u64, i64>(
528 id,
529 Arc::new(VecU8Schema),
530 Arc::new(VecU8Schema),
531 Diagnostics::from_purpose("open loop"),
532 true,
533 )
534 .await?;
535
536 let listen = reader
537 .listen(Antichain::from_elem(0))
538 .await
539 .expect("cannot serve requested as_of");
540 readers.push(listen);
541 }
542
543 Ok((writers, readers))
544 }
545
546 pub struct RawBenchmarkWriter {
547 tx: Option<Sender<ColumnarRecords>>,
548 #[allow(dead_code)]
549 handles: Vec<JoinHandle<()>>,
550 }
551
552 impl RawBenchmarkWriter {
553 async fn new(
554 persist: &PersistClient,
555 id: ShardId,
556 idx: usize,
557 ) -> Result<Self, anyhow::Error> {
558 let mut handles = Vec::<JoinHandle<()>>::new();
559 let (records_tx, mut records_rx) = tokio::sync::mpsc::channel::<ColumnarRecords>(2);
560 let (batch_tx, mut batch_rx) = tokio::sync::mpsc::channel(10);
561
562 let write = persist
563 .open_writer::<Vec<u8>, Vec<u8>, u64, i64>(
564 id,
565 Arc::new(VecU8Schema),
566 Arc::new(VecU8Schema),
567 Diagnostics::from_purpose("open loop"),
568 )
569 .await?;
570
571 let batch_writer_span = info_span!("batch-writer", idx);
573 let handle = mz_ore::task::spawn(
574 || format!("batch-writer-{}", idx),
575 async move {
576 let mut current_upper = timely::progress::Timestamp::minimum();
577 while let Some(records) = records_rx.recv().await {
578 let mut max_ts = 0;
579 let current_upper_chain = Antichain::from_elem(current_upper);
580
581 let mut builder = write.builder(current_upper_chain);
582
583 for ((k, v), t, d) in records.iter() {
584 builder
585 .add(&k.to_vec(), &v.to_vec(), &u64::decode(t), &i64::decode(d))
586 .await
587 .expect("invalid usage");
588
589 max_ts = std::cmp::max(max_ts, u64::decode(t));
590 }
591
592 max_ts = max_ts + 1;
593 let new_upper_chain = Antichain::from_elem(max_ts);
594 current_upper = max_ts;
595
596 let batch = builder
597 .finish(new_upper_chain)
598 .await
599 .expect("invalid usage");
600
601 match batch_tx.send(batch).await {
602 Ok(_) => (),
603 Err(e) => panic!("send error: {}", e),
604 }
605 }
606 }
607 .instrument(batch_writer_span),
608 );
609 handles.push(handle);
610
611 let mut write = persist
612 .open_writer::<Vec<u8>, Vec<u8>, u64, i64>(
613 id,
614 Arc::new(VecU8Schema),
615 Arc::new(VecU8Schema),
616 Diagnostics::from_purpose("open loop"),
617 )
618 .await?;
619
620 let appender_span = info_span!("appender", idx);
622 let handle = mz_ore::task::spawn(
623 || format!("appender-{}", idx),
624 async move {
625 while let Some(batch) = batch_rx.recv().await {
626 let lower = batch.lower().clone();
627 let upper = batch.upper().clone();
628 write
629 .append_batch(batch, lower, upper)
630 .await
631 .expect("invalid usage")
632 .expect("unexpected upper");
633 }
634 }
635 .instrument(appender_span),
636 );
637 handles.push(handle);
638
639 let writer = RawBenchmarkWriter {
640 tx: Some(records_tx),
641 handles,
642 };
643
644 Ok(writer)
645 }
646 }
647
648 #[async_trait]
649 impl BenchmarkWriter for RawBenchmarkWriter {
650 async fn write(&mut self, batch: ColumnarRecords) -> Result<(), anyhow::Error> {
651 self.tx
652 .as_mut()
653 .expect("writer was already finished")
654 .send(batch)
655 .await
656 .expect("writer send error");
657 Ok(())
658 }
659
660 async fn finish(mut self) -> Result<(), anyhow::Error> {
661 self.tx.take().expect("already finished");
662
663 for handle in self.handles.drain(..) {
664 let () = handle.await?;
665 }
666
667 Ok(())
668 }
669 }
670
671 #[async_trait]
672 impl BenchmarkReader for Listen<Vec<u8>, Vec<u8>, u64, i64> {
673 async fn num_records(&mut self) -> Result<usize, anyhow::Error> {
674 let mut count = 0;
678 let events = self.fetch_next().await;
679
680 for event in events {
681 if let ListenEvent::Progress(t) = event {
682 count = usize::cast_from(t.elements()[0]);
683 }
684 }
685
686 Ok(count)
687 }
688 }
689}