1use std::collections::BTreeMap;
11use std::convert::Infallible;
12use std::ops::Rem;
13use std::sync::Arc;
14use std::time::Duration;
15
16use differential_dataflow::{AsCollection, Hashable};
17use futures::StreamExt;
18use itertools::Itertools;
19use mz_ore::cast::CastFrom;
20use mz_ore::iter::IteratorExt;
21use mz_ore::now::NowFn;
22use mz_repr::{Diff, GlobalId, Row};
23use mz_storage_types::errors::DataflowError;
24use mz_storage_types::sources::load_generator::{
25 Event, Generator, KeyValueLoadGenerator, LoadGenerator, LoadGeneratorOutput,
26 LoadGeneratorSourceConnection,
27};
28use mz_storage_types::sources::{MzOffset, SourceExportDetails, SourceTimestamp};
29use mz_timely_util::builder_async::{
30 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
31};
32use mz_timely_util::containers::stack::AccountedStackBuilder;
33use timely::container::CapacityContainerBuilder;
34use timely::dataflow::channels::pact::Pipeline;
35use timely::dataflow::operators::core::Partition;
36use timely::dataflow::{Scope, StreamVec};
37use timely::progress::{Antichain, Timestamp};
38use tokio::time::{Instant, interval_at};
39
40use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
41use crate::source::types::{Probe, SignaledFuture, SourceRender, StackedCollection};
42use crate::source::{RawSourceCreationConfig, SourceMessage};
43
44mod auction;
45mod clock;
46mod counter;
47mod datums;
48mod key_value;
49mod marketing;
50mod tpch;
51
52pub use auction::Auction;
53pub use clock::Clock;
54pub use counter::Counter;
55pub use datums::Datums;
56pub use tpch::Tpch;
57
58use self::marketing::Marketing;
59
60enum GeneratorKind {
61 Simple {
62 generator: Box<dyn Generator>,
63 tick_micros: Option<u64>,
64 as_of: u64,
65 up_to: u64,
66 },
67 KeyValue(KeyValueLoadGenerator),
68}
69
70impl GeneratorKind {
71 fn new(g: &LoadGenerator, tick_micros: Option<u64>, as_of: u64, up_to: u64) -> Self {
72 match g {
73 LoadGenerator::Auction => GeneratorKind::Simple {
74 generator: Box::new(Auction {}),
75 tick_micros,
76 as_of,
77 up_to,
78 },
79 LoadGenerator::Clock => GeneratorKind::Simple {
80 generator: Box::new(Clock {
81 tick_ms: tick_micros
82 .map(Duration::from_micros)
83 .unwrap_or(Duration::from_secs(1))
84 .as_millis()
85 .try_into()
86 .expect("reasonable tick interval"),
87 as_of_ms: as_of,
88 }),
89 tick_micros,
90 as_of,
91 up_to,
92 },
93 LoadGenerator::Counter { max_cardinality } => GeneratorKind::Simple {
94 generator: Box::new(Counter {
95 max_cardinality: max_cardinality.clone(),
96 }),
97 tick_micros,
98 as_of,
99 up_to,
100 },
101 LoadGenerator::Datums => GeneratorKind::Simple {
102 generator: Box::new(Datums {}),
103 tick_micros,
104 as_of,
105 up_to,
106 },
107 LoadGenerator::Marketing => GeneratorKind::Simple {
108 generator: Box::new(Marketing {}),
109 tick_micros,
110 as_of,
111 up_to,
112 },
113 LoadGenerator::Tpch {
114 count_supplier,
115 count_part,
116 count_customer,
117 count_orders,
118 count_clerk,
119 } => GeneratorKind::Simple {
120 generator: Box::new(Tpch {
121 count_supplier: *count_supplier,
122 count_part: *count_part,
123 count_customer: *count_customer,
124 count_orders: *count_orders,
125 count_clerk: *count_clerk,
126 tick: Duration::from_micros(tick_micros.unwrap_or(0)),
129 }),
130 tick_micros,
131 as_of,
132 up_to,
133 },
134 LoadGenerator::KeyValue(kv) => GeneratorKind::KeyValue(kv.clone()),
135 }
136 }
137
138 fn render<G: Scope<Timestamp = MzOffset>>(
139 self,
140 scope: &mut G,
141 config: &RawSourceCreationConfig,
142 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
143 start_signal: impl std::future::Future<Output = ()> + 'static,
144 ) -> (
145 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
146 StreamVec<G, Infallible>,
147 StreamVec<G, HealthStatusMessage>,
148 Vec<PressOnDropButton>,
149 ) {
150 let mut output_map = BTreeMap::new();
152 let mut idx_to_exportid = BTreeMap::new();
154 output_map.insert(LoadGeneratorOutput::Default, Vec::new());
158 for (idx, (export_id, export)) in config.source_exports.iter().enumerate() {
159 let output_type = match &export.details {
160 SourceExportDetails::LoadGenerator(details) => details.output,
161 SourceExportDetails::None => continue,
163 _ => panic!("unexpected source export details: {:?}", export.details),
164 };
165 output_map
166 .entry(output_type)
167 .or_insert_with(Vec::new)
168 .push(idx);
169 idx_to_exportid.insert(idx, export_id.clone());
170 }
171
172 match self {
173 GeneratorKind::Simple {
174 tick_micros,
175 as_of,
176 up_to,
177 generator,
178 } => render_simple_generator(
179 generator,
180 tick_micros,
181 as_of.into(),
182 up_to.into(),
183 scope,
184 config,
185 committed_uppers,
186 output_map,
187 ),
188 GeneratorKind::KeyValue(kv) => key_value::render(
189 kv,
190 scope,
191 config.clone(),
192 committed_uppers,
193 start_signal,
194 output_map,
195 idx_to_exportid,
196 ),
197 }
198 }
199}
200
201impl SourceRender for LoadGeneratorSourceConnection {
202 type Time = MzOffset;
203
204 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Generator;
205
206 fn render<G: Scope<Timestamp = MzOffset>>(
207 self,
208 scope: &mut G,
209 config: &RawSourceCreationConfig,
210 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
211 start_signal: impl std::future::Future<Output = ()> + 'static,
212 ) -> (
213 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
214 StreamVec<G, HealthStatusMessage>,
215 StreamVec<G, Probe<MzOffset>>,
216 Vec<PressOnDropButton>,
217 ) {
218 let generator_kind = GeneratorKind::new(
219 &self.load_generator,
220 self.tick_micros,
221 self.as_of,
222 self.up_to,
223 );
224 let (updates, progress, health, button) =
225 generator_kind.render(scope, config, committed_uppers, start_signal);
226
227 let probe_stream = synthesize_probes(
228 config.id,
229 progress,
230 config.timestamp_interval,
231 config.now_fn.clone(),
232 );
233
234 (updates, health, probe_stream, button)
235 }
236}
237
238fn render_simple_generator<G: Scope<Timestamp = MzOffset>>(
239 generator: Box<dyn Generator>,
240 tick_micros: Option<u64>,
241 as_of: MzOffset,
242 up_to: MzOffset,
243 scope: &G,
244 config: &RawSourceCreationConfig,
245 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
246 output_map: BTreeMap<LoadGeneratorOutput, Vec<usize>>,
247) -> (
248 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
249 StreamVec<G, Infallible>,
250 StreamVec<G, HealthStatusMessage>,
251 Vec<PressOnDropButton>,
252) {
253 let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone());
254
255 let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
256 let export_ids: Vec<_> = config.source_exports.keys().copied().collect();
257 let partition_count = u64::cast_from(export_ids.len());
258 let data_streams: Vec<_> = stream.partition::<CapacityContainerBuilder<_>, _, _>(
259 partition_count,
260 |((output, data), time, diff): &(
261 (usize, Result<SourceMessage, DataflowError>),
262 MzOffset,
263 Diff,
264 )| {
265 let output = u64::cast_from(*output);
266 (output, (data.clone(), time.clone(), diff.clone()))
267 },
268 );
269 let mut data_collections = BTreeMap::new();
270 for (id, data_stream) in export_ids.iter().zip_eq(data_streams) {
271 data_collections.insert(*id, data_stream.as_collection());
272 }
273
274 let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
275 let (health_output, health_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
276
277 let busy_signal = Arc::clone(&config.busy_signal);
278 let source_resume_uppers = config.source_resume_uppers.clone();
279 let is_active_worker = config.responsible_for(());
280 let source_statistics = config.statistics.clone();
281 let button = builder.build(move |caps| {
282 SignaledFuture::new(busy_signal, async move {
283 let [mut cap, mut progress_cap, health_cap] = caps.try_into().unwrap();
284
285 let mut health_cap = Some(health_cap);
287
288 if !is_active_worker {
289 for stats in source_statistics.values() {
291 stats.set_offset_known(0);
292 stats.set_offset_committed(0);
293 }
294 return;
295 }
296
297 let resume_upper = Antichain::from_iter(
298 source_resume_uppers
299 .values()
300 .flat_map(|f| f.iter().map(MzOffset::decode_row)),
301 );
302
303 let Some(resume_offset) = resume_upper.into_option() else {
304 return;
305 };
306
307 let now_fn = mz_ore::now::SYSTEM_TIME.clone();
308
309 let start_instant = {
310 let now_millis = now_fn();
317 let now_instant = Instant::now();
318 let delay_millis = tick_micros
319 .map(|tick_micros| tick_micros / 1000)
320 .filter(|tick_millis| *tick_millis > 0)
321 .map(|tick_millis| tick_millis - now_millis.rem(tick_millis))
322 .unwrap_or(0);
323 now_instant + Duration::from_millis(delay_millis)
324 };
325 let tick = Duration::from_micros(tick_micros.unwrap_or(1_000_000));
326 let mut tick_interval = interval_at(start_instant, tick);
327
328 let mut rows = generator.by_seed(now_fn, None, resume_offset);
329
330 let mut committed_uppers = std::pin::pin!(committed_uppers);
331
332 let mut offset_committed = if resume_offset.offset == 0 {
334 Some(0)
335 } else {
336 None
337 };
338
339 while let Some((output_type, event)) = rows.next() {
340 match event {
341 Event::Message(mut offset, (value, diff)) => {
342 if offset <= as_of {
344 offset = as_of;
345 }
346
347 if offset >= up_to {
353 continue;
354 }
355
356 let _ = progress_cap.try_downgrade(&(offset + 1));
360
361 let outputs = match output_map.get(&output_type) {
362 Some(outputs) => outputs,
363 None => continue,
365 };
366
367 let message: Result<SourceMessage, DataflowError> = Ok(SourceMessage {
368 key: Row::default(),
369 value,
370 metadata: Row::default(),
371 });
372
373 if resume_offset <= offset {
377 for (&output, message) in outputs.iter().repeat_clone(message) {
378 data_output
379 .give_fueled(&cap, ((output, message), offset, diff))
380 .await;
381 }
382 }
383 }
384 Event::Progress(Some(offset)) => {
385 if resume_offset <= offset && health_cap.is_some() {
386 let health_cap = health_cap.take().expect("known to exist");
387 let export_ids = export_ids.iter().copied();
388 for id in export_ids.map(Some).chain(std::iter::once(None)) {
389 health_output.give(
390 &health_cap,
391 HealthStatusMessage {
392 id,
393 namespace: StatusNamespace::Generator,
394 update: HealthStatusUpdate::running(),
395 },
396 );
397 }
398 }
399
400 if offset >= up_to {
402 break;
403 }
404
405 if offset <= as_of {
408 continue;
409 }
410
411 cap.downgrade(&offset);
412 let _ = progress_cap.try_downgrade(&offset);
413
414 if resume_offset < offset {
420 loop {
421 tokio::select! {
422 _tick = tick_interval.tick() => {
423 break;
424 }
425 Some(frontier) = committed_uppers.next() => {
426 if let Some(offset) = frontier.as_option() {
427 offset_committed = Some(offset.offset);
430 }
431 }
432 }
433 }
434
435 if let Some(offset_committed) = offset_committed {
439 for stats in source_statistics.values() {
440 stats.set_offset_committed(offset_committed);
441 stats.set_offset_known(offset_committed);
446 }
447 }
448 }
449 }
450 Event::Progress(None) => return,
451 }
452 }
453 })
454 });
455
456 (
457 data_collections,
458 progress_stream,
459 health_stream,
460 vec![button.press_on_drop()],
461 )
462}
463
464fn synthesize_probes<G>(
470 source_id: GlobalId,
471 progress: StreamVec<G, Infallible>,
472 interval: Duration,
473 now_fn: NowFn,
474) -> StreamVec<G, Probe<G::Timestamp>>
475where
476 G: Scope,
477{
478 let scope = progress.scope();
479
480 let active_worker = usize::cast_from(source_id.hashed()) % scope.peers();
481 let is_active_worker = active_worker == scope.index();
482
483 let mut op = AsyncOperatorBuilder::new("synthesize_probes".into(), scope);
484 let (output, output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
485 let mut input = op.new_input_for(progress, Pipeline, &output);
486
487 op.build(|caps| async move {
488 if !is_active_worker {
489 return;
490 }
491
492 let [cap] = caps.try_into().expect("one capability per output");
493
494 let mut ticker = super::probe::Ticker::new(move || interval, now_fn.clone());
495
496 let minimum_frontier = Antichain::from_elem(Timestamp::minimum());
497 let mut frontier = minimum_frontier.clone();
498 loop {
499 tokio::select! {
500 event = input.next() => match event {
501 Some(AsyncEvent::Progress(progress)) => frontier = progress,
502 Some(AsyncEvent::Data(..)) => unreachable!(),
503 None => break,
504 },
505 probe_ts = ticker.tick(), if frontier != minimum_frontier => {
510 let probe = Probe {
511 probe_ts,
512 upstream_frontier: frontier.clone(),
513 };
514 output.give(&cap, probe);
515 }
516 }
517 }
518
519 let probe = Probe {
520 probe_ts: now_fn().into(),
521 upstream_frontier: Antichain::new(),
522 };
523 output.give(&cap, probe);
524 });
525
526 output_stream
527}