mz_storage/source/generator/
marketing.rs1use std::{
11 collections::{BTreeMap, btree_map::Entry},
12 iter,
13};
14
15use mz_ore::now::to_datetime;
16use mz_repr::{Datum, Diff, Row};
17use mz_storage_types::sources::MzOffset;
18use mz_storage_types::sources::load_generator::{
19 Event, Generator, LoadGeneratorOutput, MarketingView,
20};
21use rand::{Rng, SeedableRng, distributions::Standard, rngs::SmallRng};
22
23const CONTROL: &str = "control";
24const EXPERIMENT: &str = "experiment";
25
26pub struct Marketing {}
27
28impl Generator for Marketing {
32 fn by_seed(
33 &self,
34 now: mz_ore::now::NowFn,
35 seed: Option<u64>,
36 _resume_offset: MzOffset,
37 ) -> Box<dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>> {
38 let mut rng: SmallRng = SmallRng::seed_from_u64(seed.unwrap_or_default());
39
40 let mut counter = 0;
41
42 let mut future_updates = FutureUpdates::default();
43 let mut pending: Vec<(MarketingView, Row, Diff)> = CUSTOMERS
44 .into_iter()
45 .enumerate()
46 .map(|(id, email)| {
47 let mut customer = Row::with_capacity(3);
48 let mut packer = customer.packer();
49
50 packer.push(Datum::Int64(id.try_into().unwrap()));
51 packer.push(Datum::String(email));
52 packer.push(Datum::Int64(rng.gen_range(5_000_000..10_000_000i64)));
53
54 (MarketingView::Customers, customer, Diff::ONE)
55 })
56 .collect();
57
58 let mut offset = 0;
59 Box::new(
60 iter::from_fn(move || {
61 if pending.is_empty() {
62 let mut impression = Row::with_capacity(4);
63 let mut packer = impression.packer();
64
65 let impression_id = counter;
66 counter += 1;
67
68 packer.push(Datum::Int64(impression_id));
69 packer.push(Datum::Int64(
70 rng.gen_range(0..CUSTOMERS.len()).try_into().unwrap(),
71 ));
72 packer.push(Datum::Int64(rng.gen_range(0..20i64)));
73 let impression_time = now();
74 packer.push(Datum::TimestampTz(
75 to_datetime(impression_time)
76 .try_into()
77 .expect("timestamp must fit"),
78 ));
79
80 pending.push((MarketingView::Impressions, impression, Diff::ONE));
81
82 if rng.gen_range(0..10) == 1 {
85 let mut click = Row::with_capacity(2);
86 let mut packer = click.packer();
87
88 let click_time = impression_time + rng.gen_range(20000..40000);
89
90 packer.push(Datum::Int64(impression_id));
91 packer.push(Datum::TimestampTz(
92 to_datetime(click_time)
93 .try_into()
94 .expect("timestamp must fit"),
95 ));
96
97 future_updates
98 .insert(click_time, (MarketingView::Clicks, click, Diff::ONE));
99 }
100
101 let mut updates = future_updates.retrieve(now());
102 pending.append(&mut updates);
103
104 for _ in 0..rng.gen_range(1..2) {
105 let id = counter;
106 counter += 1;
107
108 let mut lead = Lead {
109 id,
110 customer_id: rng.gen_range(0..CUSTOMERS.len()).try_into().unwrap(),
111 created_at: now(),
112 converted_at: None,
113 conversion_amount: None,
114 };
115
116 pending.push((MarketingView::Leads, lead.to_row(), Diff::ONE));
117
118 let score = rng.sample::<f64, _>(Standard);
121 let label = score > 0.5f64;
122
123 let bucket = if lead.id % 10 <= 1 {
124 CONTROL
125 } else {
126 EXPERIMENT
127 };
128
129 let mut prediction = Row::with_capacity(4);
130 let mut packer = prediction.packer();
131
132 packer.push(Datum::Int64(lead.id));
133 packer.push(Datum::String(bucket));
134 packer.push(Datum::TimestampTz(
135 to_datetime(now()).try_into().expect("timestamp must fit"),
136 ));
137 packer.push(Datum::Float64(score.into()));
138
139 pending.push((MarketingView::ConversionPredictions, prediction, Diff::ONE));
140
141 let mut sent_coupon = false;
142 if !label && bucket == EXPERIMENT {
143 sent_coupon = true;
144 let amount = rng.gen_range(500..5000);
145
146 let mut coupon = Row::with_capacity(4);
147 let mut packer = coupon.packer();
148
149 let id = counter;
150 counter += 1;
151 packer.push(Datum::Int64(id));
152 packer.push(Datum::Int64(lead.id));
153 packer.push(Datum::TimestampTz(
154 to_datetime(now()).try_into().expect("timestamp must fit"),
155 ));
156 packer.push(Datum::Int64(amount));
157
158 pending.push((MarketingView::Coupons, coupon, Diff::ONE));
159 }
160
161 let mut converted = rng.sample::<f64, _>(Standard) < score;
165 if sent_coupon && !converted {
166 converted = rng.sample::<f64, _>(Standard) < score;
167 }
168
169 if converted {
170 let converted_at = now() + rng.gen_range(1..30);
171
172 future_updates.insert(
173 converted_at,
174 (MarketingView::Leads, lead.to_row(), Diff::MINUS_ONE),
175 );
176
177 lead.converted_at = Some(converted_at);
178 lead.conversion_amount = Some(rng.gen_range(1000..25000));
179
180 future_updates.insert(
181 converted_at,
182 (MarketingView::Leads, lead.to_row(), Diff::ONE),
183 );
184 }
185 }
186 }
187
188 pending.pop().map(|(output, row, diff)| {
189 let msg = (
190 LoadGeneratorOutput::Marketing(output),
191 Event::Message(MzOffset::from(offset), (row, diff)),
192 );
193
194 let progress = if pending.is_empty() {
195 offset += 1;
196 Some((
197 LoadGeneratorOutput::Marketing(output),
198 Event::Progress(Some(MzOffset::from(offset))),
199 ))
200 } else {
201 None
202 };
203 std::iter::once(msg).chain(progress)
204 })
205 })
206 .flatten(),
207 )
208 }
209}
210
211struct Lead {
212 id: i64,
213 customer_id: i64,
214 created_at: u64,
215 converted_at: Option<u64>,
216 conversion_amount: Option<i64>,
217}
218
219impl Lead {
220 fn to_row(&self) -> Row {
221 let mut row = Row::with_capacity(5);
222 let mut packer = row.packer();
223 packer.push(Datum::Int64(self.id));
224 packer.push(Datum::Int64(self.customer_id));
225 packer.push(Datum::TimestampTz(
226 to_datetime(self.created_at)
227 .try_into()
228 .expect("timestamp must fit"),
229 ));
230 packer.push(
231 self.converted_at
232 .map(|converted_at| {
233 Datum::TimestampTz(
234 to_datetime(converted_at)
235 .try_into()
236 .expect("timestamp must fit"),
237 )
238 })
239 .unwrap_or(Datum::Null),
240 );
241 packer.push(
242 self.conversion_amount
243 .map(Datum::Int64)
244 .unwrap_or(Datum::Null),
245 );
246
247 row
248 }
249}
250
251const CUSTOMERS: &[&str] = &[
252 "andy.rooney@email.com",
253 "marisa.tomei@email.com",
254 "betty.thomas@email.com",
255 "don.imus@email.com",
256 "chevy.chase@email.com",
257 "george.wendt@email.com",
258 "oscar.levant@email.com",
259 "jack.lemmon@email.com",
260 "ben.vereen@email.com",
261 "alexander.hamilton@email.com",
262 "tommy.lee.jones@email.com",
263 "george.takei@email.com",
264 "norman.mailer@email.com",
265 "casey.kasem@email.com",
266 "sarah.miles@email.com",
267 "john.landis@email.com",
268 "george.c..marshall@email.com",
269 "rita.coolidge@email.com",
270 "al.unser@email.com",
271 "ross.perot@email.com",
272 "mikhail.gorbachev@email.com",
273 "yasmine.bleeth@email.com",
274 "darryl.strawberry@email.com",
275 "bruce.springsteen@email.com",
276 "weird.al.yankovic@email.com",
277 "james.franco@email.com",
278 "jean.smart@email.com",
279 "stevie.nicks@email.com",
280 "robert.merrill@email.com",
281 "todd.bridges@email.com",
282 "sam.cooke@email.com",
283 "bert.convy@email.com",
284 "erica.jong@email.com",
285 "oscar.schindler@email.com",
286 "douglas.fairbanks@email.com",
287 "penny.marshall@email.com",
288 "bram.stoker@email.com",
289 "holly.hunter@email.com",
290 "leontyne.price@email.com",
291 "dick.smothers@email.com",
292 "meredith.baxter@email.com",
293 "carla.bruni@email.com",
294 "joel.mccrea@email.com",
295 "mariette.hartley@email.com",
296 "vince.gill@email.com",
297 "leon.schotter@email.com",
298 "johann.von.goethe@email.com",
299 "john.katz@email.com",
300 "attenborough@email.com",
301 "billy.graham@email.com",
302];
303
304#[derive(Default)]
305struct FutureUpdates {
306 updates: BTreeMap<u64, Vec<(MarketingView, Row, Diff)>>,
307}
308
309impl FutureUpdates {
310 fn insert(&mut self, time: u64, update: (MarketingView, Row, Diff)) {
312 match self.updates.entry(time) {
313 Entry::Vacant(v) => {
314 v.insert(vec![update]);
315 }
316 Entry::Occupied(o) => {
317 o.into_mut().push(update);
318 }
319 }
320 }
321
322 fn retrieve(&mut self, time: u64) -> Vec<(MarketingView, Row, Diff)> {
325 let mut updates = vec![];
326 while let Some(e) = self.updates.first_entry() {
327 if *e.key() > time {
328 break;
329 }
330
331 updates.append(&mut e.remove());
332 }
333 updates
334 }
335}