1use std::collections::BTreeMap;
11use std::convert::Infallible;
12use std::ops::Rem;
13use std::sync::Arc;
14use std::time::Duration;
15
16use differential_dataflow::{AsCollection, Hashable};
17use futures::StreamExt;
18use itertools::Itertools;
19use mz_ore::cast::CastFrom;
20use mz_ore::iter::IteratorExt;
21use mz_ore::now::NowFn;
22use mz_repr::{Diff, GlobalId, Row};
23use mz_storage_types::errors::DataflowError;
24use mz_storage_types::sources::load_generator::{
25 Event, Generator, KeyValueLoadGenerator, LoadGenerator, LoadGeneratorOutput,
26 LoadGeneratorSourceConnection,
27};
28use mz_storage_types::sources::{MzOffset, SourceExportDetails, SourceTimestamp};
29use mz_timely_util::builder_async::{
30 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
31};
32use mz_timely_util::containers::stack::FueledBuilder;
33use timely::container::CapacityContainerBuilder;
34use timely::dataflow::channels::pact::Pipeline;
35use timely::dataflow::operators::core::Partition;
36use timely::dataflow::{Scope, StreamVec};
37use timely::progress::{Antichain, Timestamp};
38use tokio::time::{Instant, interval_at};
39
40use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
41use crate::source::types::{FuelSize, Probe, SignaledFuture, SourceRender, StackedCollection};
42use crate::source::{RawSourceCreationConfig, SourceMessage};
43
44mod auction;
45mod clock;
46mod counter;
47mod datums;
48mod key_value;
49mod marketing;
50mod tpch;
51
52pub use auction::Auction;
53pub use clock::Clock;
54pub use counter::Counter;
55pub use datums::Datums;
56pub use tpch::Tpch;
57
58use self::marketing::Marketing;
59
60enum GeneratorKind {
61 Simple {
62 generator: Box<dyn Generator>,
63 tick_micros: Option<u64>,
64 as_of: u64,
65 up_to: u64,
66 },
67 KeyValue(KeyValueLoadGenerator),
68}
69
70impl GeneratorKind {
71 fn new(g: &LoadGenerator, tick_micros: Option<u64>, as_of: u64, up_to: u64) -> Self {
72 match g {
73 LoadGenerator::Auction => GeneratorKind::Simple {
74 generator: Box::new(Auction {}),
75 tick_micros,
76 as_of,
77 up_to,
78 },
79 LoadGenerator::Clock => GeneratorKind::Simple {
80 generator: Box::new(Clock {
81 tick_ms: tick_micros
82 .map(Duration::from_micros)
83 .unwrap_or(Duration::from_secs(1))
84 .as_millis()
85 .try_into()
86 .expect("reasonable tick interval"),
87 as_of_ms: as_of,
88 }),
89 tick_micros,
90 as_of,
91 up_to,
92 },
93 LoadGenerator::Counter { max_cardinality } => GeneratorKind::Simple {
94 generator: Box::new(Counter {
95 max_cardinality: max_cardinality.clone(),
96 }),
97 tick_micros,
98 as_of,
99 up_to,
100 },
101 LoadGenerator::Datums => GeneratorKind::Simple {
102 generator: Box::new(Datums {}),
103 tick_micros,
104 as_of,
105 up_to,
106 },
107 LoadGenerator::Marketing => GeneratorKind::Simple {
108 generator: Box::new(Marketing {}),
109 tick_micros,
110 as_of,
111 up_to,
112 },
113 LoadGenerator::Tpch {
114 count_supplier,
115 count_part,
116 count_customer,
117 count_orders,
118 count_clerk,
119 } => GeneratorKind::Simple {
120 generator: Box::new(Tpch {
121 count_supplier: *count_supplier,
122 count_part: *count_part,
123 count_customer: *count_customer,
124 count_orders: *count_orders,
125 count_clerk: *count_clerk,
126 tick: Duration::from_micros(tick_micros.unwrap_or(0)),
129 }),
130 tick_micros,
131 as_of,
132 up_to,
133 },
134 LoadGenerator::KeyValue(kv) => GeneratorKind::KeyValue(kv.clone()),
135 }
136 }
137
138 fn render<'scope>(
139 self,
140 scope: Scope<'scope, MzOffset>,
141 config: &RawSourceCreationConfig,
142 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
143 start_signal: impl std::future::Future<Output = ()> + 'static,
144 ) -> (
145 BTreeMap<
146 GlobalId,
147 StackedCollection<'scope, MzOffset, Result<SourceMessage, DataflowError>>,
148 >,
149 StreamVec<'scope, MzOffset, Infallible>,
150 StreamVec<'scope, MzOffset, HealthStatusMessage>,
151 Vec<PressOnDropButton>,
152 ) {
153 let mut output_map = BTreeMap::new();
155 let mut idx_to_exportid = BTreeMap::new();
157 output_map.insert(LoadGeneratorOutput::Default, Vec::new());
161 for (idx, (export_id, export)) in config.source_exports.iter().enumerate() {
162 let output_type = match &export.details {
163 SourceExportDetails::LoadGenerator(details) => details.output,
164 SourceExportDetails::None => continue,
166 _ => panic!("unexpected source export details: {:?}", export.details),
167 };
168 output_map
169 .entry(output_type)
170 .or_insert_with(Vec::new)
171 .push(idx);
172 idx_to_exportid.insert(idx, export_id.clone());
173 }
174
175 match self {
176 GeneratorKind::Simple {
177 tick_micros,
178 as_of,
179 up_to,
180 generator,
181 } => render_simple_generator(
182 generator,
183 tick_micros,
184 as_of.into(),
185 up_to.into(),
186 scope,
187 config,
188 committed_uppers,
189 output_map,
190 ),
191 GeneratorKind::KeyValue(kv) => key_value::render(
192 kv,
193 scope,
194 config.clone(),
195 committed_uppers,
196 start_signal,
197 output_map,
198 idx_to_exportid,
199 ),
200 }
201 }
202}
203
204impl SourceRender for LoadGeneratorSourceConnection {
205 type Time = MzOffset;
206
207 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Generator;
208
209 fn render<'scope>(
210 self,
211 scope: Scope<'scope, MzOffset>,
212 config: &RawSourceCreationConfig,
213 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
214 start_signal: impl std::future::Future<Output = ()> + 'static,
215 ) -> (
216 BTreeMap<
217 GlobalId,
218 StackedCollection<'scope, MzOffset, Result<SourceMessage, DataflowError>>,
219 >,
220 StreamVec<'scope, MzOffset, HealthStatusMessage>,
221 StreamVec<'scope, MzOffset, Probe<MzOffset>>,
222 Vec<PressOnDropButton>,
223 ) {
224 let generator_kind = GeneratorKind::new(
225 &self.load_generator,
226 self.tick_micros,
227 self.as_of,
228 self.up_to,
229 );
230 let (updates, progress, health, button) =
231 generator_kind.render(scope, config, committed_uppers, start_signal);
232
233 let probe_stream = synthesize_probes(
234 config.id,
235 progress,
236 config.timestamp_interval,
237 config.now_fn.clone(),
238 );
239
240 (updates, health, probe_stream, button)
241 }
242}
243
244fn render_simple_generator<'scope>(
245 generator: Box<dyn Generator>,
246 tick_micros: Option<u64>,
247 as_of: MzOffset,
248 up_to: MzOffset,
249 scope: Scope<'scope, MzOffset>,
250 config: &RawSourceCreationConfig,
251 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
252 output_map: BTreeMap<LoadGeneratorOutput, Vec<usize>>,
253) -> (
254 BTreeMap<GlobalId, StackedCollection<'scope, MzOffset, Result<SourceMessage, DataflowError>>>,
255 StreamVec<'scope, MzOffset, Infallible>,
256 StreamVec<'scope, MzOffset, HealthStatusMessage>,
257 Vec<PressOnDropButton>,
258) {
259 let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone());
260
261 let (data_output, stream) = builder.new_output::<FueledBuilder<
262 CapacityContainerBuilder<
263 Vec<(
264 (usize, Result<SourceMessage, DataflowError>),
265 MzOffset,
266 Diff,
267 )>,
268 >,
269 >>();
270 let export_ids: Vec<_> = config.source_exports.keys().copied().collect();
271 let partition_count = u64::cast_from(export_ids.len());
272 let data_streams: Vec<_> = stream.partition::<CapacityContainerBuilder<_>, _, _>(
273 partition_count,
274 |((output, data), time, diff): (
275 (usize, Result<SourceMessage, DataflowError>),
276 MzOffset,
277 Diff,
278 )| {
279 let output = u64::cast_from(output);
280 (output, (data, time, diff))
281 },
282 );
283 let mut data_collections = BTreeMap::new();
284 for (id, data_stream) in export_ids.iter().zip_eq(data_streams) {
285 data_collections.insert(*id, data_stream.as_collection());
286 }
287
288 let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
289 let (health_output, health_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
290
291 let busy_signal = Arc::clone(&config.busy_signal);
292 let source_resume_uppers = config.source_resume_uppers.clone();
293 let is_active_worker = config.responsible_for(());
294 let source_statistics = config.statistics.clone();
295 let button = builder.build(move |caps| {
296 SignaledFuture::new(busy_signal, async move {
297 let [mut cap, mut progress_cap, health_cap] = caps.try_into().unwrap();
298
299 let mut health_cap = Some(health_cap);
301
302 if !is_active_worker {
303 for stats in source_statistics.values() {
305 stats.set_offset_known(0);
306 stats.set_offset_committed(0);
307 }
308 return;
309 }
310
311 let resume_upper = Antichain::from_iter(
312 source_resume_uppers
313 .values()
314 .flat_map(|f| f.iter().map(MzOffset::decode_row)),
315 );
316
317 let Some(resume_offset) = resume_upper.into_option() else {
318 return;
319 };
320
321 let now_fn = mz_ore::now::SYSTEM_TIME.clone();
322
323 let start_instant = {
324 let now_millis = now_fn();
331 let now_instant = Instant::now();
332 let delay_millis = tick_micros
333 .map(|tick_micros| tick_micros / 1000)
334 .filter(|tick_millis| *tick_millis > 0)
335 .map(|tick_millis| tick_millis - now_millis.rem(tick_millis))
336 .unwrap_or(0);
337 now_instant + Duration::from_millis(delay_millis)
338 };
339 let tick = Duration::from_micros(tick_micros.unwrap_or(1_000_000));
340 let mut tick_interval = interval_at(start_instant, tick);
341
342 let mut rows = generator.by_seed(now_fn, None, resume_offset);
343
344 let mut committed_uppers = std::pin::pin!(committed_uppers);
345
346 let mut offset_committed = if resume_offset.offset == 0 {
348 Some(0)
349 } else {
350 None
351 };
352
353 while let Some((output_type, event)) = rows.next() {
354 match event {
355 Event::Message(mut offset, (value, diff)) => {
356 if offset <= as_of {
358 offset = as_of;
359 }
360
361 if offset >= up_to {
367 continue;
368 }
369
370 let _ = progress_cap.try_downgrade(&(offset + 1));
374
375 let outputs = match output_map.get(&output_type) {
376 Some(outputs) => outputs,
377 None => continue,
379 };
380
381 let message: Result<SourceMessage, DataflowError> = Ok(SourceMessage {
382 key: Row::default(),
383 value,
384 metadata: Row::default(),
385 });
386
387 if resume_offset <= offset {
391 for (&output, message) in outputs.iter().repeat_clone(message) {
392 let update = ((output, message), offset, diff);
393 let size = update.fuel_size();
394 data_output.give_fueled(&cap, update, size).await;
395 }
396 }
397 }
398 Event::Progress(Some(offset)) => {
399 if resume_offset <= offset && health_cap.is_some() {
400 let health_cap = health_cap.take().expect("known to exist");
401 let export_ids = export_ids.iter().copied();
402 for id in export_ids.map(Some).chain(std::iter::once(None)) {
403 health_output.give(
404 &health_cap,
405 HealthStatusMessage {
406 id,
407 namespace: StatusNamespace::Generator,
408 update: HealthStatusUpdate::running(),
409 },
410 );
411 }
412 }
413
414 if offset >= up_to {
416 break;
417 }
418
419 if offset <= as_of {
422 continue;
423 }
424
425 cap.downgrade(&offset);
426 let _ = progress_cap.try_downgrade(&offset);
427
428 if resume_offset < offset {
434 loop {
435 tokio::select! {
436 _tick = tick_interval.tick() => {
437 break;
438 }
439 Some(frontier) = committed_uppers.next() => {
440 if let Some(offset) = frontier.as_option() {
441 offset_committed = Some(offset.offset);
444 }
445 }
446 }
447 }
448
449 if let Some(offset_committed) = offset_committed {
453 for stats in source_statistics.values() {
454 stats.set_offset_committed(offset_committed);
455 stats.set_offset_known(offset_committed);
460 }
461 }
462 }
463 }
464 Event::Progress(None) => return,
465 }
466 }
467 })
468 });
469
470 (
471 data_collections,
472 progress_stream,
473 health_stream,
474 vec![button.press_on_drop()],
475 )
476}
477
478fn synthesize_probes<'scope, T: timely::progress::Timestamp>(
484 source_id: GlobalId,
485 progress: StreamVec<'scope, T, Infallible>,
486 interval: Duration,
487 now_fn: NowFn,
488) -> StreamVec<'scope, T, Probe<T>> {
489 let scope = progress.scope();
490
491 let active_worker = usize::cast_from(source_id.hashed()) % scope.peers();
492 let is_active_worker = active_worker == scope.index();
493
494 let mut op = AsyncOperatorBuilder::new("synthesize_probes".into(), scope);
495 let (output, output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
496 let mut input = op.new_input_for(progress, Pipeline, &output);
497
498 op.build(|caps| async move {
499 if !is_active_worker {
500 return;
501 }
502
503 let [cap] = caps.try_into().expect("one capability per output");
504
505 let mut ticker = super::probe::Ticker::new(move || interval, now_fn.clone());
506
507 let minimum_frontier = Antichain::from_elem(Timestamp::minimum());
508 let mut frontier = minimum_frontier.clone();
509 loop {
510 tokio::select! {
511 event = input.next() => match event {
512 Some(AsyncEvent::Progress(progress)) => frontier = progress,
513 Some(AsyncEvent::Data(..)) => unreachable!(),
514 None => break,
515 },
516 probe_ts = ticker.tick(), if frontier != minimum_frontier => {
521 let probe = Probe {
522 probe_ts,
523 upstream_frontier: frontier.clone(),
524 };
525 output.give(&cap, probe);
526 }
527 }
528 }
529
530 let probe = Probe {
531 probe_ts: now_fn().into(),
532 upstream_frontier: Antichain::new(),
533 };
534 output.give(&cap, probe);
535 });
536
537 output_stream
538}