use std::collections::VecDeque;
use std::fmt::Display;
use std::iter;
use std::ops::RangeInclusive;
use std::sync::LazyLock;
use std::time::Duration;
use chrono::NaiveDate;
use dec::{Context as DecimalContext, OrderedDecimal};
use mz_ore::now::NowFn;
use mz_repr::adt::date::Date;
use mz_repr::adt::numeric::{self, DecimalLike, Numeric};
use mz_repr::{Datum, Row};
use mz_storage_types::sources::load_generator::{Event, Generator, LoadGeneratorOutput, TpchView};
use mz_storage_types::sources::MzOffset;
use rand::distributions::{Alphanumeric, DistString};
use rand::rngs::StdRng;
use rand::seq::SliceRandom;
use rand::{Rng, SeedableRng};
#[derive(Clone, Debug)]
pub struct Tpch {
pub count_supplier: i64,
pub count_part: i64,
pub count_customer: i64,
pub count_orders: i64,
pub count_clerk: i64,
pub tick: Duration,
}
impl Generator for Tpch {
fn by_seed(
&self,
_: NowFn,
seed: Option<u64>,
_resume_offset: MzOffset,
) -> Box<(dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, i64)>)>)>
{
let mut rng = StdRng::seed_from_u64(seed.unwrap_or_default());
let mut ctx = Context {
tpch: self.clone(),
decimal_one: Numeric::from(1),
decimal_neg_one: Numeric::from(-1),
cx: numeric::cx_datum(),
text_string_source: Alphanumeric.sample_string(&mut rng, 3 << 20),
row_buffer: Row::default(),
};
let count_nation: i64 = NATIONS.len().try_into().unwrap();
let count_region: i64 = REGIONS.len().try_into().unwrap();
let mut rows = (0..ctx.tpch.count_supplier)
.map(|i| (TpchView::Supplier, i))
.chain((1..=ctx.tpch.count_part).map(|i| (TpchView::Part, i)))
.chain((1..=ctx.tpch.count_customer).map(|i| (TpchView::Customer, i)))
.chain((1..=ctx.tpch.count_orders).map(|i| (TpchView::Orders, i)))
.chain((0..count_nation).map(|i| (TpchView::Nation, i)))
.chain((0..count_region).map(|i| (TpchView::Region, i)))
.peekable();
let mut pending = VecDeque::new();
let mut active_orders = Vec::new();
let mut offset = 0;
let mut row = Row::default();
Box::new(iter::from_fn(move || {
if let Some((output, event)) = pending.pop_front() {
return Some((LoadGeneratorOutput::Tpch(output), event));
}
if let Some((output, key)) = rows.next() {
let key_usize = usize::try_from(key).expect("key known to be non-negative");
let row = match output {
TpchView::Supplier => {
let nation = rng.gen_range(0..count_nation);
row.packer().extend([
Datum::Int64(key),
Datum::String(&pad_nine("Supplier", key)),
Datum::String(&v_string(&mut rng, 10, 40)), Datum::Int64(nation),
Datum::String(&phone(&mut rng, nation)),
Datum::Numeric(decimal(&mut rng, &mut ctx.cx, -999_99, 9_999_99, 100)), Datum::String(text_string(&mut rng, &ctx.text_string_source, 25, 100)),
]);
row.clone()
}
TpchView::Part => {
let name: String = PARTNAMES
.choose_multiple(&mut rng, 5)
.cloned()
.collect::<Vec<_>>()
.join(" ");
let m = rng.gen_range(1..=5);
let n = rng.gen_range(1..=5);
for _ in 1..=4 {
let suppkey = (key
+ (rng.gen_range(0..=3)
* ((ctx.tpch.count_supplier / 4)
+ (key - 1) / ctx.tpch.count_supplier)))
% ctx.tpch.count_supplier
+ 1;
row.packer().extend([
Datum::Int64(key),
Datum::Int64(suppkey),
Datum::Int32(rng.gen_range(1..=9_999)), Datum::Numeric(decimal(&mut rng, &mut ctx.cx, 1_00, 1_000_00, 100)), Datum::String(text_string(
&mut rng,
&ctx.text_string_source,
49,
198,
)),
]);
pending.push_back((
TpchView::Partsupp,
Event::Message(MzOffset::from(offset), (row.clone(), 1)),
));
}
row.packer().extend([
Datum::Int64(key),
Datum::String(&name),
Datum::String(&format!("Manufacturer#{m}")),
Datum::String(&format!("Brand#{m}{n}")),
Datum::String(&syllables(&mut rng, TYPES)),
Datum::Int32(rng.gen_range(1..=50)), Datum::String(&syllables(&mut rng, CONTAINERS)),
Datum::Numeric(partkey_retailprice(key)),
Datum::String(text_string(&mut rng, &ctx.text_string_source, 49, 198)),
]);
row.clone()
}
TpchView::Customer => {
let nation = rng.gen_range(0..count_nation);
row.packer().extend([
Datum::Int64(key),
Datum::String(&pad_nine("Customer", key)),
Datum::String(&v_string(&mut rng, 10, 40)), Datum::Int64(nation),
Datum::String(&phone(&mut rng, nation)),
Datum::Numeric(decimal(&mut rng, &mut ctx.cx, -999_99, 9_999_99, 100)), Datum::String(SEGMENTS.choose(&mut rng).unwrap()),
Datum::String(text_string(&mut rng, &ctx.text_string_source, 29, 116)),
]);
row.clone()
}
TpchView::Orders => {
let seed = rng.gen();
let (order, lineitems) = ctx.order_row(seed, key);
for row in lineitems {
pending.push_back((
TpchView::Lineitem,
Event::Message(MzOffset::from(offset), (row, 1)),
));
}
if !ctx.tpch.tick.is_zero() {
active_orders.push((key, seed));
}
order
}
TpchView::Nation => {
let (name, region) = NATIONS[key_usize];
row.packer().extend([
Datum::Int64(key),
Datum::String(name),
Datum::Int64(region),
Datum::String(text_string(&mut rng, &ctx.text_string_source, 31, 114)),
]);
row.clone()
}
TpchView::Region => {
row.packer().extend([
Datum::Int64(key),
Datum::String(REGIONS[key_usize]),
Datum::String(text_string(&mut rng, &ctx.text_string_source, 31, 115)),
]);
row.clone()
}
_ => unreachable!("{output:?}"),
};
pending.push_back((output, Event::Message(MzOffset::from(offset), (row, 1))));
if rows.peek().is_none() {
offset += 1;
pending.push_back((output, Event::Progress(Some(MzOffset::from(offset)))));
}
} else {
if ctx.tpch.tick.is_zero() {
return None;
}
let idx = rng.gen_range(0..active_orders.len());
let (key, old_seed) = active_orders.swap_remove(idx);
let (old_order, old_lineitems) = ctx.order_row(old_seed, key);
for row in old_lineitems {
pending.push_back((
TpchView::Lineitem,
Event::Message(MzOffset::from(offset), (row, -1)),
));
}
let new_seed = rng.gen();
let (new_order, new_lineitems) = ctx.order_row(new_seed, key);
for row in new_lineitems {
pending.push_back((
TpchView::Lineitem,
Event::Message(MzOffset::from(offset), (row, 1)),
));
}
pending.push_back((
TpchView::Orders,
Event::Message(MzOffset::from(offset), (old_order, -1)),
));
pending.push_back((
TpchView::Orders,
Event::Message(MzOffset::from(offset), (new_order, 1)),
));
offset += 1;
pending.push_back((
TpchView::Orders,
Event::Progress(Some(MzOffset::from(offset))),
));
active_orders.push((key, new_seed));
}
pending
.pop_front()
.map(|(output, event)| (LoadGeneratorOutput::Tpch(output), event))
}))
}
}
struct Context {
tpch: Tpch,
decimal_one: Numeric,
decimal_neg_one: Numeric,
cx: DecimalContext<Numeric>,
text_string_source: String,
row_buffer: Row,
}
impl Context {
fn order_row(&mut self, seed: u64, key: i64) -> (Row, Vec<Row>) {
let mut rng = StdRng::seed_from_u64(seed);
let key = order_key(key);
let custkey = loop {
let custkey = rng.gen_range(1..=self.tpch.count_customer);
if custkey % 3 != 0 {
break custkey;
}
};
let orderdate = date(&mut rng, &*START_DATE, 1..=*ORDER_END_DAYS);
let mut totalprice = Numeric::lossy_from(0);
let mut orderstatus = None;
let lineitem_count = rng.gen_range(1..=7);
let mut lineitems = Vec::with_capacity(lineitem_count);
for linenumber in 1..=lineitem_count {
let partkey = rng.gen_range(1..=self.tpch.count_part);
let suppkey = (partkey
+ (rng.gen_range(0..=3)
* ((self.tpch.count_supplier / 4) + (partkey - 1) / self.tpch.count_supplier)))
% self.tpch.count_supplier
+ 1;
let quantity = Numeric::from(rng.gen_range(1..=50));
let mut extendedprice = quantity;
self.cx
.mul(&mut extendedprice, &partkey_retailprice(partkey).0);
let mut discount = decimal(&mut rng, &mut self.cx, 0, 8, 100);
let mut tax = decimal(&mut rng, &mut self.cx, 0, 10, 100);
let shipdate = date(&mut rng, &orderdate, 1..=121);
let receiptdate = date(&mut rng, &shipdate, 1..=30);
let linestatus = if shipdate > *CURRENT_DATE { "O" } else { "F" };
self.row_buffer.packer().extend([
Datum::Int64(key),
Datum::Int64(partkey),
Datum::Int64(suppkey),
Datum::Int32(linenumber.try_into().expect("must fit")),
Datum::Numeric(OrderedDecimal(quantity)),
Datum::Numeric(OrderedDecimal(extendedprice)),
Datum::Numeric(discount),
Datum::Numeric(tax),
Datum::String(if receiptdate <= *CURRENT_DATE {
["R", "A"].choose(&mut rng).unwrap()
} else {
"N"
}), Datum::String(linestatus),
Datum::Date(shipdate),
Datum::Date(date(&mut rng, &orderdate, 30..=90)), Datum::Date(receiptdate),
Datum::String(INSTRUCTIONS.choose(&mut rng).unwrap()),
Datum::String(MODES.choose(&mut rng).unwrap()),
Datum::String(text_string(&mut rng, &self.text_string_source, 10, 43)),
]);
let row = self.row_buffer.clone();
self.cx.add(&mut tax.0, &self.decimal_one);
self.cx.sub(&mut discount.0, &self.decimal_neg_one);
self.cx.abs(&mut discount.0);
self.cx.mul(&mut extendedprice, &tax.0);
self.cx.mul(&mut extendedprice, &discount.0);
self.cx.add(&mut totalprice, &extendedprice);
if let Some(status) = orderstatus {
if status != linestatus {
orderstatus = Some("P");
}
} else {
orderstatus = Some(linestatus);
}
lineitems.push(row);
}
self.row_buffer.packer().extend([
Datum::Int64(key),
Datum::Int64(custkey),
Datum::String(orderstatus.unwrap()),
Datum::Numeric(OrderedDecimal(totalprice)),
Datum::Date(orderdate),
Datum::String(PRIORITIES.choose(&mut rng).unwrap()),
Datum::String(&pad_nine("Clerk", rng.gen_range(1..=self.tpch.count_clerk))),
Datum::Int32(0), Datum::String(text_string(&mut rng, &self.text_string_source, 19, 78)),
]);
let order = self.row_buffer.clone();
(order, lineitems)
}
}
fn partkey_retailprice(key: i64) -> OrderedDecimal<Numeric> {
let price = (90000 + ((key / 10) % 20001) + 100 * (key % 1000)) / 100;
OrderedDecimal(Numeric::from(price))
}
fn pad_nine<S: Display>(prefix: &str, s: S) -> String {
format!("{}#{s:09}", prefix)
}
pub static START_DATE: LazyLock<Date> =
LazyLock::new(|| Date::try_from(NaiveDate::from_ymd_opt(1992, 1, 1).unwrap()).unwrap());
pub static CURRENT_DATE: LazyLock<Date> =
LazyLock::new(|| Date::try_from(NaiveDate::from_ymd_opt(1995, 6, 17).unwrap()).unwrap());
pub static END_DATE: LazyLock<Date> =
LazyLock::new(|| Date::try_from(NaiveDate::from_ymd_opt(1998, 12, 31).unwrap()).unwrap());
pub static ORDER_END_DAYS: LazyLock<i32> = LazyLock::new(|| *END_DATE - *START_DATE - 151);
fn text_string<'a, R: Rng + ?Sized>(
rng: &mut R,
source: &'a str,
min: usize,
max: usize,
) -> &'a str {
let start = rng.gen_range(0..=(source.len() - max));
let len = rng.gen_range(min..=max);
&source[start..(start + len)]
}
fn date<R: Rng + ?Sized>(rng: &mut R, start: &Date, days: RangeInclusive<i32>) -> Date {
let days = rng.gen_range(days);
start.checked_add(days).expect("must fit")
}
fn order_key(mut i: i64) -> i64 {
const SPARSE_BITS: usize = 2;
const SPARSE_KEEP: usize = 3;
let low_bits = i & ((1 << SPARSE_KEEP) - 1);
i >>= SPARSE_KEEP;
i <<= SPARSE_BITS;
i <<= SPARSE_KEEP;
i += low_bits;
i
}
fn syllables<R: Rng + ?Sized>(rng: &mut R, syllables: &[&[&str]]) -> String {
let mut s = String::new();
for (i, syllable) in syllables.iter().enumerate() {
if i > 0 {
s.push(' ');
}
s.push_str(syllable.choose(rng).unwrap());
}
s
}
fn decimal<R: Rng + ?Sized>(
rng: &mut R,
cx: &mut dec::Context<Numeric>,
min: i64,
max: i64,
div: i64,
) -> OrderedDecimal<Numeric> {
let n = rng.gen_range(min..=max);
let mut n = Numeric::lossy_from(n);
cx.div(&mut n, &Numeric::lossy_from(div));
OrderedDecimal(n)
}
fn phone<R: Rng + ?Sized>(rng: &mut R, nation: i64) -> String {
let mut s = String::with_capacity(15);
s.push_str(&(nation + 10).to_string());
s.push('-');
s.push_str(&rng.gen_range(100..=999).to_string());
s.push('-');
s.push_str(&rng.gen_range(100..=999).to_string());
s.push('-');
s.push_str(&rng.gen_range(1000..=9999).to_string());
s
}
fn v_string<R: Rng + ?Sized>(rng: &mut R, min: usize, max: usize) -> String {
const ALPHABET: [char; 64] = [
'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J',
'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2',
'3', '4', '5', '6', '7', '8', '9', '0', ',', ' ',
];
let take = rng.gen_range(min..=max);
let mut s = String::with_capacity(take);
for _ in 0..take {
s.push(*ALPHABET.choose(rng).unwrap());
}
s
}
const INSTRUCTIONS: &[&str] = &[
"DELIVER IN PERSON",
"COLLECT COD",
"NONE",
"TAKE BACK RETURN",
];
const MODES: &[&str] = &["REG AIR", "AIR", "RAIL", "SHIP", "TRUCK", "MAIL", "FOB"];
const PARTNAMES: &[&str] = &[
"almond",
"antique",
"aquamarine",
"azure",
"beige",
"bisque",
"black",
"blanched",
"blue",
"blush",
"brown",
"burlywood",
"burnished",
"chartreuse",
"chiffon",
"chocolate",
"coral",
"cornflower",
"cornsilk",
"cream",
"cyan",
"dark",
"deep",
"dim",
"dodger",
"drab",
"firebrick",
"floral",
"forest",
"frosted",
"gainsboro",
"ghost",
"goldenrod",
"green",
"grey",
"honeydew",
"hot",
"indian",
"ivory",
"khaki",
"lace",
"lavender",
"lawn",
"lemon",
"light",
"lime",
"linen",
"magenta",
"maroon",
"medium",
"metallic",
"midnight",
"mint",
"misty",
"moccasin",
"navajo",
"navy",
"olive",
"orange",
"orchid",
"pale",
"papaya",
"peach",
"peru",
"pink",
"plum",
"powder",
"puff",
"purple",
"red",
"rose",
"rosy",
"royal",
"saddle",
"salmon",
"sandy",
"seashell",
"sienna",
"sky",
"slate",
"smoke",
"snow",
"spring",
"steel",
"tan",
"thistle",
"tomato",
"turquoise",
"violet",
"wheat",
"white",
"yellow",
];
const PRIORITIES: &[&str] = &["1-URGENT", "2-HIGH", "3-MEDIUM", "4-NOT SPECIFIED"];
const TYPES: &[&[&str]] = &[
&["STANDARD", "SMALL", "MEDIUM", "LARGE", "ECONOMY", "PROMO"],
&["ANODIZED", "BURNISHED", "PLATED", "POLISHED", "BRUSHED"],
&["TIN", "NICKEL", "BRASS", "STEEL", "COPPER"],
];
const CONTAINERS: &[&[&str]] = &[
&["SM", "MED", "JUMBO", "WRAP"],
&["BOX", "BAG", "JAR", "PKG", "PACK", "CAN", "DRUM"],
];
const SEGMENTS: &[&str] = &[
"AUTOMOBILE",
"BUILDING",
"FURNITURE",
"MACHINERY",
"HOUSEHOLD",
];
const REGIONS: &[&str] = &["AFRICA", "AMERICA", "ASIA", "EUROPE", "MIDDLE EAST"];
const NATIONS: &[(&str, i64)] = &[
("ALGERIA", 0),
("ARGENTINA", 1),
("BRAZIL", 1),
("CANADA", 1),
("EGYPT", 4),
("ETHIOPIA", 0),
("FRANCE", 3),
("GERMANY", 3),
("INDIA", 2),
("INDONESIA", 2),
("IRAN", 4),
("IRAQ", 4),
("JAPAN", 2),
("JORDAN", 4),
("KENYA", 0),
("MOROCCO", 0),
("MOZAMBIQUE", 0),
("PERU", 1),
("CHINA", 2),
("ROMANIA", 3),
("SAUDI ARABIA", 4),
("VIETNAM", 2),
("RUSSIA", 3),
("UNITED KINGDOM", 3),
("UNITED STATES", 1),
];