mz_storage/source/generator/
key_value.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::convert::Infallible;
12use std::sync::Arc;
13
14use differential_dataflow::AsCollection;
15use futures::stream::StreamExt;
16use itertools::Itertools;
17use mz_ore::cast::CastFrom;
18use mz_ore::iter::IteratorExt;
19use mz_repr::{Datum, Diff, GlobalId, Row};
20use mz_storage_types::errors::DataflowError;
21use mz_storage_types::sources::load_generator::{KeyValueLoadGenerator, LoadGeneratorOutput};
22use mz_storage_types::sources::{MzOffset, SourceTimestamp};
23use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
24use mz_timely_util::containers::stack::AccountedStackBuilder;
25use rand::rngs::StdRng;
26use rand::{RngCore, SeedableRng};
27use timely::container::CapacityContainerBuilder;
28use timely::dataflow::operators::core::Partition;
29use timely::dataflow::operators::{Concat, ToStream};
30use timely::dataflow::{Scope, Stream};
31use timely::progress::Antichain;
32use tracing::info;
33
34use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
35use crate::source::types::{ProgressStatisticsUpdate, SignaledFuture, StackedCollection};
36use crate::source::{RawSourceCreationConfig, SourceMessage};
37
38pub fn render<G: Scope<Timestamp = MzOffset>>(
39    key_value: KeyValueLoadGenerator,
40    scope: &mut G,
41    config: RawSourceCreationConfig,
42    committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
43    start_signal: impl std::future::Future<Output = ()> + 'static,
44    output_map: BTreeMap<LoadGeneratorOutput, Vec<usize>>,
45) -> (
46    BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
47    Stream<G, Infallible>,
48    Stream<G, HealthStatusMessage>,
49    Stream<G, ProgressStatisticsUpdate>,
50    Vec<PressOnDropButton>,
51) {
52    let (steady_state_stats_stream, stats_button) =
53        render_statistics_operator(scope, &config, committed_uppers);
54
55    let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone());
56
57    let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
58    let partition_count = u64::cast_from(config.source_exports.len());
59    let data_streams: Vec<_> = stream.partition::<CapacityContainerBuilder<_>, _, _>(
60        partition_count,
61        |((output, data), time, diff): &(
62            (usize, Result<SourceMessage, DataflowError>),
63            MzOffset,
64            Diff,
65        )| {
66            let output = u64::cast_from(*output);
67            (output, (data.clone(), time.clone(), diff.clone()))
68        },
69    );
70    let mut data_collections = BTreeMap::new();
71    for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
72        data_collections.insert(*id, data_stream.as_collection());
73    }
74
75    let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
76    let (stats_output, stats_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
77
78    let busy_signal = Arc::clone(&config.busy_signal);
79    let button = builder.build(move |caps| {
80        SignaledFuture::new(busy_signal, async move {
81            let [mut cap, mut progress_cap, stats_cap]: [_; 3] = caps.try_into().unwrap();
82
83            let resume_upper = Antichain::from_iter(
84                config
85                    .source_resume_uppers
86                    .values()
87                    .flat_map(|f| f.iter().map(MzOffset::decode_row)),
88            );
89
90            let Some(resume_offset) = resume_upper.into_option() else {
91                return;
92            };
93
94            // The key-value load generator only has one 'output' stream, which is the default output
95            // that needs to be emitted to all output indexes.
96            let output_indexes = output_map
97                .get(&LoadGeneratorOutput::Default)
98                .expect("default output");
99
100            info!(
101                ?config.worker_id,
102                "starting key-value load generator at {}",
103                resume_offset.offset,
104            );
105            cap.downgrade(&resume_offset);
106            progress_cap.downgrade(&resume_offset);
107            start_signal.await;
108            info!(?config.worker_id, "received key-value load generator start signal");
109
110            let snapshotting = resume_offset.offset == 0;
111
112            let mut local_partitions: Vec<_> = (0..key_value.partitions)
113                .filter_map(|p| {
114                    config
115                        .responsible_for(p)
116                        .then(|| TransactionalSnapshotProducer::new(p, key_value.clone()))
117                })
118                .collect();
119
120            let stats_worker = config.responsible_for(0);
121
122            if local_partitions.is_empty() {
123                stats_output.give(
124                    &stats_cap,
125                    ProgressStatisticsUpdate::Snapshot {
126                        records_known: 0,
127                        records_staged: 0,
128                    },
129                );
130                return;
131            }
132
133            let local_snapshot_size = (u64::cast_from(local_partitions.len()))
134                * key_value.keys
135                * key_value.transactional_snapshot_rounds()
136                / key_value.partitions;
137
138            // Re-usable buffers.
139            let mut value_buffer: Vec<u8> = vec![0; usize::cast_from(key_value.value_size)];
140
141            // snapshotting
142            let mut upper_offset = if snapshotting {
143                let snapshot_rounds = key_value.transactional_snapshot_rounds();
144
145                if stats_worker {
146                    stats_output.give(
147                        &stats_cap,
148                        ProgressStatisticsUpdate::SteadyState {
149                            offset_known: snapshot_rounds,
150                            offset_committed: 0,
151                        },
152                    );
153                };
154
155                // Downgrade to the snapshot frontier.
156                progress_cap.downgrade(&MzOffset::from(snapshot_rounds));
157
158                let mut emitted = 0;
159                stats_output.give(
160                    &stats_cap,
161                    ProgressStatisticsUpdate::Snapshot {
162                        records_known: local_snapshot_size,
163                        records_staged: emitted,
164                    },
165                );
166                while local_partitions.iter().any(|si| !si.finished()) {
167                    for sp in local_partitions.iter_mut() {
168                        for u in sp.produce_batch(&mut value_buffer, output_indexes) {
169                            data_output.give_fueled(&cap, u).await;
170                            emitted += 1;
171                        }
172
173                        stats_output.give(
174                            &stats_cap,
175                            ProgressStatisticsUpdate::Snapshot {
176                                records_known: local_snapshot_size,
177                                records_staged: emitted,
178                            },
179                        );
180                    }
181                }
182
183                cap.downgrade(&MzOffset::from(snapshot_rounds));
184                snapshot_rounds
185            } else {
186                cap.downgrade(&resume_offset);
187                progress_cap.downgrade(&resume_offset);
188                resume_offset.offset
189            };
190
191            let mut local_partitions: Vec<_> = (0..key_value.partitions)
192                .filter_map(|p| {
193                    config
194                        .responsible_for(p)
195                        .then(|| UpdateProducer::new(p, upper_offset, key_value.clone()))
196                })
197                .collect();
198            if !local_partitions.is_empty()
199                && (key_value.tick_interval.is_some() || !key_value.transactional_snapshot)
200            {
201                let mut interval = key_value.tick_interval.map(tokio::time::interval);
202
203                loop {
204                    if local_partitions.iter().all(|si| si.finished_quick()) {
205                        if let Some(interval) = &mut interval {
206                            interval.tick().await;
207                        } else {
208                            break;
209                        }
210                    }
211
212                    for up in local_partitions.iter_mut() {
213                        let (new_upper, iter) = up.produce_batch(&mut value_buffer, output_indexes);
214                        upper_offset = new_upper;
215                        for u in iter {
216                            data_output.give_fueled(&cap, u).await;
217                        }
218                    }
219                    cap.downgrade(&MzOffset::from(upper_offset));
220                    progress_cap.downgrade(&MzOffset::from(upper_offset));
221                }
222            }
223
224            std::future::pending::<()>().await;
225        })
226    });
227
228    let status = [HealthStatusMessage {
229        id: None,
230        namespace: StatusNamespace::Generator,
231        update: HealthStatusUpdate::running(),
232    }]
233    .to_stream(scope);
234    let stats_stream = stats_stream.concat(&steady_state_stats_stream);
235
236    (
237        data_collections,
238        progress_stream,
239        status,
240        stats_stream,
241        vec![button.press_on_drop(), stats_button],
242    )
243}
244
245/// An iterator that produces keys belonging to a partition.
246struct PartitionKeyIterator {
247    /// The partition.
248    partition: u64,
249    /// The number of partitions.
250    partitions: u64,
251    /// The key space.
252    keys: u64,
253    /// The next key.
254    next: u64,
255}
256
257impl PartitionKeyIterator {
258    fn new(partition: u64, partitions: u64, keys: u64, start_key: u64) -> Self {
259        assert_eq!(keys % partitions, 0);
260        PartitionKeyIterator {
261            partition,
262            partitions,
263            keys,
264            next: start_key,
265        }
266    }
267}
268
269impl Iterator for &mut PartitionKeyIterator {
270    type Item = u64;
271
272    fn next(&mut self) -> Option<Self::Item> {
273        let ret = self.next;
274        self.next = (self.next + self.partitions) % self.keys;
275        Some(ret)
276    }
277}
278
279/// Create `StdRng` seeded so identical values can be produced during resumption (during
280/// snapshotting or after).
281fn create_consistent_rng(source_seed: u64, offset: u64, partition: u64) -> StdRng {
282    let mut seed = [0; 32];
283    seed[0..8].copy_from_slice(&source_seed.to_le_bytes());
284    seed[8..16].copy_from_slice(&offset.to_le_bytes());
285    seed[16..24].copy_from_slice(&partition.to_le_bytes());
286    StdRng::from_seed(seed)
287}
288
289/// A struct that produces batches of data for the snapshotting phase.
290struct TransactionalSnapshotProducer {
291    /// The key iterator for the partition.
292    pi: PartitionKeyIterator,
293    /// The batch size.
294    batch_size: u64,
295    /// The number of batches produced in the current round.
296    produced_batches: u64,
297    /// The expected number of batches per round.
298    expected_batches: u64,
299    /// The current round of the offset
300    round: u64,
301    /// The total number of rounds.
302    snapshot_rounds: u64,
303    /// The rng for the current round.
304    rng: Option<StdRng>,
305    /// The source-level seed for the rng.
306    seed: u64,
307    /// Whether to include the offset or not.
308    include_offset: bool,
309}
310
311impl TransactionalSnapshotProducer {
312    fn new(partition: u64, key_value: KeyValueLoadGenerator) -> Self {
313        let snapshot_rounds = key_value.transactional_snapshot_rounds();
314
315        let KeyValueLoadGenerator {
316            partitions,
317            keys,
318            batch_size,
319            seed,
320            include_offset,
321            ..
322        } = key_value;
323
324        assert_eq!((keys / partitions) % batch_size, 0);
325        let pi = PartitionKeyIterator::new(
326            partition, partitions, keys, // The first start key is the partition.
327            partition,
328        );
329        TransactionalSnapshotProducer {
330            pi,
331            batch_size,
332            produced_batches: 0,
333            expected_batches: keys / partitions / batch_size,
334            round: 0,
335            snapshot_rounds,
336            rng: None,
337            seed,
338            include_offset: include_offset.is_some(),
339        }
340    }
341
342    /// If this partition is done snapshotting.
343    fn finished(&self) -> bool {
344        self.round >= self.snapshot_rounds
345    }
346
347    /// Produce a batch of message into `buffer`, of size `batch_size`. Advances the current
348    /// batch and round counter.
349    ///
350    /// The output iterator must be consumed fully for this method to be used correctly.
351    fn produce_batch<'a>(
352        &'a mut self,
353        value_buffer: &'a mut Vec<u8>,
354        output_indexes: &'a [usize],
355    ) -> impl Iterator<
356        Item = (
357            (usize, Result<SourceMessage, DataflowError>),
358            MzOffset,
359            Diff,
360        ),
361    > + 'a {
362        let finished = self.finished();
363
364        let rng = self
365            .rng
366            .get_or_insert_with(|| create_consistent_rng(self.seed, self.round, self.pi.partition));
367
368        let partition: u64 = self.pi.partition;
369        let iter_round: u64 = self.round;
370        let include_offset: bool = self.include_offset;
371        let iter = self
372            .pi
373            .take(if finished {
374                0
375            } else {
376                usize::cast_from(self.batch_size)
377            })
378            .flat_map(move |key| {
379                rng.fill_bytes(value_buffer.as_mut_slice());
380                let msg = Ok(SourceMessage {
381                    key: Row::pack_slice(&[Datum::UInt64(key)]),
382                    value: Row::pack_slice(&[Datum::UInt64(partition), Datum::Bytes(value_buffer)]),
383                    metadata: if include_offset {
384                        Row::pack(&[Datum::UInt64(iter_round)])
385                    } else {
386                        Row::default()
387                    },
388                });
389                output_indexes
390                    .iter()
391                    .repeat_clone(msg)
392                    .map(move |(idx, msg)| ((*idx, msg), MzOffset::from(iter_round), Diff::ONE))
393            });
394
395        if !finished {
396            self.produced_batches += 1;
397
398            if self.produced_batches == self.expected_batches {
399                self.round += 1;
400                self.produced_batches = 0;
401            }
402        }
403
404        iter
405    }
406}
407
408/// A struct that produces batches of data for the post-snapshotting phase.
409struct UpdateProducer {
410    /// The key iterator for the partition.
411    pi: PartitionKeyIterator,
412    /// The batch size.
413    batch_size: u64,
414    /// The next offset to produce updates at.
415    next_offset: u64,
416    /// The source-level seed for the rng.
417    seed: u64,
418    /// The number of offsets we expect to be part of the `transactional_snapshot`.
419    expected_quick_offsets: u64,
420    /// Whether to include the offset or not.
421    include_offset: bool,
422}
423
424impl UpdateProducer {
425    fn new(partition: u64, next_offset: u64, key_value: KeyValueLoadGenerator) -> Self {
426        let snapshot_rounds = key_value.transactional_snapshot_rounds();
427        let quick_rounds = key_value.non_transactional_snapshot_rounds();
428        let KeyValueLoadGenerator {
429            partitions,
430            keys,
431            batch_size,
432            seed,
433            include_offset,
434            ..
435        } = key_value;
436
437        // Each snapshot _round_ is associated with an offset, starting at 0. Therefore
438        // the `next_offset` - `snapshot_rounds` is the index of the next non-snapshot update
439        // batches we must produce. The start key for the that batch is that index
440        // multiplied by the batch size and the number of partitions.
441        let start_key =
442            (((next_offset - snapshot_rounds) * batch_size * partitions) + partition) % keys;
443
444        // We expect to emit 1 batch per offset. A round is emitting the full set of keys (in the
445        // partition. The number of keys divided by the batch size (and partitioned, so divided by
446        // the partition count) is the number of offsets in each _round_. We also add the number of
447        // snapshot rounds, which is also the number of offsets in the snapshot.
448        let expected_quick_offsets =
449            ((keys / partitions / batch_size) * quick_rounds) + snapshot_rounds;
450
451        let pi = PartitionKeyIterator::new(partition, partitions, keys, start_key);
452        UpdateProducer {
453            pi,
454            batch_size,
455            next_offset,
456            seed,
457            expected_quick_offsets,
458            include_offset: include_offset.is_some(),
459        }
460    }
461
462    /// If this partition is done producing quick updates.
463    fn finished_quick(&self) -> bool {
464        self.next_offset >= self.expected_quick_offsets
465    }
466
467    /// Produce a batch of message into `buffer`, of size `batch_size`. Advances the current
468    /// batch and round counter. Also returns the frontier after the batch.
469    ///
470    /// The output iterator must be consumed fully for this method to be used correctly.
471    fn produce_batch<'a>(
472        &'a mut self,
473        value_buffer: &'a mut Vec<u8>,
474        output_indexes: &'a [usize],
475    ) -> (
476        u64,
477        impl Iterator<
478            Item = (
479                (usize, Result<SourceMessage, DataflowError>),
480                MzOffset,
481                Diff,
482            ),
483        > + 'a,
484    ) {
485        let mut rng = create_consistent_rng(self.seed, self.next_offset, self.pi.partition);
486
487        let partition: u64 = self.pi.partition;
488        let iter_offset: u64 = self.next_offset;
489        let include_offset: bool = self.include_offset;
490        let iter = self
491            .pi
492            .take(usize::cast_from(self.batch_size))
493            .flat_map(move |key| {
494                rng.fill_bytes(value_buffer.as_mut_slice());
495                let msg = Ok(SourceMessage {
496                    key: Row::pack_slice(&[Datum::UInt64(key)]),
497                    value: Row::pack_slice(&[Datum::UInt64(partition), Datum::Bytes(value_buffer)]),
498                    metadata: if include_offset {
499                        Row::pack(&[Datum::UInt64(iter_offset)])
500                    } else {
501                        Row::default()
502                    },
503                });
504                output_indexes
505                    .iter()
506                    .repeat_clone(msg)
507                    .map(move |(idx, msg)| ((*idx, msg), MzOffset::from(iter_offset), Diff::ONE))
508            });
509
510        // Advance to the next offset.
511        self.next_offset += 1;
512        (self.next_offset, iter)
513    }
514}
515
516pub fn render_statistics_operator<G: Scope<Timestamp = MzOffset>>(
517    scope: &G,
518    config: &RawSourceCreationConfig,
519    committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
520) -> (Stream<G, ProgressStatisticsUpdate>, PressOnDropButton) {
521    let id = config.id;
522    let mut builder =
523        AsyncOperatorBuilder::new(format!("key_value_loadgen_statistics:{id}"), scope.clone());
524
525    let (stats_output, stats_stream) = builder.new_output();
526
527    let offset_worker = config.responsible_for(0);
528
529    let button = builder.build(move |caps| async move {
530        let [stats_cap]: [_; 1] = caps.try_into().unwrap();
531
532        if !offset_worker {
533            // Emit 0, to mark this worker as having started up correctly.
534            stats_output.give(
535                &stats_cap,
536                ProgressStatisticsUpdate::SteadyState {
537                    offset_known: 0,
538                    offset_committed: 0,
539                },
540            );
541            return;
542        }
543
544        tokio::pin!(committed_uppers);
545        loop {
546            match committed_uppers.next().await {
547                Some(frontier) => {
548                    if let Some(offset) = frontier.as_option() {
549                        stats_output.give(
550                            &stats_cap,
551                            ProgressStatisticsUpdate::SteadyState {
552                                offset_known: offset.offset,
553                                offset_committed: offset.offset,
554                            },
555                        );
556                    }
557                }
558                None => return,
559            }
560        }
561    });
562
563    (stats_stream, button.press_on_drop())
564}
565
566#[cfg(test)]
567mod test {
568    use super::*;
569
570    #[mz_ore::test]
571    fn test_key_value_loadgen_resume_upper() {
572        let up = UpdateProducer::new(
573            1, // partition 1
574            5, // resume upper of 5
575            KeyValueLoadGenerator {
576                keys: 126,
577                snapshot_rounds: 2,
578                transactional_snapshot: true,
579                value_size: 1234,
580                partitions: 3,
581                tick_interval: None,
582                batch_size: 2,
583                seed: 1234,
584                include_offset: None,
585            },
586        );
587
588        // The first key 3 rounds after the 2 snapshot rounds would be the 7th key (3 rounds of 2
589        // beforehand) produced for partition 1, so 13.
590        assert_eq!(up.pi.next, 19);
591
592        let up = UpdateProducer::new(
593            1,           // partition 1
594            5 + 126 / 2, // resume upper of 5 after a full set of keys has been produced.
595            KeyValueLoadGenerator {
596                keys: 126,
597                snapshot_rounds: 2,
598                transactional_snapshot: true,
599                value_size: 1234,
600                partitions: 3,
601                tick_interval: None,
602                batch_size: 2,
603                seed: 1234,
604                include_offset: None,
605            },
606        );
607
608        assert_eq!(up.pi.next, 19);
609    }
610
611    #[mz_ore::test]
612    fn test_key_value_loadgen_part_iter() {
613        let mut pi = PartitionKeyIterator::new(
614            1,   // partition 1
615            3,   // of 3 partitions
616            126, // 126 keys.
617            1,   // Start at the beginning
618        );
619
620        assert_eq!(1, Iterator::next(&mut &mut pi).unwrap());
621        assert_eq!(4, Iterator::next(&mut &mut pi).unwrap());
622
623        // After a full round, ensure we wrap around correctly;
624        let _ = pi.take((126 / 3) - 2).count();
625        assert_eq!(pi.next, 1);
626    }
627}