mz_storage/source/
generator.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::convert::Infallible;
12use std::ops::Rem;
13use std::sync::Arc;
14use std::time::Duration;
15
16use differential_dataflow::AsCollection;
17use futures::StreamExt;
18use itertools::Itertools;
19use mz_ore::cast::CastFrom;
20use mz_ore::iter::IteratorExt;
21use mz_repr::{Diff, GlobalId, Row};
22use mz_storage_types::errors::DataflowError;
23use mz_storage_types::sources::load_generator::{
24    Event, Generator, KeyValueLoadGenerator, LoadGenerator, LoadGeneratorOutput,
25    LoadGeneratorSourceConnection,
26};
27use mz_storage_types::sources::{MzOffset, SourceExportDetails, SourceTimestamp};
28use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
29use mz_timely_util::containers::stack::AccountedStackBuilder;
30use timely::container::CapacityContainerBuilder;
31use timely::dataflow::operators::core::Partition;
32use timely::dataflow::{Scope, Stream};
33use timely::progress::Antichain;
34use tokio::time::{Instant, interval_at};
35
36use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
37use crate::source::types::{Probe, SignaledFuture, SourceRender, StackedCollection};
38use crate::source::{RawSourceCreationConfig, SourceMessage};
39
40mod auction;
41mod clock;
42mod counter;
43mod datums;
44mod key_value;
45mod marketing;
46mod tpch;
47
48pub use auction::Auction;
49pub use clock::Clock;
50pub use counter::Counter;
51pub use datums::Datums;
52pub use tpch::Tpch;
53
54use self::marketing::Marketing;
55
56enum GeneratorKind {
57    Simple {
58        generator: Box<dyn Generator>,
59        tick_micros: Option<u64>,
60        as_of: u64,
61        up_to: u64,
62    },
63    KeyValue(KeyValueLoadGenerator),
64}
65
66impl GeneratorKind {
67    fn new(g: &LoadGenerator, tick_micros: Option<u64>, as_of: u64, up_to: u64) -> Self {
68        match g {
69            LoadGenerator::Auction => GeneratorKind::Simple {
70                generator: Box::new(Auction {}),
71                tick_micros,
72                as_of,
73                up_to,
74            },
75            LoadGenerator::Clock => GeneratorKind::Simple {
76                generator: Box::new(Clock {
77                    tick_ms: tick_micros
78                        .map(Duration::from_micros)
79                        .unwrap_or(Duration::from_secs(1))
80                        .as_millis()
81                        .try_into()
82                        .expect("reasonable tick interval"),
83                    as_of_ms: as_of,
84                }),
85                tick_micros,
86                as_of,
87                up_to,
88            },
89            LoadGenerator::Counter { max_cardinality } => GeneratorKind::Simple {
90                generator: Box::new(Counter {
91                    max_cardinality: max_cardinality.clone(),
92                }),
93                tick_micros,
94                as_of,
95                up_to,
96            },
97            LoadGenerator::Datums => GeneratorKind::Simple {
98                generator: Box::new(Datums {}),
99                tick_micros,
100                as_of,
101                up_to,
102            },
103            LoadGenerator::Marketing => GeneratorKind::Simple {
104                generator: Box::new(Marketing {}),
105                tick_micros,
106                as_of,
107                up_to,
108            },
109            LoadGenerator::Tpch {
110                count_supplier,
111                count_part,
112                count_customer,
113                count_orders,
114                count_clerk,
115            } => GeneratorKind::Simple {
116                generator: Box::new(Tpch {
117                    count_supplier: *count_supplier,
118                    count_part: *count_part,
119                    count_customer: *count_customer,
120                    count_orders: *count_orders,
121                    count_clerk: *count_clerk,
122                    // The default tick behavior 1s. For tpch we want to disable ticking
123                    // completely.
124                    tick: Duration::from_micros(tick_micros.unwrap_or(0)),
125                }),
126                tick_micros,
127                as_of,
128                up_to,
129            },
130            LoadGenerator::KeyValue(kv) => GeneratorKind::KeyValue(kv.clone()),
131        }
132    }
133
134    fn render<G: Scope<Timestamp = MzOffset>>(
135        self,
136        scope: &mut G,
137        config: &RawSourceCreationConfig,
138        committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
139        start_signal: impl std::future::Future<Output = ()> + 'static,
140    ) -> (
141        BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
142        Stream<G, Infallible>,
143        Stream<G, HealthStatusMessage>,
144        Vec<PressOnDropButton>,
145    ) {
146        // figure out which output types from the generator belong to which output indexes
147        let mut output_map = BTreeMap::new();
148        // Maps the output index to export_id for statistics.
149        let mut idx_to_exportid = BTreeMap::new();
150        // Make sure that there's an entry for the default output, even if there are no exports
151        // that need data output. Certain implementations rely on it (at the time of this comment
152        // that includes the key-value load gen source).
153        output_map.insert(LoadGeneratorOutput::Default, Vec::new());
154        for (idx, (export_id, export)) in config.source_exports.iter().enumerate() {
155            let output_type = match &export.details {
156                SourceExportDetails::LoadGenerator(details) => details.output,
157                // This is an export that doesn't need any data output to it.
158                SourceExportDetails::None => continue,
159                _ => panic!("unexpected source export details: {:?}", export.details),
160            };
161            output_map
162                .entry(output_type)
163                .or_insert_with(Vec::new)
164                .push(idx);
165            idx_to_exportid.insert(idx, export_id.clone());
166        }
167
168        match self {
169            GeneratorKind::Simple {
170                tick_micros,
171                as_of,
172                up_to,
173                generator,
174            } => render_simple_generator(
175                generator,
176                tick_micros,
177                as_of.into(),
178                up_to.into(),
179                scope,
180                config,
181                committed_uppers,
182                output_map,
183            ),
184            GeneratorKind::KeyValue(kv) => key_value::render(
185                kv,
186                scope,
187                config.clone(),
188                committed_uppers,
189                start_signal,
190                output_map,
191                idx_to_exportid,
192            ),
193        }
194    }
195}
196
197impl SourceRender for LoadGeneratorSourceConnection {
198    type Time = MzOffset;
199
200    const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Generator;
201
202    fn render<G: Scope<Timestamp = MzOffset>>(
203        self,
204        scope: &mut G,
205        config: &RawSourceCreationConfig,
206        committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
207        start_signal: impl std::future::Future<Output = ()> + 'static,
208    ) -> (
209        BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
210        Stream<G, Infallible>,
211        Stream<G, HealthStatusMessage>,
212        Option<Stream<G, Probe<MzOffset>>>,
213        Vec<PressOnDropButton>,
214    ) {
215        let generator_kind = GeneratorKind::new(
216            &self.load_generator,
217            self.tick_micros,
218            self.as_of,
219            self.up_to,
220        );
221        let (updates, uppers, health, button) =
222            generator_kind.render(scope, config, committed_uppers, start_signal);
223
224        (updates, uppers, health, None, button)
225    }
226}
227
228fn render_simple_generator<G: Scope<Timestamp = MzOffset>>(
229    generator: Box<dyn Generator>,
230    tick_micros: Option<u64>,
231    as_of: MzOffset,
232    up_to: MzOffset,
233    scope: &G,
234    config: &RawSourceCreationConfig,
235    committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
236    output_map: BTreeMap<LoadGeneratorOutput, Vec<usize>>,
237) -> (
238    BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
239    Stream<G, Infallible>,
240    Stream<G, HealthStatusMessage>,
241    Vec<PressOnDropButton>,
242) {
243    let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone());
244
245    let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
246    let export_ids: Vec<_> = config.source_exports.keys().copied().collect();
247    let partition_count = u64::cast_from(export_ids.len());
248    let data_streams: Vec<_> = stream.partition::<CapacityContainerBuilder<_>, _, _>(
249        partition_count,
250        |((output, data), time, diff): &(
251            (usize, Result<SourceMessage, DataflowError>),
252            MzOffset,
253            Diff,
254        )| {
255            let output = u64::cast_from(*output);
256            (output, (data.clone(), time.clone(), diff.clone()))
257        },
258    );
259    let mut data_collections = BTreeMap::new();
260    for (id, data_stream) in export_ids.iter().zip_eq(data_streams) {
261        data_collections.insert(*id, data_stream.as_collection());
262    }
263
264    let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
265    let (health_output, health_stream) = builder.new_output();
266
267    let busy_signal = Arc::clone(&config.busy_signal);
268    let source_resume_uppers = config.source_resume_uppers.clone();
269    let is_active_worker = config.responsible_for(());
270    let source_statistics = config.statistics.clone();
271    let button = builder.build(move |caps| {
272        SignaledFuture::new(busy_signal, async move {
273            let [mut cap, mut progress_cap, health_cap] = caps.try_into().unwrap();
274
275            // We only need this until we reported ourselves as Running.
276            let mut health_cap = Some(health_cap);
277
278            if !is_active_worker {
279                // Emit 0, to mark this worker as having started up correctly.
280                for stats in source_statistics.values() {
281                    stats.set_offset_known(0);
282                    stats.set_offset_committed(0);
283                }
284                return;
285            }
286
287            let resume_upper = Antichain::from_iter(
288                source_resume_uppers
289                    .values()
290                    .flat_map(|f| f.iter().map(MzOffset::decode_row)),
291            );
292
293            let Some(resume_offset) = resume_upper.into_option() else {
294                return;
295            };
296
297            let now_fn = mz_ore::now::SYSTEM_TIME.clone();
298
299            let start_instant = {
300                // We want to have our interval start at a nice round number...
301                // for example, if our tick interval is one minute, to start at a minute boundary.
302                // However, the `Interval` type from tokio can't be "floored" in that way.
303                // Instead, figure out the amount we should step forward based on the wall clock,
304                // then apply that to our monotonic clock to make things start at approximately the
305                // right time.
306                let now_millis = now_fn();
307                let now_instant = Instant::now();
308                let delay_millis = tick_micros
309                    .map(|tick_micros| tick_micros / 1000)
310                    .filter(|tick_millis| *tick_millis > 0)
311                    .map(|tick_millis| tick_millis - now_millis.rem(tick_millis))
312                    .unwrap_or(0);
313                now_instant + Duration::from_millis(delay_millis)
314            };
315            let tick = Duration::from_micros(tick_micros.unwrap_or(1_000_000));
316            let mut tick_interval = interval_at(start_instant, tick);
317
318            let mut rows = generator.by_seed(now_fn, None, resume_offset);
319
320            let mut committed_uppers = std::pin::pin!(committed_uppers);
321
322            // If we are just starting up, report 0 as our `offset_committed`.
323            let mut offset_committed = if resume_offset.offset == 0 {
324                Some(0)
325            } else {
326                None
327            };
328
329            while let Some((output_type, event)) = rows.next() {
330                match event {
331                    Event::Message(mut offset, (value, diff)) => {
332                        // Fast forward any data before the requested as of.
333                        if offset <= as_of {
334                            offset = as_of;
335                        }
336
337                        // If the load generator produces data at or beyond the
338                        // requested `up_to`, drop it. We'll terminate the load
339                        // generator when the capability advances to the `up_to`,
340                        // but the load generator might produce data far in advance
341                        // of its capability.
342                        if offset >= up_to {
343                            continue;
344                        }
345
346                        // Once we see the load generator start producing data for some offset,
347                        // we report progress beyond that offset, to ensure that a binding can be
348                        // minted for the data and it doesn't accumulate in reclocking.
349                        let _ = progress_cap.try_downgrade(&(offset + 1));
350
351                        let outputs = match output_map.get(&output_type) {
352                            Some(outputs) => outputs,
353                            // We don't have an output index for this output type, so drop it
354                            None => continue,
355                        };
356
357                        let message: Result<SourceMessage, DataflowError> = Ok(SourceMessage {
358                            key: Row::default(),
359                            value,
360                            metadata: Row::default(),
361                        });
362
363                        // Some generators always reproduce their TVC from the beginning which can
364                        // generate a significant amount of data that will overwhelm the dataflow.
365                        // Since those are not required downstream we eagerly ignore them here.
366                        if resume_offset <= offset {
367                            for (&output, message) in outputs.iter().repeat_clone(message) {
368                                data_output
369                                    .give_fueled(&cap, ((output, message), offset, diff))
370                                    .await;
371                            }
372                        }
373                    }
374                    Event::Progress(Some(offset)) => {
375                        if resume_offset <= offset && health_cap.is_some() {
376                            let health_cap = health_cap.take().expect("known to exist");
377                            let export_ids = export_ids.iter().copied();
378                            for id in export_ids.map(Some).chain(std::iter::once(None)) {
379                                health_output.give(
380                                    &health_cap,
381                                    HealthStatusMessage {
382                                        id,
383                                        namespace: StatusNamespace::Generator,
384                                        update: HealthStatusUpdate::running(),
385                                    },
386                                );
387                            }
388                        }
389
390                        // If we've reached the requested maximum offset, cease.
391                        if offset >= up_to {
392                            break;
393                        }
394
395                        // If the offset is at or below the requested `as_of`, don't
396                        // downgrade the capability.
397                        if offset <= as_of {
398                            continue;
399                        }
400
401                        cap.downgrade(&offset);
402                        let _ = progress_cap.try_downgrade(&offset);
403
404                        // We only sleep if we have surpassed the resume offset so that we can
405                        // quickly go over any historical updates that a generator might choose to
406                        // emit.
407                        // TODO(petrosagg): Remove the sleep below and make generators return an
408                        // async stream so that they can drive the rate of production directly
409                        if resume_offset < offset {
410                            loop {
411                                tokio::select! {
412                                    _tick = tick_interval.tick() => {
413                                        break;
414                                    }
415                                    Some(frontier) = committed_uppers.next() => {
416                                        if let Some(offset) = frontier.as_option() {
417                                            // Offset N means we have committed N offsets (offsets are
418                                            // 0-indexed)
419                                            offset_committed = Some(offset.offset);
420                                        }
421                                    }
422                                }
423                            }
424
425                            // TODO(guswynn): generators have various definitions of "snapshot", so
426                            // we are not going to implement snapshot progress statistics for them
427                            // right now, but will come back to it.
428                            if let Some(offset_committed) = offset_committed {
429                                for stats in source_statistics.values() {
430                                    stats.set_offset_committed(offset_committed);
431                                    // technically we could have _known_ a larger offset
432                                    // than the one that has been committed, but we can
433                                    // never recover that known amount on restart, so we
434                                    // just advance these in lock step.
435                                    stats.set_offset_known(offset_committed);
436                                }
437                            }
438                        }
439                    }
440                    Event::Progress(None) => return,
441                }
442            }
443        })
444    });
445
446    (
447        data_collections,
448        progress_stream,
449        health_stream,
450        vec![button.press_on_drop()],
451    )
452}