mz_storage/source/generator/
marketing.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::{BTreeMap, btree_map::Entry},
12    iter,
13};
14
15use mz_ore::now::to_datetime;
16use mz_repr::{Datum, Diff, Row};
17use mz_storage_types::sources::MzOffset;
18use mz_storage_types::sources::load_generator::{
19    Event, Generator, LoadGeneratorOutput, MarketingView,
20};
21use rand::{Rng, SeedableRng, distributions::Standard, rngs::SmallRng};
22
23const CONTROL: &str = "control";
24const EXPERIMENT: &str = "experiment";
25
26pub struct Marketing {}
27
28// Note that this generator issues retractions; if you change this,
29// `mz_storage_types::sources::LoadGenerator::is_monotonic`
30// must be updated.
31impl Generator for Marketing {
32    fn by_seed(
33        &self,
34        now: mz_ore::now::NowFn,
35        seed: Option<u64>,
36        _resume_offset: MzOffset,
37    ) -> Box<dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>> {
38        let mut rng: SmallRng = SmallRng::seed_from_u64(seed.unwrap_or_default());
39
40        let mut counter = 0;
41
42        let mut future_updates = FutureUpdates::default();
43        let mut pending: Vec<(MarketingView, Row, Diff)> = CUSTOMERS
44            .into_iter()
45            .enumerate()
46            .map(|(id, email)| {
47                let mut customer = Row::with_capacity(3);
48                let mut packer = customer.packer();
49
50                packer.push(Datum::Int64(id.try_into().unwrap()));
51                packer.push(Datum::String(email));
52                packer.push(Datum::Int64(rng.gen_range(5_000_000..10_000_000i64)));
53
54                (MarketingView::Customers, customer, Diff::ONE)
55            })
56            .collect();
57
58        let mut offset = 0;
59        Box::new(
60            iter::from_fn(move || {
61                if pending.is_empty() {
62                    let mut impression = Row::with_capacity(4);
63                    let mut packer = impression.packer();
64
65                    let impression_id = counter;
66                    counter += 1;
67
68                    packer.push(Datum::Int64(impression_id));
69                    packer.push(Datum::Int64(
70                        rng.gen_range(0..CUSTOMERS.len()).try_into().unwrap(),
71                    ));
72                    packer.push(Datum::Int64(rng.gen_range(0..20i64)));
73                    let impression_time = now();
74                    packer.push(Datum::TimestampTz(
75                        to_datetime(impression_time)
76                            .try_into()
77                            .expect("timestamp must fit"),
78                    ));
79
80                    pending.push((MarketingView::Impressions, impression, Diff::ONE));
81
82                    // 1 in 10 impressions have a click. Making us the
83                    // most successful marketing organization in the world.
84                    if rng.gen_range(0..10) == 1 {
85                        let mut click = Row::with_capacity(2);
86                        let mut packer = click.packer();
87
88                        let click_time = impression_time + rng.gen_range(20000..40000);
89
90                        packer.push(Datum::Int64(impression_id));
91                        packer.push(Datum::TimestampTz(
92                            to_datetime(click_time)
93                                .try_into()
94                                .expect("timestamp must fit"),
95                        ));
96
97                        future_updates
98                            .insert(click_time, (MarketingView::Clicks, click, Diff::ONE));
99                    }
100
101                    let mut updates = future_updates.retrieve(now());
102                    pending.append(&mut updates);
103
104                    for _ in 0..rng.gen_range(1..2) {
105                        let id = counter;
106                        counter += 1;
107
108                        let mut lead = Lead {
109                            id,
110                            customer_id: rng.gen_range(0..CUSTOMERS.len()).try_into().unwrap(),
111                            created_at: now(),
112                            converted_at: None,
113                            conversion_amount: None,
114                        };
115
116                        pending.push((MarketingView::Leads, lead.to_row(), Diff::ONE));
117
118                        // a highly scientific statistical model
119                        // predicting the likelyhood of a conversion
120                        let score = rng.sample::<f64, _>(Standard);
121                        let label = score > 0.5f64;
122
123                        let bucket = if lead.id % 10 <= 1 {
124                            CONTROL
125                        } else {
126                            EXPERIMENT
127                        };
128
129                        let mut prediction = Row::with_capacity(4);
130                        let mut packer = prediction.packer();
131
132                        packer.push(Datum::Int64(lead.id));
133                        packer.push(Datum::String(bucket));
134                        packer.push(Datum::TimestampTz(
135                            to_datetime(now()).try_into().expect("timestamp must fit"),
136                        ));
137                        packer.push(Datum::Float64(score.into()));
138
139                        pending.push((MarketingView::ConversionPredictions, prediction, Diff::ONE));
140
141                        let mut sent_coupon = false;
142                        if !label && bucket == EXPERIMENT {
143                            sent_coupon = true;
144                            let amount = rng.gen_range(500..5000);
145
146                            let mut coupon = Row::with_capacity(4);
147                            let mut packer = coupon.packer();
148
149                            let id = counter;
150                            counter += 1;
151                            packer.push(Datum::Int64(id));
152                            packer.push(Datum::Int64(lead.id));
153                            packer.push(Datum::TimestampTz(
154                                to_datetime(now()).try_into().expect("timestamp must fit"),
155                            ));
156                            packer.push(Datum::Int64(amount));
157
158                            pending.push((MarketingView::Coupons, coupon, Diff::ONE));
159                        }
160
161                        // Decide if a lead will convert. We assume our model is fairly
162                        // accurate and correlates with conversions. We also assume
163                        // that coupons make leads a little more liekly to convert.
164                        let mut converted = rng.sample::<f64, _>(Standard) < score;
165                        if sent_coupon && !converted {
166                            converted = rng.sample::<f64, _>(Standard) < score;
167                        }
168
169                        if converted {
170                            let converted_at = now() + rng.gen_range(1..30);
171
172                            future_updates.insert(
173                                converted_at,
174                                (MarketingView::Leads, lead.to_row(), Diff::MINUS_ONE),
175                            );
176
177                            lead.converted_at = Some(converted_at);
178                            lead.conversion_amount = Some(rng.gen_range(1000..25000));
179
180                            future_updates.insert(
181                                converted_at,
182                                (MarketingView::Leads, lead.to_row(), Diff::ONE),
183                            );
184                        }
185                    }
186                }
187
188                pending.pop().map(|(output, row, diff)| {
189                    let msg = (
190                        LoadGeneratorOutput::Marketing(output),
191                        Event::Message(MzOffset::from(offset), (row, diff)),
192                    );
193
194                    let progress = if pending.is_empty() {
195                        offset += 1;
196                        Some((
197                            LoadGeneratorOutput::Marketing(output),
198                            Event::Progress(Some(MzOffset::from(offset))),
199                        ))
200                    } else {
201                        None
202                    };
203                    std::iter::once(msg).chain(progress)
204                })
205            })
206            .flatten(),
207        )
208    }
209}
210
211struct Lead {
212    id: i64,
213    customer_id: i64,
214    created_at: u64,
215    converted_at: Option<u64>,
216    conversion_amount: Option<i64>,
217}
218
219impl Lead {
220    fn to_row(&self) -> Row {
221        let mut row = Row::with_capacity(5);
222        let mut packer = row.packer();
223        packer.push(Datum::Int64(self.id));
224        packer.push(Datum::Int64(self.customer_id));
225        packer.push(Datum::TimestampTz(
226            to_datetime(self.created_at)
227                .try_into()
228                .expect("timestamp must fit"),
229        ));
230        packer.push(
231            self.converted_at
232                .map(|converted_at| {
233                    Datum::TimestampTz(
234                        to_datetime(converted_at)
235                            .try_into()
236                            .expect("timestamp must fit"),
237                    )
238                })
239                .unwrap_or(Datum::Null),
240        );
241        packer.push(
242            self.conversion_amount
243                .map(Datum::Int64)
244                .unwrap_or(Datum::Null),
245        );
246
247        row
248    }
249}
250
251const CUSTOMERS: &[&str] = &[
252    "andy.rooney@email.com",
253    "marisa.tomei@email.com",
254    "betty.thomas@email.com",
255    "don.imus@email.com",
256    "chevy.chase@email.com",
257    "george.wendt@email.com",
258    "oscar.levant@email.com",
259    "jack.lemmon@email.com",
260    "ben.vereen@email.com",
261    "alexander.hamilton@email.com",
262    "tommy.lee.jones@email.com",
263    "george.takei@email.com",
264    "norman.mailer@email.com",
265    "casey.kasem@email.com",
266    "sarah.miles@email.com",
267    "john.landis@email.com",
268    "george.c..marshall@email.com",
269    "rita.coolidge@email.com",
270    "al.unser@email.com",
271    "ross.perot@email.com",
272    "mikhail.gorbachev@email.com",
273    "yasmine.bleeth@email.com",
274    "darryl.strawberry@email.com",
275    "bruce.springsteen@email.com",
276    "weird.al.yankovic@email.com",
277    "james.franco@email.com",
278    "jean.smart@email.com",
279    "stevie.nicks@email.com",
280    "robert.merrill@email.com",
281    "todd.bridges@email.com",
282    "sam.cooke@email.com",
283    "bert.convy@email.com",
284    "erica.jong@email.com",
285    "oscar.schindler@email.com",
286    "douglas.fairbanks@email.com",
287    "penny.marshall@email.com",
288    "bram.stoker@email.com",
289    "holly.hunter@email.com",
290    "leontyne.price@email.com",
291    "dick.smothers@email.com",
292    "meredith.baxter@email.com",
293    "carla.bruni@email.com",
294    "joel.mccrea@email.com",
295    "mariette.hartley@email.com",
296    "vince.gill@email.com",
297    "leon.schotter@email.com",
298    "johann.von.goethe@email.com",
299    "john.katz@email.com",
300    "attenborough@email.com",
301    "billy.graham@email.com",
302];
303
304#[derive(Default)]
305struct FutureUpdates {
306    updates: BTreeMap<u64, Vec<(MarketingView, Row, Diff)>>,
307}
308
309impl FutureUpdates {
310    /// Schedules a row to be output at a certain time
311    fn insert(&mut self, time: u64, update: (MarketingView, Row, Diff)) {
312        match self.updates.entry(time) {
313            Entry::Vacant(v) => {
314                v.insert(vec![update]);
315            }
316            Entry::Occupied(o) => {
317                o.into_mut().push(update);
318            }
319        }
320    }
321
322    /// Returns all rows that are scheduled to be output
323    /// at or before a certain time.
324    fn retrieve(&mut self, time: u64) -> Vec<(MarketingView, Row, Diff)> {
325        let mut updates = vec![];
326        while let Some(e) = self.updates.first_entry() {
327            if *e.key() > time {
328                break;
329            }
330
331            updates.append(&mut e.remove());
332        }
333        updates
334    }
335}