1use std::collections::BTreeMap;
11use std::convert::Infallible;
12use std::ops::Rem;
13use std::sync::Arc;
14use std::time::Duration;
15
16use differential_dataflow::AsCollection;
17use futures::StreamExt;
18use itertools::Itertools;
19use mz_ore::cast::CastFrom;
20use mz_ore::iter::IteratorExt;
21use mz_repr::{Diff, GlobalId, Row};
22use mz_storage_types::errors::DataflowError;
23use mz_storage_types::sources::load_generator::{
24 Event, Generator, KeyValueLoadGenerator, LoadGenerator, LoadGeneratorOutput,
25 LoadGeneratorSourceConnection,
26};
27use mz_storage_types::sources::{MzOffset, SourceExportDetails, SourceTimestamp};
28use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
29use mz_timely_util::containers::stack::AccountedStackBuilder;
30use timely::container::CapacityContainerBuilder;
31use timely::dataflow::operators::core::Partition;
32use timely::dataflow::{Scope, Stream};
33use timely::progress::Antichain;
34use tokio::time::{Instant, interval_at};
35
36use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
37use crate::source::types::{
38 Probe, ProgressStatisticsUpdate, SignaledFuture, SourceRender, StackedCollection,
39};
40use crate::source::{RawSourceCreationConfig, SourceMessage};
41
42mod auction;
43mod clock;
44mod counter;
45mod datums;
46mod key_value;
47mod marketing;
48mod tpch;
49
50pub use auction::Auction;
51pub use clock::Clock;
52pub use counter::Counter;
53pub use datums::Datums;
54pub use tpch::Tpch;
55
56use self::marketing::Marketing;
57
58enum GeneratorKind {
59 Simple {
60 generator: Box<dyn Generator>,
61 tick_micros: Option<u64>,
62 as_of: u64,
63 up_to: u64,
64 },
65 KeyValue(KeyValueLoadGenerator),
66}
67
68impl GeneratorKind {
69 fn new(g: &LoadGenerator, tick_micros: Option<u64>, as_of: u64, up_to: u64) -> Self {
70 match g {
71 LoadGenerator::Auction => GeneratorKind::Simple {
72 generator: Box::new(Auction {}),
73 tick_micros,
74 as_of,
75 up_to,
76 },
77 LoadGenerator::Clock => GeneratorKind::Simple {
78 generator: Box::new(Clock {
79 tick_ms: tick_micros
80 .map(Duration::from_micros)
81 .unwrap_or(Duration::from_secs(1))
82 .as_millis()
83 .try_into()
84 .expect("reasonable tick interval"),
85 as_of_ms: as_of,
86 }),
87 tick_micros,
88 as_of,
89 up_to,
90 },
91 LoadGenerator::Counter { max_cardinality } => GeneratorKind::Simple {
92 generator: Box::new(Counter {
93 max_cardinality: max_cardinality.clone(),
94 }),
95 tick_micros,
96 as_of,
97 up_to,
98 },
99 LoadGenerator::Datums => GeneratorKind::Simple {
100 generator: Box::new(Datums {}),
101 tick_micros,
102 as_of,
103 up_to,
104 },
105 LoadGenerator::Marketing => GeneratorKind::Simple {
106 generator: Box::new(Marketing {}),
107 tick_micros,
108 as_of,
109 up_to,
110 },
111 LoadGenerator::Tpch {
112 count_supplier,
113 count_part,
114 count_customer,
115 count_orders,
116 count_clerk,
117 } => GeneratorKind::Simple {
118 generator: Box::new(Tpch {
119 count_supplier: *count_supplier,
120 count_part: *count_part,
121 count_customer: *count_customer,
122 count_orders: *count_orders,
123 count_clerk: *count_clerk,
124 tick: Duration::from_micros(tick_micros.unwrap_or(0)),
127 }),
128 tick_micros,
129 as_of,
130 up_to,
131 },
132 LoadGenerator::KeyValue(kv) => GeneratorKind::KeyValue(kv.clone()),
133 }
134 }
135
136 fn render<G: Scope<Timestamp = MzOffset>>(
137 self,
138 scope: &mut G,
139 config: &RawSourceCreationConfig,
140 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
141 start_signal: impl std::future::Future<Output = ()> + 'static,
142 ) -> (
143 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
144 Stream<G, Infallible>,
145 Stream<G, HealthStatusMessage>,
146 Stream<G, ProgressStatisticsUpdate>,
147 Vec<PressOnDropButton>,
148 ) {
149 let mut output_map = BTreeMap::new();
151 output_map.insert(LoadGeneratorOutput::Default, Vec::new());
155 for (idx, (_, export)) in config.source_exports.iter().enumerate() {
156 let output_type = match &export.details {
157 SourceExportDetails::LoadGenerator(details) => details.output,
158 SourceExportDetails::None => continue,
160 _ => panic!("unexpected source export details: {:?}", export.details),
161 };
162 output_map
163 .entry(output_type)
164 .or_insert_with(Vec::new)
165 .push(idx);
166 }
167
168 match self {
169 GeneratorKind::Simple {
170 tick_micros,
171 as_of,
172 up_to,
173 generator,
174 } => render_simple_generator(
175 generator,
176 tick_micros,
177 as_of.into(),
178 up_to.into(),
179 scope,
180 config,
181 committed_uppers,
182 output_map,
183 ),
184 GeneratorKind::KeyValue(kv) => key_value::render(
185 kv,
186 scope,
187 config.clone(),
188 committed_uppers,
189 start_signal,
190 output_map,
191 ),
192 }
193 }
194}
195
196impl SourceRender for LoadGeneratorSourceConnection {
197 type Time = MzOffset;
198
199 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Generator;
200
201 fn render<G: Scope<Timestamp = MzOffset>>(
202 self,
203 scope: &mut G,
204 config: &RawSourceCreationConfig,
205 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
206 start_signal: impl std::future::Future<Output = ()> + 'static,
207 ) -> (
208 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
209 Stream<G, Infallible>,
210 Stream<G, HealthStatusMessage>,
211 Stream<G, ProgressStatisticsUpdate>,
212 Option<Stream<G, Probe<MzOffset>>>,
213 Vec<PressOnDropButton>,
214 ) {
215 let generator_kind = GeneratorKind::new(
216 &self.load_generator,
217 self.tick_micros,
218 self.as_of,
219 self.up_to,
220 );
221 let (updates, uppers, health, stats, button) =
222 generator_kind.render(scope, config, committed_uppers, start_signal);
223
224 (updates, uppers, health, stats, None, button)
225 }
226}
227
228fn render_simple_generator<G: Scope<Timestamp = MzOffset>>(
229 generator: Box<dyn Generator>,
230 tick_micros: Option<u64>,
231 as_of: MzOffset,
232 up_to: MzOffset,
233 scope: &G,
234 config: &RawSourceCreationConfig,
235 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
236 output_map: BTreeMap<LoadGeneratorOutput, Vec<usize>>,
237) -> (
238 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
239 Stream<G, Infallible>,
240 Stream<G, HealthStatusMessage>,
241 Stream<G, ProgressStatisticsUpdate>,
242 Vec<PressOnDropButton>,
243) {
244 let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone());
245
246 let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
247 let partition_count = u64::cast_from(config.source_exports.len());
248 let data_streams: Vec<_> = stream.partition::<CapacityContainerBuilder<_>, _, _>(
249 partition_count,
250 |((output, data), time, diff): &(
251 (usize, Result<SourceMessage, DataflowError>),
252 MzOffset,
253 Diff,
254 )| {
255 let output = u64::cast_from(*output);
256 (output, (data.clone(), time.clone(), diff.clone()))
257 },
258 );
259 let mut data_collections = BTreeMap::new();
260 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
261 data_collections.insert(*id, data_stream.as_collection());
262 }
263
264 let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
265 let (health_output, health_stream) = builder.new_output();
266 let (stats_output, stats_stream) = builder.new_output();
267
268 let busy_signal = Arc::clone(&config.busy_signal);
269 let source_resume_uppers = config.source_resume_uppers.clone();
270 let is_active_worker = config.responsible_for(());
271 let button = builder.build(move |caps| {
272 SignaledFuture::new(busy_signal, async move {
273 let [mut cap, mut progress_cap, health_cap, stats_cap] = caps.try_into().unwrap();
274
275 let mut health_cap = Some(health_cap);
277
278 if !is_active_worker {
279 stats_output.give(
281 &stats_cap,
282 ProgressStatisticsUpdate::SteadyState {
283 offset_known: 0,
284 offset_committed: 0,
285 },
286 );
287 return;
288 }
289
290 let resume_upper = Antichain::from_iter(
291 source_resume_uppers
292 .values()
293 .flat_map(|f| f.iter().map(MzOffset::decode_row)),
294 );
295
296 let Some(resume_offset) = resume_upper.into_option() else {
297 return;
298 };
299
300 let now_fn = mz_ore::now::SYSTEM_TIME.clone();
301
302 let start_instant = {
303 let now_millis = now_fn();
310 let now_instant = Instant::now();
311 let delay_millis = tick_micros
312 .map(|tick_micros| tick_micros / 1000)
313 .filter(|tick_millis| *tick_millis > 0)
314 .map(|tick_millis| tick_millis - now_millis.rem(tick_millis))
315 .unwrap_or(0);
316 now_instant + Duration::from_millis(delay_millis)
317 };
318 let tick = Duration::from_micros(tick_micros.unwrap_or(1_000_000));
319 let mut tick_interval = interval_at(start_instant, tick);
320
321 let mut rows = generator.by_seed(now_fn, None, resume_offset);
322
323 let mut committed_uppers = std::pin::pin!(committed_uppers);
324
325 let mut offset_committed = if resume_offset.offset == 0 {
327 Some(0)
328 } else {
329 None
330 };
331
332 while let Some((output_type, event)) = rows.next() {
333 match event {
334 Event::Message(mut offset, (value, diff)) => {
335 if offset <= as_of {
337 offset = as_of;
338 }
339
340 if offset >= up_to {
346 continue;
347 }
348
349 let _ = progress_cap.try_downgrade(&(offset + 1));
353
354 let outputs = match output_map.get(&output_type) {
355 Some(outputs) => outputs,
356 None => continue,
358 };
359
360 let message: Result<SourceMessage, DataflowError> = Ok(SourceMessage {
361 key: Row::default(),
362 value,
363 metadata: Row::default(),
364 });
365
366 if resume_offset <= offset {
370 for (&output, message) in outputs.iter().repeat_clone(message) {
371 data_output
372 .give_fueled(&cap, ((output, message), offset, diff))
373 .await;
374 }
375 }
376 }
377 Event::Progress(Some(offset)) => {
378 if resume_offset <= offset && health_cap.is_some() {
379 let health_cap = health_cap.take().expect("known to exist");
380 health_output.give(
381 &health_cap,
382 HealthStatusMessage {
383 id: None,
384 namespace: StatusNamespace::Generator,
385 update: HealthStatusUpdate::running(),
386 },
387 );
388 }
389
390 if offset >= up_to {
392 break;
393 }
394
395 if offset <= as_of {
398 continue;
399 }
400
401 cap.downgrade(&offset);
402 let _ = progress_cap.try_downgrade(&offset);
403
404 if resume_offset < offset {
410 loop {
411 tokio::select! {
412 _tick = tick_interval.tick() => {
413 break;
414 }
415 Some(frontier) = committed_uppers.next() => {
416 if let Some(offset) = frontier.as_option() {
417 offset_committed = Some(offset.offset);
420 }
421 }
422 }
423 }
424
425 if let Some(offset_committed) = offset_committed {
429 stats_output.give(
430 &stats_cap,
431 ProgressStatisticsUpdate::SteadyState {
432 offset_known: offset_committed,
437 offset_committed,
438 },
439 );
440 }
441 }
442 }
443 Event::Progress(None) => return,
444 }
445 }
446 })
447 });
448
449 (
450 data_collections,
451 progress_stream,
452 health_stream,
453 stats_stream,
454 vec![button.press_on_drop()],
455 )
456}