mz_storage/source/generator/
key_value.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::convert::Infallible;
12use std::sync::Arc;
13
14use differential_dataflow::AsCollection;
15use futures::stream::StreamExt;
16use itertools::Itertools;
17use mz_ore::cast::CastFrom;
18use mz_ore::iter::IteratorExt;
19use mz_repr::{Datum, Diff, GlobalId, Row};
20use mz_storage_types::errors::DataflowError;
21use mz_storage_types::sources::load_generator::{KeyValueLoadGenerator, LoadGeneratorOutput};
22use mz_storage_types::sources::{MzOffset, SourceTimestamp};
23use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
24use mz_timely_util::containers::stack::AccountedStackBuilder;
25use rand::rngs::StdRng;
26use rand::{RngCore, SeedableRng};
27use timely::container::CapacityContainerBuilder;
28use timely::dataflow::operators::ToStream;
29use timely::dataflow::operators::core::Partition;
30use timely::dataflow::{Scope, Stream};
31use timely::progress::{Antichain, Timestamp};
32use tracing::info;
33
34use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
35use crate::source::types::{SignaledFuture, StackedCollection};
36use crate::source::{RawSourceCreationConfig, SourceMessage};
37
38pub fn render<G: Scope<Timestamp = MzOffset>>(
39    key_value: KeyValueLoadGenerator,
40    scope: &mut G,
41    config: RawSourceCreationConfig,
42    committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
43    start_signal: impl std::future::Future<Output = ()> + 'static,
44    output_map: BTreeMap<LoadGeneratorOutput, Vec<usize>>,
45    idx_to_exportid: BTreeMap<usize, GlobalId>,
46) -> (
47    BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
48    Stream<G, Infallible>,
49    Stream<G, HealthStatusMessage>,
50    Vec<PressOnDropButton>,
51) {
52    // known and comitted offsets are recorded in the stats operator
53    // It's easier to have this operator record the metrics rather than trying to special case it below.
54    let stats_button = render_statistics_operator(scope, &config, committed_uppers);
55
56    let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone());
57
58    let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
59    let partition_count = u64::cast_from(config.source_exports.len());
60    let data_streams: Vec<_> = stream.partition::<CapacityContainerBuilder<_>, _, _>(
61        partition_count,
62        |((output, data), time, diff): &(
63            (usize, Result<SourceMessage, DataflowError>),
64            MzOffset,
65            Diff,
66        )| {
67            let output = u64::cast_from(*output);
68            (output, (data.clone(), time.clone(), diff.clone()))
69        },
70    );
71    let mut data_collections = BTreeMap::new();
72    for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
73        data_collections.insert(*id, data_stream.as_collection());
74    }
75
76    let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
77
78    let busy_signal = Arc::clone(&config.busy_signal);
79
80    // The key-value load generator only has one 'output' stream, which is the default output
81    // that needs to be emitted to all output indexes.
82    // Contains the `SourceStatistics` entries for exports that require a snapshot.
83    let mut snapshot_export_stats = vec![];
84
85    // We can't just iterate over config.statistics (which came from StorageState.aggregated_statistics)
86    // AggregateStatistics will contain SourceStatistics for all exports of key-value load generator, not
87    // just the exports of the config.id.
88    let mut all_export_stats = vec![];
89    let output_indexes = output_map
90        .get(&LoadGeneratorOutput::Default)
91        .expect("default output")
92        .clone();
93    for export_id in output_indexes.iter().map(|idx| {
94        idx_to_exportid
95            .get(idx)
96            .expect("mapping of output index to export id")
97    }) {
98        let export_resume_upper = config
99            .source_resume_uppers
100            .get(export_id)
101            .map(|rows| Antichain::from_iter(rows.iter().map(MzOffset::decode_row)))
102            .expect("all source exports must be present in resume uppers");
103        tracing::warn!(
104            "source_id={} export_id={} worker_id={} resume_upper={:?}",
105            config.id,
106            export_id,
107            config.worker_id,
108            export_resume_upper
109        );
110        let export_stats = config
111            .statistics
112            .get(export_id)
113            .expect("statistics initialized for export")
114            .clone();
115        if export_resume_upper.as_ref() == &[MzOffset::minimum()] {
116            snapshot_export_stats.push(export_stats.clone());
117        }
118        all_export_stats.push(export_stats);
119    }
120
121    let button = builder.build(move |caps| {
122        SignaledFuture::new(busy_signal, async move {
123            let [mut cap, mut progress_cap]: [_; 2] = caps.try_into().unwrap();
124            let stats_worker = config.responsible_for(0);
125            let resume_upper = Antichain::from_iter(
126                config
127                    .source_resume_uppers
128                    .values()
129                    .flat_map(|f| f.iter().map(MzOffset::decode_row)),
130            );
131
132            let Some(resume_offset) = resume_upper.into_option() else {
133                return;
134            };
135            let snapshotting = resume_offset.offset == 0;
136            // A worker *must* emit a count even if not responsible for snapshotting a table
137            // as statistic summarization will return null if any worker hasn't set a value.
138            // This will also reset snapshot stats for any exports not snapshotting.
139            if snapshotting {
140                for stats in all_export_stats.iter() {
141                    stats.set_snapshot_records_known(0);
142                    stats.set_snapshot_records_staged(0);
143                }
144            }
145            let mut local_partitions: Vec<_> = (0..key_value.partitions)
146                .filter_map(|p| {
147                    config
148                        .responsible_for(p)
149                        .then(|| TransactionalSnapshotProducer::new(p, key_value.clone()))
150                })
151                .collect();
152
153            // worker has no work to do
154            if local_partitions.is_empty() {
155                return;
156            }
157
158            info!(
159                ?config.worker_id,
160                "starting key-value load generator at {}",
161                resume_offset.offset,
162            );
163            cap.downgrade(&resume_offset);
164            progress_cap.downgrade(&resume_offset);
165            start_signal.await;
166            info!(?config.worker_id, "received key-value load generator start signal");
167
168            let local_snapshot_size = (u64::cast_from(local_partitions.len()))
169                * key_value.keys
170                * key_value.transactional_snapshot_rounds()
171                / key_value.partitions;
172
173            // Re-usable buffers.
174            let mut value_buffer: Vec<u8> = vec![0; usize::cast_from(key_value.value_size)];
175
176            let mut upper_offset = if snapshotting {
177                let snapshot_rounds = key_value.transactional_snapshot_rounds();
178                if stats_worker {
179                    for stats in all_export_stats.iter() {
180                        stats.set_offset_known(snapshot_rounds);
181                    }
182                }
183
184                // Downgrade to the snapshot frontier.
185                progress_cap.downgrade(&MzOffset::from(snapshot_rounds));
186
187                let mut emitted = 0;
188                for stats in snapshot_export_stats.iter() {
189                    stats.set_snapshot_records_known(local_snapshot_size);
190                }
191                // output_map can contain no outputs, which would leave output_indexes empty
192                let num_outputs = u64::cast_from(output_indexes.len()).max(1);
193                while local_partitions.iter().any(|si| !si.finished()) {
194                    for sp in local_partitions.iter_mut() {
195                        let mut emitted_all_exports = 0;
196                        for u in sp.produce_batch(&mut value_buffer, &output_indexes) {
197                            data_output.give_fueled(&cap, u).await;
198                            emitted_all_exports += 1;
199                        }
200                        // emitted_all_indexes is going to be some multiple of num_outputs;
201                        emitted += emitted_all_exports / num_outputs;
202                    }
203                    for stats in snapshot_export_stats.iter() {
204                        stats.set_snapshot_records_staged(emitted);
205                    }
206                }
207                // snapshotting is completed
208                snapshot_export_stats.clear();
209
210                cap.downgrade(&MzOffset::from(snapshot_rounds));
211                snapshot_rounds
212            } else {
213                cap.downgrade(&resume_offset);
214                progress_cap.downgrade(&resume_offset);
215                resume_offset.offset
216            };
217
218            let mut local_partitions: Vec<_> = (0..key_value.partitions)
219                .filter_map(|p| {
220                    config
221                        .responsible_for(p)
222                        .then(|| UpdateProducer::new(p, upper_offset, key_value.clone()))
223                })
224                .collect();
225            if !local_partitions.is_empty()
226                && (key_value.tick_interval.is_some() || !key_value.transactional_snapshot)
227            {
228                let mut interval = key_value.tick_interval.map(tokio::time::interval);
229
230                loop {
231                    if local_partitions.iter().all(|si| si.finished_quick()) {
232                        if let Some(interval) = &mut interval {
233                            interval.tick().await;
234                        } else {
235                            break;
236                        }
237                    }
238
239                    for up in local_partitions.iter_mut() {
240                        let (new_upper, iter) =
241                            up.produce_batch(&mut value_buffer, &output_indexes);
242                        upper_offset = new_upper;
243                        for u in iter {
244                            data_output.give_fueled(&cap, u).await;
245                        }
246                    }
247                    cap.downgrade(&MzOffset::from(upper_offset));
248                    progress_cap.downgrade(&MzOffset::from(upper_offset));
249                }
250            }
251            std::future::pending::<()>().await;
252        })
253    });
254
255    let status = [HealthStatusMessage {
256        id: None,
257        namespace: StatusNamespace::Generator,
258        update: HealthStatusUpdate::running(),
259    }]
260    .to_stream(scope);
261    (
262        data_collections,
263        progress_stream,
264        status,
265        vec![button.press_on_drop(), stats_button],
266    )
267}
268
269/// An iterator that produces keys belonging to a partition.
270struct PartitionKeyIterator {
271    /// The partition.
272    partition: u64,
273    /// The number of partitions.
274    partitions: u64,
275    /// The key space.
276    keys: u64,
277    /// The next key.
278    next: u64,
279}
280
281impl PartitionKeyIterator {
282    fn new(partition: u64, partitions: u64, keys: u64, start_key: u64) -> Self {
283        assert_eq!(keys % partitions, 0);
284        PartitionKeyIterator {
285            partition,
286            partitions,
287            keys,
288            next: start_key,
289        }
290    }
291}
292
293impl Iterator for &mut PartitionKeyIterator {
294    type Item = u64;
295
296    fn next(&mut self) -> Option<Self::Item> {
297        let ret = self.next;
298        self.next = (self.next + self.partitions) % self.keys;
299        Some(ret)
300    }
301}
302
303/// Create `StdRng` seeded so identical values can be produced during resumption (during
304/// snapshotting or after).
305fn create_consistent_rng(source_seed: u64, offset: u64, partition: u64) -> StdRng {
306    let mut seed = [0; 32];
307    seed[0..8].copy_from_slice(&source_seed.to_le_bytes());
308    seed[8..16].copy_from_slice(&offset.to_le_bytes());
309    seed[16..24].copy_from_slice(&partition.to_le_bytes());
310    StdRng::from_seed(seed)
311}
312
313/// A struct that produces batches of data for the snapshotting phase.
314struct TransactionalSnapshotProducer {
315    /// The key iterator for the partition.
316    pi: PartitionKeyIterator,
317    /// The batch size.
318    batch_size: u64,
319    /// The number of batches produced in the current round.
320    produced_batches: u64,
321    /// The expected number of batches per round.
322    expected_batches: u64,
323    /// The current round of the offset
324    round: u64,
325    /// The total number of rounds.
326    snapshot_rounds: u64,
327    /// The rng for the current round.
328    rng: Option<StdRng>,
329    /// The source-level seed for the rng.
330    seed: u64,
331    /// Whether to include the offset or not.
332    include_offset: bool,
333}
334
335impl TransactionalSnapshotProducer {
336    fn new(partition: u64, key_value: KeyValueLoadGenerator) -> Self {
337        let snapshot_rounds = key_value.transactional_snapshot_rounds();
338
339        let KeyValueLoadGenerator {
340            partitions,
341            keys,
342            batch_size,
343            seed,
344            include_offset,
345            ..
346        } = key_value;
347
348        assert_eq!((keys / partitions) % batch_size, 0);
349        let pi = PartitionKeyIterator::new(
350            partition, partitions, keys, // The first start key is the partition.
351            partition,
352        );
353        TransactionalSnapshotProducer {
354            pi,
355            batch_size,
356            produced_batches: 0,
357            expected_batches: keys / partitions / batch_size,
358            round: 0,
359            snapshot_rounds,
360            rng: None,
361            seed,
362            include_offset: include_offset.is_some(),
363        }
364    }
365
366    /// If this partition is done snapshotting.
367    fn finished(&self) -> bool {
368        self.round >= self.snapshot_rounds
369    }
370
371    /// Produce a batch of message into `buffer`, of size `batch_size`. Advances the current
372    /// batch and round counter.
373    ///
374    /// The output iterator must be consumed fully for this method to be used correctly.
375    fn produce_batch<'a>(
376        &'a mut self,
377        value_buffer: &'a mut Vec<u8>,
378        output_indexes: &'a [usize],
379    ) -> impl Iterator<
380        Item = (
381            (usize, Result<SourceMessage, DataflowError>),
382            MzOffset,
383            Diff,
384        ),
385    > + 'a {
386        let finished = self.finished();
387
388        let rng = self
389            .rng
390            .get_or_insert_with(|| create_consistent_rng(self.seed, self.round, self.pi.partition));
391
392        let partition: u64 = self.pi.partition;
393        let iter_round: u64 = self.round;
394        let include_offset: bool = self.include_offset;
395        let iter = self
396            .pi
397            .take(if finished {
398                0
399            } else {
400                usize::cast_from(self.batch_size)
401            })
402            .flat_map(move |key| {
403                rng.fill_bytes(value_buffer.as_mut_slice());
404                let msg = Ok(SourceMessage {
405                    key: Row::pack_slice(&[Datum::UInt64(key)]),
406                    value: Row::pack_slice(&[Datum::UInt64(partition), Datum::Bytes(value_buffer)]),
407                    metadata: if include_offset {
408                        Row::pack(&[Datum::UInt64(iter_round)])
409                    } else {
410                        Row::default()
411                    },
412                });
413                output_indexes
414                    .iter()
415                    .repeat_clone(msg)
416                    .map(move |(idx, msg)| ((*idx, msg), MzOffset::from(iter_round), Diff::ONE))
417            });
418
419        if !finished {
420            self.produced_batches += 1;
421
422            if self.produced_batches == self.expected_batches {
423                self.round += 1;
424                self.produced_batches = 0;
425            }
426        }
427
428        iter
429    }
430}
431
432/// A struct that produces batches of data for the post-snapshotting phase.
433struct UpdateProducer {
434    /// The key iterator for the partition.
435    pi: PartitionKeyIterator,
436    /// The batch size.
437    batch_size: u64,
438    /// The next offset to produce updates at.
439    next_offset: u64,
440    /// The source-level seed for the rng.
441    seed: u64,
442    /// The number of offsets we expect to be part of the `transactional_snapshot`.
443    expected_quick_offsets: u64,
444    /// Whether to include the offset or not.
445    include_offset: bool,
446}
447
448impl UpdateProducer {
449    fn new(partition: u64, next_offset: u64, key_value: KeyValueLoadGenerator) -> Self {
450        let snapshot_rounds = key_value.transactional_snapshot_rounds();
451        let quick_rounds = key_value.non_transactional_snapshot_rounds();
452        let KeyValueLoadGenerator {
453            partitions,
454            keys,
455            batch_size,
456            seed,
457            include_offset,
458            ..
459        } = key_value;
460
461        // Each snapshot _round_ is associated with an offset, starting at 0. Therefore
462        // the `next_offset` - `snapshot_rounds` is the index of the next non-snapshot update
463        // batches we must produce. The start key for the that batch is that index
464        // multiplied by the batch size and the number of partitions.
465        let start_key =
466            (((next_offset - snapshot_rounds) * batch_size * partitions) + partition) % keys;
467
468        // We expect to emit 1 batch per offset. A round is emitting the full set of keys (in the
469        // partition. The number of keys divided by the batch size (and partitioned, so divided by
470        // the partition count) is the number of offsets in each _round_. We also add the number of
471        // snapshot rounds, which is also the number of offsets in the snapshot.
472        let expected_quick_offsets =
473            ((keys / partitions / batch_size) * quick_rounds) + snapshot_rounds;
474
475        let pi = PartitionKeyIterator::new(partition, partitions, keys, start_key);
476        UpdateProducer {
477            pi,
478            batch_size,
479            next_offset,
480            seed,
481            expected_quick_offsets,
482            include_offset: include_offset.is_some(),
483        }
484    }
485
486    /// If this partition is done producing quick updates.
487    fn finished_quick(&self) -> bool {
488        self.next_offset >= self.expected_quick_offsets
489    }
490
491    /// Produce a batch of message into `buffer`, of size `batch_size`. Advances the current
492    /// batch and round counter. Also returns the frontier after the batch.
493    ///
494    /// The output iterator must be consumed fully for this method to be used correctly.
495    fn produce_batch<'a>(
496        &'a mut self,
497        value_buffer: &'a mut Vec<u8>,
498        output_indexes: &'a [usize],
499    ) -> (
500        u64,
501        impl Iterator<
502            Item = (
503                (usize, Result<SourceMessage, DataflowError>),
504                MzOffset,
505                Diff,
506            ),
507        > + 'a,
508    ) {
509        let mut rng = create_consistent_rng(self.seed, self.next_offset, self.pi.partition);
510
511        let partition: u64 = self.pi.partition;
512        let iter_offset: u64 = self.next_offset;
513        let include_offset: bool = self.include_offset;
514        let iter = self
515            .pi
516            .take(usize::cast_from(self.batch_size))
517            .flat_map(move |key| {
518                rng.fill_bytes(value_buffer.as_mut_slice());
519                let msg = Ok(SourceMessage {
520                    key: Row::pack_slice(&[Datum::UInt64(key)]),
521                    value: Row::pack_slice(&[Datum::UInt64(partition), Datum::Bytes(value_buffer)]),
522                    metadata: if include_offset {
523                        Row::pack(&[Datum::UInt64(iter_offset)])
524                    } else {
525                        Row::default()
526                    },
527                });
528                output_indexes
529                    .iter()
530                    .repeat_clone(msg)
531                    .map(move |(idx, msg)| ((*idx, msg), MzOffset::from(iter_offset), Diff::ONE))
532            });
533
534        // Advance to the next offset.
535        self.next_offset += 1;
536        (self.next_offset, iter)
537    }
538}
539
540pub fn render_statistics_operator<G: Scope<Timestamp = MzOffset>>(
541    scope: &G,
542    config: &RawSourceCreationConfig,
543    committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
544) -> PressOnDropButton {
545    let id = config.id;
546    let builder =
547        AsyncOperatorBuilder::new(format!("key_value_loadgen_statistics:{id}"), scope.clone());
548    let offset_worker = config.responsible_for(0);
549    let source_statistics = config.statistics.clone();
550    let button = builder.build(move |caps| async move {
551        drop(caps);
552        if !offset_worker {
553            // Emit 0, to mark this worker as having started up correctly.
554            for stat in source_statistics.values() {
555                stat.set_offset_committed(0);
556                stat.set_offset_known(0);
557            }
558            return;
559        }
560        tokio::pin!(committed_uppers);
561        loop {
562            match committed_uppers.next().await {
563                Some(frontier) => {
564                    if let Some(offset) = frontier.as_option() {
565                        for stat in source_statistics.values() {
566                            stat.set_offset_committed(offset.offset);
567                            stat.set_offset_known(offset.offset);
568                        }
569                    }
570                }
571                None => return,
572            }
573        }
574    });
575    button.press_on_drop()
576}
577
578#[cfg(test)]
579mod test {
580    use super::*;
581
582    #[mz_ore::test]
583    fn test_key_value_loadgen_resume_upper() {
584        let up = UpdateProducer::new(
585            1, // partition 1
586            5, // resume upper of 5
587            KeyValueLoadGenerator {
588                keys: 126,
589                snapshot_rounds: 2,
590                transactional_snapshot: true,
591                value_size: 1234,
592                partitions: 3,
593                tick_interval: None,
594                batch_size: 2,
595                seed: 1234,
596                include_offset: None,
597            },
598        );
599
600        // The first key 3 rounds after the 2 snapshot rounds would be the 7th key (3 rounds of 2
601        // beforehand) produced for partition 1, so 13.
602        assert_eq!(up.pi.next, 19);
603
604        let up = UpdateProducer::new(
605            1,           // partition 1
606            5 + 126 / 2, // resume upper of 5 after a full set of keys has been produced.
607            KeyValueLoadGenerator {
608                keys: 126,
609                snapshot_rounds: 2,
610                transactional_snapshot: true,
611                value_size: 1234,
612                partitions: 3,
613                tick_interval: None,
614                batch_size: 2,
615                seed: 1234,
616                include_offset: None,
617            },
618        );
619
620        assert_eq!(up.pi.next, 19);
621    }
622
623    #[mz_ore::test]
624    fn test_key_value_loadgen_part_iter() {
625        let mut pi = PartitionKeyIterator::new(
626            1,   // partition 1
627            3,   // of 3 partitions
628            126, // 126 keys.
629            1,   // Start at the beginning
630        );
631
632        assert_eq!(1, Iterator::next(&mut &mut pi).unwrap());
633        assert_eq!(4, Iterator::next(&mut &mut pi).unwrap());
634
635        // After a full round, ensure we wrap around correctly;
636        let _ = pi.take((126 / 3) - 2).count();
637        assert_eq!(pi.next, 1);
638    }
639}