1use std::collections::BTreeMap;
11use std::convert::Infallible;
12use std::sync::Arc;
13
14use differential_dataflow::AsCollection;
15use futures::stream::StreamExt;
16use itertools::Itertools;
17use mz_ore::cast::CastFrom;
18use mz_ore::iter::IteratorExt;
19use mz_repr::{Datum, Diff, GlobalId, Row};
20use mz_storage_types::errors::DataflowError;
21use mz_storage_types::sources::load_generator::{KeyValueLoadGenerator, LoadGeneratorOutput};
22use mz_storage_types::sources::{MzOffset, SourceTimestamp};
23use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
24use mz_timely_util::containers::stack::AccountedStackBuilder;
25use rand::rngs::StdRng;
26use rand::{RngCore, SeedableRng};
27use timely::container::CapacityContainerBuilder;
28use timely::dataflow::operators::core::Partition;
29use timely::dataflow::operators::{Concat, ToStream};
30use timely::dataflow::{Scope, Stream};
31use timely::progress::Antichain;
32use tracing::info;
33
34use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
35use crate::source::types::{ProgressStatisticsUpdate, SignaledFuture, StackedCollection};
36use crate::source::{RawSourceCreationConfig, SourceMessage};
37
38pub fn render<G: Scope<Timestamp = MzOffset>>(
39 key_value: KeyValueLoadGenerator,
40 scope: &mut G,
41 config: RawSourceCreationConfig,
42 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
43 start_signal: impl std::future::Future<Output = ()> + 'static,
44 output_map: BTreeMap<LoadGeneratorOutput, Vec<usize>>,
45) -> (
46 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
47 Stream<G, Infallible>,
48 Stream<G, HealthStatusMessage>,
49 Stream<G, ProgressStatisticsUpdate>,
50 Vec<PressOnDropButton>,
51) {
52 let (steady_state_stats_stream, stats_button) =
53 render_statistics_operator(scope, &config, committed_uppers);
54
55 let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone());
56
57 let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
58 let partition_count = u64::cast_from(config.source_exports.len());
59 let data_streams: Vec<_> = stream.partition::<CapacityContainerBuilder<_>, _, _>(
60 partition_count,
61 |((output, data), time, diff): &(
62 (usize, Result<SourceMessage, DataflowError>),
63 MzOffset,
64 Diff,
65 )| {
66 let output = u64::cast_from(*output);
67 (output, (data.clone(), time.clone(), diff.clone()))
68 },
69 );
70 let mut data_collections = BTreeMap::new();
71 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
72 data_collections.insert(*id, data_stream.as_collection());
73 }
74
75 let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
76 let (stats_output, stats_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
77
78 let busy_signal = Arc::clone(&config.busy_signal);
79 let button = builder.build(move |caps| {
80 SignaledFuture::new(busy_signal, async move {
81 let [mut cap, mut progress_cap, stats_cap]: [_; 3] = caps.try_into().unwrap();
82
83 let resume_upper = Antichain::from_iter(
84 config
85 .source_resume_uppers
86 .values()
87 .flat_map(|f| f.iter().map(MzOffset::decode_row)),
88 );
89
90 let Some(resume_offset) = resume_upper.into_option() else {
91 return;
92 };
93
94 let output_indexes = output_map
97 .get(&LoadGeneratorOutput::Default)
98 .expect("default output");
99
100 info!(
101 ?config.worker_id,
102 "starting key-value load generator at {}",
103 resume_offset.offset,
104 );
105 cap.downgrade(&resume_offset);
106 progress_cap.downgrade(&resume_offset);
107 start_signal.await;
108 info!(?config.worker_id, "received key-value load generator start signal");
109
110 let snapshotting = resume_offset.offset == 0;
111
112 let mut local_partitions: Vec<_> = (0..key_value.partitions)
113 .filter_map(|p| {
114 config
115 .responsible_for(p)
116 .then(|| TransactionalSnapshotProducer::new(p, key_value.clone()))
117 })
118 .collect();
119
120 let stats_worker = config.responsible_for(0);
121
122 if local_partitions.is_empty() {
123 stats_output.give(
124 &stats_cap,
125 ProgressStatisticsUpdate::Snapshot {
126 records_known: 0,
127 records_staged: 0,
128 },
129 );
130 return;
131 }
132
133 let local_snapshot_size = (u64::cast_from(local_partitions.len()))
134 * key_value.keys
135 * key_value.transactional_snapshot_rounds()
136 / key_value.partitions;
137
138 let mut value_buffer: Vec<u8> = vec![0; usize::cast_from(key_value.value_size)];
140
141 let mut upper_offset = if snapshotting {
143 let snapshot_rounds = key_value.transactional_snapshot_rounds();
144
145 if stats_worker {
146 stats_output.give(
147 &stats_cap,
148 ProgressStatisticsUpdate::SteadyState {
149 offset_known: snapshot_rounds,
150 offset_committed: 0,
151 },
152 );
153 };
154
155 progress_cap.downgrade(&MzOffset::from(snapshot_rounds));
157
158 let mut emitted = 0;
159 stats_output.give(
160 &stats_cap,
161 ProgressStatisticsUpdate::Snapshot {
162 records_known: local_snapshot_size,
163 records_staged: emitted,
164 },
165 );
166 while local_partitions.iter().any(|si| !si.finished()) {
167 for sp in local_partitions.iter_mut() {
168 for u in sp.produce_batch(&mut value_buffer, output_indexes) {
169 data_output.give_fueled(&cap, u).await;
170 emitted += 1;
171 }
172
173 stats_output.give(
174 &stats_cap,
175 ProgressStatisticsUpdate::Snapshot {
176 records_known: local_snapshot_size,
177 records_staged: emitted,
178 },
179 );
180 }
181 }
182
183 cap.downgrade(&MzOffset::from(snapshot_rounds));
184 snapshot_rounds
185 } else {
186 cap.downgrade(&resume_offset);
187 progress_cap.downgrade(&resume_offset);
188 resume_offset.offset
189 };
190
191 let mut local_partitions: Vec<_> = (0..key_value.partitions)
192 .filter_map(|p| {
193 config
194 .responsible_for(p)
195 .then(|| UpdateProducer::new(p, upper_offset, key_value.clone()))
196 })
197 .collect();
198 if !local_partitions.is_empty()
199 && (key_value.tick_interval.is_some() || !key_value.transactional_snapshot)
200 {
201 let mut interval = key_value.tick_interval.map(tokio::time::interval);
202
203 loop {
204 if local_partitions.iter().all(|si| si.finished_quick()) {
205 if let Some(interval) = &mut interval {
206 interval.tick().await;
207 } else {
208 break;
209 }
210 }
211
212 for up in local_partitions.iter_mut() {
213 let (new_upper, iter) = up.produce_batch(&mut value_buffer, output_indexes);
214 upper_offset = new_upper;
215 for u in iter {
216 data_output.give_fueled(&cap, u).await;
217 }
218 }
219 cap.downgrade(&MzOffset::from(upper_offset));
220 progress_cap.downgrade(&MzOffset::from(upper_offset));
221 }
222 }
223
224 std::future::pending::<()>().await;
225 })
226 });
227
228 let status = [HealthStatusMessage {
229 id: None,
230 namespace: StatusNamespace::Generator,
231 update: HealthStatusUpdate::running(),
232 }]
233 .to_stream(scope);
234 let stats_stream = stats_stream.concat(&steady_state_stats_stream);
235
236 (
237 data_collections,
238 progress_stream,
239 status,
240 stats_stream,
241 vec![button.press_on_drop(), stats_button],
242 )
243}
244
245struct PartitionKeyIterator {
247 partition: u64,
249 partitions: u64,
251 keys: u64,
253 next: u64,
255}
256
257impl PartitionKeyIterator {
258 fn new(partition: u64, partitions: u64, keys: u64, start_key: u64) -> Self {
259 assert_eq!(keys % partitions, 0);
260 PartitionKeyIterator {
261 partition,
262 partitions,
263 keys,
264 next: start_key,
265 }
266 }
267}
268
269impl Iterator for &mut PartitionKeyIterator {
270 type Item = u64;
271
272 fn next(&mut self) -> Option<Self::Item> {
273 let ret = self.next;
274 self.next = (self.next + self.partitions) % self.keys;
275 Some(ret)
276 }
277}
278
279fn create_consistent_rng(source_seed: u64, offset: u64, partition: u64) -> StdRng {
282 let mut seed = [0; 32];
283 seed[0..8].copy_from_slice(&source_seed.to_le_bytes());
284 seed[8..16].copy_from_slice(&offset.to_le_bytes());
285 seed[16..24].copy_from_slice(&partition.to_le_bytes());
286 StdRng::from_seed(seed)
287}
288
289struct TransactionalSnapshotProducer {
291 pi: PartitionKeyIterator,
293 batch_size: u64,
295 produced_batches: u64,
297 expected_batches: u64,
299 round: u64,
301 snapshot_rounds: u64,
303 rng: Option<StdRng>,
305 seed: u64,
307 include_offset: bool,
309}
310
311impl TransactionalSnapshotProducer {
312 fn new(partition: u64, key_value: KeyValueLoadGenerator) -> Self {
313 let snapshot_rounds = key_value.transactional_snapshot_rounds();
314
315 let KeyValueLoadGenerator {
316 partitions,
317 keys,
318 batch_size,
319 seed,
320 include_offset,
321 ..
322 } = key_value;
323
324 assert_eq!((keys / partitions) % batch_size, 0);
325 let pi = PartitionKeyIterator::new(
326 partition, partitions, keys, partition,
328 );
329 TransactionalSnapshotProducer {
330 pi,
331 batch_size,
332 produced_batches: 0,
333 expected_batches: keys / partitions / batch_size,
334 round: 0,
335 snapshot_rounds,
336 rng: None,
337 seed,
338 include_offset: include_offset.is_some(),
339 }
340 }
341
342 fn finished(&self) -> bool {
344 self.round >= self.snapshot_rounds
345 }
346
347 fn produce_batch<'a>(
352 &'a mut self,
353 value_buffer: &'a mut Vec<u8>,
354 output_indexes: &'a [usize],
355 ) -> impl Iterator<
356 Item = (
357 (usize, Result<SourceMessage, DataflowError>),
358 MzOffset,
359 Diff,
360 ),
361 > + 'a {
362 let finished = self.finished();
363
364 let rng = self
365 .rng
366 .get_or_insert_with(|| create_consistent_rng(self.seed, self.round, self.pi.partition));
367
368 let partition: u64 = self.pi.partition;
369 let iter_round: u64 = self.round;
370 let include_offset: bool = self.include_offset;
371 let iter = self
372 .pi
373 .take(if finished {
374 0
375 } else {
376 usize::cast_from(self.batch_size)
377 })
378 .flat_map(move |key| {
379 rng.fill_bytes(value_buffer.as_mut_slice());
380 let msg = Ok(SourceMessage {
381 key: Row::pack_slice(&[Datum::UInt64(key)]),
382 value: Row::pack_slice(&[Datum::UInt64(partition), Datum::Bytes(value_buffer)]),
383 metadata: if include_offset {
384 Row::pack(&[Datum::UInt64(iter_round)])
385 } else {
386 Row::default()
387 },
388 });
389 output_indexes
390 .iter()
391 .repeat_clone(msg)
392 .map(move |(idx, msg)| ((*idx, msg), MzOffset::from(iter_round), Diff::ONE))
393 });
394
395 if !finished {
396 self.produced_batches += 1;
397
398 if self.produced_batches == self.expected_batches {
399 self.round += 1;
400 self.produced_batches = 0;
401 }
402 }
403
404 iter
405 }
406}
407
408struct UpdateProducer {
410 pi: PartitionKeyIterator,
412 batch_size: u64,
414 next_offset: u64,
416 seed: u64,
418 expected_quick_offsets: u64,
420 include_offset: bool,
422}
423
424impl UpdateProducer {
425 fn new(partition: u64, next_offset: u64, key_value: KeyValueLoadGenerator) -> Self {
426 let snapshot_rounds = key_value.transactional_snapshot_rounds();
427 let quick_rounds = key_value.non_transactional_snapshot_rounds();
428 let KeyValueLoadGenerator {
429 partitions,
430 keys,
431 batch_size,
432 seed,
433 include_offset,
434 ..
435 } = key_value;
436
437 let start_key =
442 (((next_offset - snapshot_rounds) * batch_size * partitions) + partition) % keys;
443
444 let expected_quick_offsets =
449 ((keys / partitions / batch_size) * quick_rounds) + snapshot_rounds;
450
451 let pi = PartitionKeyIterator::new(partition, partitions, keys, start_key);
452 UpdateProducer {
453 pi,
454 batch_size,
455 next_offset,
456 seed,
457 expected_quick_offsets,
458 include_offset: include_offset.is_some(),
459 }
460 }
461
462 fn finished_quick(&self) -> bool {
464 self.next_offset >= self.expected_quick_offsets
465 }
466
467 fn produce_batch<'a>(
472 &'a mut self,
473 value_buffer: &'a mut Vec<u8>,
474 output_indexes: &'a [usize],
475 ) -> (
476 u64,
477 impl Iterator<
478 Item = (
479 (usize, Result<SourceMessage, DataflowError>),
480 MzOffset,
481 Diff,
482 ),
483 > + 'a,
484 ) {
485 let mut rng = create_consistent_rng(self.seed, self.next_offset, self.pi.partition);
486
487 let partition: u64 = self.pi.partition;
488 let iter_offset: u64 = self.next_offset;
489 let include_offset: bool = self.include_offset;
490 let iter = self
491 .pi
492 .take(usize::cast_from(self.batch_size))
493 .flat_map(move |key| {
494 rng.fill_bytes(value_buffer.as_mut_slice());
495 let msg = Ok(SourceMessage {
496 key: Row::pack_slice(&[Datum::UInt64(key)]),
497 value: Row::pack_slice(&[Datum::UInt64(partition), Datum::Bytes(value_buffer)]),
498 metadata: if include_offset {
499 Row::pack(&[Datum::UInt64(iter_offset)])
500 } else {
501 Row::default()
502 },
503 });
504 output_indexes
505 .iter()
506 .repeat_clone(msg)
507 .map(move |(idx, msg)| ((*idx, msg), MzOffset::from(iter_offset), Diff::ONE))
508 });
509
510 self.next_offset += 1;
512 (self.next_offset, iter)
513 }
514}
515
516pub fn render_statistics_operator<G: Scope<Timestamp = MzOffset>>(
517 scope: &G,
518 config: &RawSourceCreationConfig,
519 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
520) -> (Stream<G, ProgressStatisticsUpdate>, PressOnDropButton) {
521 let id = config.id;
522 let mut builder =
523 AsyncOperatorBuilder::new(format!("key_value_loadgen_statistics:{id}"), scope.clone());
524
525 let (stats_output, stats_stream) = builder.new_output();
526
527 let offset_worker = config.responsible_for(0);
528
529 let button = builder.build(move |caps| async move {
530 let [stats_cap]: [_; 1] = caps.try_into().unwrap();
531
532 if !offset_worker {
533 stats_output.give(
535 &stats_cap,
536 ProgressStatisticsUpdate::SteadyState {
537 offset_known: 0,
538 offset_committed: 0,
539 },
540 );
541 return;
542 }
543
544 tokio::pin!(committed_uppers);
545 loop {
546 match committed_uppers.next().await {
547 Some(frontier) => {
548 if let Some(offset) = frontier.as_option() {
549 stats_output.give(
550 &stats_cap,
551 ProgressStatisticsUpdate::SteadyState {
552 offset_known: offset.offset,
553 offset_committed: offset.offset,
554 },
555 );
556 }
557 }
558 None => return,
559 }
560 }
561 });
562
563 (stats_stream, button.press_on_drop())
564}
565
566#[cfg(test)]
567mod test {
568 use super::*;
569
570 #[mz_ore::test]
571 fn test_key_value_loadgen_resume_upper() {
572 let up = UpdateProducer::new(
573 1, 5, KeyValueLoadGenerator {
576 keys: 126,
577 snapshot_rounds: 2,
578 transactional_snapshot: true,
579 value_size: 1234,
580 partitions: 3,
581 tick_interval: None,
582 batch_size: 2,
583 seed: 1234,
584 include_offset: None,
585 },
586 );
587
588 assert_eq!(up.pi.next, 19);
591
592 let up = UpdateProducer::new(
593 1, 5 + 126 / 2, KeyValueLoadGenerator {
596 keys: 126,
597 snapshot_rounds: 2,
598 transactional_snapshot: true,
599 value_size: 1234,
600 partitions: 3,
601 tick_interval: None,
602 batch_size: 2,
603 seed: 1234,
604 include_offset: None,
605 },
606 );
607
608 assert_eq!(up.pi.next, 19);
609 }
610
611 #[mz_ore::test]
612 fn test_key_value_loadgen_part_iter() {
613 let mut pi = PartitionKeyIterator::new(
614 1, 3, 126, 1, );
619
620 assert_eq!(1, Iterator::next(&mut &mut pi).unwrap());
621 assert_eq!(4, Iterator::next(&mut &mut pi).unwrap());
622
623 let _ = pi.take((126 / 3) - 2).count();
625 assert_eq!(pi.next, 1);
626 }
627}