mz_persist/
workload.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A configurable data generator for benchmarking.
11
12use std::cmp;
13use std::env::{self, VarError};
14use std::io::Write;
15use std::mem::size_of;
16
17use mz_ore::cast::CastFrom;
18use mz_persist_types::Codec64;
19
20use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsBuilder};
21use crate::metrics::ColumnarMetrics;
22
23/// A configurable data generator for benchmarking.
24#[derive(Clone, Debug)]
25pub struct DataGenerator {
26    /// The total number of records to produce.
27    pub record_count: usize,
28    /// The number of "goodput" bytes to make each record.
29    pub record_size_bytes: usize,
30    /// The maximum number of records included in a generated batch of records.
31    pub batch_max_count: usize,
32    // TODO: unique: bool,
33    key_buf: Vec<u8>,
34    val_buf: Vec<u8>,
35}
36
37const RECORD_SIZE_BYTES_KEY: &str = "MZ_PERSIST_RECORD_SIZE_BYTES";
38const RECORD_COUNT_KEY: &str = "MZ_PERSIST_RECORD_COUNT";
39const BATCH_MAX_COUNT_KEY: &str = "MZ_PERSIST_BATCH_MAX_COUNT";
40
41const RECORD_SIZE_BYTES_KEY_SMALL: &str = "MZ_PERSIST_RECORD_SIZE_BYTES_SMALL";
42const RECORD_COUNT_KEY_SMALL: &str = "MZ_PERSIST_RECORD_COUNT_SMALL";
43const BATCH_MAX_COUNT_KEY_SMALL: &str = "MZ_PERSIST_BATCH_MAX_COUNT_SMALL";
44
45// Selected arbitrarily as representative of production record sizes.
46const DEFAULT_RECORD_SIZE_BYTES: usize = 64;
47// Selected arbitrarily to make ~8MiB batches.
48const DEFAULT_BATCH_MAX_COUNT: usize = (8 * 1024 * 1024) / DEFAULT_RECORD_SIZE_BYTES;
49// Manually tuned once to be the smallest value such that (1) we hit max
50// throughput on the end_to_end benchmark and (2) goodput_pretty produces a
51// round-ish number.
52const DEFAULT_RECORD_COUNT: usize = 819_200;
53
54// Selected arbitrarily as representative of production record sizes.
55const DEFAULT_RECORD_SIZE_BYTES_SMALL: usize = 1024;
56// Selected to produce a small number of batches with default settings.
57const DEFAULT_BATCH_MAX_COUNT_SMALL: usize = DEFAULT_RECORD_COUNT_SMALL / 8;
58// Selected arbitrarily to make ~64KiB of data.
59const DEFAULT_RECORD_COUNT_SMALL: usize = (64 * 1024) / DEFAULT_RECORD_SIZE_BYTES_SMALL;
60
61const TS_DIFF_GOODPUT_SIZE: usize = size_of::<u64>() + size_of::<i64>();
62
63fn read_env_usize(key: &str, default: usize) -> usize {
64    match env::var(key) {
65        Ok(x) => x
66            .parse()
67            .map_err(|err| format!("invalid value for {}: {}", key, err))
68            .unwrap(),
69        Err(VarError::NotPresent) => default,
70        Err(err) => panic!("invalid value for {}: {}", key, err),
71    }
72}
73
74impl Default for DataGenerator {
75    fn default() -> Self {
76        let record_count = read_env_usize(RECORD_COUNT_KEY, DEFAULT_RECORD_COUNT);
77        let record_size_bytes = read_env_usize(RECORD_SIZE_BYTES_KEY, DEFAULT_RECORD_SIZE_BYTES);
78        let batch_max_count = read_env_usize(BATCH_MAX_COUNT_KEY, DEFAULT_BATCH_MAX_COUNT);
79
80        eprintln!(
81            "{}={} {}={} {}={}",
82            RECORD_COUNT_KEY,
83            record_count,
84            RECORD_SIZE_BYTES_KEY,
85            record_size_bytes,
86            BATCH_MAX_COUNT_KEY,
87            batch_max_count,
88        );
89        DataGenerator::new(record_count, record_size_bytes, batch_max_count)
90    }
91}
92
93impl DataGenerator {
94    /// Returns a new [DataGenerator].
95    pub fn new(record_count: usize, record_size_bytes: usize, batch_max_count: usize) -> Self {
96        // NB: Strict greater so we have at least one byte for key.
97        assert!(record_size_bytes > TS_DIFF_GOODPUT_SIZE);
98        assert!(batch_max_count > 0);
99        DataGenerator {
100            record_count,
101            record_size_bytes,
102            batch_max_count,
103            key_buf: Vec::new(),
104            val_buf: Vec::new(),
105        }
106    }
107
108    /// Returns a new [DataGenerator] specifically for testing small data volumes.
109    pub fn small() -> Self {
110        let record_count_small = read_env_usize(RECORD_COUNT_KEY_SMALL, DEFAULT_RECORD_COUNT_SMALL);
111        let record_size_bytes_small =
112            read_env_usize(RECORD_SIZE_BYTES_KEY_SMALL, DEFAULT_RECORD_SIZE_BYTES_SMALL);
113        let batch_max_count_small =
114            read_env_usize(BATCH_MAX_COUNT_KEY_SMALL, DEFAULT_BATCH_MAX_COUNT_SMALL);
115
116        eprintln!(
117            "{}={} {}={} {}={}",
118            RECORD_COUNT_KEY_SMALL,
119            record_count_small,
120            RECORD_SIZE_BYTES_KEY_SMALL,
121            record_size_bytes_small,
122            BATCH_MAX_COUNT_KEY_SMALL,
123            batch_max_count_small,
124        );
125        DataGenerator::new(
126            record_count_small,
127            record_size_bytes_small,
128            batch_max_count_small,
129        )
130    }
131
132    /// Returns the number of "goodput" bytes represented by the entire dataset
133    /// produced by this generator.
134    pub fn goodput_bytes(&self) -> u64 {
135        u64::cast_from(self.record_count * self.record_size_bytes)
136    }
137
138    /// Returns a more easily human readable version of [Self::goodput_bytes].
139    pub fn goodput_pretty(&self) -> String {
140        let goodput_bytes = self.goodput_bytes();
141        const KIB: u64 = 1024;
142        const MIB: u64 = 1024 * KIB;
143        const GIB: u64 = 1024 * MIB;
144        if goodput_bytes >= 10 * GIB {
145            format!("{}GiB", goodput_bytes / GIB)
146        } else if goodput_bytes >= 10 * MIB {
147            format!("{}MiB", goodput_bytes / MIB)
148        } else if goodput_bytes >= 10 * KIB {
149            format!("{}KiB", goodput_bytes / KIB)
150        } else {
151            format!("{}B", goodput_bytes)
152        }
153    }
154
155    /// Generates the requested batch of records.
156    pub fn gen_batch(&mut self, batch_idx: usize) -> Option<ColumnarRecords> {
157        let batch_start = self.batch_max_count * batch_idx;
158        let batch_end = cmp::min(batch_start + self.batch_max_count, self.record_count);
159        if batch_start >= batch_end {
160            return None;
161        }
162        let items = batch_end - batch_start;
163        let mut batch = ColumnarRecordsBuilder::with_capacity(
164            items,
165            (self.record_size_bytes - TS_DIFF_GOODPUT_SIZE) * items,
166            0,
167        );
168        for record_idx in batch_start..batch_end {
169            let (kv, t, d) = self.gen_record(record_idx);
170            assert!(
171                batch.push((kv, Codec64::encode(&t), Codec64::encode(&d))),
172                "generator exceeded batch size; smaller batches needed"
173            );
174        }
175        Some(batch.finish(&ColumnarMetrics::disconnected()))
176    }
177
178    fn gen_record(&mut self, record_idx: usize) -> ((&[u8], &[u8]), u64, i64) {
179        assert!(record_idx < self.record_count);
180        assert!(self.record_size_bytes > TS_DIFF_GOODPUT_SIZE);
181
182        self.key_buf.clear();
183        let key_len = self.record_size_bytes - TS_DIFF_GOODPUT_SIZE;
184        if self.key_buf.capacity() < key_len {
185            self.key_buf.reserve(key_len);
186        }
187        // This format `record_idx` as an integer and, if necessary, left-pads
188        // it with 0s to be `key_len` chars long.
189        write!(&mut self.key_buf, "{:01$}", record_idx, key_len)
190            .expect("write to Vec is infallible");
191        self.key_buf.truncate(key_len);
192        assert_eq!(self.key_buf.len(), key_len);
193
194        self.val_buf.clear();
195
196        let ts = u64::cast_from(record_idx);
197        let diff = 1;
198        ((&self.key_buf, &self.val_buf), ts, diff)
199    }
200
201    /// Returns an [Iterator] of all records in batches.
202    pub fn batches(&self) -> DataGeneratorBatchIter {
203        DataGeneratorBatchIter {
204            config: self.clone(),
205            batch_idx: 0,
206        }
207    }
208
209    /// Returns an [Iterator] of all records.
210    pub fn records(&self) -> impl Iterator<Item = ((Vec<u8>, Vec<u8>), u64, i64)> {
211        let mut config = self.clone();
212        (0..self.record_count).map(move |record_idx| {
213            let ((k, v), t, d) = config.gen_record(record_idx);
214            ((k.to_vec(), v.to_vec()), t, d)
215        })
216    }
217}
218
219/// An implementation of [Iterator] for [DataGenerator].
220#[derive(Debug)]
221pub struct DataGeneratorBatchIter {
222    config: DataGenerator,
223    batch_idx: usize,
224}
225
226impl Iterator for DataGeneratorBatchIter {
227    type Item = ColumnarRecords;
228
229    fn next(&mut self) -> Option<Self::Item> {
230        let batch_idx = self.batch_idx;
231        self.batch_idx += 1;
232        self.config.gen_batch(batch_idx)
233    }
234}
235
236/// Encodes the given data into a flat buffer that is exactly
237/// `data.goodput_bytes()` long.
238pub fn flat_blob(data: &DataGenerator) -> Vec<u8> {
239    let mut buf = Vec::with_capacity(usize::cast_from(data.goodput_bytes()));
240    for batch in data.batches() {
241        for ((k, v), t, d) in batch.iter() {
242            buf.extend_from_slice(k);
243            buf.extend_from_slice(v);
244            buf.extend_from_slice(&t);
245            buf.extend_from_slice(&d);
246        }
247    }
248    assert_eq!(buf.len(), usize::cast_from(data.goodput_bytes()));
249    buf
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255
256    #[mz_ore::test]
257    fn size_invariants() {
258        fn testcase(c: DataGenerator) {
259            let (mut actual_len, mut actual_goodput_bytes) = (0, 0);
260            for batch in c.batches() {
261                for ((k, v), _, _) in batch.iter() {
262                    actual_len += 1;
263                    actual_goodput_bytes += k.len() + v.len() + TS_DIFF_GOODPUT_SIZE;
264                }
265            }
266            assert_eq!(actual_len, c.record_count);
267            assert_eq!(actual_goodput_bytes, usize::cast_from(c.goodput_bytes()));
268        }
269
270        testcase(DataGenerator::new(1, 32, 1));
271        testcase(DataGenerator::new(1, 32, 100));
272        testcase(DataGenerator::new(100, 32, 7));
273        testcase(DataGenerator::new(1000, 32, 100));
274    }
275
276    #[mz_ore::test]
277    fn goodput_pretty() {
278        fn testcase(bytes: usize) -> String {
279            DataGenerator::new(1, bytes, 1).goodput_pretty()
280        }
281
282        assert_eq!(testcase(33), "33B");
283        assert_eq!(testcase(10 * 1024 - 1), "10239B");
284        assert_eq!(testcase(10 * 1024), "10KiB");
285        assert_eq!(testcase(10 * 1024 * 1024 - 1), "10239KiB");
286        assert_eq!(testcase(10 * 1024 * 1024), "10MiB");
287        assert_eq!(testcase(10 * 1024 * 1024 * 1024 - 1), "10239MiB");
288        assert_eq!(testcase(10 * 1024 * 1024 * 1024), "10GiB");
289        assert_eq!(testcase(10 * 1024 * 1024 * 1024 * 1024 - 1), "10239GiB");
290        assert_eq!(testcase(10 * 1024 * 1024 * 1024 * 1024), "10240GiB"); // No TiB
291    }
292}