1use std::collections::BTreeMap;
11use std::convert::Infallible;
12use std::ops::Rem;
13use std::sync::Arc;
14use std::time::Duration;
15
16use differential_dataflow::{AsCollection, Hashable};
17use futures::StreamExt;
18use itertools::Itertools;
19use mz_ore::cast::CastFrom;
20use mz_ore::iter::IteratorExt;
21use mz_ore::now::NowFn;
22use mz_repr::{Diff, GlobalId, Row};
23use mz_storage_types::errors::DataflowError;
24use mz_storage_types::sources::load_generator::{
25 Event, Generator, KeyValueLoadGenerator, LoadGenerator, LoadGeneratorOutput,
26 LoadGeneratorSourceConnection,
27};
28use mz_storage_types::sources::{MzOffset, SourceExportDetails, SourceTimestamp};
29use mz_timely_util::builder_async::{
30 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
31};
32use mz_timely_util::containers::stack::AccountedStackBuilder;
33use timely::container::CapacityContainerBuilder;
34use timely::dataflow::channels::pact::Pipeline;
35use timely::dataflow::operators::core::Partition;
36use timely::dataflow::{Scope, StreamVec};
37use timely::progress::{Antichain, Timestamp};
38use tokio::time::{Instant, interval_at};
39
40use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
41use crate::source::types::{Probe, SignaledFuture, SourceRender, StackedCollection};
42use crate::source::{RawSourceCreationConfig, SourceMessage};
43
44mod auction;
45mod clock;
46mod counter;
47mod datums;
48mod key_value;
49mod marketing;
50mod tpch;
51
52pub use auction::Auction;
53pub use clock::Clock;
54pub use counter::Counter;
55pub use datums::Datums;
56pub use tpch::Tpch;
57
58use self::marketing::Marketing;
59
60enum GeneratorKind {
61 Simple {
62 generator: Box<dyn Generator>,
63 tick_micros: Option<u64>,
64 as_of: u64,
65 up_to: u64,
66 },
67 KeyValue(KeyValueLoadGenerator),
68}
69
70impl GeneratorKind {
71 fn new(g: &LoadGenerator, tick_micros: Option<u64>, as_of: u64, up_to: u64) -> Self {
72 match g {
73 LoadGenerator::Auction => GeneratorKind::Simple {
74 generator: Box::new(Auction {}),
75 tick_micros,
76 as_of,
77 up_to,
78 },
79 LoadGenerator::Clock => GeneratorKind::Simple {
80 generator: Box::new(Clock {
81 tick_ms: tick_micros
82 .map(Duration::from_micros)
83 .unwrap_or(Duration::from_secs(1))
84 .as_millis()
85 .try_into()
86 .expect("reasonable tick interval"),
87 as_of_ms: as_of,
88 }),
89 tick_micros,
90 as_of,
91 up_to,
92 },
93 LoadGenerator::Counter { max_cardinality } => GeneratorKind::Simple {
94 generator: Box::new(Counter {
95 max_cardinality: max_cardinality.clone(),
96 }),
97 tick_micros,
98 as_of,
99 up_to,
100 },
101 LoadGenerator::Datums => GeneratorKind::Simple {
102 generator: Box::new(Datums {}),
103 tick_micros,
104 as_of,
105 up_to,
106 },
107 LoadGenerator::Marketing => GeneratorKind::Simple {
108 generator: Box::new(Marketing {}),
109 tick_micros,
110 as_of,
111 up_to,
112 },
113 LoadGenerator::Tpch {
114 count_supplier,
115 count_part,
116 count_customer,
117 count_orders,
118 count_clerk,
119 } => GeneratorKind::Simple {
120 generator: Box::new(Tpch {
121 count_supplier: *count_supplier,
122 count_part: *count_part,
123 count_customer: *count_customer,
124 count_orders: *count_orders,
125 count_clerk: *count_clerk,
126 tick: Duration::from_micros(tick_micros.unwrap_or(0)),
129 }),
130 tick_micros,
131 as_of,
132 up_to,
133 },
134 LoadGenerator::KeyValue(kv) => GeneratorKind::KeyValue(kv.clone()),
135 }
136 }
137
138 fn render<'scope>(
139 self,
140 scope: Scope<'scope, MzOffset>,
141 config: &RawSourceCreationConfig,
142 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
143 start_signal: impl std::future::Future<Output = ()> + 'static,
144 ) -> (
145 BTreeMap<
146 GlobalId,
147 StackedCollection<'scope, MzOffset, Result<SourceMessage, DataflowError>>,
148 >,
149 StreamVec<'scope, MzOffset, Infallible>,
150 StreamVec<'scope, MzOffset, HealthStatusMessage>,
151 Vec<PressOnDropButton>,
152 ) {
153 let mut output_map = BTreeMap::new();
155 let mut idx_to_exportid = BTreeMap::new();
157 output_map.insert(LoadGeneratorOutput::Default, Vec::new());
161 for (idx, (export_id, export)) in config.source_exports.iter().enumerate() {
162 let output_type = match &export.details {
163 SourceExportDetails::LoadGenerator(details) => details.output,
164 SourceExportDetails::None => continue,
166 _ => panic!("unexpected source export details: {:?}", export.details),
167 };
168 output_map
169 .entry(output_type)
170 .or_insert_with(Vec::new)
171 .push(idx);
172 idx_to_exportid.insert(idx, export_id.clone());
173 }
174
175 match self {
176 GeneratorKind::Simple {
177 tick_micros,
178 as_of,
179 up_to,
180 generator,
181 } => render_simple_generator(
182 generator,
183 tick_micros,
184 as_of.into(),
185 up_to.into(),
186 scope,
187 config,
188 committed_uppers,
189 output_map,
190 ),
191 GeneratorKind::KeyValue(kv) => key_value::render(
192 kv,
193 scope,
194 config.clone(),
195 committed_uppers,
196 start_signal,
197 output_map,
198 idx_to_exportid,
199 ),
200 }
201 }
202}
203
204impl SourceRender for LoadGeneratorSourceConnection {
205 type Time = MzOffset;
206
207 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Generator;
208
209 fn render<'scope>(
210 self,
211 scope: Scope<'scope, MzOffset>,
212 config: &RawSourceCreationConfig,
213 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
214 start_signal: impl std::future::Future<Output = ()> + 'static,
215 ) -> (
216 BTreeMap<
217 GlobalId,
218 StackedCollection<'scope, MzOffset, Result<SourceMessage, DataflowError>>,
219 >,
220 StreamVec<'scope, MzOffset, HealthStatusMessage>,
221 StreamVec<'scope, MzOffset, Probe<MzOffset>>,
222 Vec<PressOnDropButton>,
223 ) {
224 let generator_kind = GeneratorKind::new(
225 &self.load_generator,
226 self.tick_micros,
227 self.as_of,
228 self.up_to,
229 );
230 let (updates, progress, health, button) =
231 generator_kind.render(scope, config, committed_uppers, start_signal);
232
233 let probe_stream = synthesize_probes(
234 config.id,
235 progress,
236 config.timestamp_interval,
237 config.now_fn.clone(),
238 );
239
240 (updates, health, probe_stream, button)
241 }
242}
243
244fn render_simple_generator<'scope>(
245 generator: Box<dyn Generator>,
246 tick_micros: Option<u64>,
247 as_of: MzOffset,
248 up_to: MzOffset,
249 scope: Scope<'scope, MzOffset>,
250 config: &RawSourceCreationConfig,
251 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
252 output_map: BTreeMap<LoadGeneratorOutput, Vec<usize>>,
253) -> (
254 BTreeMap<GlobalId, StackedCollection<'scope, MzOffset, Result<SourceMessage, DataflowError>>>,
255 StreamVec<'scope, MzOffset, Infallible>,
256 StreamVec<'scope, MzOffset, HealthStatusMessage>,
257 Vec<PressOnDropButton>,
258) {
259 let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone());
260
261 let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
262 let export_ids: Vec<_> = config.source_exports.keys().copied().collect();
263 let partition_count = u64::cast_from(export_ids.len());
264 let data_streams: Vec<_> = stream.partition::<CapacityContainerBuilder<_>, _, _>(
265 partition_count,
266 |((output, data), time, diff): &(
267 (usize, Result<SourceMessage, DataflowError>),
268 MzOffset,
269 Diff,
270 )| {
271 let output = u64::cast_from(*output);
272 (output, (data.clone(), time.clone(), diff.clone()))
273 },
274 );
275 let mut data_collections = BTreeMap::new();
276 for (id, data_stream) in export_ids.iter().zip_eq(data_streams) {
277 data_collections.insert(*id, data_stream.as_collection());
278 }
279
280 let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
281 let (health_output, health_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
282
283 let busy_signal = Arc::clone(&config.busy_signal);
284 let source_resume_uppers = config.source_resume_uppers.clone();
285 let is_active_worker = config.responsible_for(());
286 let source_statistics = config.statistics.clone();
287 let button = builder.build(move |caps| {
288 SignaledFuture::new(busy_signal, async move {
289 let [mut cap, mut progress_cap, health_cap] = caps.try_into().unwrap();
290
291 let mut health_cap = Some(health_cap);
293
294 if !is_active_worker {
295 for stats in source_statistics.values() {
297 stats.set_offset_known(0);
298 stats.set_offset_committed(0);
299 }
300 return;
301 }
302
303 let resume_upper = Antichain::from_iter(
304 source_resume_uppers
305 .values()
306 .flat_map(|f| f.iter().map(MzOffset::decode_row)),
307 );
308
309 let Some(resume_offset) = resume_upper.into_option() else {
310 return;
311 };
312
313 let now_fn = mz_ore::now::SYSTEM_TIME.clone();
314
315 let start_instant = {
316 let now_millis = now_fn();
323 let now_instant = Instant::now();
324 let delay_millis = tick_micros
325 .map(|tick_micros| tick_micros / 1000)
326 .filter(|tick_millis| *tick_millis > 0)
327 .map(|tick_millis| tick_millis - now_millis.rem(tick_millis))
328 .unwrap_or(0);
329 now_instant + Duration::from_millis(delay_millis)
330 };
331 let tick = Duration::from_micros(tick_micros.unwrap_or(1_000_000));
332 let mut tick_interval = interval_at(start_instant, tick);
333
334 let mut rows = generator.by_seed(now_fn, None, resume_offset);
335
336 let mut committed_uppers = std::pin::pin!(committed_uppers);
337
338 let mut offset_committed = if resume_offset.offset == 0 {
340 Some(0)
341 } else {
342 None
343 };
344
345 while let Some((output_type, event)) = rows.next() {
346 match event {
347 Event::Message(mut offset, (value, diff)) => {
348 if offset <= as_of {
350 offset = as_of;
351 }
352
353 if offset >= up_to {
359 continue;
360 }
361
362 let _ = progress_cap.try_downgrade(&(offset + 1));
366
367 let outputs = match output_map.get(&output_type) {
368 Some(outputs) => outputs,
369 None => continue,
371 };
372
373 let message: Result<SourceMessage, DataflowError> = Ok(SourceMessage {
374 key: Row::default(),
375 value,
376 metadata: Row::default(),
377 });
378
379 if resume_offset <= offset {
383 for (&output, message) in outputs.iter().repeat_clone(message) {
384 data_output
385 .give_fueled(&cap, ((output, message), offset, diff))
386 .await;
387 }
388 }
389 }
390 Event::Progress(Some(offset)) => {
391 if resume_offset <= offset && health_cap.is_some() {
392 let health_cap = health_cap.take().expect("known to exist");
393 let export_ids = export_ids.iter().copied();
394 for id in export_ids.map(Some).chain(std::iter::once(None)) {
395 health_output.give(
396 &health_cap,
397 HealthStatusMessage {
398 id,
399 namespace: StatusNamespace::Generator,
400 update: HealthStatusUpdate::running(),
401 },
402 );
403 }
404 }
405
406 if offset >= up_to {
408 break;
409 }
410
411 if offset <= as_of {
414 continue;
415 }
416
417 cap.downgrade(&offset);
418 let _ = progress_cap.try_downgrade(&offset);
419
420 if resume_offset < offset {
426 loop {
427 tokio::select! {
428 _tick = tick_interval.tick() => {
429 break;
430 }
431 Some(frontier) = committed_uppers.next() => {
432 if let Some(offset) = frontier.as_option() {
433 offset_committed = Some(offset.offset);
436 }
437 }
438 }
439 }
440
441 if let Some(offset_committed) = offset_committed {
445 for stats in source_statistics.values() {
446 stats.set_offset_committed(offset_committed);
447 stats.set_offset_known(offset_committed);
452 }
453 }
454 }
455 }
456 Event::Progress(None) => return,
457 }
458 }
459 })
460 });
461
462 (
463 data_collections,
464 progress_stream,
465 health_stream,
466 vec![button.press_on_drop()],
467 )
468}
469
470fn synthesize_probes<'scope, T: timely::progress::Timestamp>(
476 source_id: GlobalId,
477 progress: StreamVec<'scope, T, Infallible>,
478 interval: Duration,
479 now_fn: NowFn,
480) -> StreamVec<'scope, T, Probe<T>> {
481 let scope = progress.scope();
482
483 let active_worker = usize::cast_from(source_id.hashed()) % scope.peers();
484 let is_active_worker = active_worker == scope.index();
485
486 let mut op = AsyncOperatorBuilder::new("synthesize_probes".into(), scope);
487 let (output, output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
488 let mut input = op.new_input_for(progress, Pipeline, &output);
489
490 op.build(|caps| async move {
491 if !is_active_worker {
492 return;
493 }
494
495 let [cap] = caps.try_into().expect("one capability per output");
496
497 let mut ticker = super::probe::Ticker::new(move || interval, now_fn.clone());
498
499 let minimum_frontier = Antichain::from_elem(Timestamp::minimum());
500 let mut frontier = minimum_frontier.clone();
501 loop {
502 tokio::select! {
503 event = input.next() => match event {
504 Some(AsyncEvent::Progress(progress)) => frontier = progress,
505 Some(AsyncEvent::Data(..)) => unreachable!(),
506 None => break,
507 },
508 probe_ts = ticker.tick(), if frontier != minimum_frontier => {
513 let probe = Probe {
514 probe_ts,
515 upstream_frontier: frontier.clone(),
516 };
517 output.give(&cap, probe);
518 }
519 }
520 }
521
522 let probe = Probe {
523 probe_ts: now_fn().into(),
524 upstream_frontier: Antichain::new(),
525 };
526 output.give(&cap, probe);
527 });
528
529 output_stream
530}