mz_storage/source/generator/
marketing.rs
1use std::{
11 collections::{BTreeMap, btree_map::Entry},
12 iter,
13};
14
15use mz_ore::now::to_datetime;
16use mz_repr::{Datum, Diff, Row};
17use mz_storage_types::sources::MzOffset;
18use mz_storage_types::sources::load_generator::{
19 Event, Generator, LoadGeneratorOutput, MarketingView,
20};
21use rand::{Rng, SeedableRng, distributions::Standard, rngs::SmallRng};
22
23const CONTROL: &str = "control";
24const EXPERIMENT: &str = "experiment";
25
26pub struct Marketing {}
27
28impl Generator for Marketing {
32 fn by_seed(
33 &self,
34 now: mz_ore::now::NowFn,
35 seed: Option<u64>,
36 _resume_offset: MzOffset,
37 ) -> Box<(dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>)>
38 {
39 let mut rng: SmallRng = SmallRng::seed_from_u64(seed.unwrap_or_default());
40
41 let mut counter = 0;
42
43 let mut future_updates = FutureUpdates::default();
44 let mut pending: Vec<(MarketingView, Row, Diff)> = CUSTOMERS
45 .into_iter()
46 .enumerate()
47 .map(|(id, email)| {
48 let mut customer = Row::with_capacity(3);
49 let mut packer = customer.packer();
50
51 packer.push(Datum::Int64(id.try_into().unwrap()));
52 packer.push(Datum::String(email));
53 packer.push(Datum::Int64(rng.gen_range(5_000_000..10_000_000i64)));
54
55 (MarketingView::Customers, customer, Diff::ONE)
56 })
57 .collect();
58
59 let mut offset = 0;
60 Box::new(
61 iter::from_fn(move || {
62 if pending.is_empty() {
63 let mut impression = Row::with_capacity(4);
64 let mut packer = impression.packer();
65
66 let impression_id = counter;
67 counter += 1;
68
69 packer.push(Datum::Int64(impression_id));
70 packer.push(Datum::Int64(
71 rng.gen_range(0..CUSTOMERS.len()).try_into().unwrap(),
72 ));
73 packer.push(Datum::Int64(rng.gen_range(0..20i64)));
74 let impression_time = now();
75 packer.push(Datum::TimestampTz(
76 to_datetime(impression_time)
77 .try_into()
78 .expect("timestamp must fit"),
79 ));
80
81 pending.push((MarketingView::Impressions, impression, Diff::ONE));
82
83 if rng.gen_range(0..10) == 1 {
86 let mut click = Row::with_capacity(2);
87 let mut packer = click.packer();
88
89 let click_time = impression_time + rng.gen_range(20000..40000);
90
91 packer.push(Datum::Int64(impression_id));
92 packer.push(Datum::TimestampTz(
93 to_datetime(click_time)
94 .try_into()
95 .expect("timestamp must fit"),
96 ));
97
98 future_updates
99 .insert(click_time, (MarketingView::Clicks, click, Diff::ONE));
100 }
101
102 let mut updates = future_updates.retrieve(now());
103 pending.append(&mut updates);
104
105 for _ in 0..rng.gen_range(1..2) {
106 let id = counter;
107 counter += 1;
108
109 let mut lead = Lead {
110 id,
111 customer_id: rng.gen_range(0..CUSTOMERS.len()).try_into().unwrap(),
112 created_at: now(),
113 converted_at: None,
114 conversion_amount: None,
115 };
116
117 pending.push((MarketingView::Leads, lead.to_row(), Diff::ONE));
118
119 let score = rng.sample::<f64, _>(Standard);
122 let label = score > 0.5f64;
123
124 let bucket = if lead.id % 10 <= 1 {
125 CONTROL
126 } else {
127 EXPERIMENT
128 };
129
130 let mut prediction = Row::with_capacity(4);
131 let mut packer = prediction.packer();
132
133 packer.push(Datum::Int64(lead.id));
134 packer.push(Datum::String(bucket));
135 packer.push(Datum::TimestampTz(
136 to_datetime(now()).try_into().expect("timestamp must fit"),
137 ));
138 packer.push(Datum::Float64(score.into()));
139
140 pending.push((MarketingView::ConversionPredictions, prediction, Diff::ONE));
141
142 let mut sent_coupon = false;
143 if !label && bucket == EXPERIMENT {
144 sent_coupon = true;
145 let amount = rng.gen_range(500..5000);
146
147 let mut coupon = Row::with_capacity(4);
148 let mut packer = coupon.packer();
149
150 let id = counter;
151 counter += 1;
152 packer.push(Datum::Int64(id));
153 packer.push(Datum::Int64(lead.id));
154 packer.push(Datum::TimestampTz(
155 to_datetime(now()).try_into().expect("timestamp must fit"),
156 ));
157 packer.push(Datum::Int64(amount));
158
159 pending.push((MarketingView::Coupons, coupon, Diff::ONE));
160 }
161
162 let mut converted = rng.sample::<f64, _>(Standard) < score;
166 if sent_coupon && !converted {
167 converted = rng.sample::<f64, _>(Standard) < score;
168 }
169
170 if converted {
171 let converted_at = now() + rng.gen_range(1..30);
172
173 future_updates.insert(
174 converted_at,
175 (MarketingView::Leads, lead.to_row(), Diff::MINUS_ONE),
176 );
177
178 lead.converted_at = Some(converted_at);
179 lead.conversion_amount = Some(rng.gen_range(1000..25000));
180
181 future_updates.insert(
182 converted_at,
183 (MarketingView::Leads, lead.to_row(), Diff::ONE),
184 );
185 }
186 }
187 }
188
189 pending.pop().map(|(output, row, diff)| {
190 let msg = (
191 LoadGeneratorOutput::Marketing(output),
192 Event::Message(MzOffset::from(offset), (row, diff)),
193 );
194
195 let progress = if pending.is_empty() {
196 offset += 1;
197 Some((
198 LoadGeneratorOutput::Marketing(output),
199 Event::Progress(Some(MzOffset::from(offset))),
200 ))
201 } else {
202 None
203 };
204 std::iter::once(msg).chain(progress)
205 })
206 })
207 .flatten(),
208 )
209 }
210}
211
212struct Lead {
213 id: i64,
214 customer_id: i64,
215 created_at: u64,
216 converted_at: Option<u64>,
217 conversion_amount: Option<i64>,
218}
219
220impl Lead {
221 fn to_row(&self) -> Row {
222 let mut row = Row::with_capacity(5);
223 let mut packer = row.packer();
224 packer.push(Datum::Int64(self.id));
225 packer.push(Datum::Int64(self.customer_id));
226 packer.push(Datum::TimestampTz(
227 to_datetime(self.created_at)
228 .try_into()
229 .expect("timestamp must fit"),
230 ));
231 packer.push(
232 self.converted_at
233 .map(|converted_at| {
234 Datum::TimestampTz(
235 to_datetime(converted_at)
236 .try_into()
237 .expect("timestamp must fit"),
238 )
239 })
240 .unwrap_or(Datum::Null),
241 );
242 packer.push(
243 self.conversion_amount
244 .map(Datum::Int64)
245 .unwrap_or(Datum::Null),
246 );
247
248 row
249 }
250}
251
252const CUSTOMERS: &[&str] = &[
253 "andy.rooney@email.com",
254 "marisa.tomei@email.com",
255 "betty.thomas@email.com",
256 "don.imus@email.com",
257 "chevy.chase@email.com",
258 "george.wendt@email.com",
259 "oscar.levant@email.com",
260 "jack.lemmon@email.com",
261 "ben.vereen@email.com",
262 "alexander.hamilton@email.com",
263 "tommy.lee.jones@email.com",
264 "george.takei@email.com",
265 "norman.mailer@email.com",
266 "casey.kasem@email.com",
267 "sarah.miles@email.com",
268 "john.landis@email.com",
269 "george.c..marshall@email.com",
270 "rita.coolidge@email.com",
271 "al.unser@email.com",
272 "ross.perot@email.com",
273 "mikhail.gorbachev@email.com",
274 "yasmine.bleeth@email.com",
275 "darryl.strawberry@email.com",
276 "bruce.springsteen@email.com",
277 "weird.al.yankovic@email.com",
278 "james.franco@email.com",
279 "jean.smart@email.com",
280 "stevie.nicks@email.com",
281 "robert.merrill@email.com",
282 "todd.bridges@email.com",
283 "sam.cooke@email.com",
284 "bert.convy@email.com",
285 "erica.jong@email.com",
286 "oscar.schindler@email.com",
287 "douglas.fairbanks@email.com",
288 "penny.marshall@email.com",
289 "bram.stoker@email.com",
290 "holly.hunter@email.com",
291 "leontyne.price@email.com",
292 "dick.smothers@email.com",
293 "meredith.baxter@email.com",
294 "carla.bruni@email.com",
295 "joel.mccrea@email.com",
296 "mariette.hartley@email.com",
297 "vince.gill@email.com",
298 "leon.schotter@email.com",
299 "johann.von.goethe@email.com",
300 "john.katz@email.com",
301 "attenborough@email.com",
302 "billy.graham@email.com",
303];
304
305#[derive(Default)]
306struct FutureUpdates {
307 updates: BTreeMap<u64, Vec<(MarketingView, Row, Diff)>>,
308}
309
310impl FutureUpdates {
311 fn insert(&mut self, time: u64, update: (MarketingView, Row, Diff)) {
313 match self.updates.entry(time) {
314 Entry::Vacant(v) => {
315 v.insert(vec![update]);
316 }
317 Entry::Occupied(o) => {
318 o.into_mut().push(update);
319 }
320 }
321 }
322
323 fn retrieve(&mut self, time: u64) -> Vec<(MarketingView, Row, Diff)> {
326 let mut updates = vec![];
327 while let Some(e) = self.updates.first_entry() {
328 if *e.key() > time {
329 break;
330 }
331
332 updates.append(&mut e.remove());
333 }
334 updates
335 }
336}