mz_storage/source/generator/
key_value.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::convert::Infallible;
12use std::sync::Arc;
13
14use differential_dataflow::AsCollection;
15use futures::stream::StreamExt;
16use itertools::Itertools;
17use mz_ore::cast::CastFrom;
18use mz_ore::iter::IteratorExt;
19use mz_repr::{Datum, Diff, GlobalId, Row};
20use mz_storage_types::errors::DataflowError;
21use mz_storage_types::sources::load_generator::{KeyValueLoadGenerator, LoadGeneratorOutput};
22use mz_storage_types::sources::{MzOffset, SourceTimestamp};
23use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
24use mz_timely_util::containers::stack::AccountedStackBuilder;
25use rand::rngs::StdRng;
26use rand::{RngCore, SeedableRng};
27use timely::container::CapacityContainerBuilder;
28use timely::dataflow::operators::ToStream;
29use timely::dataflow::operators::core::Partition;
30use timely::dataflow::{Scope, Stream};
31use timely::progress::{Antichain, Timestamp};
32use tracing::info;
33
34use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
35use crate::source::types::{SignaledFuture, StackedCollection};
36use crate::source::{RawSourceCreationConfig, SourceMessage};
37
38pub fn render<G: Scope<Timestamp = MzOffset>>(
39    key_value: KeyValueLoadGenerator,
40    scope: &mut G,
41    config: RawSourceCreationConfig,
42    committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
43    start_signal: impl std::future::Future<Output = ()> + 'static,
44    output_map: BTreeMap<LoadGeneratorOutput, Vec<usize>>,
45    idx_to_exportid: BTreeMap<usize, GlobalId>,
46) -> (
47    BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
48    Stream<G, Infallible>,
49    Stream<G, HealthStatusMessage>,
50    Vec<PressOnDropButton>,
51) {
52    // known and comitted offsets are recorded in the stats operator
53    // It's easier to have this operator record the metrics rather than trying to special case it below.
54    let stats_button = render_statistics_operator(scope, &config, committed_uppers);
55
56    let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone());
57
58    let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
59    let export_ids: Vec<_> = config.source_exports.keys().copied().collect();
60    let partition_count = u64::cast_from(export_ids.len());
61    let data_streams: Vec<_> = stream.partition::<CapacityContainerBuilder<_>, _, _>(
62        partition_count,
63        |((output, data), time, diff): &(
64            (usize, Result<SourceMessage, DataflowError>),
65            MzOffset,
66            Diff,
67        )| {
68            let output = u64::cast_from(*output);
69            (output, (data.clone(), time.clone(), diff.clone()))
70        },
71    );
72    let mut data_collections = BTreeMap::new();
73    for (id, data_stream) in export_ids.iter().zip_eq(data_streams) {
74        data_collections.insert(*id, data_stream.as_collection());
75    }
76
77    let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
78
79    let busy_signal = Arc::clone(&config.busy_signal);
80
81    // The key-value load generator only has one 'output' stream, which is the default output
82    // that needs to be emitted to all output indexes.
83    // Contains the `SourceStatistics` entries for exports that require a snapshot.
84    let mut snapshot_export_stats = vec![];
85
86    // We can't just iterate over config.statistics (which came from StorageState.aggregated_statistics)
87    // AggregateStatistics will contain SourceStatistics for all exports of key-value load generator, not
88    // just the exports of the config.id.
89    let mut all_export_stats = vec![];
90    let output_indexes = output_map
91        .get(&LoadGeneratorOutput::Default)
92        .expect("default output")
93        .clone();
94    for export_id in output_indexes.iter().map(|idx| {
95        idx_to_exportid
96            .get(idx)
97            .expect("mapping of output index to export id")
98    }) {
99        let export_resume_upper = config
100            .source_resume_uppers
101            .get(export_id)
102            .map(|rows| Antichain::from_iter(rows.iter().map(MzOffset::decode_row)))
103            .expect("all source exports must be present in resume uppers");
104        tracing::warn!(
105            "source_id={} export_id={} worker_id={} resume_upper={:?}",
106            config.id,
107            export_id,
108            config.worker_id,
109            export_resume_upper
110        );
111        let export_stats = config
112            .statistics
113            .get(export_id)
114            .expect("statistics initialized for export")
115            .clone();
116        if export_resume_upper.as_ref() == &[MzOffset::minimum()] {
117            snapshot_export_stats.push(export_stats.clone());
118        }
119        all_export_stats.push(export_stats);
120    }
121
122    let button = builder.build(move |caps| {
123        SignaledFuture::new(busy_signal, async move {
124            let [mut cap, mut progress_cap]: [_; 2] = caps.try_into().unwrap();
125            let stats_worker = config.responsible_for(0);
126            let resume_upper = Antichain::from_iter(
127                config
128                    .source_resume_uppers
129                    .values()
130                    .flat_map(|f| f.iter().map(MzOffset::decode_row)),
131            );
132
133            let Some(resume_offset) = resume_upper.into_option() else {
134                return;
135            };
136            let snapshotting = resume_offset.offset == 0;
137            // A worker *must* emit a count even if not responsible for snapshotting a table
138            // as statistic summarization will return null if any worker hasn't set a value.
139            // This will also reset snapshot stats for any exports not snapshotting.
140            if snapshotting {
141                for stats in all_export_stats.iter() {
142                    stats.set_snapshot_records_known(0);
143                    stats.set_snapshot_records_staged(0);
144                }
145            }
146            let mut local_partitions: Vec<_> = (0..key_value.partitions)
147                .filter_map(|p| {
148                    config
149                        .responsible_for(p)
150                        .then(|| TransactionalSnapshotProducer::new(p, key_value.clone()))
151                })
152                .collect();
153
154            // worker has no work to do
155            if local_partitions.is_empty() {
156                return;
157            }
158
159            info!(
160                ?config.worker_id,
161                "starting key-value load generator at {}",
162                resume_offset.offset,
163            );
164            cap.downgrade(&resume_offset);
165            progress_cap.downgrade(&resume_offset);
166            start_signal.await;
167            info!(?config.worker_id, "received key-value load generator start signal");
168
169            let local_snapshot_size = (u64::cast_from(local_partitions.len()))
170                * key_value.keys
171                * key_value.transactional_snapshot_rounds()
172                / key_value.partitions;
173
174            // Re-usable buffers.
175            let mut value_buffer: Vec<u8> = vec![0; usize::cast_from(key_value.value_size)];
176
177            let mut upper_offset = if snapshotting {
178                let snapshot_rounds = key_value.transactional_snapshot_rounds();
179                if stats_worker {
180                    for stats in all_export_stats.iter() {
181                        stats.set_offset_known(snapshot_rounds);
182                    }
183                }
184
185                // Downgrade to the snapshot frontier.
186                progress_cap.downgrade(&MzOffset::from(snapshot_rounds));
187
188                let mut emitted = 0;
189                for stats in snapshot_export_stats.iter() {
190                    stats.set_snapshot_records_known(local_snapshot_size);
191                }
192                // output_map can contain no outputs, which would leave output_indexes empty
193                let num_outputs = u64::cast_from(output_indexes.len()).max(1);
194                while local_partitions.iter().any(|si| !si.finished()) {
195                    for sp in local_partitions.iter_mut() {
196                        let mut emitted_all_exports = 0;
197                        for u in sp.produce_batch(&mut value_buffer, &output_indexes) {
198                            data_output.give_fueled(&cap, u).await;
199                            emitted_all_exports += 1;
200                        }
201                        // emitted_all_indexes is going to be some multiple of num_outputs;
202                        emitted += emitted_all_exports / num_outputs;
203                    }
204                    for stats in snapshot_export_stats.iter() {
205                        stats.set_snapshot_records_staged(emitted);
206                    }
207                }
208                // snapshotting is completed
209                snapshot_export_stats.clear();
210
211                cap.downgrade(&MzOffset::from(snapshot_rounds));
212                snapshot_rounds
213            } else {
214                cap.downgrade(&resume_offset);
215                progress_cap.downgrade(&resume_offset);
216                resume_offset.offset
217            };
218
219            let mut local_partitions: Vec<_> = (0..key_value.partitions)
220                .filter_map(|p| {
221                    config
222                        .responsible_for(p)
223                        .then(|| UpdateProducer::new(p, upper_offset, key_value.clone()))
224                })
225                .collect();
226            if !local_partitions.is_empty()
227                && (key_value.tick_interval.is_some() || !key_value.transactional_snapshot)
228            {
229                let mut interval = key_value.tick_interval.map(tokio::time::interval);
230
231                loop {
232                    if local_partitions.iter().all(|si| si.finished_quick()) {
233                        if let Some(interval) = &mut interval {
234                            interval.tick().await;
235                        } else {
236                            break;
237                        }
238                    }
239
240                    for up in local_partitions.iter_mut() {
241                        let (new_upper, iter) =
242                            up.produce_batch(&mut value_buffer, &output_indexes);
243                        upper_offset = new_upper;
244                        for u in iter {
245                            data_output.give_fueled(&cap, u).await;
246                        }
247                    }
248                    cap.downgrade(&MzOffset::from(upper_offset));
249                    progress_cap.downgrade(&MzOffset::from(upper_offset));
250                }
251            }
252            std::future::pending::<()>().await;
253        })
254    });
255
256    let status = export_ids
257        .into_iter()
258        .map(Some)
259        .chain(std::iter::once(None))
260        .map(|id| HealthStatusMessage {
261            id,
262            namespace: StatusNamespace::Generator,
263            update: HealthStatusUpdate::running(),
264        })
265        .collect::<Vec<_>>()
266        .to_stream(scope);
267    (
268        data_collections,
269        progress_stream,
270        status,
271        vec![button.press_on_drop(), stats_button],
272    )
273}
274
275/// An iterator that produces keys belonging to a partition.
276struct PartitionKeyIterator {
277    /// The partition.
278    partition: u64,
279    /// The number of partitions.
280    partitions: u64,
281    /// The key space.
282    keys: u64,
283    /// The next key.
284    next: u64,
285}
286
287impl PartitionKeyIterator {
288    fn new(partition: u64, partitions: u64, keys: u64, start_key: u64) -> Self {
289        assert_eq!(keys % partitions, 0);
290        PartitionKeyIterator {
291            partition,
292            partitions,
293            keys,
294            next: start_key,
295        }
296    }
297}
298
299impl Iterator for &mut PartitionKeyIterator {
300    type Item = u64;
301
302    fn next(&mut self) -> Option<Self::Item> {
303        let ret = self.next;
304        self.next = (self.next + self.partitions) % self.keys;
305        Some(ret)
306    }
307}
308
309/// Create `StdRng` seeded so identical values can be produced during resumption (during
310/// snapshotting or after).
311fn create_consistent_rng(source_seed: u64, offset: u64, partition: u64) -> StdRng {
312    let mut seed = [0; 32];
313    seed[0..8].copy_from_slice(&source_seed.to_le_bytes());
314    seed[8..16].copy_from_slice(&offset.to_le_bytes());
315    seed[16..24].copy_from_slice(&partition.to_le_bytes());
316    StdRng::from_seed(seed)
317}
318
319/// A struct that produces batches of data for the snapshotting phase.
320struct TransactionalSnapshotProducer {
321    /// The key iterator for the partition.
322    pi: PartitionKeyIterator,
323    /// The batch size.
324    batch_size: u64,
325    /// The number of batches produced in the current round.
326    produced_batches: u64,
327    /// The expected number of batches per round.
328    expected_batches: u64,
329    /// The current round of the offset
330    round: u64,
331    /// The total number of rounds.
332    snapshot_rounds: u64,
333    /// The rng for the current round.
334    rng: Option<StdRng>,
335    /// The source-level seed for the rng.
336    seed: u64,
337    /// Whether to include the offset or not.
338    include_offset: bool,
339}
340
341impl TransactionalSnapshotProducer {
342    fn new(partition: u64, key_value: KeyValueLoadGenerator) -> Self {
343        let snapshot_rounds = key_value.transactional_snapshot_rounds();
344
345        let KeyValueLoadGenerator {
346            partitions,
347            keys,
348            batch_size,
349            seed,
350            include_offset,
351            ..
352        } = key_value;
353
354        assert_eq!((keys / partitions) % batch_size, 0);
355        let pi = PartitionKeyIterator::new(
356            partition, partitions, keys, // The first start key is the partition.
357            partition,
358        );
359        TransactionalSnapshotProducer {
360            pi,
361            batch_size,
362            produced_batches: 0,
363            expected_batches: keys / partitions / batch_size,
364            round: 0,
365            snapshot_rounds,
366            rng: None,
367            seed,
368            include_offset: include_offset.is_some(),
369        }
370    }
371
372    /// If this partition is done snapshotting.
373    fn finished(&self) -> bool {
374        self.round >= self.snapshot_rounds
375    }
376
377    /// Produce a batch of message into `buffer`, of size `batch_size`. Advances the current
378    /// batch and round counter.
379    ///
380    /// The output iterator must be consumed fully for this method to be used correctly.
381    fn produce_batch<'a>(
382        &'a mut self,
383        value_buffer: &'a mut Vec<u8>,
384        output_indexes: &'a [usize],
385    ) -> impl Iterator<
386        Item = (
387            (usize, Result<SourceMessage, DataflowError>),
388            MzOffset,
389            Diff,
390        ),
391    > + 'a {
392        let finished = self.finished();
393
394        let rng = self
395            .rng
396            .get_or_insert_with(|| create_consistent_rng(self.seed, self.round, self.pi.partition));
397
398        let partition: u64 = self.pi.partition;
399        let iter_round: u64 = self.round;
400        let include_offset: bool = self.include_offset;
401        let iter = self
402            .pi
403            .take(if finished {
404                0
405            } else {
406                usize::cast_from(self.batch_size)
407            })
408            .flat_map(move |key| {
409                rng.fill_bytes(value_buffer.as_mut_slice());
410                let msg = Ok(SourceMessage {
411                    key: Row::pack_slice(&[Datum::UInt64(key)]),
412                    value: Row::pack_slice(&[Datum::UInt64(partition), Datum::Bytes(value_buffer)]),
413                    metadata: if include_offset {
414                        Row::pack(&[Datum::UInt64(iter_round)])
415                    } else {
416                        Row::default()
417                    },
418                });
419                output_indexes
420                    .iter()
421                    .repeat_clone(msg)
422                    .map(move |(idx, msg)| ((*idx, msg), MzOffset::from(iter_round), Diff::ONE))
423            });
424
425        if !finished {
426            self.produced_batches += 1;
427
428            if self.produced_batches == self.expected_batches {
429                self.round += 1;
430                self.produced_batches = 0;
431            }
432        }
433
434        iter
435    }
436}
437
438/// A struct that produces batches of data for the post-snapshotting phase.
439struct UpdateProducer {
440    /// The key iterator for the partition.
441    pi: PartitionKeyIterator,
442    /// The batch size.
443    batch_size: u64,
444    /// The next offset to produce updates at.
445    next_offset: u64,
446    /// The source-level seed for the rng.
447    seed: u64,
448    /// The number of offsets we expect to be part of the `transactional_snapshot`.
449    expected_quick_offsets: u64,
450    /// Whether to include the offset or not.
451    include_offset: bool,
452}
453
454impl UpdateProducer {
455    fn new(partition: u64, next_offset: u64, key_value: KeyValueLoadGenerator) -> Self {
456        let snapshot_rounds = key_value.transactional_snapshot_rounds();
457        let quick_rounds = key_value.non_transactional_snapshot_rounds();
458        let KeyValueLoadGenerator {
459            partitions,
460            keys,
461            batch_size,
462            seed,
463            include_offset,
464            ..
465        } = key_value;
466
467        // Each snapshot _round_ is associated with an offset, starting at 0. Therefore
468        // the `next_offset` - `snapshot_rounds` is the index of the next non-snapshot update
469        // batches we must produce. The start key for the that batch is that index
470        // multiplied by the batch size and the number of partitions.
471        let start_key =
472            (((next_offset - snapshot_rounds) * batch_size * partitions) + partition) % keys;
473
474        // We expect to emit 1 batch per offset. A round is emitting the full set of keys (in the
475        // partition. The number of keys divided by the batch size (and partitioned, so divided by
476        // the partition count) is the number of offsets in each _round_. We also add the number of
477        // snapshot rounds, which is also the number of offsets in the snapshot.
478        let expected_quick_offsets =
479            ((keys / partitions / batch_size) * quick_rounds) + snapshot_rounds;
480
481        let pi = PartitionKeyIterator::new(partition, partitions, keys, start_key);
482        UpdateProducer {
483            pi,
484            batch_size,
485            next_offset,
486            seed,
487            expected_quick_offsets,
488            include_offset: include_offset.is_some(),
489        }
490    }
491
492    /// If this partition is done producing quick updates.
493    fn finished_quick(&self) -> bool {
494        self.next_offset >= self.expected_quick_offsets
495    }
496
497    /// Produce a batch of message into `buffer`, of size `batch_size`. Advances the current
498    /// batch and round counter. Also returns the frontier after the batch.
499    ///
500    /// The output iterator must be consumed fully for this method to be used correctly.
501    fn produce_batch<'a>(
502        &'a mut self,
503        value_buffer: &'a mut Vec<u8>,
504        output_indexes: &'a [usize],
505    ) -> (
506        u64,
507        impl Iterator<
508            Item = (
509                (usize, Result<SourceMessage, DataflowError>),
510                MzOffset,
511                Diff,
512            ),
513        > + 'a,
514    ) {
515        let mut rng = create_consistent_rng(self.seed, self.next_offset, self.pi.partition);
516
517        let partition: u64 = self.pi.partition;
518        let iter_offset: u64 = self.next_offset;
519        let include_offset: bool = self.include_offset;
520        let iter = self
521            .pi
522            .take(usize::cast_from(self.batch_size))
523            .flat_map(move |key| {
524                rng.fill_bytes(value_buffer.as_mut_slice());
525                let msg = Ok(SourceMessage {
526                    key: Row::pack_slice(&[Datum::UInt64(key)]),
527                    value: Row::pack_slice(&[Datum::UInt64(partition), Datum::Bytes(value_buffer)]),
528                    metadata: if include_offset {
529                        Row::pack(&[Datum::UInt64(iter_offset)])
530                    } else {
531                        Row::default()
532                    },
533                });
534                output_indexes
535                    .iter()
536                    .repeat_clone(msg)
537                    .map(move |(idx, msg)| ((*idx, msg), MzOffset::from(iter_offset), Diff::ONE))
538            });
539
540        // Advance to the next offset.
541        self.next_offset += 1;
542        (self.next_offset, iter)
543    }
544}
545
546pub fn render_statistics_operator<G: Scope<Timestamp = MzOffset>>(
547    scope: &G,
548    config: &RawSourceCreationConfig,
549    committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
550) -> PressOnDropButton {
551    let id = config.id;
552    let builder =
553        AsyncOperatorBuilder::new(format!("key_value_loadgen_statistics:{id}"), scope.clone());
554    let offset_worker = config.responsible_for(0);
555    let source_statistics = config.statistics.clone();
556    let button = builder.build(move |caps| async move {
557        drop(caps);
558        if !offset_worker {
559            // Emit 0, to mark this worker as having started up correctly.
560            for stat in source_statistics.values() {
561                stat.set_offset_committed(0);
562                stat.set_offset_known(0);
563            }
564            return;
565        }
566        tokio::pin!(committed_uppers);
567        loop {
568            match committed_uppers.next().await {
569                Some(frontier) => {
570                    if let Some(offset) = frontier.as_option() {
571                        for stat in source_statistics.values() {
572                            stat.set_offset_committed(offset.offset);
573                            stat.set_offset_known(offset.offset);
574                        }
575                    }
576                }
577                None => return,
578            }
579        }
580    });
581    button.press_on_drop()
582}
583
584#[cfg(test)]
585mod test {
586    use super::*;
587
588    #[mz_ore::test]
589    fn test_key_value_loadgen_resume_upper() {
590        let up = UpdateProducer::new(
591            1, // partition 1
592            5, // resume upper of 5
593            KeyValueLoadGenerator {
594                keys: 126,
595                snapshot_rounds: 2,
596                transactional_snapshot: true,
597                value_size: 1234,
598                partitions: 3,
599                tick_interval: None,
600                batch_size: 2,
601                seed: 1234,
602                include_offset: None,
603            },
604        );
605
606        // The first key 3 rounds after the 2 snapshot rounds would be the 7th key (3 rounds of 2
607        // beforehand) produced for partition 1, so 13.
608        assert_eq!(up.pi.next, 19);
609
610        let up = UpdateProducer::new(
611            1,           // partition 1
612            5 + 126 / 2, // resume upper of 5 after a full set of keys has been produced.
613            KeyValueLoadGenerator {
614                keys: 126,
615                snapshot_rounds: 2,
616                transactional_snapshot: true,
617                value_size: 1234,
618                partitions: 3,
619                tick_interval: None,
620                batch_size: 2,
621                seed: 1234,
622                include_offset: None,
623            },
624        );
625
626        assert_eq!(up.pi.next, 19);
627    }
628
629    #[mz_ore::test]
630    fn test_key_value_loadgen_part_iter() {
631        let mut pi = PartitionKeyIterator::new(
632            1,   // partition 1
633            3,   // of 3 partitions
634            126, // 126 keys.
635            1,   // Start at the beginning
636        );
637
638        assert_eq!(1, Iterator::next(&mut &mut pi).unwrap());
639        assert_eq!(4, Iterator::next(&mut &mut pi).unwrap());
640
641        // After a full round, ensure we wrap around correctly;
642        let _ = pi.take((126 / 3) - 2).count();
643        assert_eq!(pi.next, 1);
644    }
645}