mz_storage/source/generator/
marketing.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::{BTreeMap, btree_map::Entry},
12    iter,
13};
14
15use mz_ore::now::to_datetime;
16use mz_repr::{Datum, Diff, Row};
17use mz_storage_types::sources::MzOffset;
18use mz_storage_types::sources::load_generator::{
19    Event, Generator, LoadGeneratorOutput, MarketingView,
20};
21use rand_8::distributions::Standard;
22use rand_8::rngs::SmallRng;
23use rand_8::{Rng, SeedableRng};
24
25const CONTROL: &str = "control";
26const EXPERIMENT: &str = "experiment";
27
28pub struct Marketing {}
29
30// Note that this generator issues retractions; if you change this,
31// `mz_storage_types::sources::LoadGenerator::is_monotonic`
32// must be updated.
33impl Generator for Marketing {
34    fn by_seed(
35        &self,
36        now: mz_ore::now::NowFn,
37        seed: Option<u64>,
38        _resume_offset: MzOffset,
39    ) -> Box<dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>> {
40        let mut rng: SmallRng = SmallRng::seed_from_u64(seed.unwrap_or_default());
41
42        let mut counter = 0;
43
44        let mut future_updates = FutureUpdates::default();
45        let mut pending: Vec<(MarketingView, Row, Diff)> = CUSTOMERS
46            .into_iter()
47            .enumerate()
48            .map(|(id, email)| {
49                let mut customer = Row::with_capacity(3);
50                let mut packer = customer.packer();
51
52                packer.push(Datum::Int64(id.try_into().unwrap()));
53                packer.push(Datum::String(email));
54                packer.push(Datum::Int64(rng.gen_range(5_000_000..10_000_000i64)));
55
56                (MarketingView::Customers, customer, Diff::ONE)
57            })
58            .collect();
59
60        let mut offset = 0;
61        Box::new(
62            iter::from_fn(move || {
63                if pending.is_empty() {
64                    let mut impression = Row::with_capacity(4);
65                    let mut packer = impression.packer();
66
67                    let impression_id = counter;
68                    counter += 1;
69
70                    packer.push(Datum::Int64(impression_id));
71                    packer.push(Datum::Int64(
72                        rng.gen_range(0..CUSTOMERS.len()).try_into().unwrap(),
73                    ));
74                    packer.push(Datum::Int64(rng.gen_range(0..20i64)));
75                    let impression_time = now();
76                    packer.push(Datum::TimestampTz(
77                        to_datetime(impression_time)
78                            .try_into()
79                            .expect("timestamp must fit"),
80                    ));
81
82                    pending.push((MarketingView::Impressions, impression, Diff::ONE));
83
84                    // 1 in 10 impressions have a click. Making us the
85                    // most successful marketing organization in the world.
86                    if rng.gen_range(0..10) == 1 {
87                        let mut click = Row::with_capacity(2);
88                        let mut packer = click.packer();
89
90                        let click_time = impression_time + rng.gen_range(20000..40000);
91
92                        packer.push(Datum::Int64(impression_id));
93                        packer.push(Datum::TimestampTz(
94                            to_datetime(click_time)
95                                .try_into()
96                                .expect("timestamp must fit"),
97                        ));
98
99                        future_updates
100                            .insert(click_time, (MarketingView::Clicks, click, Diff::ONE));
101                    }
102
103                    let mut updates = future_updates.retrieve(now());
104                    pending.append(&mut updates);
105
106                    for _ in 0..rng.gen_range(1..2) {
107                        let id = counter;
108                        counter += 1;
109
110                        let mut lead = Lead {
111                            id,
112                            customer_id: rng.gen_range(0..CUSTOMERS.len()).try_into().unwrap(),
113                            created_at: now(),
114                            converted_at: None,
115                            conversion_amount: None,
116                        };
117
118                        pending.push((MarketingView::Leads, lead.to_row(), Diff::ONE));
119
120                        // a highly scientific statistical model
121                        // predicting the likelyhood of a conversion
122                        let score = rng.sample::<f64, _>(Standard);
123                        let label = score > 0.5f64;
124
125                        let bucket = if lead.id % 10 <= 1 {
126                            CONTROL
127                        } else {
128                            EXPERIMENT
129                        };
130
131                        let mut prediction = Row::with_capacity(4);
132                        let mut packer = prediction.packer();
133
134                        packer.push(Datum::Int64(lead.id));
135                        packer.push(Datum::String(bucket));
136                        packer.push(Datum::TimestampTz(
137                            to_datetime(now()).try_into().expect("timestamp must fit"),
138                        ));
139                        packer.push(Datum::Float64(score.into()));
140
141                        pending.push((MarketingView::ConversionPredictions, prediction, Diff::ONE));
142
143                        let mut sent_coupon = false;
144                        if !label && bucket == EXPERIMENT {
145                            sent_coupon = true;
146                            let amount = rng.gen_range(500..5000);
147
148                            let mut coupon = Row::with_capacity(4);
149                            let mut packer = coupon.packer();
150
151                            let id = counter;
152                            counter += 1;
153                            packer.push(Datum::Int64(id));
154                            packer.push(Datum::Int64(lead.id));
155                            packer.push(Datum::TimestampTz(
156                                to_datetime(now()).try_into().expect("timestamp must fit"),
157                            ));
158                            packer.push(Datum::Int64(amount));
159
160                            pending.push((MarketingView::Coupons, coupon, Diff::ONE));
161                        }
162
163                        // Decide if a lead will convert. We assume our model is fairly
164                        // accurate and correlates with conversions. We also assume
165                        // that coupons make leads a little more liekly to convert.
166                        let mut converted = rng.sample::<f64, _>(Standard) < score;
167                        if sent_coupon && !converted {
168                            converted = rng.sample::<f64, _>(Standard) < score;
169                        }
170
171                        if converted {
172                            let converted_at = now() + rng.gen_range(1..30);
173
174                            future_updates.insert(
175                                converted_at,
176                                (MarketingView::Leads, lead.to_row(), Diff::MINUS_ONE),
177                            );
178
179                            lead.converted_at = Some(converted_at);
180                            lead.conversion_amount = Some(rng.gen_range(1000..25000));
181
182                            future_updates.insert(
183                                converted_at,
184                                (MarketingView::Leads, lead.to_row(), Diff::ONE),
185                            );
186                        }
187                    }
188                }
189
190                pending.pop().map(|(output, row, diff)| {
191                    let msg = (
192                        LoadGeneratorOutput::Marketing(output),
193                        Event::Message(MzOffset::from(offset), (row, diff)),
194                    );
195
196                    let progress = if pending.is_empty() {
197                        offset += 1;
198                        Some((
199                            LoadGeneratorOutput::Marketing(output),
200                            Event::Progress(Some(MzOffset::from(offset))),
201                        ))
202                    } else {
203                        None
204                    };
205                    std::iter::once(msg).chain(progress)
206                })
207            })
208            .flatten(),
209        )
210    }
211}
212
213struct Lead {
214    id: i64,
215    customer_id: i64,
216    created_at: u64,
217    converted_at: Option<u64>,
218    conversion_amount: Option<i64>,
219}
220
221impl Lead {
222    fn to_row(&self) -> Row {
223        let mut row = Row::with_capacity(5);
224        let mut packer = row.packer();
225        packer.push(Datum::Int64(self.id));
226        packer.push(Datum::Int64(self.customer_id));
227        packer.push(Datum::TimestampTz(
228            to_datetime(self.created_at)
229                .try_into()
230                .expect("timestamp must fit"),
231        ));
232        packer.push(
233            self.converted_at
234                .map(|converted_at| {
235                    Datum::TimestampTz(
236                        to_datetime(converted_at)
237                            .try_into()
238                            .expect("timestamp must fit"),
239                    )
240                })
241                .unwrap_or(Datum::Null),
242        );
243        packer.push(
244            self.conversion_amount
245                .map(Datum::Int64)
246                .unwrap_or(Datum::Null),
247        );
248
249        row
250    }
251}
252
253const CUSTOMERS: &[&str] = &[
254    "andy.rooney@email.com",
255    "marisa.tomei@email.com",
256    "betty.thomas@email.com",
257    "don.imus@email.com",
258    "chevy.chase@email.com",
259    "george.wendt@email.com",
260    "oscar.levant@email.com",
261    "jack.lemmon@email.com",
262    "ben.vereen@email.com",
263    "alexander.hamilton@email.com",
264    "tommy.lee.jones@email.com",
265    "george.takei@email.com",
266    "norman.mailer@email.com",
267    "casey.kasem@email.com",
268    "sarah.miles@email.com",
269    "john.landis@email.com",
270    "george.c..marshall@email.com",
271    "rita.coolidge@email.com",
272    "al.unser@email.com",
273    "ross.perot@email.com",
274    "mikhail.gorbachev@email.com",
275    "yasmine.bleeth@email.com",
276    "darryl.strawberry@email.com",
277    "bruce.springsteen@email.com",
278    "weird.al.yankovic@email.com",
279    "james.franco@email.com",
280    "jean.smart@email.com",
281    "stevie.nicks@email.com",
282    "robert.merrill@email.com",
283    "todd.bridges@email.com",
284    "sam.cooke@email.com",
285    "bert.convy@email.com",
286    "erica.jong@email.com",
287    "oscar.schindler@email.com",
288    "douglas.fairbanks@email.com",
289    "penny.marshall@email.com",
290    "bram.stoker@email.com",
291    "holly.hunter@email.com",
292    "leontyne.price@email.com",
293    "dick.smothers@email.com",
294    "meredith.baxter@email.com",
295    "carla.bruni@email.com",
296    "joel.mccrea@email.com",
297    "mariette.hartley@email.com",
298    "vince.gill@email.com",
299    "leon.schotter@email.com",
300    "johann.von.goethe@email.com",
301    "john.katz@email.com",
302    "attenborough@email.com",
303    "billy.graham@email.com",
304];
305
306#[derive(Default)]
307struct FutureUpdates {
308    updates: BTreeMap<u64, Vec<(MarketingView, Row, Diff)>>,
309}
310
311impl FutureUpdates {
312    /// Schedules a row to be output at a certain time
313    fn insert(&mut self, time: u64, update: (MarketingView, Row, Diff)) {
314        match self.updates.entry(time) {
315            Entry::Vacant(v) => {
316                v.insert(vec![update]);
317            }
318            Entry::Occupied(o) => {
319                o.into_mut().push(update);
320            }
321        }
322    }
323
324    /// Returns all rows that are scheduled to be output
325    /// at or before a certain time.
326    fn retrieve(&mut self, time: u64) -> Vec<(MarketingView, Row, Diff)> {
327        let mut updates = vec![];
328        while let Some(e) = self.updates.first_entry() {
329            if *e.key() > time {
330                break;
331            }
332
333            updates.append(&mut e.remove());
334        }
335        updates
336    }
337}