mz_storage/source/generator/
marketing.rs1use std::{
11 collections::{BTreeMap, btree_map::Entry},
12 iter,
13};
14
15use mz_ore::now::to_datetime;
16use mz_repr::{Datum, Diff, Row};
17use mz_storage_types::sources::MzOffset;
18use mz_storage_types::sources::load_generator::{
19 Event, Generator, LoadGeneratorOutput, MarketingView,
20};
21use rand_8::distributions::Standard;
22use rand_8::rngs::SmallRng;
23use rand_8::{Rng, SeedableRng};
24
25const CONTROL: &str = "control";
26const EXPERIMENT: &str = "experiment";
27
28pub struct Marketing {}
29
30impl Generator for Marketing {
34 fn by_seed(
35 &self,
36 now: mz_ore::now::NowFn,
37 seed: Option<u64>,
38 _resume_offset: MzOffset,
39 ) -> Box<dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>> {
40 let mut rng: SmallRng = SmallRng::seed_from_u64(seed.unwrap_or_default());
41
42 let mut counter = 0;
43
44 let mut future_updates = FutureUpdates::default();
45 let mut pending: Vec<(MarketingView, Row, Diff)> = CUSTOMERS
46 .into_iter()
47 .enumerate()
48 .map(|(id, email)| {
49 let mut customer = Row::with_capacity(3);
50 let mut packer = customer.packer();
51
52 packer.push(Datum::Int64(id.try_into().unwrap()));
53 packer.push(Datum::String(email));
54 packer.push(Datum::Int64(rng.gen_range(5_000_000..10_000_000i64)));
55
56 (MarketingView::Customers, customer, Diff::ONE)
57 })
58 .collect();
59
60 let mut offset = 0;
61 Box::new(
62 iter::from_fn(move || {
63 if pending.is_empty() {
64 let mut impression = Row::with_capacity(4);
65 let mut packer = impression.packer();
66
67 let impression_id = counter;
68 counter += 1;
69
70 packer.push(Datum::Int64(impression_id));
71 packer.push(Datum::Int64(
72 rng.gen_range(0..CUSTOMERS.len()).try_into().unwrap(),
73 ));
74 packer.push(Datum::Int64(rng.gen_range(0..20i64)));
75 let impression_time = now();
76 packer.push(Datum::TimestampTz(
77 to_datetime(impression_time)
78 .try_into()
79 .expect("timestamp must fit"),
80 ));
81
82 pending.push((MarketingView::Impressions, impression, Diff::ONE));
83
84 if rng.gen_range(0..10) == 1 {
87 let mut click = Row::with_capacity(2);
88 let mut packer = click.packer();
89
90 let click_time = impression_time + rng.gen_range(20000..40000);
91
92 packer.push(Datum::Int64(impression_id));
93 packer.push(Datum::TimestampTz(
94 to_datetime(click_time)
95 .try_into()
96 .expect("timestamp must fit"),
97 ));
98
99 future_updates
100 .insert(click_time, (MarketingView::Clicks, click, Diff::ONE));
101 }
102
103 let mut updates = future_updates.retrieve(now());
104 pending.append(&mut updates);
105
106 for _ in 0..rng.gen_range(1..2) {
107 let id = counter;
108 counter += 1;
109
110 let mut lead = Lead {
111 id,
112 customer_id: rng.gen_range(0..CUSTOMERS.len()).try_into().unwrap(),
113 created_at: now(),
114 converted_at: None,
115 conversion_amount: None,
116 };
117
118 pending.push((MarketingView::Leads, lead.to_row(), Diff::ONE));
119
120 let score = rng.sample::<f64, _>(Standard);
123 let label = score > 0.5f64;
124
125 let bucket = if lead.id % 10 <= 1 {
126 CONTROL
127 } else {
128 EXPERIMENT
129 };
130
131 let mut prediction = Row::with_capacity(4);
132 let mut packer = prediction.packer();
133
134 packer.push(Datum::Int64(lead.id));
135 packer.push(Datum::String(bucket));
136 packer.push(Datum::TimestampTz(
137 to_datetime(now()).try_into().expect("timestamp must fit"),
138 ));
139 packer.push(Datum::Float64(score.into()));
140
141 pending.push((MarketingView::ConversionPredictions, prediction, Diff::ONE));
142
143 let mut sent_coupon = false;
144 if !label && bucket == EXPERIMENT {
145 sent_coupon = true;
146 let amount = rng.gen_range(500..5000);
147
148 let mut coupon = Row::with_capacity(4);
149 let mut packer = coupon.packer();
150
151 let id = counter;
152 counter += 1;
153 packer.push(Datum::Int64(id));
154 packer.push(Datum::Int64(lead.id));
155 packer.push(Datum::TimestampTz(
156 to_datetime(now()).try_into().expect("timestamp must fit"),
157 ));
158 packer.push(Datum::Int64(amount));
159
160 pending.push((MarketingView::Coupons, coupon, Diff::ONE));
161 }
162
163 let mut converted = rng.sample::<f64, _>(Standard) < score;
167 if sent_coupon && !converted {
168 converted = rng.sample::<f64, _>(Standard) < score;
169 }
170
171 if converted {
172 let converted_at = now() + rng.gen_range(1..30);
173
174 future_updates.insert(
175 converted_at,
176 (MarketingView::Leads, lead.to_row(), Diff::MINUS_ONE),
177 );
178
179 lead.converted_at = Some(converted_at);
180 lead.conversion_amount = Some(rng.gen_range(1000..25000));
181
182 future_updates.insert(
183 converted_at,
184 (MarketingView::Leads, lead.to_row(), Diff::ONE),
185 );
186 }
187 }
188 }
189
190 pending.pop().map(|(output, row, diff)| {
191 let msg = (
192 LoadGeneratorOutput::Marketing(output),
193 Event::Message(MzOffset::from(offset), (row, diff)),
194 );
195
196 let progress = if pending.is_empty() {
197 offset += 1;
198 Some((
199 LoadGeneratorOutput::Marketing(output),
200 Event::Progress(Some(MzOffset::from(offset))),
201 ))
202 } else {
203 None
204 };
205 std::iter::once(msg).chain(progress)
206 })
207 })
208 .flatten(),
209 )
210 }
211}
212
213struct Lead {
214 id: i64,
215 customer_id: i64,
216 created_at: u64,
217 converted_at: Option<u64>,
218 conversion_amount: Option<i64>,
219}
220
221impl Lead {
222 fn to_row(&self) -> Row {
223 let mut row = Row::with_capacity(5);
224 let mut packer = row.packer();
225 packer.push(Datum::Int64(self.id));
226 packer.push(Datum::Int64(self.customer_id));
227 packer.push(Datum::TimestampTz(
228 to_datetime(self.created_at)
229 .try_into()
230 .expect("timestamp must fit"),
231 ));
232 packer.push(
233 self.converted_at
234 .map(|converted_at| {
235 Datum::TimestampTz(
236 to_datetime(converted_at)
237 .try_into()
238 .expect("timestamp must fit"),
239 )
240 })
241 .unwrap_or(Datum::Null),
242 );
243 packer.push(
244 self.conversion_amount
245 .map(Datum::Int64)
246 .unwrap_or(Datum::Null),
247 );
248
249 row
250 }
251}
252
253const CUSTOMERS: &[&str] = &[
254 "andy.rooney@email.com",
255 "marisa.tomei@email.com",
256 "betty.thomas@email.com",
257 "don.imus@email.com",
258 "chevy.chase@email.com",
259 "george.wendt@email.com",
260 "oscar.levant@email.com",
261 "jack.lemmon@email.com",
262 "ben.vereen@email.com",
263 "alexander.hamilton@email.com",
264 "tommy.lee.jones@email.com",
265 "george.takei@email.com",
266 "norman.mailer@email.com",
267 "casey.kasem@email.com",
268 "sarah.miles@email.com",
269 "john.landis@email.com",
270 "george.c..marshall@email.com",
271 "rita.coolidge@email.com",
272 "al.unser@email.com",
273 "ross.perot@email.com",
274 "mikhail.gorbachev@email.com",
275 "yasmine.bleeth@email.com",
276 "darryl.strawberry@email.com",
277 "bruce.springsteen@email.com",
278 "weird.al.yankovic@email.com",
279 "james.franco@email.com",
280 "jean.smart@email.com",
281 "stevie.nicks@email.com",
282 "robert.merrill@email.com",
283 "todd.bridges@email.com",
284 "sam.cooke@email.com",
285 "bert.convy@email.com",
286 "erica.jong@email.com",
287 "oscar.schindler@email.com",
288 "douglas.fairbanks@email.com",
289 "penny.marshall@email.com",
290 "bram.stoker@email.com",
291 "holly.hunter@email.com",
292 "leontyne.price@email.com",
293 "dick.smothers@email.com",
294 "meredith.baxter@email.com",
295 "carla.bruni@email.com",
296 "joel.mccrea@email.com",
297 "mariette.hartley@email.com",
298 "vince.gill@email.com",
299 "leon.schotter@email.com",
300 "johann.von.goethe@email.com",
301 "john.katz@email.com",
302 "attenborough@email.com",
303 "billy.graham@email.com",
304];
305
306#[derive(Default)]
307struct FutureUpdates {
308 updates: BTreeMap<u64, Vec<(MarketingView, Row, Diff)>>,
309}
310
311impl FutureUpdates {
312 fn insert(&mut self, time: u64, update: (MarketingView, Row, Diff)) {
314 match self.updates.entry(time) {
315 Entry::Vacant(v) => {
316 v.insert(vec![update]);
317 }
318 Entry::Occupied(o) => {
319 o.into_mut().push(update);
320 }
321 }
322 }
323
324 fn retrieve(&mut self, time: u64) -> Vec<(MarketingView, Row, Diff)> {
327 let mut updates = vec![];
328 while let Some(e) = self.updates.first_entry() {
329 if *e.key() > time {
330 break;
331 }
332
333 updates.append(&mut e.remove());
334 }
335 updates
336 }
337}