mz_storage/source/generator/
marketing.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::{BTreeMap, btree_map::Entry},
12    iter,
13};
14
15use mz_ore::now::to_datetime;
16use mz_repr::{Datum, Diff, Row};
17use mz_storage_types::sources::MzOffset;
18use mz_storage_types::sources::load_generator::{
19    Event, Generator, LoadGeneratorOutput, MarketingView,
20};
21use rand::{Rng, SeedableRng, distributions::Standard, rngs::SmallRng};
22
23const CONTROL: &str = "control";
24const EXPERIMENT: &str = "experiment";
25
26pub struct Marketing {}
27
28// Note that this generator issues retractions; if you change this,
29// `mz_storage_types::sources::LoadGenerator::is_monotonic`
30// must be updated.
31impl Generator for Marketing {
32    fn by_seed(
33        &self,
34        now: mz_ore::now::NowFn,
35        seed: Option<u64>,
36        _resume_offset: MzOffset,
37    ) -> Box<(dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>)>
38    {
39        let mut rng: SmallRng = SmallRng::seed_from_u64(seed.unwrap_or_default());
40
41        let mut counter = 0;
42
43        let mut future_updates = FutureUpdates::default();
44        let mut pending: Vec<(MarketingView, Row, Diff)> = CUSTOMERS
45            .into_iter()
46            .enumerate()
47            .map(|(id, email)| {
48                let mut customer = Row::with_capacity(3);
49                let mut packer = customer.packer();
50
51                packer.push(Datum::Int64(id.try_into().unwrap()));
52                packer.push(Datum::String(email));
53                packer.push(Datum::Int64(rng.gen_range(5_000_000..10_000_000i64)));
54
55                (MarketingView::Customers, customer, Diff::ONE)
56            })
57            .collect();
58
59        let mut offset = 0;
60        Box::new(
61            iter::from_fn(move || {
62                if pending.is_empty() {
63                    let mut impression = Row::with_capacity(4);
64                    let mut packer = impression.packer();
65
66                    let impression_id = counter;
67                    counter += 1;
68
69                    packer.push(Datum::Int64(impression_id));
70                    packer.push(Datum::Int64(
71                        rng.gen_range(0..CUSTOMERS.len()).try_into().unwrap(),
72                    ));
73                    packer.push(Datum::Int64(rng.gen_range(0..20i64)));
74                    let impression_time = now();
75                    packer.push(Datum::TimestampTz(
76                        to_datetime(impression_time)
77                            .try_into()
78                            .expect("timestamp must fit"),
79                    ));
80
81                    pending.push((MarketingView::Impressions, impression, Diff::ONE));
82
83                    // 1 in 10 impressions have a click. Making us the
84                    // most successful marketing organization in the world.
85                    if rng.gen_range(0..10) == 1 {
86                        let mut click = Row::with_capacity(2);
87                        let mut packer = click.packer();
88
89                        let click_time = impression_time + rng.gen_range(20000..40000);
90
91                        packer.push(Datum::Int64(impression_id));
92                        packer.push(Datum::TimestampTz(
93                            to_datetime(click_time)
94                                .try_into()
95                                .expect("timestamp must fit"),
96                        ));
97
98                        future_updates
99                            .insert(click_time, (MarketingView::Clicks, click, Diff::ONE));
100                    }
101
102                    let mut updates = future_updates.retrieve(now());
103                    pending.append(&mut updates);
104
105                    for _ in 0..rng.gen_range(1..2) {
106                        let id = counter;
107                        counter += 1;
108
109                        let mut lead = Lead {
110                            id,
111                            customer_id: rng.gen_range(0..CUSTOMERS.len()).try_into().unwrap(),
112                            created_at: now(),
113                            converted_at: None,
114                            conversion_amount: None,
115                        };
116
117                        pending.push((MarketingView::Leads, lead.to_row(), Diff::ONE));
118
119                        // a highly scientific statistical model
120                        // predicting the likelyhood of a conversion
121                        let score = rng.sample::<f64, _>(Standard);
122                        let label = score > 0.5f64;
123
124                        let bucket = if lead.id % 10 <= 1 {
125                            CONTROL
126                        } else {
127                            EXPERIMENT
128                        };
129
130                        let mut prediction = Row::with_capacity(4);
131                        let mut packer = prediction.packer();
132
133                        packer.push(Datum::Int64(lead.id));
134                        packer.push(Datum::String(bucket));
135                        packer.push(Datum::TimestampTz(
136                            to_datetime(now()).try_into().expect("timestamp must fit"),
137                        ));
138                        packer.push(Datum::Float64(score.into()));
139
140                        pending.push((MarketingView::ConversionPredictions, prediction, Diff::ONE));
141
142                        let mut sent_coupon = false;
143                        if !label && bucket == EXPERIMENT {
144                            sent_coupon = true;
145                            let amount = rng.gen_range(500..5000);
146
147                            let mut coupon = Row::with_capacity(4);
148                            let mut packer = coupon.packer();
149
150                            let id = counter;
151                            counter += 1;
152                            packer.push(Datum::Int64(id));
153                            packer.push(Datum::Int64(lead.id));
154                            packer.push(Datum::TimestampTz(
155                                to_datetime(now()).try_into().expect("timestamp must fit"),
156                            ));
157                            packer.push(Datum::Int64(amount));
158
159                            pending.push((MarketingView::Coupons, coupon, Diff::ONE));
160                        }
161
162                        // Decide if a lead will convert. We assume our model is fairly
163                        // accurate and correlates with conversions. We also assume
164                        // that coupons make leads a little more liekly to convert.
165                        let mut converted = rng.sample::<f64, _>(Standard) < score;
166                        if sent_coupon && !converted {
167                            converted = rng.sample::<f64, _>(Standard) < score;
168                        }
169
170                        if converted {
171                            let converted_at = now() + rng.gen_range(1..30);
172
173                            future_updates.insert(
174                                converted_at,
175                                (MarketingView::Leads, lead.to_row(), Diff::MINUS_ONE),
176                            );
177
178                            lead.converted_at = Some(converted_at);
179                            lead.conversion_amount = Some(rng.gen_range(1000..25000));
180
181                            future_updates.insert(
182                                converted_at,
183                                (MarketingView::Leads, lead.to_row(), Diff::ONE),
184                            );
185                        }
186                    }
187                }
188
189                pending.pop().map(|(output, row, diff)| {
190                    let msg = (
191                        LoadGeneratorOutput::Marketing(output),
192                        Event::Message(MzOffset::from(offset), (row, diff)),
193                    );
194
195                    let progress = if pending.is_empty() {
196                        offset += 1;
197                        Some((
198                            LoadGeneratorOutput::Marketing(output),
199                            Event::Progress(Some(MzOffset::from(offset))),
200                        ))
201                    } else {
202                        None
203                    };
204                    std::iter::once(msg).chain(progress)
205                })
206            })
207            .flatten(),
208        )
209    }
210}
211
212struct Lead {
213    id: i64,
214    customer_id: i64,
215    created_at: u64,
216    converted_at: Option<u64>,
217    conversion_amount: Option<i64>,
218}
219
220impl Lead {
221    fn to_row(&self) -> Row {
222        let mut row = Row::with_capacity(5);
223        let mut packer = row.packer();
224        packer.push(Datum::Int64(self.id));
225        packer.push(Datum::Int64(self.customer_id));
226        packer.push(Datum::TimestampTz(
227            to_datetime(self.created_at)
228                .try_into()
229                .expect("timestamp must fit"),
230        ));
231        packer.push(
232            self.converted_at
233                .map(|converted_at| {
234                    Datum::TimestampTz(
235                        to_datetime(converted_at)
236                            .try_into()
237                            .expect("timestamp must fit"),
238                    )
239                })
240                .unwrap_or(Datum::Null),
241        );
242        packer.push(
243            self.conversion_amount
244                .map(Datum::Int64)
245                .unwrap_or(Datum::Null),
246        );
247
248        row
249    }
250}
251
252const CUSTOMERS: &[&str] = &[
253    "andy.rooney@email.com",
254    "marisa.tomei@email.com",
255    "betty.thomas@email.com",
256    "don.imus@email.com",
257    "chevy.chase@email.com",
258    "george.wendt@email.com",
259    "oscar.levant@email.com",
260    "jack.lemmon@email.com",
261    "ben.vereen@email.com",
262    "alexander.hamilton@email.com",
263    "tommy.lee.jones@email.com",
264    "george.takei@email.com",
265    "norman.mailer@email.com",
266    "casey.kasem@email.com",
267    "sarah.miles@email.com",
268    "john.landis@email.com",
269    "george.c..marshall@email.com",
270    "rita.coolidge@email.com",
271    "al.unser@email.com",
272    "ross.perot@email.com",
273    "mikhail.gorbachev@email.com",
274    "yasmine.bleeth@email.com",
275    "darryl.strawberry@email.com",
276    "bruce.springsteen@email.com",
277    "weird.al.yankovic@email.com",
278    "james.franco@email.com",
279    "jean.smart@email.com",
280    "stevie.nicks@email.com",
281    "robert.merrill@email.com",
282    "todd.bridges@email.com",
283    "sam.cooke@email.com",
284    "bert.convy@email.com",
285    "erica.jong@email.com",
286    "oscar.schindler@email.com",
287    "douglas.fairbanks@email.com",
288    "penny.marshall@email.com",
289    "bram.stoker@email.com",
290    "holly.hunter@email.com",
291    "leontyne.price@email.com",
292    "dick.smothers@email.com",
293    "meredith.baxter@email.com",
294    "carla.bruni@email.com",
295    "joel.mccrea@email.com",
296    "mariette.hartley@email.com",
297    "vince.gill@email.com",
298    "leon.schotter@email.com",
299    "johann.von.goethe@email.com",
300    "john.katz@email.com",
301    "attenborough@email.com",
302    "billy.graham@email.com",
303];
304
305#[derive(Default)]
306struct FutureUpdates {
307    updates: BTreeMap<u64, Vec<(MarketingView, Row, Diff)>>,
308}
309
310impl FutureUpdates {
311    /// Schedules a row to be output at a certain time
312    fn insert(&mut self, time: u64, update: (MarketingView, Row, Diff)) {
313        match self.updates.entry(time) {
314            Entry::Vacant(v) => {
315                v.insert(vec![update]);
316            }
317            Entry::Occupied(o) => {
318                o.into_mut().push(update);
319            }
320        }
321    }
322
323    /// Returns all rows that are scheduled to be output
324    /// at or before a certain time.
325    fn retrieve(&mut self, time: u64) -> Vec<(MarketingView, Row, Diff)> {
326        let mut updates = vec![];
327        while let Some(e) = self.updates.first_entry() {
328            if *e.key() > time {
329                break;
330            }
331
332            updates.append(&mut e.remove());
333        }
334        updates
335    }
336}