1use std::collections::VecDeque;
11use std::fmt::Display;
12use std::iter;
13use std::ops::RangeInclusive;
14use std::sync::LazyLock;
15use std::time::Duration;
16
17use chrono::NaiveDate;
18use dec::{Context as DecimalContext, OrderedDecimal};
19use mz_ore::now::NowFn;
20use mz_repr::adt::date::Date;
21use mz_repr::adt::numeric::{self, DecimalLike, Numeric};
22use mz_repr::{Datum, Diff, Row};
23use mz_storage_types::sources::MzOffset;
24use mz_storage_types::sources::load_generator::{Event, Generator, LoadGeneratorOutput, TpchView};
25use rand_8::distributions::{Alphanumeric, DistString};
26use rand_8::rngs::StdRng;
27use rand_8::seq::SliceRandom;
28use rand_8::{Rng, SeedableRng};
29
30#[derive(Clone, Debug)]
31pub struct Tpch {
32 pub count_supplier: i64,
33 pub count_part: i64,
34 pub count_customer: i64,
35 pub count_orders: i64,
36 pub count_clerk: i64,
37 pub tick: Duration,
38}
39
40impl Generator for Tpch {
41 fn by_seed(
42 &self,
43 _: NowFn,
44 seed: Option<u64>,
45 _resume_offset: MzOffset,
46 ) -> Box<dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>> {
47 let mut rng = StdRng::seed_from_u64(seed.unwrap_or_default());
48 let mut ctx = Context {
49 tpch: self.clone(),
50 decimal_one: Numeric::from(1),
51 decimal_neg_one: Numeric::from(-1),
52 cx: numeric::cx_datum(),
53 text_string_source: Alphanumeric.sample_string(&mut rng, 3 << 20),
55 row_buffer: Row::default(),
56 };
57
58 let count_nation: i64 = NATIONS.len().try_into().unwrap();
59 let count_region: i64 = REGIONS.len().try_into().unwrap();
60
61 let mut rows = (0..ctx.tpch.count_supplier)
62 .map(|i| (TpchView::Supplier, i))
63 .chain((1..=ctx.tpch.count_part).map(|i| (TpchView::Part, i)))
64 .chain((1..=ctx.tpch.count_customer).map(|i| (TpchView::Customer, i)))
65 .chain((1..=ctx.tpch.count_orders).map(|i| (TpchView::Orders, i)))
66 .chain((0..count_nation).map(|i| (TpchView::Nation, i)))
67 .chain((0..count_region).map(|i| (TpchView::Region, i)))
68 .peekable();
69
70 let mut pending = VecDeque::new();
73
74 let mut active_orders = Vec::new();
77
78 let mut offset = 0;
79 let mut row = Row::default();
80 Box::new(iter::from_fn(move || {
81 if let Some((output, event)) = pending.pop_front() {
82 return Some((LoadGeneratorOutput::Tpch(output), event));
83 }
84 if let Some((output, key)) = rows.next() {
85 let key_usize = usize::try_from(key).expect("key known to be non-negative");
86 let row = match output {
87 TpchView::Supplier => {
88 let nation = rng.gen_range(0..count_nation);
89 row.packer().extend([
90 Datum::Int64(key),
91 Datum::String(&pad_nine("Supplier", key)),
92 Datum::String(&v_string(&mut rng, 10, 40)), Datum::Int64(nation),
94 Datum::String(&phone(&mut rng, nation)),
95 Datum::Numeric(decimal(&mut rng, &mut ctx.cx, -999_99, 9_999_99, 100)), Datum::String(text_string(&mut rng, &ctx.text_string_source, 25, 100)),
98 ]);
99 row.clone()
100 }
101 TpchView::Part => {
102 let name: String = PARTNAMES
103 .choose_multiple(&mut rng, 5)
104 .cloned()
105 .collect::<Vec<_>>()
106 .join(" ");
107 let m = rng.gen_range(1..=5);
108 let n = rng.gen_range(1..=5);
109 for _ in 1..=4 {
110 let suppkey = (key
111 + (rng.gen_range(0..=3)
112 * ((ctx.tpch.count_supplier / 4)
113 + (key - 1) / ctx.tpch.count_supplier)))
114 % ctx.tpch.count_supplier
115 + 1;
116 row.packer().extend([
117 Datum::Int64(key),
118 Datum::Int64(suppkey),
119 Datum::Int32(rng.gen_range(1..=9_999)), Datum::Numeric(decimal(&mut rng, &mut ctx.cx, 1_00, 1_000_00, 100)), Datum::String(text_string(
122 &mut rng,
123 &ctx.text_string_source,
124 49,
125 198,
126 )),
127 ]);
128 pending.push_back((
129 TpchView::Partsupp,
130 Event::Message(MzOffset::from(offset), (row.clone(), Diff::ONE)),
131 ));
132 }
133 row.packer().extend([
134 Datum::Int64(key),
135 Datum::String(&name),
136 Datum::String(&format!("Manufacturer#{m}")),
137 Datum::String(&format!("Brand#{m}{n}")),
138 Datum::String(&syllables(&mut rng, TYPES)),
139 Datum::Int32(rng.gen_range(1..=50)), Datum::String(&syllables(&mut rng, CONTAINERS)),
141 Datum::Numeric(partkey_retailprice(key)),
142 Datum::String(text_string(&mut rng, &ctx.text_string_source, 49, 198)),
143 ]);
144 row.clone()
145 }
146 TpchView::Customer => {
147 let nation = rng.gen_range(0..count_nation);
148 row.packer().extend([
149 Datum::Int64(key),
150 Datum::String(&pad_nine("Customer", key)),
151 Datum::String(&v_string(&mut rng, 10, 40)), Datum::Int64(nation),
153 Datum::String(&phone(&mut rng, nation)),
154 Datum::Numeric(decimal(&mut rng, &mut ctx.cx, -999_99, 9_999_99, 100)), Datum::String(SEGMENTS.choose(&mut rng).unwrap()),
156 Datum::String(text_string(&mut rng, &ctx.text_string_source, 29, 116)),
157 ]);
158 row.clone()
159 }
160 TpchView::Orders => {
161 let seed = rng.r#gen();
162 let (order, lineitems) = ctx.order_row(seed, key);
163 for row in lineitems {
164 pending.push_back((
165 TpchView::Lineitem,
166 Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
167 ));
168 }
169 if !ctx.tpch.tick.is_zero() {
170 active_orders.push((key, seed));
171 }
172 order
173 }
174 TpchView::Nation => {
175 let (name, region) = NATIONS[key_usize];
176 row.packer().extend([
177 Datum::Int64(key),
178 Datum::String(name),
179 Datum::Int64(region),
180 Datum::String(text_string(&mut rng, &ctx.text_string_source, 31, 114)),
181 ]);
182 row.clone()
183 }
184 TpchView::Region => {
185 row.packer().extend([
186 Datum::Int64(key),
187 Datum::String(REGIONS[key_usize]),
188 Datum::String(text_string(&mut rng, &ctx.text_string_source, 31, 115)),
189 ]);
190 row.clone()
191 }
192 _ => unreachable!("{output:?}"),
193 };
194
195 pending.push_back((
196 output,
197 Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
198 ));
199 if rows.peek().is_none() {
200 offset += 1;
201 pending.push_back((output, Event::Progress(Some(MzOffset::from(offset)))));
202 }
203 } else {
204 if ctx.tpch.tick.is_zero() {
205 return None;
206 }
207 let idx = rng.gen_range(0..active_orders.len());
208 let (key, old_seed) = active_orders.swap_remove(idx);
209 let (old_order, old_lineitems) = ctx.order_row(old_seed, key);
210 for row in old_lineitems {
214 pending.push_back((
215 TpchView::Lineitem,
216 Event::Message(MzOffset::from(offset), (row, Diff::MINUS_ONE)),
217 ));
218 }
219 let new_seed = rng.r#gen();
220 let (new_order, new_lineitems) = ctx.order_row(new_seed, key);
221 for row in new_lineitems {
222 pending.push_back((
223 TpchView::Lineitem,
224 Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
225 ));
226 }
227 pending.push_back((
228 TpchView::Orders,
229 Event::Message(MzOffset::from(offset), (old_order, Diff::MINUS_ONE)),
230 ));
231 pending.push_back((
232 TpchView::Orders,
233 Event::Message(MzOffset::from(offset), (new_order, Diff::ONE)),
234 ));
235 offset += 1;
236 pending.push_back((
237 TpchView::Orders,
238 Event::Progress(Some(MzOffset::from(offset))),
239 ));
240 active_orders.push((key, new_seed));
241 }
242 pending
243 .pop_front()
244 .map(|(output, event)| (LoadGeneratorOutput::Tpch(output), event))
245 }))
246 }
247}
248
249struct Context {
250 tpch: Tpch,
251 decimal_one: Numeric,
252 decimal_neg_one: Numeric,
253 cx: DecimalContext<Numeric>,
254 text_string_source: String,
255 row_buffer: Row,
256}
257
258impl Context {
259 fn order_row(&mut self, seed: u64, key: i64) -> (Row, Vec<Row>) {
263 let mut rng = StdRng::seed_from_u64(seed);
264 let key = order_key(key);
265 let custkey = loop {
266 let custkey = rng.gen_range(1..=self.tpch.count_customer);
267 if custkey % 3 != 0 {
268 break custkey;
269 }
270 };
271 let orderdate = date(&mut rng, &*START_DATE, 1..=*ORDER_END_DAYS);
272 let mut totalprice = Numeric::lossy_from(0);
273 let mut orderstatus = None;
274 let lineitem_count = rng.gen_range(1..=7);
275 let mut lineitems = Vec::with_capacity(lineitem_count);
276
277 for linenumber in 1..=lineitem_count {
278 let partkey = rng.gen_range(1..=self.tpch.count_part);
279 let suppkey = (partkey
280 + (rng.gen_range(0..=3)
281 * ((self.tpch.count_supplier / 4) + (partkey - 1) / self.tpch.count_supplier)))
282 % self.tpch.count_supplier
283 + 1;
284 let quantity = Numeric::from(rng.gen_range(1..=50));
285 let mut extendedprice = quantity;
286 self.cx
287 .mul(&mut extendedprice, &partkey_retailprice(partkey).0);
288 let mut discount = decimal(&mut rng, &mut self.cx, 0, 8, 100);
289 let mut tax = decimal(&mut rng, &mut self.cx, 0, 10, 100);
290 let shipdate = date(&mut rng, &orderdate, 1..=121);
291 let receiptdate = date(&mut rng, &shipdate, 1..=30);
292 let linestatus = if shipdate > *CURRENT_DATE { "O" } else { "F" };
293 self.row_buffer.packer().extend([
294 Datum::Int64(key),
295 Datum::Int64(partkey),
296 Datum::Int64(suppkey),
297 Datum::Int32(linenumber.try_into().expect("must fit")),
298 Datum::Numeric(OrderedDecimal(quantity)),
299 Datum::Numeric(OrderedDecimal(extendedprice)),
300 Datum::Numeric(discount),
301 Datum::Numeric(tax),
302 Datum::String(if receiptdate <= *CURRENT_DATE {
303 ["R", "A"].choose(&mut rng).unwrap()
304 } else {
305 "N"
306 }), Datum::String(linestatus),
308 Datum::Date(shipdate),
309 Datum::Date(date(&mut rng, &orderdate, 30..=90)), Datum::Date(receiptdate),
311 Datum::String(INSTRUCTIONS.choose(&mut rng).unwrap()),
312 Datum::String(MODES.choose(&mut rng).unwrap()),
313 Datum::String(text_string(&mut rng, &self.text_string_source, 10, 43)),
314 ]);
315 let row = self.row_buffer.clone();
316 self.cx.add(&mut tax.0, &self.decimal_one);
318 self.cx.sub(&mut discount.0, &self.decimal_neg_one);
319 self.cx.abs(&mut discount.0);
320 self.cx.mul(&mut extendedprice, &tax.0);
321 self.cx.mul(&mut extendedprice, &discount.0);
322 self.cx.add(&mut totalprice, &extendedprice);
323 if let Some(status) = orderstatus {
324 if status != linestatus {
325 orderstatus = Some("P");
326 }
327 } else {
328 orderstatus = Some(linestatus);
329 }
330 lineitems.push(row);
331 }
332
333 self.cx.rescale(&mut totalprice, &Numeric::from(-2));
334
335 self.row_buffer.packer().extend([
336 Datum::Int64(key),
337 Datum::Int64(custkey),
338 Datum::String(orderstatus.unwrap()),
339 Datum::Numeric(OrderedDecimal(totalprice)),
340 Datum::Date(orderdate),
341 Datum::String(PRIORITIES.choose(&mut rng).unwrap()),
342 Datum::String(&pad_nine("Clerk", rng.gen_range(1..=self.tpch.count_clerk))),
343 Datum::Int32(0), Datum::String(text_string(&mut rng, &self.text_string_source, 19, 78)),
345 ]);
346 let order = self.row_buffer.clone();
347 (order, lineitems)
348 }
349}
350
351fn partkey_retailprice(key: i64) -> OrderedDecimal<Numeric> {
352 let price = (90000 + ((key / 10) % 20001) + 100 * (key % 1000)) / 100;
353 OrderedDecimal(Numeric::from(price))
354}
355
356fn pad_nine<S: Display>(prefix: &str, s: S) -> String {
357 format!("{}#{s:09}", prefix)
358}
359
360pub static START_DATE: LazyLock<Date> =
361 LazyLock::new(|| Date::try_from(NaiveDate::from_ymd_opt(1992, 1, 1).unwrap()).unwrap());
362pub static CURRENT_DATE: LazyLock<Date> =
363 LazyLock::new(|| Date::try_from(NaiveDate::from_ymd_opt(1995, 6, 17).unwrap()).unwrap());
364pub static END_DATE: LazyLock<Date> =
365 LazyLock::new(|| Date::try_from(NaiveDate::from_ymd_opt(1998, 12, 31).unwrap()).unwrap());
366pub static ORDER_END_DAYS: LazyLock<i32> = LazyLock::new(|| *END_DATE - *START_DATE - 151);
367
368fn text_string<'a, R: Rng + ?Sized>(
369 rng: &mut R,
370 source: &'a str,
371 min: usize,
372 max: usize,
373) -> &'a str {
374 let start = rng.gen_range(0..=(source.len() - max));
375 let len = rng.gen_range(min..=max);
376 &source[start..(start + len)]
377}
378
379fn date<R: Rng + ?Sized>(rng: &mut R, start: &Date, days: RangeInclusive<i32>) -> Date {
380 let days = rng.gen_range(days);
381 start.checked_add(days).expect("must fit")
382}
383
384fn order_key(mut i: i64) -> i64 {
386 const SPARSE_BITS: usize = 2;
387 const SPARSE_KEEP: usize = 3;
388 let low_bits = i & ((1 << SPARSE_KEEP) - 1);
389 i >>= SPARSE_KEEP;
390 i <<= SPARSE_BITS;
391 i <<= SPARSE_KEEP;
394 i += low_bits;
395 i
396}
397
398fn syllables<R: Rng + ?Sized>(rng: &mut R, syllables: &[&[&str]]) -> String {
399 let mut s = String::new();
400 for (i, syllable) in syllables.iter().enumerate() {
401 if i > 0 {
402 s.push(' ');
403 }
404 s.push_str(syllable.choose(rng).unwrap());
405 }
406 s
407}
408
409fn decimal<R: Rng + ?Sized>(
410 rng: &mut R,
411 cx: &mut dec::Context<Numeric>,
412 min: i64,
413 max: i64,
414 div: i64,
415) -> OrderedDecimal<Numeric> {
416 let n = rng.gen_range(min..=max);
417 let mut n = Numeric::lossy_from(n);
418 cx.div(&mut n, &Numeric::lossy_from(div));
419 OrderedDecimal(n)
420}
421
422fn phone<R: Rng + ?Sized>(rng: &mut R, nation: i64) -> String {
423 let mut s = String::with_capacity(15);
424 s.push_str(&(nation + 10).to_string());
425 s.push('-');
426 s.push_str(&rng.gen_range(100..=999).to_string());
427 s.push('-');
428 s.push_str(&rng.gen_range(100..=999).to_string());
429 s.push('-');
430 s.push_str(&rng.gen_range(1000..=9999).to_string());
431 s
432}
433
434fn v_string<R: Rng + ?Sized>(rng: &mut R, min: usize, max: usize) -> String {
435 const ALPHABET: [char; 64] = [
436 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
437 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J',
438 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2',
439 '3', '4', '5', '6', '7', '8', '9', '0', ',', ' ',
440 ];
441 let take = rng.gen_range(min..=max);
442 let mut s = String::with_capacity(take);
443 for _ in 0..take {
444 s.push(*ALPHABET.choose(rng).unwrap());
445 }
446 s
447}
448
449const INSTRUCTIONS: &[&str] = &[
450 "DELIVER IN PERSON",
451 "COLLECT COD",
452 "NONE",
453 "TAKE BACK RETURN",
454];
455
456const MODES: &[&str] = &["REG AIR", "AIR", "RAIL", "SHIP", "TRUCK", "MAIL", "FOB"];
457
458const PARTNAMES: &[&str] = &[
459 "almond",
460 "antique",
461 "aquamarine",
462 "azure",
463 "beige",
464 "bisque",
465 "black",
466 "blanched",
467 "blue",
468 "blush",
469 "brown",
470 "burlywood",
471 "burnished",
472 "chartreuse",
473 "chiffon",
474 "chocolate",
475 "coral",
476 "cornflower",
477 "cornsilk",
478 "cream",
479 "cyan",
480 "dark",
481 "deep",
482 "dim",
483 "dodger",
484 "drab",
485 "firebrick",
486 "floral",
487 "forest",
488 "frosted",
489 "gainsboro",
490 "ghost",
491 "goldenrod",
492 "green",
493 "grey",
494 "honeydew",
495 "hot",
496 "indian",
497 "ivory",
498 "khaki",
499 "lace",
500 "lavender",
501 "lawn",
502 "lemon",
503 "light",
504 "lime",
505 "linen",
506 "magenta",
507 "maroon",
508 "medium",
509 "metallic",
510 "midnight",
511 "mint",
512 "misty",
513 "moccasin",
514 "navajo",
515 "navy",
516 "olive",
517 "orange",
518 "orchid",
519 "pale",
520 "papaya",
521 "peach",
522 "peru",
523 "pink",
524 "plum",
525 "powder",
526 "puff",
527 "purple",
528 "red",
529 "rose",
530 "rosy",
531 "royal",
532 "saddle",
533 "salmon",
534 "sandy",
535 "seashell",
536 "sienna",
537 "sky",
538 "slate",
539 "smoke",
540 "snow",
541 "spring",
542 "steel",
543 "tan",
544 "thistle",
545 "tomato",
546 "turquoise",
547 "violet",
548 "wheat",
549 "white",
550 "yellow",
551];
552
553const PRIORITIES: &[&str] = &["1-URGENT", "2-HIGH", "3-MEDIUM", "4-NOT SPECIFIED"];
554
555const TYPES: &[&[&str]] = &[
556 &["STANDARD", "SMALL", "MEDIUM", "LARGE", "ECONOMY", "PROMO"],
557 &["ANODIZED", "BURNISHED", "PLATED", "POLISHED", "BRUSHED"],
558 &["TIN", "NICKEL", "BRASS", "STEEL", "COPPER"],
559];
560
561const CONTAINERS: &[&[&str]] = &[
562 &["SM", "MED", "JUMBO", "WRAP"],
563 &["BOX", "BAG", "JAR", "PKG", "PACK", "CAN", "DRUM"],
564];
565
566const SEGMENTS: &[&str] = &[
567 "AUTOMOBILE",
568 "BUILDING",
569 "FURNITURE",
570 "MACHINERY",
571 "HOUSEHOLD",
572];
573
574const REGIONS: &[&str] = &["AFRICA", "AMERICA", "ASIA", "EUROPE", "MIDDLE EAST"];
575
576const NATIONS: &[(&str, i64)] = &[
577 ("ALGERIA", 0),
578 ("ARGENTINA", 1),
579 ("BRAZIL", 1),
580 ("CANADA", 1),
581 ("EGYPT", 4),
582 ("ETHIOPIA", 0),
583 ("FRANCE", 3),
584 ("GERMANY", 3),
585 ("INDIA", 2),
586 ("INDONESIA", 2),
587 ("IRAN", 4),
588 ("IRAQ", 4),
589 ("JAPAN", 2),
590 ("JORDAN", 4),
591 ("KENYA", 0),
592 ("MOROCCO", 0),
593 ("MOZAMBIQUE", 0),
594 ("PERU", 1),
595 ("CHINA", 2),
596 ("ROMANIA", 3),
597 ("SAUDI ARABIA", 4),
598 ("VIETNAM", 2),
599 ("RUSSIA", 3),
600 ("UNITED KINGDOM", 3),
601 ("UNITED STATES", 1),
602];