1use std::collections::BTreeMap;
11use std::convert::Infallible;
12use std::sync::Arc;
13
14use differential_dataflow::AsCollection;
15use futures::stream::StreamExt;
16use itertools::Itertools;
17use mz_ore::cast::CastFrom;
18use mz_ore::iter::IteratorExt;
19use mz_repr::{Datum, Diff, GlobalId, Row};
20use mz_storage_types::errors::DataflowError;
21use mz_storage_types::sources::load_generator::{KeyValueLoadGenerator, LoadGeneratorOutput};
22use mz_storage_types::sources::{MzOffset, SourceTimestamp};
23use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
24use mz_timely_util::containers::stack::AccountedStackBuilder;
25use rand::rngs::StdRng;
26use rand::{RngCore, SeedableRng};
27use timely::container::CapacityContainerBuilder;
28use timely::dataflow::operators::ToStream;
29use timely::dataflow::operators::core::Partition;
30use timely::dataflow::{Scope, Stream};
31use timely::progress::{Antichain, Timestamp};
32use tracing::info;
33
34use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
35use crate::source::types::{SignaledFuture, StackedCollection};
36use crate::source::{RawSourceCreationConfig, SourceMessage};
37
38pub fn render<G: Scope<Timestamp = MzOffset>>(
39 key_value: KeyValueLoadGenerator,
40 scope: &mut G,
41 config: RawSourceCreationConfig,
42 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
43 start_signal: impl std::future::Future<Output = ()> + 'static,
44 output_map: BTreeMap<LoadGeneratorOutput, Vec<usize>>,
45 idx_to_exportid: BTreeMap<usize, GlobalId>,
46) -> (
47 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
48 Stream<G, Infallible>,
49 Stream<G, HealthStatusMessage>,
50 Vec<PressOnDropButton>,
51) {
52 let stats_button = render_statistics_operator(scope, &config, committed_uppers);
55
56 let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone());
57
58 let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
59 let partition_count = u64::cast_from(config.source_exports.len());
60 let data_streams: Vec<_> = stream.partition::<CapacityContainerBuilder<_>, _, _>(
61 partition_count,
62 |((output, data), time, diff): &(
63 (usize, Result<SourceMessage, DataflowError>),
64 MzOffset,
65 Diff,
66 )| {
67 let output = u64::cast_from(*output);
68 (output, (data.clone(), time.clone(), diff.clone()))
69 },
70 );
71 let mut data_collections = BTreeMap::new();
72 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
73 data_collections.insert(*id, data_stream.as_collection());
74 }
75
76 let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
77
78 let busy_signal = Arc::clone(&config.busy_signal);
79
80 let mut snapshot_export_stats = vec![];
84
85 let mut all_export_stats = vec![];
89 let output_indexes = output_map
90 .get(&LoadGeneratorOutput::Default)
91 .expect("default output")
92 .clone();
93 for export_id in output_indexes.iter().map(|idx| {
94 idx_to_exportid
95 .get(idx)
96 .expect("mapping of output index to export id")
97 }) {
98 let export_resume_upper = config
99 .source_resume_uppers
100 .get(export_id)
101 .map(|rows| Antichain::from_iter(rows.iter().map(MzOffset::decode_row)))
102 .expect("all source exports must be present in resume uppers");
103 tracing::warn!(
104 "source_id={} export_id={} worker_id={} resume_upper={:?}",
105 config.id,
106 export_id,
107 config.worker_id,
108 export_resume_upper
109 );
110 let export_stats = config
111 .statistics
112 .get(export_id)
113 .expect("statistics initialized for export")
114 .clone();
115 if export_resume_upper.as_ref() == &[MzOffset::minimum()] {
116 snapshot_export_stats.push(export_stats.clone());
117 }
118 all_export_stats.push(export_stats);
119 }
120
121 let button = builder.build(move |caps| {
122 SignaledFuture::new(busy_signal, async move {
123 let [mut cap, mut progress_cap]: [_; 2] = caps.try_into().unwrap();
124 let stats_worker = config.responsible_for(0);
125 let resume_upper = Antichain::from_iter(
126 config
127 .source_resume_uppers
128 .values()
129 .flat_map(|f| f.iter().map(MzOffset::decode_row)),
130 );
131
132 let Some(resume_offset) = resume_upper.into_option() else {
133 return;
134 };
135 let snapshotting = resume_offset.offset == 0;
136 if snapshotting {
140 for stats in all_export_stats.iter() {
141 stats.set_snapshot_records_known(0);
142 stats.set_snapshot_records_staged(0);
143 }
144 }
145 let mut local_partitions: Vec<_> = (0..key_value.partitions)
146 .filter_map(|p| {
147 config
148 .responsible_for(p)
149 .then(|| TransactionalSnapshotProducer::new(p, key_value.clone()))
150 })
151 .collect();
152
153 if local_partitions.is_empty() {
155 return;
156 }
157
158 info!(
159 ?config.worker_id,
160 "starting key-value load generator at {}",
161 resume_offset.offset,
162 );
163 cap.downgrade(&resume_offset);
164 progress_cap.downgrade(&resume_offset);
165 start_signal.await;
166 info!(?config.worker_id, "received key-value load generator start signal");
167
168 let local_snapshot_size = (u64::cast_from(local_partitions.len()))
169 * key_value.keys
170 * key_value.transactional_snapshot_rounds()
171 / key_value.partitions;
172
173 let mut value_buffer: Vec<u8> = vec![0; usize::cast_from(key_value.value_size)];
175
176 let mut upper_offset = if snapshotting {
177 let snapshot_rounds = key_value.transactional_snapshot_rounds();
178 if stats_worker {
179 for stats in all_export_stats.iter() {
180 stats.set_offset_known(snapshot_rounds);
181 }
182 }
183
184 progress_cap.downgrade(&MzOffset::from(snapshot_rounds));
186
187 let mut emitted = 0;
188 for stats in snapshot_export_stats.iter() {
189 stats.set_snapshot_records_known(local_snapshot_size);
190 }
191 let num_outputs = u64::cast_from(output_indexes.len()).max(1);
193 while local_partitions.iter().any(|si| !si.finished()) {
194 for sp in local_partitions.iter_mut() {
195 let mut emitted_all_exports = 0;
196 for u in sp.produce_batch(&mut value_buffer, &output_indexes) {
197 data_output.give_fueled(&cap, u).await;
198 emitted_all_exports += 1;
199 }
200 emitted += emitted_all_exports / num_outputs;
202 }
203 for stats in snapshot_export_stats.iter() {
204 stats.set_snapshot_records_staged(emitted);
205 }
206 }
207 snapshot_export_stats.clear();
209
210 cap.downgrade(&MzOffset::from(snapshot_rounds));
211 snapshot_rounds
212 } else {
213 cap.downgrade(&resume_offset);
214 progress_cap.downgrade(&resume_offset);
215 resume_offset.offset
216 };
217
218 let mut local_partitions: Vec<_> = (0..key_value.partitions)
219 .filter_map(|p| {
220 config
221 .responsible_for(p)
222 .then(|| UpdateProducer::new(p, upper_offset, key_value.clone()))
223 })
224 .collect();
225 if !local_partitions.is_empty()
226 && (key_value.tick_interval.is_some() || !key_value.transactional_snapshot)
227 {
228 let mut interval = key_value.tick_interval.map(tokio::time::interval);
229
230 loop {
231 if local_partitions.iter().all(|si| si.finished_quick()) {
232 if let Some(interval) = &mut interval {
233 interval.tick().await;
234 } else {
235 break;
236 }
237 }
238
239 for up in local_partitions.iter_mut() {
240 let (new_upper, iter) =
241 up.produce_batch(&mut value_buffer, &output_indexes);
242 upper_offset = new_upper;
243 for u in iter {
244 data_output.give_fueled(&cap, u).await;
245 }
246 }
247 cap.downgrade(&MzOffset::from(upper_offset));
248 progress_cap.downgrade(&MzOffset::from(upper_offset));
249 }
250 }
251 std::future::pending::<()>().await;
252 })
253 });
254
255 let status = [HealthStatusMessage {
256 id: None,
257 namespace: StatusNamespace::Generator,
258 update: HealthStatusUpdate::running(),
259 }]
260 .to_stream(scope);
261 (
262 data_collections,
263 progress_stream,
264 status,
265 vec![button.press_on_drop(), stats_button],
266 )
267}
268
269struct PartitionKeyIterator {
271 partition: u64,
273 partitions: u64,
275 keys: u64,
277 next: u64,
279}
280
281impl PartitionKeyIterator {
282 fn new(partition: u64, partitions: u64, keys: u64, start_key: u64) -> Self {
283 assert_eq!(keys % partitions, 0);
284 PartitionKeyIterator {
285 partition,
286 partitions,
287 keys,
288 next: start_key,
289 }
290 }
291}
292
293impl Iterator for &mut PartitionKeyIterator {
294 type Item = u64;
295
296 fn next(&mut self) -> Option<Self::Item> {
297 let ret = self.next;
298 self.next = (self.next + self.partitions) % self.keys;
299 Some(ret)
300 }
301}
302
303fn create_consistent_rng(source_seed: u64, offset: u64, partition: u64) -> StdRng {
306 let mut seed = [0; 32];
307 seed[0..8].copy_from_slice(&source_seed.to_le_bytes());
308 seed[8..16].copy_from_slice(&offset.to_le_bytes());
309 seed[16..24].copy_from_slice(&partition.to_le_bytes());
310 StdRng::from_seed(seed)
311}
312
313struct TransactionalSnapshotProducer {
315 pi: PartitionKeyIterator,
317 batch_size: u64,
319 produced_batches: u64,
321 expected_batches: u64,
323 round: u64,
325 snapshot_rounds: u64,
327 rng: Option<StdRng>,
329 seed: u64,
331 include_offset: bool,
333}
334
335impl TransactionalSnapshotProducer {
336 fn new(partition: u64, key_value: KeyValueLoadGenerator) -> Self {
337 let snapshot_rounds = key_value.transactional_snapshot_rounds();
338
339 let KeyValueLoadGenerator {
340 partitions,
341 keys,
342 batch_size,
343 seed,
344 include_offset,
345 ..
346 } = key_value;
347
348 assert_eq!((keys / partitions) % batch_size, 0);
349 let pi = PartitionKeyIterator::new(
350 partition, partitions, keys, partition,
352 );
353 TransactionalSnapshotProducer {
354 pi,
355 batch_size,
356 produced_batches: 0,
357 expected_batches: keys / partitions / batch_size,
358 round: 0,
359 snapshot_rounds,
360 rng: None,
361 seed,
362 include_offset: include_offset.is_some(),
363 }
364 }
365
366 fn finished(&self) -> bool {
368 self.round >= self.snapshot_rounds
369 }
370
371 fn produce_batch<'a>(
376 &'a mut self,
377 value_buffer: &'a mut Vec<u8>,
378 output_indexes: &'a [usize],
379 ) -> impl Iterator<
380 Item = (
381 (usize, Result<SourceMessage, DataflowError>),
382 MzOffset,
383 Diff,
384 ),
385 > + 'a {
386 let finished = self.finished();
387
388 let rng = self
389 .rng
390 .get_or_insert_with(|| create_consistent_rng(self.seed, self.round, self.pi.partition));
391
392 let partition: u64 = self.pi.partition;
393 let iter_round: u64 = self.round;
394 let include_offset: bool = self.include_offset;
395 let iter = self
396 .pi
397 .take(if finished {
398 0
399 } else {
400 usize::cast_from(self.batch_size)
401 })
402 .flat_map(move |key| {
403 rng.fill_bytes(value_buffer.as_mut_slice());
404 let msg = Ok(SourceMessage {
405 key: Row::pack_slice(&[Datum::UInt64(key)]),
406 value: Row::pack_slice(&[Datum::UInt64(partition), Datum::Bytes(value_buffer)]),
407 metadata: if include_offset {
408 Row::pack(&[Datum::UInt64(iter_round)])
409 } else {
410 Row::default()
411 },
412 });
413 output_indexes
414 .iter()
415 .repeat_clone(msg)
416 .map(move |(idx, msg)| ((*idx, msg), MzOffset::from(iter_round), Diff::ONE))
417 });
418
419 if !finished {
420 self.produced_batches += 1;
421
422 if self.produced_batches == self.expected_batches {
423 self.round += 1;
424 self.produced_batches = 0;
425 }
426 }
427
428 iter
429 }
430}
431
432struct UpdateProducer {
434 pi: PartitionKeyIterator,
436 batch_size: u64,
438 next_offset: u64,
440 seed: u64,
442 expected_quick_offsets: u64,
444 include_offset: bool,
446}
447
448impl UpdateProducer {
449 fn new(partition: u64, next_offset: u64, key_value: KeyValueLoadGenerator) -> Self {
450 let snapshot_rounds = key_value.transactional_snapshot_rounds();
451 let quick_rounds = key_value.non_transactional_snapshot_rounds();
452 let KeyValueLoadGenerator {
453 partitions,
454 keys,
455 batch_size,
456 seed,
457 include_offset,
458 ..
459 } = key_value;
460
461 let start_key =
466 (((next_offset - snapshot_rounds) * batch_size * partitions) + partition) % keys;
467
468 let expected_quick_offsets =
473 ((keys / partitions / batch_size) * quick_rounds) + snapshot_rounds;
474
475 let pi = PartitionKeyIterator::new(partition, partitions, keys, start_key);
476 UpdateProducer {
477 pi,
478 batch_size,
479 next_offset,
480 seed,
481 expected_quick_offsets,
482 include_offset: include_offset.is_some(),
483 }
484 }
485
486 fn finished_quick(&self) -> bool {
488 self.next_offset >= self.expected_quick_offsets
489 }
490
491 fn produce_batch<'a>(
496 &'a mut self,
497 value_buffer: &'a mut Vec<u8>,
498 output_indexes: &'a [usize],
499 ) -> (
500 u64,
501 impl Iterator<
502 Item = (
503 (usize, Result<SourceMessage, DataflowError>),
504 MzOffset,
505 Diff,
506 ),
507 > + 'a,
508 ) {
509 let mut rng = create_consistent_rng(self.seed, self.next_offset, self.pi.partition);
510
511 let partition: u64 = self.pi.partition;
512 let iter_offset: u64 = self.next_offset;
513 let include_offset: bool = self.include_offset;
514 let iter = self
515 .pi
516 .take(usize::cast_from(self.batch_size))
517 .flat_map(move |key| {
518 rng.fill_bytes(value_buffer.as_mut_slice());
519 let msg = Ok(SourceMessage {
520 key: Row::pack_slice(&[Datum::UInt64(key)]),
521 value: Row::pack_slice(&[Datum::UInt64(partition), Datum::Bytes(value_buffer)]),
522 metadata: if include_offset {
523 Row::pack(&[Datum::UInt64(iter_offset)])
524 } else {
525 Row::default()
526 },
527 });
528 output_indexes
529 .iter()
530 .repeat_clone(msg)
531 .map(move |(idx, msg)| ((*idx, msg), MzOffset::from(iter_offset), Diff::ONE))
532 });
533
534 self.next_offset += 1;
536 (self.next_offset, iter)
537 }
538}
539
540pub fn render_statistics_operator<G: Scope<Timestamp = MzOffset>>(
541 scope: &G,
542 config: &RawSourceCreationConfig,
543 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
544) -> PressOnDropButton {
545 let id = config.id;
546 let builder =
547 AsyncOperatorBuilder::new(format!("key_value_loadgen_statistics:{id}"), scope.clone());
548 let offset_worker = config.responsible_for(0);
549 let source_statistics = config.statistics.clone();
550 let button = builder.build(move |caps| async move {
551 drop(caps);
552 if !offset_worker {
553 for stat in source_statistics.values() {
555 stat.set_offset_committed(0);
556 stat.set_offset_known(0);
557 }
558 return;
559 }
560 tokio::pin!(committed_uppers);
561 loop {
562 match committed_uppers.next().await {
563 Some(frontier) => {
564 if let Some(offset) = frontier.as_option() {
565 for stat in source_statistics.values() {
566 stat.set_offset_committed(offset.offset);
567 stat.set_offset_known(offset.offset);
568 }
569 }
570 }
571 None => return,
572 }
573 }
574 });
575 button.press_on_drop()
576}
577
578#[cfg(test)]
579mod test {
580 use super::*;
581
582 #[mz_ore::test]
583 fn test_key_value_loadgen_resume_upper() {
584 let up = UpdateProducer::new(
585 1, 5, KeyValueLoadGenerator {
588 keys: 126,
589 snapshot_rounds: 2,
590 transactional_snapshot: true,
591 value_size: 1234,
592 partitions: 3,
593 tick_interval: None,
594 batch_size: 2,
595 seed: 1234,
596 include_offset: None,
597 },
598 );
599
600 assert_eq!(up.pi.next, 19);
603
604 let up = UpdateProducer::new(
605 1, 5 + 126 / 2, KeyValueLoadGenerator {
608 keys: 126,
609 snapshot_rounds: 2,
610 transactional_snapshot: true,
611 value_size: 1234,
612 partitions: 3,
613 tick_interval: None,
614 batch_size: 2,
615 seed: 1234,
616 include_offset: None,
617 },
618 );
619
620 assert_eq!(up.pi.next, 19);
621 }
622
623 #[mz_ore::test]
624 fn test_key_value_loadgen_part_iter() {
625 let mut pi = PartitionKeyIterator::new(
626 1, 3, 126, 1, );
631
632 assert_eq!(1, Iterator::next(&mut &mut pi).unwrap());
633 assert_eq!(4, Iterator::next(&mut &mut pi).unwrap());
634
635 let _ = pi.take((126 / 3) - 2).count();
637 assert_eq!(pi.next, 1);
638 }
639}