1use std::collections::VecDeque;
11use std::fmt::Display;
12use std::iter;
13use std::ops::RangeInclusive;
14use std::sync::LazyLock;
15use std::time::Duration;
16
17use chrono::NaiveDate;
18use dec::{Context as DecimalContext, OrderedDecimal};
19use mz_ore::now::NowFn;
20use mz_repr::adt::date::Date;
21use mz_repr::adt::numeric::{self, DecimalLike, Numeric};
22use mz_repr::{Datum, Diff, Row};
23use mz_storage_types::sources::MzOffset;
24use mz_storage_types::sources::load_generator::{Event, Generator, LoadGeneratorOutput, TpchView};
25use rand::distributions::{Alphanumeric, DistString};
26use rand::rngs::StdRng;
27use rand::seq::SliceRandom;
28use rand::{Rng, SeedableRng};
29
30#[derive(Clone, Debug)]
31pub struct Tpch {
32 pub count_supplier: i64,
33 pub count_part: i64,
34 pub count_customer: i64,
35 pub count_orders: i64,
36 pub count_clerk: i64,
37 pub tick: Duration,
38}
39
40impl Generator for Tpch {
41 fn by_seed(
42 &self,
43 _: NowFn,
44 seed: Option<u64>,
45 _resume_offset: MzOffset,
46 ) -> Box<dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>> {
47 let mut rng = StdRng::seed_from_u64(seed.unwrap_or_default());
48 let mut ctx = Context {
49 tpch: self.clone(),
50 decimal_one: Numeric::from(1),
51 decimal_neg_one: Numeric::from(-1),
52 cx: numeric::cx_datum(),
53 text_string_source: Alphanumeric.sample_string(&mut rng, 3 << 20),
55 row_buffer: Row::default(),
56 };
57
58 let count_nation: i64 = NATIONS.len().try_into().unwrap();
59 let count_region: i64 = REGIONS.len().try_into().unwrap();
60
61 let mut rows = (0..ctx.tpch.count_supplier)
62 .map(|i| (TpchView::Supplier, i))
63 .chain((1..=ctx.tpch.count_part).map(|i| (TpchView::Part, i)))
64 .chain((1..=ctx.tpch.count_customer).map(|i| (TpchView::Customer, i)))
65 .chain((1..=ctx.tpch.count_orders).map(|i| (TpchView::Orders, i)))
66 .chain((0..count_nation).map(|i| (TpchView::Nation, i)))
67 .chain((0..count_region).map(|i| (TpchView::Region, i)))
68 .peekable();
69
70 let mut pending = VecDeque::new();
73
74 let mut active_orders = Vec::new();
77
78 let mut offset = 0;
79 let mut row = Row::default();
80 Box::new(iter::from_fn(move || {
81 if let Some((output, event)) = pending.pop_front() {
82 return Some((LoadGeneratorOutput::Tpch(output), event));
83 }
84 if let Some((output, key)) = rows.next() {
85 let key_usize = usize::try_from(key).expect("key known to be non-negative");
86 let row = match output {
87 TpchView::Supplier => {
88 let nation = rng.gen_range(0..count_nation);
89 row.packer().extend([
90 Datum::Int64(key),
91 Datum::String(&pad_nine("Supplier", key)),
92 Datum::String(&v_string(&mut rng, 10, 40)), Datum::Int64(nation),
94 Datum::String(&phone(&mut rng, nation)),
95 Datum::Numeric(decimal(&mut rng, &mut ctx.cx, -999_99, 9_999_99, 100)), Datum::String(text_string(&mut rng, &ctx.text_string_source, 25, 100)),
98 ]);
99 row.clone()
100 }
101 TpchView::Part => {
102 let name: String = PARTNAMES
103 .choose_multiple(&mut rng, 5)
104 .cloned()
105 .collect::<Vec<_>>()
106 .join(" ");
107 let m = rng.gen_range(1..=5);
108 let n = rng.gen_range(1..=5);
109 for _ in 1..=4 {
110 let suppkey = (key
111 + (rng.gen_range(0..=3)
112 * ((ctx.tpch.count_supplier / 4)
113 + (key - 1) / ctx.tpch.count_supplier)))
114 % ctx.tpch.count_supplier
115 + 1;
116 row.packer().extend([
117 Datum::Int64(key),
118 Datum::Int64(suppkey),
119 Datum::Int32(rng.gen_range(1..=9_999)), Datum::Numeric(decimal(&mut rng, &mut ctx.cx, 1_00, 1_000_00, 100)), Datum::String(text_string(
122 &mut rng,
123 &ctx.text_string_source,
124 49,
125 198,
126 )),
127 ]);
128 pending.push_back((
129 TpchView::Partsupp,
130 Event::Message(MzOffset::from(offset), (row.clone(), Diff::ONE)),
131 ));
132 }
133 row.packer().extend([
134 Datum::Int64(key),
135 Datum::String(&name),
136 Datum::String(&format!("Manufacturer#{m}")),
137 Datum::String(&format!("Brand#{m}{n}")),
138 Datum::String(&syllables(&mut rng, TYPES)),
139 Datum::Int32(rng.gen_range(1..=50)), Datum::String(&syllables(&mut rng, CONTAINERS)),
141 Datum::Numeric(partkey_retailprice(key)),
142 Datum::String(text_string(&mut rng, &ctx.text_string_source, 49, 198)),
143 ]);
144 row.clone()
145 }
146 TpchView::Customer => {
147 let nation = rng.gen_range(0..count_nation);
148 row.packer().extend([
149 Datum::Int64(key),
150 Datum::String(&pad_nine("Customer", key)),
151 Datum::String(&v_string(&mut rng, 10, 40)), Datum::Int64(nation),
153 Datum::String(&phone(&mut rng, nation)),
154 Datum::Numeric(decimal(&mut rng, &mut ctx.cx, -999_99, 9_999_99, 100)), Datum::String(SEGMENTS.choose(&mut rng).unwrap()),
156 Datum::String(text_string(&mut rng, &ctx.text_string_source, 29, 116)),
157 ]);
158 row.clone()
159 }
160 TpchView::Orders => {
161 let seed = rng.r#gen();
162 let (order, lineitems) = ctx.order_row(seed, key);
163 for row in lineitems {
164 pending.push_back((
165 TpchView::Lineitem,
166 Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
167 ));
168 }
169 if !ctx.tpch.tick.is_zero() {
170 active_orders.push((key, seed));
171 }
172 order
173 }
174 TpchView::Nation => {
175 let (name, region) = NATIONS[key_usize];
176 row.packer().extend([
177 Datum::Int64(key),
178 Datum::String(name),
179 Datum::Int64(region),
180 Datum::String(text_string(&mut rng, &ctx.text_string_source, 31, 114)),
181 ]);
182 row.clone()
183 }
184 TpchView::Region => {
185 row.packer().extend([
186 Datum::Int64(key),
187 Datum::String(REGIONS[key_usize]),
188 Datum::String(text_string(&mut rng, &ctx.text_string_source, 31, 115)),
189 ]);
190 row.clone()
191 }
192 _ => unreachable!("{output:?}"),
193 };
194
195 pending.push_back((
196 output,
197 Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
198 ));
199 if rows.peek().is_none() {
200 offset += 1;
201 pending.push_back((output, Event::Progress(Some(MzOffset::from(offset)))));
202 }
203 } else {
204 if ctx.tpch.tick.is_zero() {
205 return None;
206 }
207 let idx = rng.gen_range(0..active_orders.len());
208 let (key, old_seed) = active_orders.swap_remove(idx);
209 let (old_order, old_lineitems) = ctx.order_row(old_seed, key);
210 for row in old_lineitems {
214 pending.push_back((
215 TpchView::Lineitem,
216 Event::Message(MzOffset::from(offset), (row, Diff::MINUS_ONE)),
217 ));
218 }
219 let new_seed = rng.r#gen();
220 let (new_order, new_lineitems) = ctx.order_row(new_seed, key);
221 for row in new_lineitems {
222 pending.push_back((
223 TpchView::Lineitem,
224 Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
225 ));
226 }
227 pending.push_back((
228 TpchView::Orders,
229 Event::Message(MzOffset::from(offset), (old_order, Diff::MINUS_ONE)),
230 ));
231 pending.push_back((
232 TpchView::Orders,
233 Event::Message(MzOffset::from(offset), (new_order, Diff::ONE)),
234 ));
235 offset += 1;
236 pending.push_back((
237 TpchView::Orders,
238 Event::Progress(Some(MzOffset::from(offset))),
239 ));
240 active_orders.push((key, new_seed));
241 }
242 pending
243 .pop_front()
244 .map(|(output, event)| (LoadGeneratorOutput::Tpch(output), event))
245 }))
246 }
247}
248
249struct Context {
250 tpch: Tpch,
251 decimal_one: Numeric,
252 decimal_neg_one: Numeric,
253 cx: DecimalContext<Numeric>,
254 text_string_source: String,
255 row_buffer: Row,
256}
257
258impl Context {
259 fn order_row(&mut self, seed: u64, key: i64) -> (Row, Vec<Row>) {
263 let mut rng = StdRng::seed_from_u64(seed);
264 let key = order_key(key);
265 let custkey = loop {
266 let custkey = rng.gen_range(1..=self.tpch.count_customer);
267 if custkey % 3 != 0 {
268 break custkey;
269 }
270 };
271 let orderdate = date(&mut rng, &*START_DATE, 1..=*ORDER_END_DAYS);
272 let mut totalprice = Numeric::lossy_from(0);
273 let mut orderstatus = None;
274 let lineitem_count = rng.gen_range(1..=7);
275 let mut lineitems = Vec::with_capacity(lineitem_count);
276
277 for linenumber in 1..=lineitem_count {
278 let partkey = rng.gen_range(1..=self.tpch.count_part);
279 let suppkey = (partkey
280 + (rng.gen_range(0..=3)
281 * ((self.tpch.count_supplier / 4) + (partkey - 1) / self.tpch.count_supplier)))
282 % self.tpch.count_supplier
283 + 1;
284 let quantity = Numeric::from(rng.gen_range(1..=50));
285 let mut extendedprice = quantity;
286 self.cx
287 .mul(&mut extendedprice, &partkey_retailprice(partkey).0);
288 let mut discount = decimal(&mut rng, &mut self.cx, 0, 8, 100);
289 let mut tax = decimal(&mut rng, &mut self.cx, 0, 10, 100);
290 let shipdate = date(&mut rng, &orderdate, 1..=121);
291 let receiptdate = date(&mut rng, &shipdate, 1..=30);
292 let linestatus = if shipdate > *CURRENT_DATE { "O" } else { "F" };
293 self.row_buffer.packer().extend([
294 Datum::Int64(key),
295 Datum::Int64(partkey),
296 Datum::Int64(suppkey),
297 Datum::Int32(linenumber.try_into().expect("must fit")),
298 Datum::Numeric(OrderedDecimal(quantity)),
299 Datum::Numeric(OrderedDecimal(extendedprice)),
300 Datum::Numeric(discount),
301 Datum::Numeric(tax),
302 Datum::String(if receiptdate <= *CURRENT_DATE {
303 ["R", "A"].choose(&mut rng).unwrap()
304 } else {
305 "N"
306 }), Datum::String(linestatus),
308 Datum::Date(shipdate),
309 Datum::Date(date(&mut rng, &orderdate, 30..=90)), Datum::Date(receiptdate),
311 Datum::String(INSTRUCTIONS.choose(&mut rng).unwrap()),
312 Datum::String(MODES.choose(&mut rng).unwrap()),
313 Datum::String(text_string(&mut rng, &self.text_string_source, 10, 43)),
314 ]);
315 let row = self.row_buffer.clone();
316 self.cx.add(&mut tax.0, &self.decimal_one);
318 self.cx.sub(&mut discount.0, &self.decimal_neg_one);
319 self.cx.abs(&mut discount.0);
320 self.cx.mul(&mut extendedprice, &tax.0);
321 self.cx.mul(&mut extendedprice, &discount.0);
322 self.cx.add(&mut totalprice, &extendedprice);
323 if let Some(status) = orderstatus {
324 if status != linestatus {
325 orderstatus = Some("P");
326 }
327 } else {
328 orderstatus = Some(linestatus);
329 }
330 lineitems.push(row);
331 }
332
333 self.row_buffer.packer().extend([
334 Datum::Int64(key),
335 Datum::Int64(custkey),
336 Datum::String(orderstatus.unwrap()),
337 Datum::Numeric(OrderedDecimal(totalprice)),
338 Datum::Date(orderdate),
339 Datum::String(PRIORITIES.choose(&mut rng).unwrap()),
340 Datum::String(&pad_nine("Clerk", rng.gen_range(1..=self.tpch.count_clerk))),
341 Datum::Int32(0), Datum::String(text_string(&mut rng, &self.text_string_source, 19, 78)),
343 ]);
344 let order = self.row_buffer.clone();
345 (order, lineitems)
346 }
347}
348
349fn partkey_retailprice(key: i64) -> OrderedDecimal<Numeric> {
350 let price = (90000 + ((key / 10) % 20001) + 100 * (key % 1000)) / 100;
351 OrderedDecimal(Numeric::from(price))
352}
353
354fn pad_nine<S: Display>(prefix: &str, s: S) -> String {
355 format!("{}#{s:09}", prefix)
356}
357
358pub static START_DATE: LazyLock<Date> =
359 LazyLock::new(|| Date::try_from(NaiveDate::from_ymd_opt(1992, 1, 1).unwrap()).unwrap());
360pub static CURRENT_DATE: LazyLock<Date> =
361 LazyLock::new(|| Date::try_from(NaiveDate::from_ymd_opt(1995, 6, 17).unwrap()).unwrap());
362pub static END_DATE: LazyLock<Date> =
363 LazyLock::new(|| Date::try_from(NaiveDate::from_ymd_opt(1998, 12, 31).unwrap()).unwrap());
364pub static ORDER_END_DAYS: LazyLock<i32> = LazyLock::new(|| *END_DATE - *START_DATE - 151);
365
366fn text_string<'a, R: Rng + ?Sized>(
367 rng: &mut R,
368 source: &'a str,
369 min: usize,
370 max: usize,
371) -> &'a str {
372 let start = rng.gen_range(0..=(source.len() - max));
373 let len = rng.gen_range(min..=max);
374 &source[start..(start + len)]
375}
376
377fn date<R: Rng + ?Sized>(rng: &mut R, start: &Date, days: RangeInclusive<i32>) -> Date {
378 let days = rng.gen_range(days);
379 start.checked_add(days).expect("must fit")
380}
381
382fn order_key(mut i: i64) -> i64 {
384 const SPARSE_BITS: usize = 2;
385 const SPARSE_KEEP: usize = 3;
386 let low_bits = i & ((1 << SPARSE_KEEP) - 1);
387 i >>= SPARSE_KEEP;
388 i <<= SPARSE_BITS;
389 i <<= SPARSE_KEEP;
392 i += low_bits;
393 i
394}
395
396fn syllables<R: Rng + ?Sized>(rng: &mut R, syllables: &[&[&str]]) -> String {
397 let mut s = String::new();
398 for (i, syllable) in syllables.iter().enumerate() {
399 if i > 0 {
400 s.push(' ');
401 }
402 s.push_str(syllable.choose(rng).unwrap());
403 }
404 s
405}
406
407fn decimal<R: Rng + ?Sized>(
408 rng: &mut R,
409 cx: &mut dec::Context<Numeric>,
410 min: i64,
411 max: i64,
412 div: i64,
413) -> OrderedDecimal<Numeric> {
414 let n = rng.gen_range(min..=max);
415 let mut n = Numeric::lossy_from(n);
416 cx.div(&mut n, &Numeric::lossy_from(div));
417 OrderedDecimal(n)
418}
419
420fn phone<R: Rng + ?Sized>(rng: &mut R, nation: i64) -> String {
421 let mut s = String::with_capacity(15);
422 s.push_str(&(nation + 10).to_string());
423 s.push('-');
424 s.push_str(&rng.gen_range(100..=999).to_string());
425 s.push('-');
426 s.push_str(&rng.gen_range(100..=999).to_string());
427 s.push('-');
428 s.push_str(&rng.gen_range(1000..=9999).to_string());
429 s
430}
431
432fn v_string<R: Rng + ?Sized>(rng: &mut R, min: usize, max: usize) -> String {
433 const ALPHABET: [char; 64] = [
434 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
435 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J',
436 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2',
437 '3', '4', '5', '6', '7', '8', '9', '0', ',', ' ',
438 ];
439 let take = rng.gen_range(min..=max);
440 let mut s = String::with_capacity(take);
441 for _ in 0..take {
442 s.push(*ALPHABET.choose(rng).unwrap());
443 }
444 s
445}
446
447const INSTRUCTIONS: &[&str] = &[
448 "DELIVER IN PERSON",
449 "COLLECT COD",
450 "NONE",
451 "TAKE BACK RETURN",
452];
453
454const MODES: &[&str] = &["REG AIR", "AIR", "RAIL", "SHIP", "TRUCK", "MAIL", "FOB"];
455
456const PARTNAMES: &[&str] = &[
457 "almond",
458 "antique",
459 "aquamarine",
460 "azure",
461 "beige",
462 "bisque",
463 "black",
464 "blanched",
465 "blue",
466 "blush",
467 "brown",
468 "burlywood",
469 "burnished",
470 "chartreuse",
471 "chiffon",
472 "chocolate",
473 "coral",
474 "cornflower",
475 "cornsilk",
476 "cream",
477 "cyan",
478 "dark",
479 "deep",
480 "dim",
481 "dodger",
482 "drab",
483 "firebrick",
484 "floral",
485 "forest",
486 "frosted",
487 "gainsboro",
488 "ghost",
489 "goldenrod",
490 "green",
491 "grey",
492 "honeydew",
493 "hot",
494 "indian",
495 "ivory",
496 "khaki",
497 "lace",
498 "lavender",
499 "lawn",
500 "lemon",
501 "light",
502 "lime",
503 "linen",
504 "magenta",
505 "maroon",
506 "medium",
507 "metallic",
508 "midnight",
509 "mint",
510 "misty",
511 "moccasin",
512 "navajo",
513 "navy",
514 "olive",
515 "orange",
516 "orchid",
517 "pale",
518 "papaya",
519 "peach",
520 "peru",
521 "pink",
522 "plum",
523 "powder",
524 "puff",
525 "purple",
526 "red",
527 "rose",
528 "rosy",
529 "royal",
530 "saddle",
531 "salmon",
532 "sandy",
533 "seashell",
534 "sienna",
535 "sky",
536 "slate",
537 "smoke",
538 "snow",
539 "spring",
540 "steel",
541 "tan",
542 "thistle",
543 "tomato",
544 "turquoise",
545 "violet",
546 "wheat",
547 "white",
548 "yellow",
549];
550
551const PRIORITIES: &[&str] = &["1-URGENT", "2-HIGH", "3-MEDIUM", "4-NOT SPECIFIED"];
552
553const TYPES: &[&[&str]] = &[
554 &["STANDARD", "SMALL", "MEDIUM", "LARGE", "ECONOMY", "PROMO"],
555 &["ANODIZED", "BURNISHED", "PLATED", "POLISHED", "BRUSHED"],
556 &["TIN", "NICKEL", "BRASS", "STEEL", "COPPER"],
557];
558
559const CONTAINERS: &[&[&str]] = &[
560 &["SM", "MED", "JUMBO", "WRAP"],
561 &["BOX", "BAG", "JAR", "PKG", "PACK", "CAN", "DRUM"],
562];
563
564const SEGMENTS: &[&str] = &[
565 "AUTOMOBILE",
566 "BUILDING",
567 "FURNITURE",
568 "MACHINERY",
569 "HOUSEHOLD",
570];
571
572const REGIONS: &[&str] = &["AFRICA", "AMERICA", "ASIA", "EUROPE", "MIDDLE EAST"];
573
574const NATIONS: &[(&str, i64)] = &[
575 ("ALGERIA", 0),
576 ("ARGENTINA", 1),
577 ("BRAZIL", 1),
578 ("CANADA", 1),
579 ("EGYPT", 4),
580 ("ETHIOPIA", 0),
581 ("FRANCE", 3),
582 ("GERMANY", 3),
583 ("INDIA", 2),
584 ("INDONESIA", 2),
585 ("IRAN", 4),
586 ("IRAQ", 4),
587 ("JAPAN", 2),
588 ("JORDAN", 4),
589 ("KENYA", 0),
590 ("MOROCCO", 0),
591 ("MOZAMBIQUE", 0),
592 ("PERU", 1),
593 ("CHINA", 2),
594 ("ROMANIA", 3),
595 ("SAUDI ARABIA", 4),
596 ("VIETNAM", 2),
597 ("RUSSIA", 3),
598 ("UNITED KINGDOM", 3),
599 ("UNITED STATES", 1),
600];