1use std::cmp;
13use std::env::{self, VarError};
14use std::io::Write;
15use std::mem::size_of;
16
17use mz_ore::cast::CastFrom;
18use mz_persist_types::Codec64;
19
20use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsBuilder};
21use crate::metrics::ColumnarMetrics;
22
23#[derive(Clone, Debug)]
25pub struct DataGenerator {
26 pub record_count: usize,
28 pub record_size_bytes: usize,
30 pub batch_max_count: usize,
32 key_buf: Vec<u8>,
34 val_buf: Vec<u8>,
35}
36
37const RECORD_SIZE_BYTES_KEY: &str = "MZ_PERSIST_RECORD_SIZE_BYTES";
38const RECORD_COUNT_KEY: &str = "MZ_PERSIST_RECORD_COUNT";
39const BATCH_MAX_COUNT_KEY: &str = "MZ_PERSIST_BATCH_MAX_COUNT";
40
41const RECORD_SIZE_BYTES_KEY_SMALL: &str = "MZ_PERSIST_RECORD_SIZE_BYTES_SMALL";
42const RECORD_COUNT_KEY_SMALL: &str = "MZ_PERSIST_RECORD_COUNT_SMALL";
43const BATCH_MAX_COUNT_KEY_SMALL: &str = "MZ_PERSIST_BATCH_MAX_COUNT_SMALL";
44
45const DEFAULT_RECORD_SIZE_BYTES: usize = 64;
47const DEFAULT_BATCH_MAX_COUNT: usize = (8 * 1024 * 1024) / DEFAULT_RECORD_SIZE_BYTES;
49const DEFAULT_RECORD_COUNT: usize = 819_200;
53
54const DEFAULT_RECORD_SIZE_BYTES_SMALL: usize = 1024;
56const DEFAULT_BATCH_MAX_COUNT_SMALL: usize = DEFAULT_RECORD_COUNT_SMALL / 8;
58const DEFAULT_RECORD_COUNT_SMALL: usize = (64 * 1024) / DEFAULT_RECORD_SIZE_BYTES_SMALL;
60
61const TS_DIFF_GOODPUT_SIZE: usize = size_of::<u64>() + size_of::<i64>();
62
63fn read_env_usize(key: &str, default: usize) -> usize {
64 match env::var(key) {
65 Ok(x) => x
66 .parse()
67 .map_err(|err| format!("invalid value for {}: {}", key, err))
68 .unwrap(),
69 Err(VarError::NotPresent) => default,
70 Err(err) => panic!("invalid value for {}: {}", key, err),
71 }
72}
73
74impl Default for DataGenerator {
75 fn default() -> Self {
76 let record_count = read_env_usize(RECORD_COUNT_KEY, DEFAULT_RECORD_COUNT);
77 let record_size_bytes = read_env_usize(RECORD_SIZE_BYTES_KEY, DEFAULT_RECORD_SIZE_BYTES);
78 let batch_max_count = read_env_usize(BATCH_MAX_COUNT_KEY, DEFAULT_BATCH_MAX_COUNT);
79
80 eprintln!(
81 "{}={} {}={} {}={}",
82 RECORD_COUNT_KEY,
83 record_count,
84 RECORD_SIZE_BYTES_KEY,
85 record_size_bytes,
86 BATCH_MAX_COUNT_KEY,
87 batch_max_count,
88 );
89 DataGenerator::new(record_count, record_size_bytes, batch_max_count)
90 }
91}
92
93impl DataGenerator {
94 pub fn new(record_count: usize, record_size_bytes: usize, batch_max_count: usize) -> Self {
96 assert!(record_size_bytes > TS_DIFF_GOODPUT_SIZE);
98 assert!(batch_max_count > 0);
99 DataGenerator {
100 record_count,
101 record_size_bytes,
102 batch_max_count,
103 key_buf: Vec::new(),
104 val_buf: Vec::new(),
105 }
106 }
107
108 pub fn small() -> Self {
110 let record_count_small = read_env_usize(RECORD_COUNT_KEY_SMALL, DEFAULT_RECORD_COUNT_SMALL);
111 let record_size_bytes_small =
112 read_env_usize(RECORD_SIZE_BYTES_KEY_SMALL, DEFAULT_RECORD_SIZE_BYTES_SMALL);
113 let batch_max_count_small =
114 read_env_usize(BATCH_MAX_COUNT_KEY_SMALL, DEFAULT_BATCH_MAX_COUNT_SMALL);
115
116 eprintln!(
117 "{}={} {}={} {}={}",
118 RECORD_COUNT_KEY_SMALL,
119 record_count_small,
120 RECORD_SIZE_BYTES_KEY_SMALL,
121 record_size_bytes_small,
122 BATCH_MAX_COUNT_KEY_SMALL,
123 batch_max_count_small,
124 );
125 DataGenerator::new(
126 record_count_small,
127 record_size_bytes_small,
128 batch_max_count_small,
129 )
130 }
131
132 pub fn goodput_bytes(&self) -> u64 {
135 u64::cast_from(self.record_count * self.record_size_bytes)
136 }
137
138 pub fn goodput_pretty(&self) -> String {
140 let goodput_bytes = self.goodput_bytes();
141 const KIB: u64 = 1024;
142 const MIB: u64 = 1024 * KIB;
143 const GIB: u64 = 1024 * MIB;
144 if goodput_bytes >= 10 * GIB {
145 format!("{}GiB", goodput_bytes / GIB)
146 } else if goodput_bytes >= 10 * MIB {
147 format!("{}MiB", goodput_bytes / MIB)
148 } else if goodput_bytes >= 10 * KIB {
149 format!("{}KiB", goodput_bytes / KIB)
150 } else {
151 format!("{}B", goodput_bytes)
152 }
153 }
154
155 pub fn gen_batch(&mut self, batch_idx: usize) -> Option<ColumnarRecords> {
157 let batch_start = self.batch_max_count * batch_idx;
158 let batch_end = cmp::min(batch_start + self.batch_max_count, self.record_count);
159 if batch_start >= batch_end {
160 return None;
161 }
162 let items = batch_end - batch_start;
163 let mut batch = ColumnarRecordsBuilder::with_capacity(
164 items,
165 (self.record_size_bytes - TS_DIFF_GOODPUT_SIZE) * items,
166 0,
167 );
168 for record_idx in batch_start..batch_end {
169 let (kv, t, d) = self.gen_record(record_idx);
170 assert!(
171 batch.push((kv, Codec64::encode(&t), Codec64::encode(&d))),
172 "generator exceeded batch size; smaller batches needed"
173 );
174 }
175 Some(batch.finish(&ColumnarMetrics::disconnected()))
176 }
177
178 fn gen_record(&mut self, record_idx: usize) -> ((&[u8], &[u8]), u64, i64) {
179 assert!(record_idx < self.record_count);
180 assert!(self.record_size_bytes > TS_DIFF_GOODPUT_SIZE);
181
182 self.key_buf.clear();
183 let key_len = self.record_size_bytes - TS_DIFF_GOODPUT_SIZE;
184 if self.key_buf.capacity() < key_len {
185 self.key_buf.reserve(key_len);
186 }
187 write!(&mut self.key_buf, "{:01$}", record_idx, key_len)
190 .expect("write to Vec is infallible");
191 self.key_buf.truncate(key_len);
192 assert_eq!(self.key_buf.len(), key_len);
193
194 self.val_buf.clear();
195
196 let ts = u64::cast_from(record_idx);
197 let diff = 1;
198 ((&self.key_buf, &self.val_buf), ts, diff)
199 }
200
201 pub fn batches(&self) -> DataGeneratorBatchIter {
203 DataGeneratorBatchIter {
204 config: self.clone(),
205 batch_idx: 0,
206 }
207 }
208
209 pub fn records(&self) -> impl Iterator<Item = ((Vec<u8>, Vec<u8>), u64, i64)> {
211 let mut config = self.clone();
212 (0..self.record_count).map(move |record_idx| {
213 let ((k, v), t, d) = config.gen_record(record_idx);
214 ((k.to_vec(), v.to_vec()), t, d)
215 })
216 }
217}
218
219#[derive(Debug)]
221pub struct DataGeneratorBatchIter {
222 config: DataGenerator,
223 batch_idx: usize,
224}
225
226impl Iterator for DataGeneratorBatchIter {
227 type Item = ColumnarRecords;
228
229 fn next(&mut self) -> Option<Self::Item> {
230 let batch_idx = self.batch_idx;
231 self.batch_idx += 1;
232 self.config.gen_batch(batch_idx)
233 }
234}
235
236pub fn flat_blob(data: &DataGenerator) -> Vec<u8> {
239 let mut buf = Vec::with_capacity(usize::cast_from(data.goodput_bytes()));
240 for batch in data.batches() {
241 for ((k, v), t, d) in batch.iter() {
242 buf.extend_from_slice(k);
243 buf.extend_from_slice(v);
244 buf.extend_from_slice(&t);
245 buf.extend_from_slice(&d);
246 }
247 }
248 assert_eq!(buf.len(), usize::cast_from(data.goodput_bytes()));
249 buf
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255
256 #[mz_ore::test]
257 fn size_invariants() {
258 fn testcase(c: DataGenerator) {
259 let (mut actual_len, mut actual_goodput_bytes) = (0, 0);
260 for batch in c.batches() {
261 for ((k, v), _, _) in batch.iter() {
262 actual_len += 1;
263 actual_goodput_bytes += k.len() + v.len() + TS_DIFF_GOODPUT_SIZE;
264 }
265 }
266 assert_eq!(actual_len, c.record_count);
267 assert_eq!(actual_goodput_bytes, usize::cast_from(c.goodput_bytes()));
268 }
269
270 testcase(DataGenerator::new(1, 32, 1));
271 testcase(DataGenerator::new(1, 32, 100));
272 testcase(DataGenerator::new(100, 32, 7));
273 testcase(DataGenerator::new(1000, 32, 100));
274 }
275
276 #[mz_ore::test]
277 fn goodput_pretty() {
278 fn testcase(bytes: usize) -> String {
279 DataGenerator::new(1, bytes, 1).goodput_pretty()
280 }
281
282 assert_eq!(testcase(33), "33B");
283 assert_eq!(testcase(10 * 1024 - 1), "10239B");
284 assert_eq!(testcase(10 * 1024), "10KiB");
285 assert_eq!(testcase(10 * 1024 * 1024 - 1), "10239KiB");
286 assert_eq!(testcase(10 * 1024 * 1024), "10MiB");
287 assert_eq!(testcase(10 * 1024 * 1024 * 1024 - 1), "10239MiB");
288 assert_eq!(testcase(10 * 1024 * 1024 * 1024), "10GiB");
289 assert_eq!(testcase(10 * 1024 * 1024 * 1024 * 1024 - 1), "10239GiB");
290 assert_eq!(testcase(10 * 1024 * 1024 * 1024 * 1024), "10240GiB"); }
292}