1use std::collections::VecDeque;
11use std::fmt::Display;
12use std::iter;
13use std::ops::RangeInclusive;
14use std::sync::LazyLock;
15use std::time::Duration;
16
17use chrono::NaiveDate;
18use dec::{Context as DecimalContext, OrderedDecimal};
19use mz_ore::now::NowFn;
20use mz_repr::adt::date::Date;
21use mz_repr::adt::numeric::{self, DecimalLike, Numeric};
22use mz_repr::{Datum, Diff, Row};
23use mz_storage_types::sources::MzOffset;
24use mz_storage_types::sources::load_generator::{Event, Generator, LoadGeneratorOutput, TpchView};
25use rand::distributions::{Alphanumeric, DistString};
26use rand::rngs::StdRng;
27use rand::seq::SliceRandom;
28use rand::{Rng, SeedableRng};
29
30#[derive(Clone, Debug)]
31pub struct Tpch {
32 pub count_supplier: i64,
33 pub count_part: i64,
34 pub count_customer: i64,
35 pub count_orders: i64,
36 pub count_clerk: i64,
37 pub tick: Duration,
38}
39
40impl Generator for Tpch {
41 fn by_seed(
42 &self,
43 _: NowFn,
44 seed: Option<u64>,
45 _resume_offset: MzOffset,
46 ) -> Box<(dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>)>
47 {
48 let mut rng = StdRng::seed_from_u64(seed.unwrap_or_default());
49 let mut ctx = Context {
50 tpch: self.clone(),
51 decimal_one: Numeric::from(1),
52 decimal_neg_one: Numeric::from(-1),
53 cx: numeric::cx_datum(),
54 text_string_source: Alphanumeric.sample_string(&mut rng, 3 << 20),
56 row_buffer: Row::default(),
57 };
58
59 let count_nation: i64 = NATIONS.len().try_into().unwrap();
60 let count_region: i64 = REGIONS.len().try_into().unwrap();
61
62 let mut rows = (0..ctx.tpch.count_supplier)
63 .map(|i| (TpchView::Supplier, i))
64 .chain((1..=ctx.tpch.count_part).map(|i| (TpchView::Part, i)))
65 .chain((1..=ctx.tpch.count_customer).map(|i| (TpchView::Customer, i)))
66 .chain((1..=ctx.tpch.count_orders).map(|i| (TpchView::Orders, i)))
67 .chain((0..count_nation).map(|i| (TpchView::Nation, i)))
68 .chain((0..count_region).map(|i| (TpchView::Region, i)))
69 .peekable();
70
71 let mut pending = VecDeque::new();
74
75 let mut active_orders = Vec::new();
78
79 let mut offset = 0;
80 let mut row = Row::default();
81 Box::new(iter::from_fn(move || {
82 if let Some((output, event)) = pending.pop_front() {
83 return Some((LoadGeneratorOutput::Tpch(output), event));
84 }
85 if let Some((output, key)) = rows.next() {
86 let key_usize = usize::try_from(key).expect("key known to be non-negative");
87 let row = match output {
88 TpchView::Supplier => {
89 let nation = rng.gen_range(0..count_nation);
90 row.packer().extend([
91 Datum::Int64(key),
92 Datum::String(&pad_nine("Supplier", key)),
93 Datum::String(&v_string(&mut rng, 10, 40)), Datum::Int64(nation),
95 Datum::String(&phone(&mut rng, nation)),
96 Datum::Numeric(decimal(&mut rng, &mut ctx.cx, -999_99, 9_999_99, 100)), Datum::String(text_string(&mut rng, &ctx.text_string_source, 25, 100)),
99 ]);
100 row.clone()
101 }
102 TpchView::Part => {
103 let name: String = PARTNAMES
104 .choose_multiple(&mut rng, 5)
105 .cloned()
106 .collect::<Vec<_>>()
107 .join(" ");
108 let m = rng.gen_range(1..=5);
109 let n = rng.gen_range(1..=5);
110 for _ in 1..=4 {
111 let suppkey = (key
112 + (rng.gen_range(0..=3)
113 * ((ctx.tpch.count_supplier / 4)
114 + (key - 1) / ctx.tpch.count_supplier)))
115 % ctx.tpch.count_supplier
116 + 1;
117 row.packer().extend([
118 Datum::Int64(key),
119 Datum::Int64(suppkey),
120 Datum::Int32(rng.gen_range(1..=9_999)), Datum::Numeric(decimal(&mut rng, &mut ctx.cx, 1_00, 1_000_00, 100)), Datum::String(text_string(
123 &mut rng,
124 &ctx.text_string_source,
125 49,
126 198,
127 )),
128 ]);
129 pending.push_back((
130 TpchView::Partsupp,
131 Event::Message(MzOffset::from(offset), (row.clone(), Diff::ONE)),
132 ));
133 }
134 row.packer().extend([
135 Datum::Int64(key),
136 Datum::String(&name),
137 Datum::String(&format!("Manufacturer#{m}")),
138 Datum::String(&format!("Brand#{m}{n}")),
139 Datum::String(&syllables(&mut rng, TYPES)),
140 Datum::Int32(rng.gen_range(1..=50)), Datum::String(&syllables(&mut rng, CONTAINERS)),
142 Datum::Numeric(partkey_retailprice(key)),
143 Datum::String(text_string(&mut rng, &ctx.text_string_source, 49, 198)),
144 ]);
145 row.clone()
146 }
147 TpchView::Customer => {
148 let nation = rng.gen_range(0..count_nation);
149 row.packer().extend([
150 Datum::Int64(key),
151 Datum::String(&pad_nine("Customer", key)),
152 Datum::String(&v_string(&mut rng, 10, 40)), Datum::Int64(nation),
154 Datum::String(&phone(&mut rng, nation)),
155 Datum::Numeric(decimal(&mut rng, &mut ctx.cx, -999_99, 9_999_99, 100)), Datum::String(SEGMENTS.choose(&mut rng).unwrap()),
157 Datum::String(text_string(&mut rng, &ctx.text_string_source, 29, 116)),
158 ]);
159 row.clone()
160 }
161 TpchView::Orders => {
162 let seed = rng.r#gen();
163 let (order, lineitems) = ctx.order_row(seed, key);
164 for row in lineitems {
165 pending.push_back((
166 TpchView::Lineitem,
167 Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
168 ));
169 }
170 if !ctx.tpch.tick.is_zero() {
171 active_orders.push((key, seed));
172 }
173 order
174 }
175 TpchView::Nation => {
176 let (name, region) = NATIONS[key_usize];
177 row.packer().extend([
178 Datum::Int64(key),
179 Datum::String(name),
180 Datum::Int64(region),
181 Datum::String(text_string(&mut rng, &ctx.text_string_source, 31, 114)),
182 ]);
183 row.clone()
184 }
185 TpchView::Region => {
186 row.packer().extend([
187 Datum::Int64(key),
188 Datum::String(REGIONS[key_usize]),
189 Datum::String(text_string(&mut rng, &ctx.text_string_source, 31, 115)),
190 ]);
191 row.clone()
192 }
193 _ => unreachable!("{output:?}"),
194 };
195
196 pending.push_back((
197 output,
198 Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
199 ));
200 if rows.peek().is_none() {
201 offset += 1;
202 pending.push_back((output, Event::Progress(Some(MzOffset::from(offset)))));
203 }
204 } else {
205 if ctx.tpch.tick.is_zero() {
206 return None;
207 }
208 let idx = rng.gen_range(0..active_orders.len());
209 let (key, old_seed) = active_orders.swap_remove(idx);
210 let (old_order, old_lineitems) = ctx.order_row(old_seed, key);
211 for row in old_lineitems {
215 pending.push_back((
216 TpchView::Lineitem,
217 Event::Message(MzOffset::from(offset), (row, Diff::MINUS_ONE)),
218 ));
219 }
220 let new_seed = rng.r#gen();
221 let (new_order, new_lineitems) = ctx.order_row(new_seed, key);
222 for row in new_lineitems {
223 pending.push_back((
224 TpchView::Lineitem,
225 Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
226 ));
227 }
228 pending.push_back((
229 TpchView::Orders,
230 Event::Message(MzOffset::from(offset), (old_order, Diff::MINUS_ONE)),
231 ));
232 pending.push_back((
233 TpchView::Orders,
234 Event::Message(MzOffset::from(offset), (new_order, Diff::ONE)),
235 ));
236 offset += 1;
237 pending.push_back((
238 TpchView::Orders,
239 Event::Progress(Some(MzOffset::from(offset))),
240 ));
241 active_orders.push((key, new_seed));
242 }
243 pending
244 .pop_front()
245 .map(|(output, event)| (LoadGeneratorOutput::Tpch(output), event))
246 }))
247 }
248}
249
250struct Context {
251 tpch: Tpch,
252 decimal_one: Numeric,
253 decimal_neg_one: Numeric,
254 cx: DecimalContext<Numeric>,
255 text_string_source: String,
256 row_buffer: Row,
257}
258
259impl Context {
260 fn order_row(&mut self, seed: u64, key: i64) -> (Row, Vec<Row>) {
264 let mut rng = StdRng::seed_from_u64(seed);
265 let key = order_key(key);
266 let custkey = loop {
267 let custkey = rng.gen_range(1..=self.tpch.count_customer);
268 if custkey % 3 != 0 {
269 break custkey;
270 }
271 };
272 let orderdate = date(&mut rng, &*START_DATE, 1..=*ORDER_END_DAYS);
273 let mut totalprice = Numeric::lossy_from(0);
274 let mut orderstatus = None;
275 let lineitem_count = rng.gen_range(1..=7);
276 let mut lineitems = Vec::with_capacity(lineitem_count);
277
278 for linenumber in 1..=lineitem_count {
279 let partkey = rng.gen_range(1..=self.tpch.count_part);
280 let suppkey = (partkey
281 + (rng.gen_range(0..=3)
282 * ((self.tpch.count_supplier / 4) + (partkey - 1) / self.tpch.count_supplier)))
283 % self.tpch.count_supplier
284 + 1;
285 let quantity = Numeric::from(rng.gen_range(1..=50));
286 let mut extendedprice = quantity;
287 self.cx
288 .mul(&mut extendedprice, &partkey_retailprice(partkey).0);
289 let mut discount = decimal(&mut rng, &mut self.cx, 0, 8, 100);
290 let mut tax = decimal(&mut rng, &mut self.cx, 0, 10, 100);
291 let shipdate = date(&mut rng, &orderdate, 1..=121);
292 let receiptdate = date(&mut rng, &shipdate, 1..=30);
293 let linestatus = if shipdate > *CURRENT_DATE { "O" } else { "F" };
294 self.row_buffer.packer().extend([
295 Datum::Int64(key),
296 Datum::Int64(partkey),
297 Datum::Int64(suppkey),
298 Datum::Int32(linenumber.try_into().expect("must fit")),
299 Datum::Numeric(OrderedDecimal(quantity)),
300 Datum::Numeric(OrderedDecimal(extendedprice)),
301 Datum::Numeric(discount),
302 Datum::Numeric(tax),
303 Datum::String(if receiptdate <= *CURRENT_DATE {
304 ["R", "A"].choose(&mut rng).unwrap()
305 } else {
306 "N"
307 }), Datum::String(linestatus),
309 Datum::Date(shipdate),
310 Datum::Date(date(&mut rng, &orderdate, 30..=90)), Datum::Date(receiptdate),
312 Datum::String(INSTRUCTIONS.choose(&mut rng).unwrap()),
313 Datum::String(MODES.choose(&mut rng).unwrap()),
314 Datum::String(text_string(&mut rng, &self.text_string_source, 10, 43)),
315 ]);
316 let row = self.row_buffer.clone();
317 self.cx.add(&mut tax.0, &self.decimal_one);
319 self.cx.sub(&mut discount.0, &self.decimal_neg_one);
320 self.cx.abs(&mut discount.0);
321 self.cx.mul(&mut extendedprice, &tax.0);
322 self.cx.mul(&mut extendedprice, &discount.0);
323 self.cx.add(&mut totalprice, &extendedprice);
324 if let Some(status) = orderstatus {
325 if status != linestatus {
326 orderstatus = Some("P");
327 }
328 } else {
329 orderstatus = Some(linestatus);
330 }
331 lineitems.push(row);
332 }
333
334 self.row_buffer.packer().extend([
335 Datum::Int64(key),
336 Datum::Int64(custkey),
337 Datum::String(orderstatus.unwrap()),
338 Datum::Numeric(OrderedDecimal(totalprice)),
339 Datum::Date(orderdate),
340 Datum::String(PRIORITIES.choose(&mut rng).unwrap()),
341 Datum::String(&pad_nine("Clerk", rng.gen_range(1..=self.tpch.count_clerk))),
342 Datum::Int32(0), Datum::String(text_string(&mut rng, &self.text_string_source, 19, 78)),
344 ]);
345 let order = self.row_buffer.clone();
346 (order, lineitems)
347 }
348}
349
350fn partkey_retailprice(key: i64) -> OrderedDecimal<Numeric> {
351 let price = (90000 + ((key / 10) % 20001) + 100 * (key % 1000)) / 100;
352 OrderedDecimal(Numeric::from(price))
353}
354
355fn pad_nine<S: Display>(prefix: &str, s: S) -> String {
356 format!("{}#{s:09}", prefix)
357}
358
359pub static START_DATE: LazyLock<Date> =
360 LazyLock::new(|| Date::try_from(NaiveDate::from_ymd_opt(1992, 1, 1).unwrap()).unwrap());
361pub static CURRENT_DATE: LazyLock<Date> =
362 LazyLock::new(|| Date::try_from(NaiveDate::from_ymd_opt(1995, 6, 17).unwrap()).unwrap());
363pub static END_DATE: LazyLock<Date> =
364 LazyLock::new(|| Date::try_from(NaiveDate::from_ymd_opt(1998, 12, 31).unwrap()).unwrap());
365pub static ORDER_END_DAYS: LazyLock<i32> = LazyLock::new(|| *END_DATE - *START_DATE - 151);
366
367fn text_string<'a, R: Rng + ?Sized>(
368 rng: &mut R,
369 source: &'a str,
370 min: usize,
371 max: usize,
372) -> &'a str {
373 let start = rng.gen_range(0..=(source.len() - max));
374 let len = rng.gen_range(min..=max);
375 &source[start..(start + len)]
376}
377
378fn date<R: Rng + ?Sized>(rng: &mut R, start: &Date, days: RangeInclusive<i32>) -> Date {
379 let days = rng.gen_range(days);
380 start.checked_add(days).expect("must fit")
381}
382
383fn order_key(mut i: i64) -> i64 {
385 const SPARSE_BITS: usize = 2;
386 const SPARSE_KEEP: usize = 3;
387 let low_bits = i & ((1 << SPARSE_KEEP) - 1);
388 i >>= SPARSE_KEEP;
389 i <<= SPARSE_BITS;
390 i <<= SPARSE_KEEP;
393 i += low_bits;
394 i
395}
396
397fn syllables<R: Rng + ?Sized>(rng: &mut R, syllables: &[&[&str]]) -> String {
398 let mut s = String::new();
399 for (i, syllable) in syllables.iter().enumerate() {
400 if i > 0 {
401 s.push(' ');
402 }
403 s.push_str(syllable.choose(rng).unwrap());
404 }
405 s
406}
407
408fn decimal<R: Rng + ?Sized>(
409 rng: &mut R,
410 cx: &mut dec::Context<Numeric>,
411 min: i64,
412 max: i64,
413 div: i64,
414) -> OrderedDecimal<Numeric> {
415 let n = rng.gen_range(min..=max);
416 let mut n = Numeric::lossy_from(n);
417 cx.div(&mut n, &Numeric::lossy_from(div));
418 OrderedDecimal(n)
419}
420
421fn phone<R: Rng + ?Sized>(rng: &mut R, nation: i64) -> String {
422 let mut s = String::with_capacity(15);
423 s.push_str(&(nation + 10).to_string());
424 s.push('-');
425 s.push_str(&rng.gen_range(100..=999).to_string());
426 s.push('-');
427 s.push_str(&rng.gen_range(100..=999).to_string());
428 s.push('-');
429 s.push_str(&rng.gen_range(1000..=9999).to_string());
430 s
431}
432
433fn v_string<R: Rng + ?Sized>(rng: &mut R, min: usize, max: usize) -> String {
434 const ALPHABET: [char; 64] = [
435 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
436 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J',
437 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2',
438 '3', '4', '5', '6', '7', '8', '9', '0', ',', ' ',
439 ];
440 let take = rng.gen_range(min..=max);
441 let mut s = String::with_capacity(take);
442 for _ in 0..take {
443 s.push(*ALPHABET.choose(rng).unwrap());
444 }
445 s
446}
447
448const INSTRUCTIONS: &[&str] = &[
449 "DELIVER IN PERSON",
450 "COLLECT COD",
451 "NONE",
452 "TAKE BACK RETURN",
453];
454
455const MODES: &[&str] = &["REG AIR", "AIR", "RAIL", "SHIP", "TRUCK", "MAIL", "FOB"];
456
457const PARTNAMES: &[&str] = &[
458 "almond",
459 "antique",
460 "aquamarine",
461 "azure",
462 "beige",
463 "bisque",
464 "black",
465 "blanched",
466 "blue",
467 "blush",
468 "brown",
469 "burlywood",
470 "burnished",
471 "chartreuse",
472 "chiffon",
473 "chocolate",
474 "coral",
475 "cornflower",
476 "cornsilk",
477 "cream",
478 "cyan",
479 "dark",
480 "deep",
481 "dim",
482 "dodger",
483 "drab",
484 "firebrick",
485 "floral",
486 "forest",
487 "frosted",
488 "gainsboro",
489 "ghost",
490 "goldenrod",
491 "green",
492 "grey",
493 "honeydew",
494 "hot",
495 "indian",
496 "ivory",
497 "khaki",
498 "lace",
499 "lavender",
500 "lawn",
501 "lemon",
502 "light",
503 "lime",
504 "linen",
505 "magenta",
506 "maroon",
507 "medium",
508 "metallic",
509 "midnight",
510 "mint",
511 "misty",
512 "moccasin",
513 "navajo",
514 "navy",
515 "olive",
516 "orange",
517 "orchid",
518 "pale",
519 "papaya",
520 "peach",
521 "peru",
522 "pink",
523 "plum",
524 "powder",
525 "puff",
526 "purple",
527 "red",
528 "rose",
529 "rosy",
530 "royal",
531 "saddle",
532 "salmon",
533 "sandy",
534 "seashell",
535 "sienna",
536 "sky",
537 "slate",
538 "smoke",
539 "snow",
540 "spring",
541 "steel",
542 "tan",
543 "thistle",
544 "tomato",
545 "turquoise",
546 "violet",
547 "wheat",
548 "white",
549 "yellow",
550];
551
552const PRIORITIES: &[&str] = &["1-URGENT", "2-HIGH", "3-MEDIUM", "4-NOT SPECIFIED"];
553
554const TYPES: &[&[&str]] = &[
555 &["STANDARD", "SMALL", "MEDIUM", "LARGE", "ECONOMY", "PROMO"],
556 &["ANODIZED", "BURNISHED", "PLATED", "POLISHED", "BRUSHED"],
557 &["TIN", "NICKEL", "BRASS", "STEEL", "COPPER"],
558];
559
560const CONTAINERS: &[&[&str]] = &[
561 &["SM", "MED", "JUMBO", "WRAP"],
562 &["BOX", "BAG", "JAR", "PKG", "PACK", "CAN", "DRUM"],
563];
564
565const SEGMENTS: &[&str] = &[
566 "AUTOMOBILE",
567 "BUILDING",
568 "FURNITURE",
569 "MACHINERY",
570 "HOUSEHOLD",
571];
572
573const REGIONS: &[&str] = &["AFRICA", "AMERICA", "ASIA", "EUROPE", "MIDDLE EAST"];
574
575const NATIONS: &[(&str, i64)] = &[
576 ("ALGERIA", 0),
577 ("ARGENTINA", 1),
578 ("BRAZIL", 1),
579 ("CANADA", 1),
580 ("EGYPT", 4),
581 ("ETHIOPIA", 0),
582 ("FRANCE", 3),
583 ("GERMANY", 3),
584 ("INDIA", 2),
585 ("INDONESIA", 2),
586 ("IRAN", 4),
587 ("IRAQ", 4),
588 ("JAPAN", 2),
589 ("JORDAN", 4),
590 ("KENYA", 0),
591 ("MOROCCO", 0),
592 ("MOZAMBIQUE", 0),
593 ("PERU", 1),
594 ("CHINA", 2),
595 ("ROMANIA", 3),
596 ("SAUDI ARABIA", 4),
597 ("VIETNAM", 2),
598 ("RUSSIA", 3),
599 ("UNITED KINGDOM", 3),
600 ("UNITED STATES", 1),
601];