mz_storage/source/generator/
tpch.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::VecDeque;
11use std::fmt::Display;
12use std::iter;
13use std::ops::RangeInclusive;
14use std::sync::LazyLock;
15use std::time::Duration;
16
17use chrono::NaiveDate;
18use dec::{Context as DecimalContext, OrderedDecimal};
19use mz_ore::now::NowFn;
20use mz_repr::adt::date::Date;
21use mz_repr::adt::numeric::{self, DecimalLike, Numeric};
22use mz_repr::{Datum, Diff, Row};
23use mz_storage_types::sources::MzOffset;
24use mz_storage_types::sources::load_generator::{Event, Generator, LoadGeneratorOutput, TpchView};
25use rand::distributions::{Alphanumeric, DistString};
26use rand::rngs::StdRng;
27use rand::seq::SliceRandom;
28use rand::{Rng, SeedableRng};
29
30#[derive(Clone, Debug)]
31pub struct Tpch {
32    pub count_supplier: i64,
33    pub count_part: i64,
34    pub count_customer: i64,
35    pub count_orders: i64,
36    pub count_clerk: i64,
37    pub tick: Duration,
38}
39
40impl Generator for Tpch {
41    fn by_seed(
42        &self,
43        _: NowFn,
44        seed: Option<u64>,
45        _resume_offset: MzOffset,
46    ) -> Box<(dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>)>
47    {
48        let mut rng = StdRng::seed_from_u64(seed.unwrap_or_default());
49        let mut ctx = Context {
50            tpch: self.clone(),
51            decimal_one: Numeric::from(1),
52            decimal_neg_one: Numeric::from(-1),
53            cx: numeric::cx_datum(),
54            // TODO: Use a text generator closer to the spec.
55            text_string_source: Alphanumeric.sample_string(&mut rng, 3 << 20),
56            row_buffer: Row::default(),
57        };
58
59        let count_nation: i64 = NATIONS.len().try_into().unwrap();
60        let count_region: i64 = REGIONS.len().try_into().unwrap();
61
62        let mut rows = (0..ctx.tpch.count_supplier)
63            .map(|i| (TpchView::Supplier, i))
64            .chain((1..=ctx.tpch.count_part).map(|i| (TpchView::Part, i)))
65            .chain((1..=ctx.tpch.count_customer).map(|i| (TpchView::Customer, i)))
66            .chain((1..=ctx.tpch.count_orders).map(|i| (TpchView::Orders, i)))
67            .chain((0..count_nation).map(|i| (TpchView::Nation, i)))
68            .chain((0..count_region).map(|i| (TpchView::Region, i)))
69            .peekable();
70
71        // Some rows need to generate other rows from their values; hold those
72        // here.
73        let mut pending = VecDeque::new();
74
75        // All orders and their lineitems, so they can be retracted during
76        // streaming.
77        let mut active_orders = Vec::new();
78
79        let mut offset = 0;
80        let mut row = Row::default();
81        Box::new(iter::from_fn(move || {
82            if let Some((output, event)) = pending.pop_front() {
83                return Some((LoadGeneratorOutput::Tpch(output), event));
84            }
85            if let Some((output, key)) = rows.next() {
86                let key_usize = usize::try_from(key).expect("key known to be non-negative");
87                let row = match output {
88                    TpchView::Supplier => {
89                        let nation = rng.gen_range(0..count_nation);
90                        row.packer().extend([
91                            Datum::Int64(key),
92                            Datum::String(&pad_nine("Supplier", key)),
93                            Datum::String(&v_string(&mut rng, 10, 40)), // address
94                            Datum::Int64(nation),
95                            Datum::String(&phone(&mut rng, nation)),
96                            Datum::Numeric(decimal(&mut rng, &mut ctx.cx, -999_99, 9_999_99, 100)), // acctbal
97                            // TODO: add customer complaints and recommends, see 4.2.3.
98                            Datum::String(text_string(&mut rng, &ctx.text_string_source, 25, 100)),
99                        ]);
100                        row.clone()
101                    }
102                    TpchView::Part => {
103                        let name: String = PARTNAMES
104                            .choose_multiple(&mut rng, 5)
105                            .cloned()
106                            .collect::<Vec<_>>()
107                            .join("  ");
108                        let m = rng.gen_range(1..=5);
109                        let n = rng.gen_range(1..=5);
110                        for _ in 1..=4 {
111                            let suppkey = (key
112                                + (rng.gen_range(0..=3)
113                                    * ((ctx.tpch.count_supplier / 4)
114                                        + (key - 1) / ctx.tpch.count_supplier)))
115                                % ctx.tpch.count_supplier
116                                + 1;
117                            row.packer().extend([
118                                Datum::Int64(key),
119                                Datum::Int64(suppkey),
120                                Datum::Int32(rng.gen_range(1..=9_999)), // availqty
121                                Datum::Numeric(decimal(&mut rng, &mut ctx.cx, 1_00, 1_000_00, 100)), // supplycost
122                                Datum::String(text_string(
123                                    &mut rng,
124                                    &ctx.text_string_source,
125                                    49,
126                                    198,
127                                )),
128                            ]);
129                            pending.push_back((
130                                TpchView::Partsupp,
131                                Event::Message(MzOffset::from(offset), (row.clone(), Diff::ONE)),
132                            ));
133                        }
134                        row.packer().extend([
135                            Datum::Int64(key),
136                            Datum::String(&name),
137                            Datum::String(&format!("Manufacturer#{m}")),
138                            Datum::String(&format!("Brand#{m}{n}")),
139                            Datum::String(&syllables(&mut rng, TYPES)),
140                            Datum::Int32(rng.gen_range(1..=50)), // size
141                            Datum::String(&syllables(&mut rng, CONTAINERS)),
142                            Datum::Numeric(partkey_retailprice(key)),
143                            Datum::String(text_string(&mut rng, &ctx.text_string_source, 49, 198)),
144                        ]);
145                        row.clone()
146                    }
147                    TpchView::Customer => {
148                        let nation = rng.gen_range(0..count_nation);
149                        row.packer().extend([
150                            Datum::Int64(key),
151                            Datum::String(&pad_nine("Customer", key)),
152                            Datum::String(&v_string(&mut rng, 10, 40)), // address
153                            Datum::Int64(nation),
154                            Datum::String(&phone(&mut rng, nation)),
155                            Datum::Numeric(decimal(&mut rng, &mut ctx.cx, -999_99, 9_999_99, 100)), // acctbal
156                            Datum::String(SEGMENTS.choose(&mut rng).unwrap()),
157                            Datum::String(text_string(&mut rng, &ctx.text_string_source, 29, 116)),
158                        ]);
159                        row.clone()
160                    }
161                    TpchView::Orders => {
162                        let seed = rng.r#gen();
163                        let (order, lineitems) = ctx.order_row(seed, key);
164                        for row in lineitems {
165                            pending.push_back((
166                                TpchView::Lineitem,
167                                Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
168                            ));
169                        }
170                        if !ctx.tpch.tick.is_zero() {
171                            active_orders.push((key, seed));
172                        }
173                        order
174                    }
175                    TpchView::Nation => {
176                        let (name, region) = NATIONS[key_usize];
177                        row.packer().extend([
178                            Datum::Int64(key),
179                            Datum::String(name),
180                            Datum::Int64(region),
181                            Datum::String(text_string(&mut rng, &ctx.text_string_source, 31, 114)),
182                        ]);
183                        row.clone()
184                    }
185                    TpchView::Region => {
186                        row.packer().extend([
187                            Datum::Int64(key),
188                            Datum::String(REGIONS[key_usize]),
189                            Datum::String(text_string(&mut rng, &ctx.text_string_source, 31, 115)),
190                        ]);
191                        row.clone()
192                    }
193                    _ => unreachable!("{output:?}"),
194                };
195
196                pending.push_back((
197                    output,
198                    Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
199                ));
200                if rows.peek().is_none() {
201                    offset += 1;
202                    pending.push_back((output, Event::Progress(Some(MzOffset::from(offset)))));
203                }
204            } else {
205                if ctx.tpch.tick.is_zero() {
206                    return None;
207                }
208                let idx = rng.gen_range(0..active_orders.len());
209                let (key, old_seed) = active_orders.swap_remove(idx);
210                let (old_order, old_lineitems) = ctx.order_row(old_seed, key);
211                // Fill pending with old lineitem retractions, new lineitem
212                // additions, and finally the new order. Return the old
213                // order to start the batch.
214                for row in old_lineitems {
215                    pending.push_back((
216                        TpchView::Lineitem,
217                        Event::Message(MzOffset::from(offset), (row, Diff::MINUS_ONE)),
218                    ));
219                }
220                let new_seed = rng.r#gen();
221                let (new_order, new_lineitems) = ctx.order_row(new_seed, key);
222                for row in new_lineitems {
223                    pending.push_back((
224                        TpchView::Lineitem,
225                        Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
226                    ));
227                }
228                pending.push_back((
229                    TpchView::Orders,
230                    Event::Message(MzOffset::from(offset), (old_order, Diff::MINUS_ONE)),
231                ));
232                pending.push_back((
233                    TpchView::Orders,
234                    Event::Message(MzOffset::from(offset), (new_order, Diff::ONE)),
235                ));
236                offset += 1;
237                pending.push_back((
238                    TpchView::Orders,
239                    Event::Progress(Some(MzOffset::from(offset))),
240                ));
241                active_orders.push((key, new_seed));
242            }
243            pending
244                .pop_front()
245                .map(|(output, event)| (LoadGeneratorOutput::Tpch(output), event))
246        }))
247    }
248}
249
250struct Context {
251    tpch: Tpch,
252    decimal_one: Numeric,
253    decimal_neg_one: Numeric,
254    cx: DecimalContext<Numeric>,
255    text_string_source: String,
256    row_buffer: Row,
257}
258
259impl Context {
260    /// Generate a row based on its key and seed. Used for order retraction
261    /// without having to hold onto the full row for the order and its
262    /// lineitems.
263    fn order_row(&mut self, seed: u64, key: i64) -> (Row, Vec<Row>) {
264        let mut rng = StdRng::seed_from_u64(seed);
265        let key = order_key(key);
266        let custkey = loop {
267            let custkey = rng.gen_range(1..=self.tpch.count_customer);
268            if custkey % 3 != 0 {
269                break custkey;
270            }
271        };
272        let orderdate = date(&mut rng, &*START_DATE, 1..=*ORDER_END_DAYS);
273        let mut totalprice = Numeric::lossy_from(0);
274        let mut orderstatus = None;
275        let lineitem_count = rng.gen_range(1..=7);
276        let mut lineitems = Vec::with_capacity(lineitem_count);
277
278        for linenumber in 1..=lineitem_count {
279            let partkey = rng.gen_range(1..=self.tpch.count_part);
280            let suppkey = (partkey
281                + (rng.gen_range(0..=3)
282                    * ((self.tpch.count_supplier / 4) + (partkey - 1) / self.tpch.count_supplier)))
283                % self.tpch.count_supplier
284                + 1;
285            let quantity = Numeric::from(rng.gen_range(1..=50));
286            let mut extendedprice = quantity;
287            self.cx
288                .mul(&mut extendedprice, &partkey_retailprice(partkey).0);
289            let mut discount = decimal(&mut rng, &mut self.cx, 0, 8, 100);
290            let mut tax = decimal(&mut rng, &mut self.cx, 0, 10, 100);
291            let shipdate = date(&mut rng, &orderdate, 1..=121);
292            let receiptdate = date(&mut rng, &shipdate, 1..=30);
293            let linestatus = if shipdate > *CURRENT_DATE { "O" } else { "F" };
294            self.row_buffer.packer().extend([
295                Datum::Int64(key),
296                Datum::Int64(partkey),
297                Datum::Int64(suppkey),
298                Datum::Int32(linenumber.try_into().expect("must fit")),
299                Datum::Numeric(OrderedDecimal(quantity)),
300                Datum::Numeric(OrderedDecimal(extendedprice)),
301                Datum::Numeric(discount),
302                Datum::Numeric(tax),
303                Datum::String(if receiptdate <= *CURRENT_DATE {
304                    ["R", "A"].choose(&mut rng).unwrap()
305                } else {
306                    "N"
307                }), // returnflag
308                Datum::String(linestatus),
309                Datum::Date(shipdate),
310                Datum::Date(date(&mut rng, &orderdate, 30..=90)), // commitdate
311                Datum::Date(receiptdate),
312                Datum::String(INSTRUCTIONS.choose(&mut rng).unwrap()),
313                Datum::String(MODES.choose(&mut rng).unwrap()),
314                Datum::String(text_string(&mut rng, &self.text_string_source, 10, 43)),
315            ]);
316            let row = self.row_buffer.clone();
317            // totalprice += extendedprice * (1.0 + tax) * (1.0 - discount)
318            self.cx.add(&mut tax.0, &self.decimal_one);
319            self.cx.sub(&mut discount.0, &self.decimal_neg_one);
320            self.cx.abs(&mut discount.0);
321            self.cx.mul(&mut extendedprice, &tax.0);
322            self.cx.mul(&mut extendedprice, &discount.0);
323            self.cx.add(&mut totalprice, &extendedprice);
324            if let Some(status) = orderstatus {
325                if status != linestatus {
326                    orderstatus = Some("P");
327                }
328            } else {
329                orderstatus = Some(linestatus);
330            }
331            lineitems.push(row);
332        }
333
334        self.row_buffer.packer().extend([
335            Datum::Int64(key),
336            Datum::Int64(custkey),
337            Datum::String(orderstatus.unwrap()),
338            Datum::Numeric(OrderedDecimal(totalprice)),
339            Datum::Date(orderdate),
340            Datum::String(PRIORITIES.choose(&mut rng).unwrap()),
341            Datum::String(&pad_nine("Clerk", rng.gen_range(1..=self.tpch.count_clerk))),
342            Datum::Int32(0), // shippriority
343            Datum::String(text_string(&mut rng, &self.text_string_source, 19, 78)),
344        ]);
345        let order = self.row_buffer.clone();
346        (order, lineitems)
347    }
348}
349
350fn partkey_retailprice(key: i64) -> OrderedDecimal<Numeric> {
351    let price = (90000 + ((key / 10) % 20001) + 100 * (key % 1000)) / 100;
352    OrderedDecimal(Numeric::from(price))
353}
354
355fn pad_nine<S: Display>(prefix: &str, s: S) -> String {
356    format!("{}#{s:09}", prefix)
357}
358
359pub static START_DATE: LazyLock<Date> =
360    LazyLock::new(|| Date::try_from(NaiveDate::from_ymd_opt(1992, 1, 1).unwrap()).unwrap());
361pub static CURRENT_DATE: LazyLock<Date> =
362    LazyLock::new(|| Date::try_from(NaiveDate::from_ymd_opt(1995, 6, 17).unwrap()).unwrap());
363pub static END_DATE: LazyLock<Date> =
364    LazyLock::new(|| Date::try_from(NaiveDate::from_ymd_opt(1998, 12, 31).unwrap()).unwrap());
365pub static ORDER_END_DAYS: LazyLock<i32> = LazyLock::new(|| *END_DATE - *START_DATE - 151);
366
367fn text_string<'a, R: Rng + ?Sized>(
368    rng: &mut R,
369    source: &'a str,
370    min: usize,
371    max: usize,
372) -> &'a str {
373    let start = rng.gen_range(0..=(source.len() - max));
374    let len = rng.gen_range(min..=max);
375    &source[start..(start + len)]
376}
377
378fn date<R: Rng + ?Sized>(rng: &mut R, start: &Date, days: RangeInclusive<i32>) -> Date {
379    let days = rng.gen_range(days);
380    start.checked_add(days).expect("must fit")
381}
382
383// See mk_sparse in dbgen's build.c.
384fn order_key(mut i: i64) -> i64 {
385    const SPARSE_BITS: usize = 2;
386    const SPARSE_KEEP: usize = 3;
387    let low_bits = i & ((1 << SPARSE_KEEP) - 1);
388    i >>= SPARSE_KEEP;
389    i <<= SPARSE_BITS;
390    // build.c has a `i += seq` here which allows generating multiple data sets
391    // in flat files.
392    i <<= SPARSE_KEEP;
393    i += low_bits;
394    i
395}
396
397fn syllables<R: Rng + ?Sized>(rng: &mut R, syllables: &[&[&str]]) -> String {
398    let mut s = String::new();
399    for (i, syllable) in syllables.iter().enumerate() {
400        if i > 0 {
401            s.push(' ');
402        }
403        s.push_str(syllable.choose(rng).unwrap());
404    }
405    s
406}
407
408fn decimal<R: Rng + ?Sized>(
409    rng: &mut R,
410    cx: &mut dec::Context<Numeric>,
411    min: i64,
412    max: i64,
413    div: i64,
414) -> OrderedDecimal<Numeric> {
415    let n = rng.gen_range(min..=max);
416    let mut n = Numeric::lossy_from(n);
417    cx.div(&mut n, &Numeric::lossy_from(div));
418    OrderedDecimal(n)
419}
420
421fn phone<R: Rng + ?Sized>(rng: &mut R, nation: i64) -> String {
422    let mut s = String::with_capacity(15);
423    s.push_str(&(nation + 10).to_string());
424    s.push('-');
425    s.push_str(&rng.gen_range(100..=999).to_string());
426    s.push('-');
427    s.push_str(&rng.gen_range(100..=999).to_string());
428    s.push('-');
429    s.push_str(&rng.gen_range(1000..=9999).to_string());
430    s
431}
432
433fn v_string<R: Rng + ?Sized>(rng: &mut R, min: usize, max: usize) -> String {
434    const ALPHABET: [char; 64] = [
435        'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
436        's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J',
437        'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2',
438        '3', '4', '5', '6', '7', '8', '9', '0', ',', ' ',
439    ];
440    let take = rng.gen_range(min..=max);
441    let mut s = String::with_capacity(take);
442    for _ in 0..take {
443        s.push(*ALPHABET.choose(rng).unwrap());
444    }
445    s
446}
447
448const INSTRUCTIONS: &[&str] = &[
449    "DELIVER IN PERSON",
450    "COLLECT COD",
451    "NONE",
452    "TAKE BACK RETURN",
453];
454
455const MODES: &[&str] = &["REG AIR", "AIR", "RAIL", "SHIP", "TRUCK", "MAIL", "FOB"];
456
457const PARTNAMES: &[&str] = &[
458    "almond",
459    "antique",
460    "aquamarine",
461    "azure",
462    "beige",
463    "bisque",
464    "black",
465    "blanched",
466    "blue",
467    "blush",
468    "brown",
469    "burlywood",
470    "burnished",
471    "chartreuse",
472    "chiffon",
473    "chocolate",
474    "coral",
475    "cornflower",
476    "cornsilk",
477    "cream",
478    "cyan",
479    "dark",
480    "deep",
481    "dim",
482    "dodger",
483    "drab",
484    "firebrick",
485    "floral",
486    "forest",
487    "frosted",
488    "gainsboro",
489    "ghost",
490    "goldenrod",
491    "green",
492    "grey",
493    "honeydew",
494    "hot",
495    "indian",
496    "ivory",
497    "khaki",
498    "lace",
499    "lavender",
500    "lawn",
501    "lemon",
502    "light",
503    "lime",
504    "linen",
505    "magenta",
506    "maroon",
507    "medium",
508    "metallic",
509    "midnight",
510    "mint",
511    "misty",
512    "moccasin",
513    "navajo",
514    "navy",
515    "olive",
516    "orange",
517    "orchid",
518    "pale",
519    "papaya",
520    "peach",
521    "peru",
522    "pink",
523    "plum",
524    "powder",
525    "puff",
526    "purple",
527    "red",
528    "rose",
529    "rosy",
530    "royal",
531    "saddle",
532    "salmon",
533    "sandy",
534    "seashell",
535    "sienna",
536    "sky",
537    "slate",
538    "smoke",
539    "snow",
540    "spring",
541    "steel",
542    "tan",
543    "thistle",
544    "tomato",
545    "turquoise",
546    "violet",
547    "wheat",
548    "white",
549    "yellow",
550];
551
552const PRIORITIES: &[&str] = &["1-URGENT", "2-HIGH", "3-MEDIUM", "4-NOT SPECIFIED"];
553
554const TYPES: &[&[&str]] = &[
555    &["STANDARD", "SMALL", "MEDIUM", "LARGE", "ECONOMY", "PROMO"],
556    &["ANODIZED", "BURNISHED", "PLATED", "POLISHED", "BRUSHED"],
557    &["TIN", "NICKEL", "BRASS", "STEEL", "COPPER"],
558];
559
560const CONTAINERS: &[&[&str]] = &[
561    &["SM", "MED", "JUMBO", "WRAP"],
562    &["BOX", "BAG", "JAR", "PKG", "PACK", "CAN", "DRUM"],
563];
564
565const SEGMENTS: &[&str] = &[
566    "AUTOMOBILE",
567    "BUILDING",
568    "FURNITURE",
569    "MACHINERY",
570    "HOUSEHOLD",
571];
572
573const REGIONS: &[&str] = &["AFRICA", "AMERICA", "ASIA", "EUROPE", "MIDDLE EAST"];
574
575const NATIONS: &[(&str, i64)] = &[
576    ("ALGERIA", 0),
577    ("ARGENTINA", 1),
578    ("BRAZIL", 1),
579    ("CANADA", 1),
580    ("EGYPT", 4),
581    ("ETHIOPIA", 0),
582    ("FRANCE", 3),
583    ("GERMANY", 3),
584    ("INDIA", 2),
585    ("INDONESIA", 2),
586    ("IRAN", 4),
587    ("IRAQ", 4),
588    ("JAPAN", 2),
589    ("JORDAN", 4),
590    ("KENYA", 0),
591    ("MOROCCO", 0),
592    ("MOZAMBIQUE", 0),
593    ("PERU", 1),
594    ("CHINA", 2),
595    ("ROMANIA", 3),
596    ("SAUDI ARABIA", 4),
597    ("VIETNAM", 2),
598    ("RUSSIA", 3),
599    ("UNITED KINGDOM", 3),
600    ("UNITED STATES", 1),
601];