Skip to main content

mz_storage/source/
generator.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::convert::Infallible;
12use std::ops::Rem;
13use std::sync::Arc;
14use std::time::Duration;
15
16use differential_dataflow::{AsCollection, Hashable};
17use futures::StreamExt;
18use itertools::Itertools;
19use mz_ore::cast::CastFrom;
20use mz_ore::iter::IteratorExt;
21use mz_ore::now::NowFn;
22use mz_repr::{Diff, GlobalId, Row};
23use mz_storage_types::errors::DataflowError;
24use mz_storage_types::sources::load_generator::{
25    Event, Generator, KeyValueLoadGenerator, LoadGenerator, LoadGeneratorOutput,
26    LoadGeneratorSourceConnection,
27};
28use mz_storage_types::sources::{MzOffset, SourceExportDetails, SourceTimestamp};
29use mz_timely_util::builder_async::{
30    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
31};
32use mz_timely_util::containers::stack::AccountedStackBuilder;
33use timely::container::CapacityContainerBuilder;
34use timely::dataflow::channels::pact::Pipeline;
35use timely::dataflow::operators::core::Partition;
36use timely::dataflow::{Scope, StreamVec};
37use timely::progress::{Antichain, Timestamp};
38use tokio::time::{Instant, interval_at};
39
40use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
41use crate::source::types::{Probe, SignaledFuture, SourceRender, StackedCollection};
42use crate::source::{RawSourceCreationConfig, SourceMessage};
43
44mod auction;
45mod clock;
46mod counter;
47mod datums;
48mod key_value;
49mod marketing;
50mod tpch;
51
52pub use auction::Auction;
53pub use clock::Clock;
54pub use counter::Counter;
55pub use datums::Datums;
56pub use tpch::Tpch;
57
58use self::marketing::Marketing;
59
60enum GeneratorKind {
61    Simple {
62        generator: Box<dyn Generator>,
63        tick_micros: Option<u64>,
64        as_of: u64,
65        up_to: u64,
66    },
67    KeyValue(KeyValueLoadGenerator),
68}
69
70impl GeneratorKind {
71    fn new(g: &LoadGenerator, tick_micros: Option<u64>, as_of: u64, up_to: u64) -> Self {
72        match g {
73            LoadGenerator::Auction => GeneratorKind::Simple {
74                generator: Box::new(Auction {}),
75                tick_micros,
76                as_of,
77                up_to,
78            },
79            LoadGenerator::Clock => GeneratorKind::Simple {
80                generator: Box::new(Clock {
81                    tick_ms: tick_micros
82                        .map(Duration::from_micros)
83                        .unwrap_or(Duration::from_secs(1))
84                        .as_millis()
85                        .try_into()
86                        .expect("reasonable tick interval"),
87                    as_of_ms: as_of,
88                }),
89                tick_micros,
90                as_of,
91                up_to,
92            },
93            LoadGenerator::Counter { max_cardinality } => GeneratorKind::Simple {
94                generator: Box::new(Counter {
95                    max_cardinality: max_cardinality.clone(),
96                }),
97                tick_micros,
98                as_of,
99                up_to,
100            },
101            LoadGenerator::Datums => GeneratorKind::Simple {
102                generator: Box::new(Datums {}),
103                tick_micros,
104                as_of,
105                up_to,
106            },
107            LoadGenerator::Marketing => GeneratorKind::Simple {
108                generator: Box::new(Marketing {}),
109                tick_micros,
110                as_of,
111                up_to,
112            },
113            LoadGenerator::Tpch {
114                count_supplier,
115                count_part,
116                count_customer,
117                count_orders,
118                count_clerk,
119            } => GeneratorKind::Simple {
120                generator: Box::new(Tpch {
121                    count_supplier: *count_supplier,
122                    count_part: *count_part,
123                    count_customer: *count_customer,
124                    count_orders: *count_orders,
125                    count_clerk: *count_clerk,
126                    // The default tick behavior 1s. For tpch we want to disable ticking
127                    // completely.
128                    tick: Duration::from_micros(tick_micros.unwrap_or(0)),
129                }),
130                tick_micros,
131                as_of,
132                up_to,
133            },
134            LoadGenerator::KeyValue(kv) => GeneratorKind::KeyValue(kv.clone()),
135        }
136    }
137
138    fn render<'scope>(
139        self,
140        scope: Scope<'scope, MzOffset>,
141        config: &RawSourceCreationConfig,
142        committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
143        start_signal: impl std::future::Future<Output = ()> + 'static,
144    ) -> (
145        BTreeMap<
146            GlobalId,
147            StackedCollection<'scope, MzOffset, Result<SourceMessage, DataflowError>>,
148        >,
149        StreamVec<'scope, MzOffset, Infallible>,
150        StreamVec<'scope, MzOffset, HealthStatusMessage>,
151        Vec<PressOnDropButton>,
152    ) {
153        // figure out which output types from the generator belong to which output indexes
154        let mut output_map = BTreeMap::new();
155        // Maps the output index to export_id for statistics.
156        let mut idx_to_exportid = BTreeMap::new();
157        // Make sure that there's an entry for the default output, even if there are no exports
158        // that need data output. Certain implementations rely on it (at the time of this comment
159        // that includes the key-value load gen source).
160        output_map.insert(LoadGeneratorOutput::Default, Vec::new());
161        for (idx, (export_id, export)) in config.source_exports.iter().enumerate() {
162            let output_type = match &export.details {
163                SourceExportDetails::LoadGenerator(details) => details.output,
164                // This is an export that doesn't need any data output to it.
165                SourceExportDetails::None => continue,
166                _ => panic!("unexpected source export details: {:?}", export.details),
167            };
168            output_map
169                .entry(output_type)
170                .or_insert_with(Vec::new)
171                .push(idx);
172            idx_to_exportid.insert(idx, export_id.clone());
173        }
174
175        match self {
176            GeneratorKind::Simple {
177                tick_micros,
178                as_of,
179                up_to,
180                generator,
181            } => render_simple_generator(
182                generator,
183                tick_micros,
184                as_of.into(),
185                up_to.into(),
186                scope,
187                config,
188                committed_uppers,
189                output_map,
190            ),
191            GeneratorKind::KeyValue(kv) => key_value::render(
192                kv,
193                scope,
194                config.clone(),
195                committed_uppers,
196                start_signal,
197                output_map,
198                idx_to_exportid,
199            ),
200        }
201    }
202}
203
204impl SourceRender for LoadGeneratorSourceConnection {
205    type Time = MzOffset;
206
207    const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Generator;
208
209    fn render<'scope>(
210        self,
211        scope: Scope<'scope, MzOffset>,
212        config: &RawSourceCreationConfig,
213        committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
214        start_signal: impl std::future::Future<Output = ()> + 'static,
215    ) -> (
216        BTreeMap<
217            GlobalId,
218            StackedCollection<'scope, MzOffset, Result<SourceMessage, DataflowError>>,
219        >,
220        StreamVec<'scope, MzOffset, HealthStatusMessage>,
221        StreamVec<'scope, MzOffset, Probe<MzOffset>>,
222        Vec<PressOnDropButton>,
223    ) {
224        let generator_kind = GeneratorKind::new(
225            &self.load_generator,
226            self.tick_micros,
227            self.as_of,
228            self.up_to,
229        );
230        let (updates, progress, health, button) =
231            generator_kind.render(scope, config, committed_uppers, start_signal);
232
233        let probe_stream = synthesize_probes(
234            config.id,
235            progress,
236            config.timestamp_interval,
237            config.now_fn.clone(),
238        );
239
240        (updates, health, probe_stream, button)
241    }
242}
243
244fn render_simple_generator<'scope>(
245    generator: Box<dyn Generator>,
246    tick_micros: Option<u64>,
247    as_of: MzOffset,
248    up_to: MzOffset,
249    scope: Scope<'scope, MzOffset>,
250    config: &RawSourceCreationConfig,
251    committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
252    output_map: BTreeMap<LoadGeneratorOutput, Vec<usize>>,
253) -> (
254    BTreeMap<GlobalId, StackedCollection<'scope, MzOffset, Result<SourceMessage, DataflowError>>>,
255    StreamVec<'scope, MzOffset, Infallible>,
256    StreamVec<'scope, MzOffset, HealthStatusMessage>,
257    Vec<PressOnDropButton>,
258) {
259    let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone());
260
261    let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
262    let export_ids: Vec<_> = config.source_exports.keys().copied().collect();
263    let partition_count = u64::cast_from(export_ids.len());
264    let data_streams: Vec<_> = stream.partition::<CapacityContainerBuilder<_>, _, _>(
265        partition_count,
266        |((output, data), time, diff): &(
267            (usize, Result<SourceMessage, DataflowError>),
268            MzOffset,
269            Diff,
270        )| {
271            let output = u64::cast_from(*output);
272            (output, (data.clone(), time.clone(), diff.clone()))
273        },
274    );
275    let mut data_collections = BTreeMap::new();
276    for (id, data_stream) in export_ids.iter().zip_eq(data_streams) {
277        data_collections.insert(*id, data_stream.as_collection());
278    }
279
280    let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
281    let (health_output, health_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
282
283    let busy_signal = Arc::clone(&config.busy_signal);
284    let source_resume_uppers = config.source_resume_uppers.clone();
285    let is_active_worker = config.responsible_for(());
286    let source_statistics = config.statistics.clone();
287    let button = builder.build(move |caps| {
288        SignaledFuture::new(busy_signal, async move {
289            let [mut cap, mut progress_cap, health_cap] = caps.try_into().unwrap();
290
291            // We only need this until we reported ourselves as Running.
292            let mut health_cap = Some(health_cap);
293
294            if !is_active_worker {
295                // Emit 0, to mark this worker as having started up correctly.
296                for stats in source_statistics.values() {
297                    stats.set_offset_known(0);
298                    stats.set_offset_committed(0);
299                }
300                return;
301            }
302
303            let resume_upper = Antichain::from_iter(
304                source_resume_uppers
305                    .values()
306                    .flat_map(|f| f.iter().map(MzOffset::decode_row)),
307            );
308
309            let Some(resume_offset) = resume_upper.into_option() else {
310                return;
311            };
312
313            let now_fn = mz_ore::now::SYSTEM_TIME.clone();
314
315            let start_instant = {
316                // We want to have our interval start at a nice round number...
317                // for example, if our tick interval is one minute, to start at a minute boundary.
318                // However, the `Interval` type from tokio can't be "floored" in that way.
319                // Instead, figure out the amount we should step forward based on the wall clock,
320                // then apply that to our monotonic clock to make things start at approximately the
321                // right time.
322                let now_millis = now_fn();
323                let now_instant = Instant::now();
324                let delay_millis = tick_micros
325                    .map(|tick_micros| tick_micros / 1000)
326                    .filter(|tick_millis| *tick_millis > 0)
327                    .map(|tick_millis| tick_millis - now_millis.rem(tick_millis))
328                    .unwrap_or(0);
329                now_instant + Duration::from_millis(delay_millis)
330            };
331            let tick = Duration::from_micros(tick_micros.unwrap_or(1_000_000));
332            let mut tick_interval = interval_at(start_instant, tick);
333
334            let mut rows = generator.by_seed(now_fn, None, resume_offset);
335
336            let mut committed_uppers = std::pin::pin!(committed_uppers);
337
338            // If we are just starting up, report 0 as our `offset_committed`.
339            let mut offset_committed = if resume_offset.offset == 0 {
340                Some(0)
341            } else {
342                None
343            };
344
345            while let Some((output_type, event)) = rows.next() {
346                match event {
347                    Event::Message(mut offset, (value, diff)) => {
348                        // Fast forward any data before the requested as of.
349                        if offset <= as_of {
350                            offset = as_of;
351                        }
352
353                        // If the load generator produces data at or beyond the
354                        // requested `up_to`, drop it. We'll terminate the load
355                        // generator when the capability advances to the `up_to`,
356                        // but the load generator might produce data far in advance
357                        // of its capability.
358                        if offset >= up_to {
359                            continue;
360                        }
361
362                        // Once we see the load generator start producing data for some offset,
363                        // we report progress beyond that offset, to ensure that a binding can be
364                        // minted for the data and it doesn't accumulate in reclocking.
365                        let _ = progress_cap.try_downgrade(&(offset + 1));
366
367                        let outputs = match output_map.get(&output_type) {
368                            Some(outputs) => outputs,
369                            // We don't have an output index for this output type, so drop it
370                            None => continue,
371                        };
372
373                        let message: Result<SourceMessage, DataflowError> = Ok(SourceMessage {
374                            key: Row::default(),
375                            value,
376                            metadata: Row::default(),
377                        });
378
379                        // Some generators always reproduce their TVC from the beginning which can
380                        // generate a significant amount of data that will overwhelm the dataflow.
381                        // Since those are not required downstream we eagerly ignore them here.
382                        if resume_offset <= offset {
383                            for (&output, message) in outputs.iter().repeat_clone(message) {
384                                data_output
385                                    .give_fueled(&cap, ((output, message), offset, diff))
386                                    .await;
387                            }
388                        }
389                    }
390                    Event::Progress(Some(offset)) => {
391                        if resume_offset <= offset && health_cap.is_some() {
392                            let health_cap = health_cap.take().expect("known to exist");
393                            let export_ids = export_ids.iter().copied();
394                            for id in export_ids.map(Some).chain(std::iter::once(None)) {
395                                health_output.give(
396                                    &health_cap,
397                                    HealthStatusMessage {
398                                        id,
399                                        namespace: StatusNamespace::Generator,
400                                        update: HealthStatusUpdate::running(),
401                                    },
402                                );
403                            }
404                        }
405
406                        // If we've reached the requested maximum offset, cease.
407                        if offset >= up_to {
408                            break;
409                        }
410
411                        // If the offset is at or below the requested `as_of`, don't
412                        // downgrade the capability.
413                        if offset <= as_of {
414                            continue;
415                        }
416
417                        cap.downgrade(&offset);
418                        let _ = progress_cap.try_downgrade(&offset);
419
420                        // We only sleep if we have surpassed the resume offset so that we can
421                        // quickly go over any historical updates that a generator might choose to
422                        // emit.
423                        // TODO(petrosagg): Remove the sleep below and make generators return an
424                        // async stream so that they can drive the rate of production directly
425                        if resume_offset < offset {
426                            loop {
427                                tokio::select! {
428                                    _tick = tick_interval.tick() => {
429                                        break;
430                                    }
431                                    Some(frontier) = committed_uppers.next() => {
432                                        if let Some(offset) = frontier.as_option() {
433                                            // Offset N means we have committed N offsets (offsets are
434                                            // 0-indexed)
435                                            offset_committed = Some(offset.offset);
436                                        }
437                                    }
438                                }
439                            }
440
441                            // TODO(guswynn): generators have various definitions of "snapshot", so
442                            // we are not going to implement snapshot progress statistics for them
443                            // right now, but will come back to it.
444                            if let Some(offset_committed) = offset_committed {
445                                for stats in source_statistics.values() {
446                                    stats.set_offset_committed(offset_committed);
447                                    // technically we could have _known_ a larger offset
448                                    // than the one that has been committed, but we can
449                                    // never recover that known amount on restart, so we
450                                    // just advance these in lock step.
451                                    stats.set_offset_known(offset_committed);
452                                }
453                            }
454                        }
455                    }
456                    Event::Progress(None) => return,
457                }
458            }
459        })
460    });
461
462    (
463        data_collections,
464        progress_stream,
465        health_stream,
466        vec![button.press_on_drop()],
467    )
468}
469
470/// Synthesizes a probe stream that produces the frontier of the given progress stream at the given
471/// interval.
472///
473/// This is used as a fallback for sources that don't support probing the frontier of the upstream
474/// system.
475fn synthesize_probes<'scope, T: timely::progress::Timestamp>(
476    source_id: GlobalId,
477    progress: StreamVec<'scope, T, Infallible>,
478    interval: Duration,
479    now_fn: NowFn,
480) -> StreamVec<'scope, T, Probe<T>> {
481    let scope = progress.scope();
482
483    let active_worker = usize::cast_from(source_id.hashed()) % scope.peers();
484    let is_active_worker = active_worker == scope.index();
485
486    let mut op = AsyncOperatorBuilder::new("synthesize_probes".into(), scope);
487    let (output, output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
488    let mut input = op.new_input_for(progress, Pipeline, &output);
489
490    op.build(|caps| async move {
491        if !is_active_worker {
492            return;
493        }
494
495        let [cap] = caps.try_into().expect("one capability per output");
496
497        let mut ticker = super::probe::Ticker::new(move || interval, now_fn.clone());
498
499        let minimum_frontier = Antichain::from_elem(Timestamp::minimum());
500        let mut frontier = minimum_frontier.clone();
501        loop {
502            tokio::select! {
503                event = input.next() => match event {
504                    Some(AsyncEvent::Progress(progress)) => frontier = progress,
505                    Some(AsyncEvent::Data(..)) => unreachable!(),
506                    None => break,
507                },
508                // We only report a probe if the source upper frontier is not the minimum frontier.
509                // This makes it so the first remap binding corresponds to the snapshot of the
510                // source, and because the first binding always maps to the minimum *target*
511                // frontier we guarantee that the source will never appear empty.
512                probe_ts = ticker.tick(), if frontier != minimum_frontier => {
513                    let probe = Probe {
514                        probe_ts,
515                        upstream_frontier: frontier.clone(),
516                    };
517                    output.give(&cap, probe);
518                }
519            }
520        }
521
522        let probe = Probe {
523            probe_ts: now_fn().into(),
524            upstream_frontier: Antichain::new(),
525        };
526        output.give(&cap, probe);
527    });
528
529    output_stream
530}