1use std::collections::BTreeMap;
11use std::convert::Infallible;
12use std::sync::Arc;
13
14use differential_dataflow::AsCollection;
15use futures::stream::StreamExt;
16use itertools::Itertools;
17use mz_ore::cast::CastFrom;
18use mz_ore::iter::IteratorExt;
19use mz_repr::{Datum, Diff, GlobalId, Row};
20use mz_storage_types::errors::DataflowError;
21use mz_storage_types::sources::load_generator::{KeyValueLoadGenerator, LoadGeneratorOutput};
22use mz_storage_types::sources::{MzOffset, SourceTimestamp};
23use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
24use mz_timely_util::containers::stack::AccountedStackBuilder;
25use rand::rngs::StdRng;
26use rand::{RngCore, SeedableRng};
27use timely::container::CapacityContainerBuilder;
28use timely::dataflow::operators::ToStream;
29use timely::dataflow::operators::core::Partition;
30use timely::dataflow::{Scope, Stream};
31use timely::progress::{Antichain, Timestamp};
32use tracing::info;
33
34use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
35use crate::source::types::{SignaledFuture, StackedCollection};
36use crate::source::{RawSourceCreationConfig, SourceMessage};
37
38pub fn render<G: Scope<Timestamp = MzOffset>>(
39 key_value: KeyValueLoadGenerator,
40 scope: &mut G,
41 config: RawSourceCreationConfig,
42 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
43 start_signal: impl std::future::Future<Output = ()> + 'static,
44 output_map: BTreeMap<LoadGeneratorOutput, Vec<usize>>,
45 idx_to_exportid: BTreeMap<usize, GlobalId>,
46) -> (
47 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
48 Stream<G, Infallible>,
49 Stream<G, HealthStatusMessage>,
50 Vec<PressOnDropButton>,
51) {
52 let stats_button = render_statistics_operator(scope, &config, committed_uppers);
55
56 let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone());
57
58 let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
59 let export_ids: Vec<_> = config.source_exports.keys().copied().collect();
60 let partition_count = u64::cast_from(export_ids.len());
61 let data_streams: Vec<_> = stream.partition::<CapacityContainerBuilder<_>, _, _>(
62 partition_count,
63 |((output, data), time, diff): &(
64 (usize, Result<SourceMessage, DataflowError>),
65 MzOffset,
66 Diff,
67 )| {
68 let output = u64::cast_from(*output);
69 (output, (data.clone(), time.clone(), diff.clone()))
70 },
71 );
72 let mut data_collections = BTreeMap::new();
73 for (id, data_stream) in export_ids.iter().zip_eq(data_streams) {
74 data_collections.insert(*id, data_stream.as_collection());
75 }
76
77 let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
78
79 let busy_signal = Arc::clone(&config.busy_signal);
80
81 let mut snapshot_export_stats = vec![];
85
86 let mut all_export_stats = vec![];
90 let output_indexes = output_map
91 .get(&LoadGeneratorOutput::Default)
92 .expect("default output")
93 .clone();
94 for export_id in output_indexes.iter().map(|idx| {
95 idx_to_exportid
96 .get(idx)
97 .expect("mapping of output index to export id")
98 }) {
99 let export_resume_upper = config
100 .source_resume_uppers
101 .get(export_id)
102 .map(|rows| Antichain::from_iter(rows.iter().map(MzOffset::decode_row)))
103 .expect("all source exports must be present in resume uppers");
104 tracing::warn!(
105 "source_id={} export_id={} worker_id={} resume_upper={:?}",
106 config.id,
107 export_id,
108 config.worker_id,
109 export_resume_upper
110 );
111 let export_stats = config
112 .statistics
113 .get(export_id)
114 .expect("statistics initialized for export")
115 .clone();
116 if export_resume_upper.as_ref() == &[MzOffset::minimum()] {
117 snapshot_export_stats.push(export_stats.clone());
118 }
119 all_export_stats.push(export_stats);
120 }
121
122 let button = builder.build(move |caps| {
123 SignaledFuture::new(busy_signal, async move {
124 let [mut cap, mut progress_cap]: [_; 2] = caps.try_into().unwrap();
125 let stats_worker = config.responsible_for(0);
126 let resume_upper = Antichain::from_iter(
127 config
128 .source_resume_uppers
129 .values()
130 .flat_map(|f| f.iter().map(MzOffset::decode_row)),
131 );
132
133 let Some(resume_offset) = resume_upper.into_option() else {
134 return;
135 };
136 let snapshotting = resume_offset.offset == 0;
137 if snapshotting {
141 for stats in all_export_stats.iter() {
142 stats.set_snapshot_records_known(0);
143 stats.set_snapshot_records_staged(0);
144 }
145 }
146 let mut local_partitions: Vec<_> = (0..key_value.partitions)
147 .filter_map(|p| {
148 config
149 .responsible_for(p)
150 .then(|| TransactionalSnapshotProducer::new(p, key_value.clone()))
151 })
152 .collect();
153
154 if local_partitions.is_empty() {
156 return;
157 }
158
159 info!(
160 ?config.worker_id,
161 "starting key-value load generator at {}",
162 resume_offset.offset,
163 );
164 cap.downgrade(&resume_offset);
165 progress_cap.downgrade(&resume_offset);
166 start_signal.await;
167 info!(?config.worker_id, "received key-value load generator start signal");
168
169 let local_snapshot_size = (u64::cast_from(local_partitions.len()))
170 * key_value.keys
171 * key_value.transactional_snapshot_rounds()
172 / key_value.partitions;
173
174 let mut value_buffer: Vec<u8> = vec![0; usize::cast_from(key_value.value_size)];
176
177 let mut upper_offset = if snapshotting {
178 let snapshot_rounds = key_value.transactional_snapshot_rounds();
179 if stats_worker {
180 for stats in all_export_stats.iter() {
181 stats.set_offset_known(snapshot_rounds);
182 }
183 }
184
185 progress_cap.downgrade(&MzOffset::from(snapshot_rounds));
187
188 let mut emitted = 0;
189 for stats in snapshot_export_stats.iter() {
190 stats.set_snapshot_records_known(local_snapshot_size);
191 }
192 let num_outputs = u64::cast_from(output_indexes.len()).max(1);
194 while local_partitions.iter().any(|si| !si.finished()) {
195 for sp in local_partitions.iter_mut() {
196 let mut emitted_all_exports = 0;
197 for u in sp.produce_batch(&mut value_buffer, &output_indexes) {
198 data_output.give_fueled(&cap, u).await;
199 emitted_all_exports += 1;
200 }
201 emitted += emitted_all_exports / num_outputs;
203 }
204 for stats in snapshot_export_stats.iter() {
205 stats.set_snapshot_records_staged(emitted);
206 }
207 }
208 snapshot_export_stats.clear();
210
211 cap.downgrade(&MzOffset::from(snapshot_rounds));
212 snapshot_rounds
213 } else {
214 cap.downgrade(&resume_offset);
215 progress_cap.downgrade(&resume_offset);
216 resume_offset.offset
217 };
218
219 let mut local_partitions: Vec<_> = (0..key_value.partitions)
220 .filter_map(|p| {
221 config
222 .responsible_for(p)
223 .then(|| UpdateProducer::new(p, upper_offset, key_value.clone()))
224 })
225 .collect();
226 if !local_partitions.is_empty()
227 && (key_value.tick_interval.is_some() || !key_value.transactional_snapshot)
228 {
229 let mut interval = key_value.tick_interval.map(tokio::time::interval);
230
231 loop {
232 if local_partitions.iter().all(|si| si.finished_quick()) {
233 if let Some(interval) = &mut interval {
234 interval.tick().await;
235 } else {
236 break;
237 }
238 }
239
240 for up in local_partitions.iter_mut() {
241 let (new_upper, iter) =
242 up.produce_batch(&mut value_buffer, &output_indexes);
243 upper_offset = new_upper;
244 for u in iter {
245 data_output.give_fueled(&cap, u).await;
246 }
247 }
248 cap.downgrade(&MzOffset::from(upper_offset));
249 progress_cap.downgrade(&MzOffset::from(upper_offset));
250 }
251 }
252 std::future::pending::<()>().await;
253 })
254 });
255
256 let status = export_ids
257 .into_iter()
258 .map(Some)
259 .chain(std::iter::once(None))
260 .map(|id| HealthStatusMessage {
261 id,
262 namespace: StatusNamespace::Generator,
263 update: HealthStatusUpdate::running(),
264 })
265 .collect::<Vec<_>>()
266 .to_stream(scope);
267 (
268 data_collections,
269 progress_stream,
270 status,
271 vec![button.press_on_drop(), stats_button],
272 )
273}
274
275struct PartitionKeyIterator {
277 partition: u64,
279 partitions: u64,
281 keys: u64,
283 next: u64,
285}
286
287impl PartitionKeyIterator {
288 fn new(partition: u64, partitions: u64, keys: u64, start_key: u64) -> Self {
289 assert_eq!(keys % partitions, 0);
290 PartitionKeyIterator {
291 partition,
292 partitions,
293 keys,
294 next: start_key,
295 }
296 }
297}
298
299impl Iterator for &mut PartitionKeyIterator {
300 type Item = u64;
301
302 fn next(&mut self) -> Option<Self::Item> {
303 let ret = self.next;
304 self.next = (self.next + self.partitions) % self.keys;
305 Some(ret)
306 }
307}
308
309fn create_consistent_rng(source_seed: u64, offset: u64, partition: u64) -> StdRng {
312 let mut seed = [0; 32];
313 seed[0..8].copy_from_slice(&source_seed.to_le_bytes());
314 seed[8..16].copy_from_slice(&offset.to_le_bytes());
315 seed[16..24].copy_from_slice(&partition.to_le_bytes());
316 StdRng::from_seed(seed)
317}
318
319struct TransactionalSnapshotProducer {
321 pi: PartitionKeyIterator,
323 batch_size: u64,
325 produced_batches: u64,
327 expected_batches: u64,
329 round: u64,
331 snapshot_rounds: u64,
333 rng: Option<StdRng>,
335 seed: u64,
337 include_offset: bool,
339}
340
341impl TransactionalSnapshotProducer {
342 fn new(partition: u64, key_value: KeyValueLoadGenerator) -> Self {
343 let snapshot_rounds = key_value.transactional_snapshot_rounds();
344
345 let KeyValueLoadGenerator {
346 partitions,
347 keys,
348 batch_size,
349 seed,
350 include_offset,
351 ..
352 } = key_value;
353
354 assert_eq!((keys / partitions) % batch_size, 0);
355 let pi = PartitionKeyIterator::new(
356 partition, partitions, keys, partition,
358 );
359 TransactionalSnapshotProducer {
360 pi,
361 batch_size,
362 produced_batches: 0,
363 expected_batches: keys / partitions / batch_size,
364 round: 0,
365 snapshot_rounds,
366 rng: None,
367 seed,
368 include_offset: include_offset.is_some(),
369 }
370 }
371
372 fn finished(&self) -> bool {
374 self.round >= self.snapshot_rounds
375 }
376
377 fn produce_batch<'a>(
382 &'a mut self,
383 value_buffer: &'a mut Vec<u8>,
384 output_indexes: &'a [usize],
385 ) -> impl Iterator<
386 Item = (
387 (usize, Result<SourceMessage, DataflowError>),
388 MzOffset,
389 Diff,
390 ),
391 > + 'a {
392 let finished = self.finished();
393
394 let rng = self
395 .rng
396 .get_or_insert_with(|| create_consistent_rng(self.seed, self.round, self.pi.partition));
397
398 let partition: u64 = self.pi.partition;
399 let iter_round: u64 = self.round;
400 let include_offset: bool = self.include_offset;
401 let iter = self
402 .pi
403 .take(if finished {
404 0
405 } else {
406 usize::cast_from(self.batch_size)
407 })
408 .flat_map(move |key| {
409 rng.fill_bytes(value_buffer.as_mut_slice());
410 let msg = Ok(SourceMessage {
411 key: Row::pack_slice(&[Datum::UInt64(key)]),
412 value: Row::pack_slice(&[Datum::UInt64(partition), Datum::Bytes(value_buffer)]),
413 metadata: if include_offset {
414 Row::pack(&[Datum::UInt64(iter_round)])
415 } else {
416 Row::default()
417 },
418 });
419 output_indexes
420 .iter()
421 .repeat_clone(msg)
422 .map(move |(idx, msg)| ((*idx, msg), MzOffset::from(iter_round), Diff::ONE))
423 });
424
425 if !finished {
426 self.produced_batches += 1;
427
428 if self.produced_batches == self.expected_batches {
429 self.round += 1;
430 self.produced_batches = 0;
431 }
432 }
433
434 iter
435 }
436}
437
438struct UpdateProducer {
440 pi: PartitionKeyIterator,
442 batch_size: u64,
444 next_offset: u64,
446 seed: u64,
448 expected_quick_offsets: u64,
450 include_offset: bool,
452}
453
454impl UpdateProducer {
455 fn new(partition: u64, next_offset: u64, key_value: KeyValueLoadGenerator) -> Self {
456 let snapshot_rounds = key_value.transactional_snapshot_rounds();
457 let quick_rounds = key_value.non_transactional_snapshot_rounds();
458 let KeyValueLoadGenerator {
459 partitions,
460 keys,
461 batch_size,
462 seed,
463 include_offset,
464 ..
465 } = key_value;
466
467 let start_key =
472 (((next_offset - snapshot_rounds) * batch_size * partitions) + partition) % keys;
473
474 let expected_quick_offsets =
479 ((keys / partitions / batch_size) * quick_rounds) + snapshot_rounds;
480
481 let pi = PartitionKeyIterator::new(partition, partitions, keys, start_key);
482 UpdateProducer {
483 pi,
484 batch_size,
485 next_offset,
486 seed,
487 expected_quick_offsets,
488 include_offset: include_offset.is_some(),
489 }
490 }
491
492 fn finished_quick(&self) -> bool {
494 self.next_offset >= self.expected_quick_offsets
495 }
496
497 fn produce_batch<'a>(
502 &'a mut self,
503 value_buffer: &'a mut Vec<u8>,
504 output_indexes: &'a [usize],
505 ) -> (
506 u64,
507 impl Iterator<
508 Item = (
509 (usize, Result<SourceMessage, DataflowError>),
510 MzOffset,
511 Diff,
512 ),
513 > + 'a,
514 ) {
515 let mut rng = create_consistent_rng(self.seed, self.next_offset, self.pi.partition);
516
517 let partition: u64 = self.pi.partition;
518 let iter_offset: u64 = self.next_offset;
519 let include_offset: bool = self.include_offset;
520 let iter = self
521 .pi
522 .take(usize::cast_from(self.batch_size))
523 .flat_map(move |key| {
524 rng.fill_bytes(value_buffer.as_mut_slice());
525 let msg = Ok(SourceMessage {
526 key: Row::pack_slice(&[Datum::UInt64(key)]),
527 value: Row::pack_slice(&[Datum::UInt64(partition), Datum::Bytes(value_buffer)]),
528 metadata: if include_offset {
529 Row::pack(&[Datum::UInt64(iter_offset)])
530 } else {
531 Row::default()
532 },
533 });
534 output_indexes
535 .iter()
536 .repeat_clone(msg)
537 .map(move |(idx, msg)| ((*idx, msg), MzOffset::from(iter_offset), Diff::ONE))
538 });
539
540 self.next_offset += 1;
542 (self.next_offset, iter)
543 }
544}
545
546pub fn render_statistics_operator<G: Scope<Timestamp = MzOffset>>(
547 scope: &G,
548 config: &RawSourceCreationConfig,
549 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
550) -> PressOnDropButton {
551 let id = config.id;
552 let builder =
553 AsyncOperatorBuilder::new(format!("key_value_loadgen_statistics:{id}"), scope.clone());
554 let offset_worker = config.responsible_for(0);
555 let source_statistics = config.statistics.clone();
556 let button = builder.build(move |caps| async move {
557 drop(caps);
558 if !offset_worker {
559 for stat in source_statistics.values() {
561 stat.set_offset_committed(0);
562 stat.set_offset_known(0);
563 }
564 return;
565 }
566 tokio::pin!(committed_uppers);
567 loop {
568 match committed_uppers.next().await {
569 Some(frontier) => {
570 if let Some(offset) = frontier.as_option() {
571 for stat in source_statistics.values() {
572 stat.set_offset_committed(offset.offset);
573 stat.set_offset_known(offset.offset);
574 }
575 }
576 }
577 None => return,
578 }
579 }
580 });
581 button.press_on_drop()
582}
583
584#[cfg(test)]
585mod test {
586 use super::*;
587
588 #[mz_ore::test]
589 fn test_key_value_loadgen_resume_upper() {
590 let up = UpdateProducer::new(
591 1, 5, KeyValueLoadGenerator {
594 keys: 126,
595 snapshot_rounds: 2,
596 transactional_snapshot: true,
597 value_size: 1234,
598 partitions: 3,
599 tick_interval: None,
600 batch_size: 2,
601 seed: 1234,
602 include_offset: None,
603 },
604 );
605
606 assert_eq!(up.pi.next, 19);
609
610 let up = UpdateProducer::new(
611 1, 5 + 126 / 2, KeyValueLoadGenerator {
614 keys: 126,
615 snapshot_rounds: 2,
616 transactional_snapshot: true,
617 value_size: 1234,
618 partitions: 3,
619 tick_interval: None,
620 batch_size: 2,
621 seed: 1234,
622 include_offset: None,
623 },
624 );
625
626 assert_eq!(up.pi.next, 19);
627 }
628
629 #[mz_ore::test]
630 fn test_key_value_loadgen_part_iter() {
631 let mut pi = PartitionKeyIterator::new(
632 1, 3, 126, 1, );
637
638 assert_eq!(1, Iterator::next(&mut &mut pi).unwrap());
639 assert_eq!(4, Iterator::next(&mut &mut pi).unwrap());
640
641 let _ = pi.take((126 / 3) - 2).count();
643 assert_eq!(pi.next, 1);
644 }
645}