mz_storage/source/generator/
tpch.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::VecDeque;
11use std::fmt::Display;
12use std::iter;
13use std::ops::RangeInclusive;
14use std::sync::LazyLock;
15use std::time::Duration;
16
17use chrono::NaiveDate;
18use dec::{Context as DecimalContext, OrderedDecimal};
19use mz_ore::now::NowFn;
20use mz_repr::adt::date::Date;
21use mz_repr::adt::numeric::{self, DecimalLike, Numeric};
22use mz_repr::{Datum, Diff, Row};
23use mz_storage_types::sources::MzOffset;
24use mz_storage_types::sources::load_generator::{Event, Generator, LoadGeneratorOutput, TpchView};
25use rand::distributions::{Alphanumeric, DistString};
26use rand::rngs::StdRng;
27use rand::seq::SliceRandom;
28use rand::{Rng, SeedableRng};
29
30#[derive(Clone, Debug)]
31pub struct Tpch {
32    pub count_supplier: i64,
33    pub count_part: i64,
34    pub count_customer: i64,
35    pub count_orders: i64,
36    pub count_clerk: i64,
37    pub tick: Duration,
38}
39
40impl Generator for Tpch {
41    fn by_seed(
42        &self,
43        _: NowFn,
44        seed: Option<u64>,
45        _resume_offset: MzOffset,
46    ) -> Box<dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>> {
47        let mut rng = StdRng::seed_from_u64(seed.unwrap_or_default());
48        let mut ctx = Context {
49            tpch: self.clone(),
50            decimal_one: Numeric::from(1),
51            decimal_neg_one: Numeric::from(-1),
52            cx: numeric::cx_datum(),
53            // TODO: Use a text generator closer to the spec.
54            text_string_source: Alphanumeric.sample_string(&mut rng, 3 << 20),
55            row_buffer: Row::default(),
56        };
57
58        let count_nation: i64 = NATIONS.len().try_into().unwrap();
59        let count_region: i64 = REGIONS.len().try_into().unwrap();
60
61        let mut rows = (0..ctx.tpch.count_supplier)
62            .map(|i| (TpchView::Supplier, i))
63            .chain((1..=ctx.tpch.count_part).map(|i| (TpchView::Part, i)))
64            .chain((1..=ctx.tpch.count_customer).map(|i| (TpchView::Customer, i)))
65            .chain((1..=ctx.tpch.count_orders).map(|i| (TpchView::Orders, i)))
66            .chain((0..count_nation).map(|i| (TpchView::Nation, i)))
67            .chain((0..count_region).map(|i| (TpchView::Region, i)))
68            .peekable();
69
70        // Some rows need to generate other rows from their values; hold those
71        // here.
72        let mut pending = VecDeque::new();
73
74        // All orders and their lineitems, so they can be retracted during
75        // streaming.
76        let mut active_orders = Vec::new();
77
78        let mut offset = 0;
79        let mut row = Row::default();
80        Box::new(iter::from_fn(move || {
81            if let Some((output, event)) = pending.pop_front() {
82                return Some((LoadGeneratorOutput::Tpch(output), event));
83            }
84            if let Some((output, key)) = rows.next() {
85                let key_usize = usize::try_from(key).expect("key known to be non-negative");
86                let row = match output {
87                    TpchView::Supplier => {
88                        let nation = rng.gen_range(0..count_nation);
89                        row.packer().extend([
90                            Datum::Int64(key),
91                            Datum::String(&pad_nine("Supplier", key)),
92                            Datum::String(&v_string(&mut rng, 10, 40)), // address
93                            Datum::Int64(nation),
94                            Datum::String(&phone(&mut rng, nation)),
95                            Datum::Numeric(decimal(&mut rng, &mut ctx.cx, -999_99, 9_999_99, 100)), // acctbal
96                            // TODO: add customer complaints and recommends, see 4.2.3.
97                            Datum::String(text_string(&mut rng, &ctx.text_string_source, 25, 100)),
98                        ]);
99                        row.clone()
100                    }
101                    TpchView::Part => {
102                        let name: String = PARTNAMES
103                            .choose_multiple(&mut rng, 5)
104                            .cloned()
105                            .collect::<Vec<_>>()
106                            .join("  ");
107                        let m = rng.gen_range(1..=5);
108                        let n = rng.gen_range(1..=5);
109                        for _ in 1..=4 {
110                            let suppkey = (key
111                                + (rng.gen_range(0..=3)
112                                    * ((ctx.tpch.count_supplier / 4)
113                                        + (key - 1) / ctx.tpch.count_supplier)))
114                                % ctx.tpch.count_supplier
115                                + 1;
116                            row.packer().extend([
117                                Datum::Int64(key),
118                                Datum::Int64(suppkey),
119                                Datum::Int32(rng.gen_range(1..=9_999)), // availqty
120                                Datum::Numeric(decimal(&mut rng, &mut ctx.cx, 1_00, 1_000_00, 100)), // supplycost
121                                Datum::String(text_string(
122                                    &mut rng,
123                                    &ctx.text_string_source,
124                                    49,
125                                    198,
126                                )),
127                            ]);
128                            pending.push_back((
129                                TpchView::Partsupp,
130                                Event::Message(MzOffset::from(offset), (row.clone(), Diff::ONE)),
131                            ));
132                        }
133                        row.packer().extend([
134                            Datum::Int64(key),
135                            Datum::String(&name),
136                            Datum::String(&format!("Manufacturer#{m}")),
137                            Datum::String(&format!("Brand#{m}{n}")),
138                            Datum::String(&syllables(&mut rng, TYPES)),
139                            Datum::Int32(rng.gen_range(1..=50)), // size
140                            Datum::String(&syllables(&mut rng, CONTAINERS)),
141                            Datum::Numeric(partkey_retailprice(key)),
142                            Datum::String(text_string(&mut rng, &ctx.text_string_source, 49, 198)),
143                        ]);
144                        row.clone()
145                    }
146                    TpchView::Customer => {
147                        let nation = rng.gen_range(0..count_nation);
148                        row.packer().extend([
149                            Datum::Int64(key),
150                            Datum::String(&pad_nine("Customer", key)),
151                            Datum::String(&v_string(&mut rng, 10, 40)), // address
152                            Datum::Int64(nation),
153                            Datum::String(&phone(&mut rng, nation)),
154                            Datum::Numeric(decimal(&mut rng, &mut ctx.cx, -999_99, 9_999_99, 100)), // acctbal
155                            Datum::String(SEGMENTS.choose(&mut rng).unwrap()),
156                            Datum::String(text_string(&mut rng, &ctx.text_string_source, 29, 116)),
157                        ]);
158                        row.clone()
159                    }
160                    TpchView::Orders => {
161                        let seed = rng.r#gen();
162                        let (order, lineitems) = ctx.order_row(seed, key);
163                        for row in lineitems {
164                            pending.push_back((
165                                TpchView::Lineitem,
166                                Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
167                            ));
168                        }
169                        if !ctx.tpch.tick.is_zero() {
170                            active_orders.push((key, seed));
171                        }
172                        order
173                    }
174                    TpchView::Nation => {
175                        let (name, region) = NATIONS[key_usize];
176                        row.packer().extend([
177                            Datum::Int64(key),
178                            Datum::String(name),
179                            Datum::Int64(region),
180                            Datum::String(text_string(&mut rng, &ctx.text_string_source, 31, 114)),
181                        ]);
182                        row.clone()
183                    }
184                    TpchView::Region => {
185                        row.packer().extend([
186                            Datum::Int64(key),
187                            Datum::String(REGIONS[key_usize]),
188                            Datum::String(text_string(&mut rng, &ctx.text_string_source, 31, 115)),
189                        ]);
190                        row.clone()
191                    }
192                    _ => unreachable!("{output:?}"),
193                };
194
195                pending.push_back((
196                    output,
197                    Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
198                ));
199                if rows.peek().is_none() {
200                    offset += 1;
201                    pending.push_back((output, Event::Progress(Some(MzOffset::from(offset)))));
202                }
203            } else {
204                if ctx.tpch.tick.is_zero() {
205                    return None;
206                }
207                let idx = rng.gen_range(0..active_orders.len());
208                let (key, old_seed) = active_orders.swap_remove(idx);
209                let (old_order, old_lineitems) = ctx.order_row(old_seed, key);
210                // Fill pending with old lineitem retractions, new lineitem
211                // additions, and finally the new order. Return the old
212                // order to start the batch.
213                for row in old_lineitems {
214                    pending.push_back((
215                        TpchView::Lineitem,
216                        Event::Message(MzOffset::from(offset), (row, Diff::MINUS_ONE)),
217                    ));
218                }
219                let new_seed = rng.r#gen();
220                let (new_order, new_lineitems) = ctx.order_row(new_seed, key);
221                for row in new_lineitems {
222                    pending.push_back((
223                        TpchView::Lineitem,
224                        Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
225                    ));
226                }
227                pending.push_back((
228                    TpchView::Orders,
229                    Event::Message(MzOffset::from(offset), (old_order, Diff::MINUS_ONE)),
230                ));
231                pending.push_back((
232                    TpchView::Orders,
233                    Event::Message(MzOffset::from(offset), (new_order, Diff::ONE)),
234                ));
235                offset += 1;
236                pending.push_back((
237                    TpchView::Orders,
238                    Event::Progress(Some(MzOffset::from(offset))),
239                ));
240                active_orders.push((key, new_seed));
241            }
242            pending
243                .pop_front()
244                .map(|(output, event)| (LoadGeneratorOutput::Tpch(output), event))
245        }))
246    }
247}
248
249struct Context {
250    tpch: Tpch,
251    decimal_one: Numeric,
252    decimal_neg_one: Numeric,
253    cx: DecimalContext<Numeric>,
254    text_string_source: String,
255    row_buffer: Row,
256}
257
258impl Context {
259    /// Generate a row based on its key and seed. Used for order retraction
260    /// without having to hold onto the full row for the order and its
261    /// lineitems.
262    fn order_row(&mut self, seed: u64, key: i64) -> (Row, Vec<Row>) {
263        let mut rng = StdRng::seed_from_u64(seed);
264        let key = order_key(key);
265        let custkey = loop {
266            let custkey = rng.gen_range(1..=self.tpch.count_customer);
267            if custkey % 3 != 0 {
268                break custkey;
269            }
270        };
271        let orderdate = date(&mut rng, &*START_DATE, 1..=*ORDER_END_DAYS);
272        let mut totalprice = Numeric::lossy_from(0);
273        let mut orderstatus = None;
274        let lineitem_count = rng.gen_range(1..=7);
275        let mut lineitems = Vec::with_capacity(lineitem_count);
276
277        for linenumber in 1..=lineitem_count {
278            let partkey = rng.gen_range(1..=self.tpch.count_part);
279            let suppkey = (partkey
280                + (rng.gen_range(0..=3)
281                    * ((self.tpch.count_supplier / 4) + (partkey - 1) / self.tpch.count_supplier)))
282                % self.tpch.count_supplier
283                + 1;
284            let quantity = Numeric::from(rng.gen_range(1..=50));
285            let mut extendedprice = quantity;
286            self.cx
287                .mul(&mut extendedprice, &partkey_retailprice(partkey).0);
288            let mut discount = decimal(&mut rng, &mut self.cx, 0, 8, 100);
289            let mut tax = decimal(&mut rng, &mut self.cx, 0, 10, 100);
290            let shipdate = date(&mut rng, &orderdate, 1..=121);
291            let receiptdate = date(&mut rng, &shipdate, 1..=30);
292            let linestatus = if shipdate > *CURRENT_DATE { "O" } else { "F" };
293            self.row_buffer.packer().extend([
294                Datum::Int64(key),
295                Datum::Int64(partkey),
296                Datum::Int64(suppkey),
297                Datum::Int32(linenumber.try_into().expect("must fit")),
298                Datum::Numeric(OrderedDecimal(quantity)),
299                Datum::Numeric(OrderedDecimal(extendedprice)),
300                Datum::Numeric(discount),
301                Datum::Numeric(tax),
302                Datum::String(if receiptdate <= *CURRENT_DATE {
303                    ["R", "A"].choose(&mut rng).unwrap()
304                } else {
305                    "N"
306                }), // returnflag
307                Datum::String(linestatus),
308                Datum::Date(shipdate),
309                Datum::Date(date(&mut rng, &orderdate, 30..=90)), // commitdate
310                Datum::Date(receiptdate),
311                Datum::String(INSTRUCTIONS.choose(&mut rng).unwrap()),
312                Datum::String(MODES.choose(&mut rng).unwrap()),
313                Datum::String(text_string(&mut rng, &self.text_string_source, 10, 43)),
314            ]);
315            let row = self.row_buffer.clone();
316            // totalprice += extendedprice * (1.0 + tax) * (1.0 - discount)
317            self.cx.add(&mut tax.0, &self.decimal_one);
318            self.cx.sub(&mut discount.0, &self.decimal_neg_one);
319            self.cx.abs(&mut discount.0);
320            self.cx.mul(&mut extendedprice, &tax.0);
321            self.cx.mul(&mut extendedprice, &discount.0);
322            self.cx.add(&mut totalprice, &extendedprice);
323            if let Some(status) = orderstatus {
324                if status != linestatus {
325                    orderstatus = Some("P");
326                }
327            } else {
328                orderstatus = Some(linestatus);
329            }
330            lineitems.push(row);
331        }
332
333        self.row_buffer.packer().extend([
334            Datum::Int64(key),
335            Datum::Int64(custkey),
336            Datum::String(orderstatus.unwrap()),
337            Datum::Numeric(OrderedDecimal(totalprice)),
338            Datum::Date(orderdate),
339            Datum::String(PRIORITIES.choose(&mut rng).unwrap()),
340            Datum::String(&pad_nine("Clerk", rng.gen_range(1..=self.tpch.count_clerk))),
341            Datum::Int32(0), // shippriority
342            Datum::String(text_string(&mut rng, &self.text_string_source, 19, 78)),
343        ]);
344        let order = self.row_buffer.clone();
345        (order, lineitems)
346    }
347}
348
349fn partkey_retailprice(key: i64) -> OrderedDecimal<Numeric> {
350    let price = (90000 + ((key / 10) % 20001) + 100 * (key % 1000)) / 100;
351    OrderedDecimal(Numeric::from(price))
352}
353
354fn pad_nine<S: Display>(prefix: &str, s: S) -> String {
355    format!("{}#{s:09}", prefix)
356}
357
358pub static START_DATE: LazyLock<Date> =
359    LazyLock::new(|| Date::try_from(NaiveDate::from_ymd_opt(1992, 1, 1).unwrap()).unwrap());
360pub static CURRENT_DATE: LazyLock<Date> =
361    LazyLock::new(|| Date::try_from(NaiveDate::from_ymd_opt(1995, 6, 17).unwrap()).unwrap());
362pub static END_DATE: LazyLock<Date> =
363    LazyLock::new(|| Date::try_from(NaiveDate::from_ymd_opt(1998, 12, 31).unwrap()).unwrap());
364pub static ORDER_END_DAYS: LazyLock<i32> = LazyLock::new(|| *END_DATE - *START_DATE - 151);
365
366fn text_string<'a, R: Rng + ?Sized>(
367    rng: &mut R,
368    source: &'a str,
369    min: usize,
370    max: usize,
371) -> &'a str {
372    let start = rng.gen_range(0..=(source.len() - max));
373    let len = rng.gen_range(min..=max);
374    &source[start..(start + len)]
375}
376
377fn date<R: Rng + ?Sized>(rng: &mut R, start: &Date, days: RangeInclusive<i32>) -> Date {
378    let days = rng.gen_range(days);
379    start.checked_add(days).expect("must fit")
380}
381
382// See mk_sparse in dbgen's build.c.
383fn order_key(mut i: i64) -> i64 {
384    const SPARSE_BITS: usize = 2;
385    const SPARSE_KEEP: usize = 3;
386    let low_bits = i & ((1 << SPARSE_KEEP) - 1);
387    i >>= SPARSE_KEEP;
388    i <<= SPARSE_BITS;
389    // build.c has a `i += seq` here which allows generating multiple data sets
390    // in flat files.
391    i <<= SPARSE_KEEP;
392    i += low_bits;
393    i
394}
395
396fn syllables<R: Rng + ?Sized>(rng: &mut R, syllables: &[&[&str]]) -> String {
397    let mut s = String::new();
398    for (i, syllable) in syllables.iter().enumerate() {
399        if i > 0 {
400            s.push(' ');
401        }
402        s.push_str(syllable.choose(rng).unwrap());
403    }
404    s
405}
406
407fn decimal<R: Rng + ?Sized>(
408    rng: &mut R,
409    cx: &mut dec::Context<Numeric>,
410    min: i64,
411    max: i64,
412    div: i64,
413) -> OrderedDecimal<Numeric> {
414    let n = rng.gen_range(min..=max);
415    let mut n = Numeric::lossy_from(n);
416    cx.div(&mut n, &Numeric::lossy_from(div));
417    OrderedDecimal(n)
418}
419
420fn phone<R: Rng + ?Sized>(rng: &mut R, nation: i64) -> String {
421    let mut s = String::with_capacity(15);
422    s.push_str(&(nation + 10).to_string());
423    s.push('-');
424    s.push_str(&rng.gen_range(100..=999).to_string());
425    s.push('-');
426    s.push_str(&rng.gen_range(100..=999).to_string());
427    s.push('-');
428    s.push_str(&rng.gen_range(1000..=9999).to_string());
429    s
430}
431
432fn v_string<R: Rng + ?Sized>(rng: &mut R, min: usize, max: usize) -> String {
433    const ALPHABET: [char; 64] = [
434        'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
435        's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J',
436        'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2',
437        '3', '4', '5', '6', '7', '8', '9', '0', ',', ' ',
438    ];
439    let take = rng.gen_range(min..=max);
440    let mut s = String::with_capacity(take);
441    for _ in 0..take {
442        s.push(*ALPHABET.choose(rng).unwrap());
443    }
444    s
445}
446
447const INSTRUCTIONS: &[&str] = &[
448    "DELIVER IN PERSON",
449    "COLLECT COD",
450    "NONE",
451    "TAKE BACK RETURN",
452];
453
454const MODES: &[&str] = &["REG AIR", "AIR", "RAIL", "SHIP", "TRUCK", "MAIL", "FOB"];
455
456const PARTNAMES: &[&str] = &[
457    "almond",
458    "antique",
459    "aquamarine",
460    "azure",
461    "beige",
462    "bisque",
463    "black",
464    "blanched",
465    "blue",
466    "blush",
467    "brown",
468    "burlywood",
469    "burnished",
470    "chartreuse",
471    "chiffon",
472    "chocolate",
473    "coral",
474    "cornflower",
475    "cornsilk",
476    "cream",
477    "cyan",
478    "dark",
479    "deep",
480    "dim",
481    "dodger",
482    "drab",
483    "firebrick",
484    "floral",
485    "forest",
486    "frosted",
487    "gainsboro",
488    "ghost",
489    "goldenrod",
490    "green",
491    "grey",
492    "honeydew",
493    "hot",
494    "indian",
495    "ivory",
496    "khaki",
497    "lace",
498    "lavender",
499    "lawn",
500    "lemon",
501    "light",
502    "lime",
503    "linen",
504    "magenta",
505    "maroon",
506    "medium",
507    "metallic",
508    "midnight",
509    "mint",
510    "misty",
511    "moccasin",
512    "navajo",
513    "navy",
514    "olive",
515    "orange",
516    "orchid",
517    "pale",
518    "papaya",
519    "peach",
520    "peru",
521    "pink",
522    "plum",
523    "powder",
524    "puff",
525    "purple",
526    "red",
527    "rose",
528    "rosy",
529    "royal",
530    "saddle",
531    "salmon",
532    "sandy",
533    "seashell",
534    "sienna",
535    "sky",
536    "slate",
537    "smoke",
538    "snow",
539    "spring",
540    "steel",
541    "tan",
542    "thistle",
543    "tomato",
544    "turquoise",
545    "violet",
546    "wheat",
547    "white",
548    "yellow",
549];
550
551const PRIORITIES: &[&str] = &["1-URGENT", "2-HIGH", "3-MEDIUM", "4-NOT SPECIFIED"];
552
553const TYPES: &[&[&str]] = &[
554    &["STANDARD", "SMALL", "MEDIUM", "LARGE", "ECONOMY", "PROMO"],
555    &["ANODIZED", "BURNISHED", "PLATED", "POLISHED", "BRUSHED"],
556    &["TIN", "NICKEL", "BRASS", "STEEL", "COPPER"],
557];
558
559const CONTAINERS: &[&[&str]] = &[
560    &["SM", "MED", "JUMBO", "WRAP"],
561    &["BOX", "BAG", "JAR", "PKG", "PACK", "CAN", "DRUM"],
562];
563
564const SEGMENTS: &[&str] = &[
565    "AUTOMOBILE",
566    "BUILDING",
567    "FURNITURE",
568    "MACHINERY",
569    "HOUSEHOLD",
570];
571
572const REGIONS: &[&str] = &["AFRICA", "AMERICA", "ASIA", "EUROPE", "MIDDLE EAST"];
573
574const NATIONS: &[(&str, i64)] = &[
575    ("ALGERIA", 0),
576    ("ARGENTINA", 1),
577    ("BRAZIL", 1),
578    ("CANADA", 1),
579    ("EGYPT", 4),
580    ("ETHIOPIA", 0),
581    ("FRANCE", 3),
582    ("GERMANY", 3),
583    ("INDIA", 2),
584    ("INDONESIA", 2),
585    ("IRAN", 4),
586    ("IRAQ", 4),
587    ("JAPAN", 2),
588    ("JORDAN", 4),
589    ("KENYA", 0),
590    ("MOROCCO", 0),
591    ("MOZAMBIQUE", 0),
592    ("PERU", 1),
593    ("CHINA", 2),
594    ("ROMANIA", 3),
595    ("SAUDI ARABIA", 4),
596    ("VIETNAM", 2),
597    ("RUSSIA", 3),
598    ("UNITED KINGDOM", 3),
599    ("UNITED STATES", 1),
600];