1use std::collections::BTreeMap;
11use std::convert::Infallible;
12use std::sync::Arc;
13
14use differential_dataflow::AsCollection;
15use futures::stream::StreamExt;
16use itertools::Itertools;
17use mz_ore::cast::CastFrom;
18use mz_ore::iter::IteratorExt;
19use mz_repr::{Datum, Diff, GlobalId, Row};
20use mz_storage_types::errors::DataflowError;
21use mz_storage_types::sources::load_generator::{KeyValueLoadGenerator, LoadGeneratorOutput};
22use mz_storage_types::sources::{MzOffset, SourceTimestamp};
23use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
24use mz_timely_util::containers::stack::FueledBuilder;
25use rand_8::rngs::StdRng;
26use rand_8::{RngCore, SeedableRng};
27use timely::container::CapacityContainerBuilder;
28use timely::dataflow::operators::core::Partition;
29use timely::dataflow::operators::vec::ToStream;
30use timely::dataflow::{Scope, StreamVec};
31use timely::progress::{Antichain, Timestamp};
32use tracing::info;
33
34use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
35use crate::source::types::{FuelSize, SignaledFuture, StackedCollection};
36use crate::source::{RawSourceCreationConfig, SourceMessage};
37
38pub fn render<'scope>(
39 key_value: KeyValueLoadGenerator,
40 scope: Scope<'scope, MzOffset>,
41 config: RawSourceCreationConfig,
42 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
43 start_signal: impl std::future::Future<Output = ()> + 'static,
44 output_map: BTreeMap<LoadGeneratorOutput, Vec<usize>>,
45 idx_to_exportid: BTreeMap<usize, GlobalId>,
46) -> (
47 BTreeMap<GlobalId, StackedCollection<'scope, MzOffset, Result<SourceMessage, DataflowError>>>,
48 StreamVec<'scope, MzOffset, Infallible>,
49 StreamVec<'scope, MzOffset, HealthStatusMessage>,
50 Vec<PressOnDropButton>,
51) {
52 let stats_button = render_statistics_operator(scope, &config, committed_uppers);
55
56 let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone());
57
58 let (data_output, stream) = builder.new_output::<FueledBuilder<
59 CapacityContainerBuilder<
60 Vec<(
61 (usize, Result<SourceMessage, DataflowError>),
62 MzOffset,
63 Diff,
64 )>,
65 >,
66 >>();
67 let export_ids: Vec<_> = config.source_exports.keys().copied().collect();
68 let partition_count = u64::cast_from(export_ids.len());
69 let data_streams: Vec<_> = stream.partition::<CapacityContainerBuilder<_>, _, _>(
70 partition_count,
71 |((output, data), time, diff): (
72 (usize, Result<SourceMessage, DataflowError>),
73 MzOffset,
74 Diff,
75 )| {
76 let output = u64::cast_from(output);
77 (output, (data, time, diff))
78 },
79 );
80 let mut data_collections = BTreeMap::new();
81 for (id, data_stream) in export_ids.iter().zip_eq(data_streams) {
82 data_collections.insert(*id, data_stream.as_collection());
83 }
84
85 let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
86
87 let busy_signal = Arc::clone(&config.busy_signal);
88
89 let mut snapshot_export_stats = vec![];
93
94 let mut all_export_stats = vec![];
98 let output_indexes = output_map
99 .get(&LoadGeneratorOutput::Default)
100 .expect("default output")
101 .clone();
102 for export_id in output_indexes.iter().map(|idx| {
103 idx_to_exportid
104 .get(idx)
105 .expect("mapping of output index to export id")
106 }) {
107 let export_resume_upper = config
108 .source_resume_uppers
109 .get(export_id)
110 .map(|rows| Antichain::from_iter(rows.iter().map(MzOffset::decode_row)))
111 .expect("all source exports must be present in resume uppers");
112 tracing::warn!(
113 "source_id={} export_id={} worker_id={} resume_upper={:?}",
114 config.id,
115 export_id,
116 config.worker_id,
117 export_resume_upper
118 );
119 let export_stats = config
120 .statistics
121 .get(export_id)
122 .expect("statistics initialized for export")
123 .clone();
124 if export_resume_upper.as_ref() == &[MzOffset::minimum()] {
125 snapshot_export_stats.push(export_stats.clone());
126 }
127 all_export_stats.push(export_stats);
128 }
129
130 let button = builder.build(move |caps| {
131 SignaledFuture::new(busy_signal, async move {
132 let [mut cap, mut progress_cap]: [_; 2] = caps.try_into().unwrap();
133 let stats_worker = config.responsible_for(0);
134 let resume_upper = Antichain::from_iter(
135 config
136 .source_resume_uppers
137 .values()
138 .flat_map(|f| f.iter().map(MzOffset::decode_row)),
139 );
140
141 let Some(resume_offset) = resume_upper.into_option() else {
142 return;
143 };
144 let snapshotting = resume_offset.offset == 0;
145 if snapshotting {
149 for stats in all_export_stats.iter() {
150 stats.set_snapshot_records_known(0);
151 stats.set_snapshot_records_staged(0);
152 }
153 }
154 let mut local_partitions: Vec<_> = (0..key_value.partitions)
155 .filter_map(|p| {
156 config
157 .responsible_for(p)
158 .then(|| TransactionalSnapshotProducer::new(p, key_value.clone()))
159 })
160 .collect();
161
162 if local_partitions.is_empty() {
164 return;
165 }
166
167 info!(
168 ?config.worker_id,
169 "starting key-value load generator at {}",
170 resume_offset.offset,
171 );
172 cap.downgrade(&resume_offset);
173 progress_cap.downgrade(&resume_offset);
174 start_signal.await;
175 info!(?config.worker_id, "received key-value load generator start signal");
176
177 let local_snapshot_size = (u64::cast_from(local_partitions.len()))
178 * key_value.keys
179 * key_value.transactional_snapshot_rounds()
180 / key_value.partitions;
181
182 let mut value_buffer: Vec<u8> = vec![0; usize::cast_from(key_value.value_size)];
184
185 let mut upper_offset = if snapshotting {
186 let snapshot_rounds = key_value.transactional_snapshot_rounds();
187 if stats_worker {
188 for stats in all_export_stats.iter() {
189 stats.set_offset_known(snapshot_rounds);
190 }
191 }
192
193 progress_cap.downgrade(&MzOffset::from(snapshot_rounds));
195
196 let mut emitted = 0;
197 for stats in snapshot_export_stats.iter() {
198 stats.set_snapshot_records_known(local_snapshot_size);
199 }
200 let num_outputs = u64::cast_from(output_indexes.len()).max(1);
202 while local_partitions.iter().any(|si| !si.finished()) {
203 for sp in local_partitions.iter_mut() {
204 let mut emitted_all_exports = 0;
205 for u in sp.produce_batch(&mut value_buffer, &output_indexes) {
206 let size = u.fuel_size();
207 data_output.give_fueled(&cap, u, size).await;
208 emitted_all_exports += 1;
209 }
210 emitted += emitted_all_exports / num_outputs;
212 }
213 for stats in snapshot_export_stats.iter() {
214 stats.set_snapshot_records_staged(emitted);
215 }
216 }
217 snapshot_export_stats.clear();
219
220 cap.downgrade(&MzOffset::from(snapshot_rounds));
221 snapshot_rounds
222 } else {
223 cap.downgrade(&resume_offset);
224 progress_cap.downgrade(&resume_offset);
225 resume_offset.offset
226 };
227
228 let mut local_partitions: Vec<_> = (0..key_value.partitions)
229 .filter_map(|p| {
230 config
231 .responsible_for(p)
232 .then(|| UpdateProducer::new(p, upper_offset, key_value.clone()))
233 })
234 .collect();
235 if !local_partitions.is_empty()
236 && (key_value.tick_interval.is_some() || !key_value.transactional_snapshot)
237 {
238 let mut interval = key_value.tick_interval.map(tokio::time::interval);
239
240 loop {
241 if local_partitions.iter().all(|si| si.finished_quick()) {
242 if let Some(interval) = &mut interval {
243 interval.tick().await;
244 } else {
245 break;
246 }
247 }
248
249 for up in local_partitions.iter_mut() {
250 let (new_upper, iter) =
251 up.produce_batch(&mut value_buffer, &output_indexes);
252 upper_offset = new_upper;
253 for u in iter {
254 let size = u.fuel_size();
255 data_output.give_fueled(&cap, u, size).await;
256 }
257 }
258 cap.downgrade(&MzOffset::from(upper_offset));
259 progress_cap.downgrade(&MzOffset::from(upper_offset));
260 }
261 }
262 std::future::pending::<()>().await;
263 })
264 });
265
266 let status = export_ids
267 .into_iter()
268 .map(Some)
269 .chain(std::iter::once(None))
270 .map(|id| HealthStatusMessage {
271 id,
272 namespace: StatusNamespace::Generator,
273 update: HealthStatusUpdate::running(),
274 })
275 .collect::<Vec<_>>()
276 .to_stream(scope);
277 (
278 data_collections,
279 progress_stream,
280 status,
281 vec![button.press_on_drop(), stats_button],
282 )
283}
284
285struct PartitionKeyIterator {
287 partition: u64,
289 partitions: u64,
291 keys: u64,
293 next: u64,
295}
296
297impl PartitionKeyIterator {
298 fn new(partition: u64, partitions: u64, keys: u64, start_key: u64) -> Self {
299 assert_eq!(keys % partitions, 0);
300 PartitionKeyIterator {
301 partition,
302 partitions,
303 keys,
304 next: start_key,
305 }
306 }
307}
308
309impl Iterator for &mut PartitionKeyIterator {
310 type Item = u64;
311
312 fn next(&mut self) -> Option<Self::Item> {
313 let ret = self.next;
314 self.next = (self.next + self.partitions) % self.keys;
315 Some(ret)
316 }
317}
318
319fn create_consistent_rng(source_seed: u64, offset: u64, partition: u64) -> StdRng {
322 let mut seed = [0; 32];
323 seed[0..8].copy_from_slice(&source_seed.to_le_bytes());
324 seed[8..16].copy_from_slice(&offset.to_le_bytes());
325 seed[16..24].copy_from_slice(&partition.to_le_bytes());
326 StdRng::from_seed(seed)
327}
328
329struct TransactionalSnapshotProducer {
331 pi: PartitionKeyIterator,
333 batch_size: u64,
335 produced_batches: u64,
337 expected_batches: u64,
339 round: u64,
341 snapshot_rounds: u64,
343 rng: Option<StdRng>,
345 seed: u64,
347 include_offset: bool,
349}
350
351impl TransactionalSnapshotProducer {
352 fn new(partition: u64, key_value: KeyValueLoadGenerator) -> Self {
353 let snapshot_rounds = key_value.transactional_snapshot_rounds();
354
355 let KeyValueLoadGenerator {
356 partitions,
357 keys,
358 batch_size,
359 seed,
360 include_offset,
361 ..
362 } = key_value;
363
364 assert_eq!((keys / partitions) % batch_size, 0);
365 let pi = PartitionKeyIterator::new(
366 partition, partitions, keys, partition,
368 );
369 TransactionalSnapshotProducer {
370 pi,
371 batch_size,
372 produced_batches: 0,
373 expected_batches: keys / partitions / batch_size,
374 round: 0,
375 snapshot_rounds,
376 rng: None,
377 seed,
378 include_offset: include_offset.is_some(),
379 }
380 }
381
382 fn finished(&self) -> bool {
384 self.round >= self.snapshot_rounds
385 }
386
387 fn produce_batch<'a>(
392 &'a mut self,
393 value_buffer: &'a mut Vec<u8>,
394 output_indexes: &'a [usize],
395 ) -> impl Iterator<
396 Item = (
397 (usize, Result<SourceMessage, DataflowError>),
398 MzOffset,
399 Diff,
400 ),
401 > + 'a {
402 let finished = self.finished();
403
404 let rng = self
405 .rng
406 .get_or_insert_with(|| create_consistent_rng(self.seed, self.round, self.pi.partition));
407
408 let partition: u64 = self.pi.partition;
409 let iter_round: u64 = self.round;
410 let include_offset: bool = self.include_offset;
411 let iter = self
412 .pi
413 .take(if finished {
414 0
415 } else {
416 usize::cast_from(self.batch_size)
417 })
418 .flat_map(move |key| {
419 rng.fill_bytes(value_buffer.as_mut_slice());
420 let msg = Ok(SourceMessage {
421 key: Row::pack_slice(&[Datum::UInt64(key)]),
422 value: Row::pack_slice(&[Datum::UInt64(partition), Datum::Bytes(value_buffer)]),
423 metadata: if include_offset {
424 Row::pack(&[Datum::UInt64(iter_round)])
425 } else {
426 Row::default()
427 },
428 });
429 output_indexes
430 .iter()
431 .repeat_clone(msg)
432 .map(move |(idx, msg)| ((*idx, msg), MzOffset::from(iter_round), Diff::ONE))
433 });
434
435 if !finished {
436 self.produced_batches += 1;
437
438 if self.produced_batches == self.expected_batches {
439 self.round += 1;
440 self.produced_batches = 0;
441 }
442 }
443
444 iter
445 }
446}
447
448struct UpdateProducer {
450 pi: PartitionKeyIterator,
452 batch_size: u64,
454 next_offset: u64,
456 seed: u64,
458 expected_quick_offsets: u64,
460 include_offset: bool,
462}
463
464impl UpdateProducer {
465 fn new(partition: u64, next_offset: u64, key_value: KeyValueLoadGenerator) -> Self {
466 let snapshot_rounds = key_value.transactional_snapshot_rounds();
467 let quick_rounds = key_value.non_transactional_snapshot_rounds();
468 let KeyValueLoadGenerator {
469 partitions,
470 keys,
471 batch_size,
472 seed,
473 include_offset,
474 ..
475 } = key_value;
476
477 let start_key =
482 (((next_offset - snapshot_rounds) * batch_size * partitions) + partition) % keys;
483
484 let expected_quick_offsets =
489 ((keys / partitions / batch_size) * quick_rounds) + snapshot_rounds;
490
491 let pi = PartitionKeyIterator::new(partition, partitions, keys, start_key);
492 UpdateProducer {
493 pi,
494 batch_size,
495 next_offset,
496 seed,
497 expected_quick_offsets,
498 include_offset: include_offset.is_some(),
499 }
500 }
501
502 fn finished_quick(&self) -> bool {
504 self.next_offset >= self.expected_quick_offsets
505 }
506
507 fn produce_batch<'a>(
512 &'a mut self,
513 value_buffer: &'a mut Vec<u8>,
514 output_indexes: &'a [usize],
515 ) -> (
516 u64,
517 impl Iterator<
518 Item = (
519 (usize, Result<SourceMessage, DataflowError>),
520 MzOffset,
521 Diff,
522 ),
523 > + 'a,
524 ) {
525 let mut rng = create_consistent_rng(self.seed, self.next_offset, self.pi.partition);
526
527 let partition: u64 = self.pi.partition;
528 let iter_offset: u64 = self.next_offset;
529 let include_offset: bool = self.include_offset;
530 let iter = self
531 .pi
532 .take(usize::cast_from(self.batch_size))
533 .flat_map(move |key| {
534 rng.fill_bytes(value_buffer.as_mut_slice());
535 let msg = Ok(SourceMessage {
536 key: Row::pack_slice(&[Datum::UInt64(key)]),
537 value: Row::pack_slice(&[Datum::UInt64(partition), Datum::Bytes(value_buffer)]),
538 metadata: if include_offset {
539 Row::pack(&[Datum::UInt64(iter_offset)])
540 } else {
541 Row::default()
542 },
543 });
544 output_indexes
545 .iter()
546 .repeat_clone(msg)
547 .map(move |(idx, msg)| ((*idx, msg), MzOffset::from(iter_offset), Diff::ONE))
548 });
549
550 self.next_offset += 1;
552 (self.next_offset, iter)
553 }
554}
555
556pub fn render_statistics_operator<'scope>(
557 scope: Scope<'scope, MzOffset>,
558 config: &RawSourceCreationConfig,
559 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
560) -> PressOnDropButton {
561 let id = config.id;
562 let builder =
563 AsyncOperatorBuilder::new(format!("key_value_loadgen_statistics:{id}"), scope.clone());
564 let offset_worker = config.responsible_for(0);
565 let source_statistics = config.statistics.clone();
566 let button = builder.build(move |caps| async move {
567 drop(caps);
568 if !offset_worker {
569 for stat in source_statistics.values() {
571 stat.set_offset_committed(0);
572 stat.set_offset_known(0);
573 }
574 return;
575 }
576 tokio::pin!(committed_uppers);
577 loop {
578 match committed_uppers.next().await {
579 Some(frontier) => {
580 if let Some(offset) = frontier.as_option() {
581 for stat in source_statistics.values() {
582 stat.set_offset_committed(offset.offset);
583 stat.set_offset_known(offset.offset);
584 }
585 }
586 }
587 None => return,
588 }
589 }
590 });
591 button.press_on_drop()
592}
593
594#[cfg(test)]
595mod test {
596 use super::*;
597
598 #[mz_ore::test]
599 fn test_key_value_loadgen_resume_upper() {
600 let up = UpdateProducer::new(
601 1, 5, KeyValueLoadGenerator {
604 keys: 126,
605 snapshot_rounds: 2,
606 transactional_snapshot: true,
607 value_size: 1234,
608 partitions: 3,
609 tick_interval: None,
610 batch_size: 2,
611 seed: 1234,
612 include_offset: None,
613 },
614 );
615
616 assert_eq!(up.pi.next, 19);
619
620 let up = UpdateProducer::new(
621 1, 5 + 126 / 2, KeyValueLoadGenerator {
624 keys: 126,
625 snapshot_rounds: 2,
626 transactional_snapshot: true,
627 value_size: 1234,
628 partitions: 3,
629 tick_interval: None,
630 batch_size: 2,
631 seed: 1234,
632 include_offset: None,
633 },
634 );
635
636 assert_eq!(up.pi.next, 19);
637 }
638
639 #[mz_ore::test]
640 fn test_key_value_loadgen_part_iter() {
641 let mut pi = PartitionKeyIterator::new(
642 1, 3, 126, 1, );
647
648 assert_eq!(1, Iterator::next(&mut &mut pi).unwrap());
649 assert_eq!(4, Iterator::next(&mut &mut pi).unwrap());
650
651 let _ = pi.take((126 / 3) - 2).count();
653 assert_eq!(pi.next, 1);
654 }
655}