Skip to main content

mz_storage/source/generator/
key_value.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::convert::Infallible;
12use std::sync::Arc;
13
14use differential_dataflow::AsCollection;
15use futures::stream::StreamExt;
16use itertools::Itertools;
17use mz_ore::cast::CastFrom;
18use mz_ore::iter::IteratorExt;
19use mz_repr::{Datum, Diff, GlobalId, Row};
20use mz_storage_types::errors::DataflowError;
21use mz_storage_types::sources::load_generator::{KeyValueLoadGenerator, LoadGeneratorOutput};
22use mz_storage_types::sources::{MzOffset, SourceTimestamp};
23use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
24use mz_timely_util::containers::stack::FueledBuilder;
25use rand_8::rngs::StdRng;
26use rand_8::{RngCore, SeedableRng};
27use timely::container::CapacityContainerBuilder;
28use timely::dataflow::operators::core::Partition;
29use timely::dataflow::operators::vec::ToStream;
30use timely::dataflow::{Scope, StreamVec};
31use timely::progress::{Antichain, Timestamp};
32use tracing::info;
33
34use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
35use crate::source::types::{FuelSize, SignaledFuture, StackedCollection};
36use crate::source::{RawSourceCreationConfig, SourceMessage};
37
38pub fn render<'scope>(
39    key_value: KeyValueLoadGenerator,
40    scope: Scope<'scope, MzOffset>,
41    config: RawSourceCreationConfig,
42    committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
43    start_signal: impl std::future::Future<Output = ()> + 'static,
44    output_map: BTreeMap<LoadGeneratorOutput, Vec<usize>>,
45    idx_to_exportid: BTreeMap<usize, GlobalId>,
46) -> (
47    BTreeMap<GlobalId, StackedCollection<'scope, MzOffset, Result<SourceMessage, DataflowError>>>,
48    StreamVec<'scope, MzOffset, Infallible>,
49    StreamVec<'scope, MzOffset, HealthStatusMessage>,
50    Vec<PressOnDropButton>,
51) {
52    // known and comitted offsets are recorded in the stats operator
53    // It's easier to have this operator record the metrics rather than trying to special case it below.
54    let stats_button = render_statistics_operator(scope, &config, committed_uppers);
55
56    let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone());
57
58    let (data_output, stream) = builder.new_output::<FueledBuilder<
59        CapacityContainerBuilder<
60            Vec<(
61                (usize, Result<SourceMessage, DataflowError>),
62                MzOffset,
63                Diff,
64            )>,
65        >,
66    >>();
67    let export_ids: Vec<_> = config.source_exports.keys().copied().collect();
68    let partition_count = u64::cast_from(export_ids.len());
69    let data_streams: Vec<_> = stream.partition::<CapacityContainerBuilder<_>, _, _>(
70        partition_count,
71        |((output, data), time, diff): (
72            (usize, Result<SourceMessage, DataflowError>),
73            MzOffset,
74            Diff,
75        )| {
76            let output = u64::cast_from(output);
77            (output, (data, time, diff))
78        },
79    );
80    let mut data_collections = BTreeMap::new();
81    for (id, data_stream) in export_ids.iter().zip_eq(data_streams) {
82        data_collections.insert(*id, data_stream.as_collection());
83    }
84
85    let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
86
87    let busy_signal = Arc::clone(&config.busy_signal);
88
89    // The key-value load generator only has one 'output' stream, which is the default output
90    // that needs to be emitted to all output indexes.
91    // Contains the `SourceStatistics` entries for exports that require a snapshot.
92    let mut snapshot_export_stats = vec![];
93
94    // We can't just iterate over config.statistics (which came from StorageState.aggregated_statistics)
95    // AggregateStatistics will contain SourceStatistics for all exports of key-value load generator, not
96    // just the exports of the config.id.
97    let mut all_export_stats = vec![];
98    let output_indexes = output_map
99        .get(&LoadGeneratorOutput::Default)
100        .expect("default output")
101        .clone();
102    for export_id in output_indexes.iter().map(|idx| {
103        idx_to_exportid
104            .get(idx)
105            .expect("mapping of output index to export id")
106    }) {
107        let export_resume_upper = config
108            .source_resume_uppers
109            .get(export_id)
110            .map(|rows| Antichain::from_iter(rows.iter().map(MzOffset::decode_row)))
111            .expect("all source exports must be present in resume uppers");
112        tracing::warn!(
113            "source_id={} export_id={} worker_id={} resume_upper={:?}",
114            config.id,
115            export_id,
116            config.worker_id,
117            export_resume_upper
118        );
119        let export_stats = config
120            .statistics
121            .get(export_id)
122            .expect("statistics initialized for export")
123            .clone();
124        if export_resume_upper.as_ref() == &[MzOffset::minimum()] {
125            snapshot_export_stats.push(export_stats.clone());
126        }
127        all_export_stats.push(export_stats);
128    }
129
130    let button = builder.build(move |caps| {
131        SignaledFuture::new(busy_signal, async move {
132            let [mut cap, mut progress_cap]: [_; 2] = caps.try_into().unwrap();
133            let stats_worker = config.responsible_for(0);
134            let resume_upper = Antichain::from_iter(
135                config
136                    .source_resume_uppers
137                    .values()
138                    .flat_map(|f| f.iter().map(MzOffset::decode_row)),
139            );
140
141            let Some(resume_offset) = resume_upper.into_option() else {
142                return;
143            };
144            let snapshotting = resume_offset.offset == 0;
145            // A worker *must* emit a count even if not responsible for snapshotting a table
146            // as statistic summarization will return null if any worker hasn't set a value.
147            // This will also reset snapshot stats for any exports not snapshotting.
148            if snapshotting {
149                for stats in all_export_stats.iter() {
150                    stats.set_snapshot_records_known(0);
151                    stats.set_snapshot_records_staged(0);
152                }
153            }
154            let mut local_partitions: Vec<_> = (0..key_value.partitions)
155                .filter_map(|p| {
156                    config
157                        .responsible_for(p)
158                        .then(|| TransactionalSnapshotProducer::new(p, key_value.clone()))
159                })
160                .collect();
161
162            // worker has no work to do
163            if local_partitions.is_empty() {
164                return;
165            }
166
167            info!(
168                ?config.worker_id,
169                "starting key-value load generator at {}",
170                resume_offset.offset,
171            );
172            cap.downgrade(&resume_offset);
173            progress_cap.downgrade(&resume_offset);
174            start_signal.await;
175            info!(?config.worker_id, "received key-value load generator start signal");
176
177            let local_snapshot_size = (u64::cast_from(local_partitions.len()))
178                * key_value.keys
179                * key_value.transactional_snapshot_rounds()
180                / key_value.partitions;
181
182            // Re-usable buffers.
183            let mut value_buffer: Vec<u8> = vec![0; usize::cast_from(key_value.value_size)];
184
185            let mut upper_offset = if snapshotting {
186                let snapshot_rounds = key_value.transactional_snapshot_rounds();
187                if stats_worker {
188                    for stats in all_export_stats.iter() {
189                        stats.set_offset_known(snapshot_rounds);
190                    }
191                }
192
193                // Downgrade to the snapshot frontier.
194                progress_cap.downgrade(&MzOffset::from(snapshot_rounds));
195
196                let mut emitted = 0;
197                for stats in snapshot_export_stats.iter() {
198                    stats.set_snapshot_records_known(local_snapshot_size);
199                }
200                // output_map can contain no outputs, which would leave output_indexes empty
201                let num_outputs = u64::cast_from(output_indexes.len()).max(1);
202                while local_partitions.iter().any(|si| !si.finished()) {
203                    for sp in local_partitions.iter_mut() {
204                        let mut emitted_all_exports = 0;
205                        for u in sp.produce_batch(&mut value_buffer, &output_indexes) {
206                            let size = u.fuel_size();
207                            data_output.give_fueled(&cap, u, size).await;
208                            emitted_all_exports += 1;
209                        }
210                        // emitted_all_indexes is going to be some multiple of num_outputs;
211                        emitted += emitted_all_exports / num_outputs;
212                    }
213                    for stats in snapshot_export_stats.iter() {
214                        stats.set_snapshot_records_staged(emitted);
215                    }
216                }
217                // snapshotting is completed
218                snapshot_export_stats.clear();
219
220                cap.downgrade(&MzOffset::from(snapshot_rounds));
221                snapshot_rounds
222            } else {
223                cap.downgrade(&resume_offset);
224                progress_cap.downgrade(&resume_offset);
225                resume_offset.offset
226            };
227
228            let mut local_partitions: Vec<_> = (0..key_value.partitions)
229                .filter_map(|p| {
230                    config
231                        .responsible_for(p)
232                        .then(|| UpdateProducer::new(p, upper_offset, key_value.clone()))
233                })
234                .collect();
235            if !local_partitions.is_empty()
236                && (key_value.tick_interval.is_some() || !key_value.transactional_snapshot)
237            {
238                let mut interval = key_value.tick_interval.map(tokio::time::interval);
239
240                loop {
241                    if local_partitions.iter().all(|si| si.finished_quick()) {
242                        if let Some(interval) = &mut interval {
243                            interval.tick().await;
244                        } else {
245                            break;
246                        }
247                    }
248
249                    for up in local_partitions.iter_mut() {
250                        let (new_upper, iter) =
251                            up.produce_batch(&mut value_buffer, &output_indexes);
252                        upper_offset = new_upper;
253                        for u in iter {
254                            let size = u.fuel_size();
255                            data_output.give_fueled(&cap, u, size).await;
256                        }
257                    }
258                    cap.downgrade(&MzOffset::from(upper_offset));
259                    progress_cap.downgrade(&MzOffset::from(upper_offset));
260                }
261            }
262            std::future::pending::<()>().await;
263        })
264    });
265
266    let status = export_ids
267        .into_iter()
268        .map(Some)
269        .chain(std::iter::once(None))
270        .map(|id| HealthStatusMessage {
271            id,
272            namespace: StatusNamespace::Generator,
273            update: HealthStatusUpdate::running(),
274        })
275        .collect::<Vec<_>>()
276        .to_stream(scope);
277    (
278        data_collections,
279        progress_stream,
280        status,
281        vec![button.press_on_drop(), stats_button],
282    )
283}
284
285/// An iterator that produces keys belonging to a partition.
286struct PartitionKeyIterator {
287    /// The partition.
288    partition: u64,
289    /// The number of partitions.
290    partitions: u64,
291    /// The key space.
292    keys: u64,
293    /// The next key.
294    next: u64,
295}
296
297impl PartitionKeyIterator {
298    fn new(partition: u64, partitions: u64, keys: u64, start_key: u64) -> Self {
299        assert_eq!(keys % partitions, 0);
300        PartitionKeyIterator {
301            partition,
302            partitions,
303            keys,
304            next: start_key,
305        }
306    }
307}
308
309impl Iterator for &mut PartitionKeyIterator {
310    type Item = u64;
311
312    fn next(&mut self) -> Option<Self::Item> {
313        let ret = self.next;
314        self.next = (self.next + self.partitions) % self.keys;
315        Some(ret)
316    }
317}
318
319/// Create `StdRng` seeded so identical values can be produced during resumption (during
320/// snapshotting or after).
321fn create_consistent_rng(source_seed: u64, offset: u64, partition: u64) -> StdRng {
322    let mut seed = [0; 32];
323    seed[0..8].copy_from_slice(&source_seed.to_le_bytes());
324    seed[8..16].copy_from_slice(&offset.to_le_bytes());
325    seed[16..24].copy_from_slice(&partition.to_le_bytes());
326    StdRng::from_seed(seed)
327}
328
329/// A struct that produces batches of data for the snapshotting phase.
330struct TransactionalSnapshotProducer {
331    /// The key iterator for the partition.
332    pi: PartitionKeyIterator,
333    /// The batch size.
334    batch_size: u64,
335    /// The number of batches produced in the current round.
336    produced_batches: u64,
337    /// The expected number of batches per round.
338    expected_batches: u64,
339    /// The current round of the offset
340    round: u64,
341    /// The total number of rounds.
342    snapshot_rounds: u64,
343    /// The rng for the current round.
344    rng: Option<StdRng>,
345    /// The source-level seed for the rng.
346    seed: u64,
347    /// Whether to include the offset or not.
348    include_offset: bool,
349}
350
351impl TransactionalSnapshotProducer {
352    fn new(partition: u64, key_value: KeyValueLoadGenerator) -> Self {
353        let snapshot_rounds = key_value.transactional_snapshot_rounds();
354
355        let KeyValueLoadGenerator {
356            partitions,
357            keys,
358            batch_size,
359            seed,
360            include_offset,
361            ..
362        } = key_value;
363
364        assert_eq!((keys / partitions) % batch_size, 0);
365        let pi = PartitionKeyIterator::new(
366            partition, partitions, keys, // The first start key is the partition.
367            partition,
368        );
369        TransactionalSnapshotProducer {
370            pi,
371            batch_size,
372            produced_batches: 0,
373            expected_batches: keys / partitions / batch_size,
374            round: 0,
375            snapshot_rounds,
376            rng: None,
377            seed,
378            include_offset: include_offset.is_some(),
379        }
380    }
381
382    /// If this partition is done snapshotting.
383    fn finished(&self) -> bool {
384        self.round >= self.snapshot_rounds
385    }
386
387    /// Produce a batch of message into `buffer`, of size `batch_size`. Advances the current
388    /// batch and round counter.
389    ///
390    /// The output iterator must be consumed fully for this method to be used correctly.
391    fn produce_batch<'a>(
392        &'a mut self,
393        value_buffer: &'a mut Vec<u8>,
394        output_indexes: &'a [usize],
395    ) -> impl Iterator<
396        Item = (
397            (usize, Result<SourceMessage, DataflowError>),
398            MzOffset,
399            Diff,
400        ),
401    > + 'a {
402        let finished = self.finished();
403
404        let rng = self
405            .rng
406            .get_or_insert_with(|| create_consistent_rng(self.seed, self.round, self.pi.partition));
407
408        let partition: u64 = self.pi.partition;
409        let iter_round: u64 = self.round;
410        let include_offset: bool = self.include_offset;
411        let iter = self
412            .pi
413            .take(if finished {
414                0
415            } else {
416                usize::cast_from(self.batch_size)
417            })
418            .flat_map(move |key| {
419                rng.fill_bytes(value_buffer.as_mut_slice());
420                let msg = Ok(SourceMessage {
421                    key: Row::pack_slice(&[Datum::UInt64(key)]),
422                    value: Row::pack_slice(&[Datum::UInt64(partition), Datum::Bytes(value_buffer)]),
423                    metadata: if include_offset {
424                        Row::pack(&[Datum::UInt64(iter_round)])
425                    } else {
426                        Row::default()
427                    },
428                });
429                output_indexes
430                    .iter()
431                    .repeat_clone(msg)
432                    .map(move |(idx, msg)| ((*idx, msg), MzOffset::from(iter_round), Diff::ONE))
433            });
434
435        if !finished {
436            self.produced_batches += 1;
437
438            if self.produced_batches == self.expected_batches {
439                self.round += 1;
440                self.produced_batches = 0;
441            }
442        }
443
444        iter
445    }
446}
447
448/// A struct that produces batches of data for the post-snapshotting phase.
449struct UpdateProducer {
450    /// The key iterator for the partition.
451    pi: PartitionKeyIterator,
452    /// The batch size.
453    batch_size: u64,
454    /// The next offset to produce updates at.
455    next_offset: u64,
456    /// The source-level seed for the rng.
457    seed: u64,
458    /// The number of offsets we expect to be part of the `transactional_snapshot`.
459    expected_quick_offsets: u64,
460    /// Whether to include the offset or not.
461    include_offset: bool,
462}
463
464impl UpdateProducer {
465    fn new(partition: u64, next_offset: u64, key_value: KeyValueLoadGenerator) -> Self {
466        let snapshot_rounds = key_value.transactional_snapshot_rounds();
467        let quick_rounds = key_value.non_transactional_snapshot_rounds();
468        let KeyValueLoadGenerator {
469            partitions,
470            keys,
471            batch_size,
472            seed,
473            include_offset,
474            ..
475        } = key_value;
476
477        // Each snapshot _round_ is associated with an offset, starting at 0. Therefore
478        // the `next_offset` - `snapshot_rounds` is the index of the next non-snapshot update
479        // batches we must produce. The start key for the that batch is that index
480        // multiplied by the batch size and the number of partitions.
481        let start_key =
482            (((next_offset - snapshot_rounds) * batch_size * partitions) + partition) % keys;
483
484        // We expect to emit 1 batch per offset. A round is emitting the full set of keys (in the
485        // partition. The number of keys divided by the batch size (and partitioned, so divided by
486        // the partition count) is the number of offsets in each _round_. We also add the number of
487        // snapshot rounds, which is also the number of offsets in the snapshot.
488        let expected_quick_offsets =
489            ((keys / partitions / batch_size) * quick_rounds) + snapshot_rounds;
490
491        let pi = PartitionKeyIterator::new(partition, partitions, keys, start_key);
492        UpdateProducer {
493            pi,
494            batch_size,
495            next_offset,
496            seed,
497            expected_quick_offsets,
498            include_offset: include_offset.is_some(),
499        }
500    }
501
502    /// If this partition is done producing quick updates.
503    fn finished_quick(&self) -> bool {
504        self.next_offset >= self.expected_quick_offsets
505    }
506
507    /// Produce a batch of message into `buffer`, of size `batch_size`. Advances the current
508    /// batch and round counter. Also returns the frontier after the batch.
509    ///
510    /// The output iterator must be consumed fully for this method to be used correctly.
511    fn produce_batch<'a>(
512        &'a mut self,
513        value_buffer: &'a mut Vec<u8>,
514        output_indexes: &'a [usize],
515    ) -> (
516        u64,
517        impl Iterator<
518            Item = (
519                (usize, Result<SourceMessage, DataflowError>),
520                MzOffset,
521                Diff,
522            ),
523        > + 'a,
524    ) {
525        let mut rng = create_consistent_rng(self.seed, self.next_offset, self.pi.partition);
526
527        let partition: u64 = self.pi.partition;
528        let iter_offset: u64 = self.next_offset;
529        let include_offset: bool = self.include_offset;
530        let iter = self
531            .pi
532            .take(usize::cast_from(self.batch_size))
533            .flat_map(move |key| {
534                rng.fill_bytes(value_buffer.as_mut_slice());
535                let msg = Ok(SourceMessage {
536                    key: Row::pack_slice(&[Datum::UInt64(key)]),
537                    value: Row::pack_slice(&[Datum::UInt64(partition), Datum::Bytes(value_buffer)]),
538                    metadata: if include_offset {
539                        Row::pack(&[Datum::UInt64(iter_offset)])
540                    } else {
541                        Row::default()
542                    },
543                });
544                output_indexes
545                    .iter()
546                    .repeat_clone(msg)
547                    .map(move |(idx, msg)| ((*idx, msg), MzOffset::from(iter_offset), Diff::ONE))
548            });
549
550        // Advance to the next offset.
551        self.next_offset += 1;
552        (self.next_offset, iter)
553    }
554}
555
556pub fn render_statistics_operator<'scope>(
557    scope: Scope<'scope, MzOffset>,
558    config: &RawSourceCreationConfig,
559    committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
560) -> PressOnDropButton {
561    let id = config.id;
562    let builder =
563        AsyncOperatorBuilder::new(format!("key_value_loadgen_statistics:{id}"), scope.clone());
564    let offset_worker = config.responsible_for(0);
565    let source_statistics = config.statistics.clone();
566    let button = builder.build(move |caps| async move {
567        drop(caps);
568        if !offset_worker {
569            // Emit 0, to mark this worker as having started up correctly.
570            for stat in source_statistics.values() {
571                stat.set_offset_committed(0);
572                stat.set_offset_known(0);
573            }
574            return;
575        }
576        tokio::pin!(committed_uppers);
577        loop {
578            match committed_uppers.next().await {
579                Some(frontier) => {
580                    if let Some(offset) = frontier.as_option() {
581                        for stat in source_statistics.values() {
582                            stat.set_offset_committed(offset.offset);
583                            stat.set_offset_known(offset.offset);
584                        }
585                    }
586                }
587                None => return,
588            }
589        }
590    });
591    button.press_on_drop()
592}
593
594#[cfg(test)]
595mod test {
596    use super::*;
597
598    #[mz_ore::test]
599    fn test_key_value_loadgen_resume_upper() {
600        let up = UpdateProducer::new(
601            1, // partition 1
602            5, // resume upper of 5
603            KeyValueLoadGenerator {
604                keys: 126,
605                snapshot_rounds: 2,
606                transactional_snapshot: true,
607                value_size: 1234,
608                partitions: 3,
609                tick_interval: None,
610                batch_size: 2,
611                seed: 1234,
612                include_offset: None,
613            },
614        );
615
616        // The first key 3 rounds after the 2 snapshot rounds would be the 7th key (3 rounds of 2
617        // beforehand) produced for partition 1, so 13.
618        assert_eq!(up.pi.next, 19);
619
620        let up = UpdateProducer::new(
621            1,           // partition 1
622            5 + 126 / 2, // resume upper of 5 after a full set of keys has been produced.
623            KeyValueLoadGenerator {
624                keys: 126,
625                snapshot_rounds: 2,
626                transactional_snapshot: true,
627                value_size: 1234,
628                partitions: 3,
629                tick_interval: None,
630                batch_size: 2,
631                seed: 1234,
632                include_offset: None,
633            },
634        );
635
636        assert_eq!(up.pi.next, 19);
637    }
638
639    #[mz_ore::test]
640    fn test_key_value_loadgen_part_iter() {
641        let mut pi = PartitionKeyIterator::new(
642            1,   // partition 1
643            3,   // of 3 partitions
644            126, // 126 keys.
645            1,   // Start at the beginning
646        );
647
648        assert_eq!(1, Iterator::next(&mut &mut pi).unwrap());
649        assert_eq!(4, Iterator::next(&mut &mut pi).unwrap());
650
651        // After a full round, ensure we wrap around correctly;
652        let _ = pi.take((126 / 3) - 2).count();
653        assert_eq!(pi.next, 1);
654    }
655}