1use std::collections::BTreeMap;
11use std::convert::Infallible;
12use std::ops::Rem;
13use std::sync::Arc;
14use std::time::Duration;
15
16use differential_dataflow::AsCollection;
17use futures::StreamExt;
18use itertools::Itertools;
19use mz_ore::cast::CastFrom;
20use mz_ore::iter::IteratorExt;
21use mz_repr::{Diff, GlobalId, Row};
22use mz_storage_types::errors::DataflowError;
23use mz_storage_types::sources::load_generator::{
24 Event, Generator, KeyValueLoadGenerator, LoadGenerator, LoadGeneratorOutput,
25 LoadGeneratorSourceConnection,
26};
27use mz_storage_types::sources::{MzOffset, SourceExportDetails, SourceTimestamp};
28use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
29use mz_timely_util::containers::stack::AccountedStackBuilder;
30use timely::container::CapacityContainerBuilder;
31use timely::dataflow::operators::core::Partition;
32use timely::dataflow::{Scope, Stream};
33use timely::progress::Antichain;
34use tokio::time::{Instant, interval_at};
35
36use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
37use crate::source::types::{Probe, SignaledFuture, SourceRender, StackedCollection};
38use crate::source::{RawSourceCreationConfig, SourceMessage};
39
40mod auction;
41mod clock;
42mod counter;
43mod datums;
44mod key_value;
45mod marketing;
46mod tpch;
47
48pub use auction::Auction;
49pub use clock::Clock;
50pub use counter::Counter;
51pub use datums::Datums;
52pub use tpch::Tpch;
53
54use self::marketing::Marketing;
55
56enum GeneratorKind {
57 Simple {
58 generator: Box<dyn Generator>,
59 tick_micros: Option<u64>,
60 as_of: u64,
61 up_to: u64,
62 },
63 KeyValue(KeyValueLoadGenerator),
64}
65
66impl GeneratorKind {
67 fn new(g: &LoadGenerator, tick_micros: Option<u64>, as_of: u64, up_to: u64) -> Self {
68 match g {
69 LoadGenerator::Auction => GeneratorKind::Simple {
70 generator: Box::new(Auction {}),
71 tick_micros,
72 as_of,
73 up_to,
74 },
75 LoadGenerator::Clock => GeneratorKind::Simple {
76 generator: Box::new(Clock {
77 tick_ms: tick_micros
78 .map(Duration::from_micros)
79 .unwrap_or(Duration::from_secs(1))
80 .as_millis()
81 .try_into()
82 .expect("reasonable tick interval"),
83 as_of_ms: as_of,
84 }),
85 tick_micros,
86 as_of,
87 up_to,
88 },
89 LoadGenerator::Counter { max_cardinality } => GeneratorKind::Simple {
90 generator: Box::new(Counter {
91 max_cardinality: max_cardinality.clone(),
92 }),
93 tick_micros,
94 as_of,
95 up_to,
96 },
97 LoadGenerator::Datums => GeneratorKind::Simple {
98 generator: Box::new(Datums {}),
99 tick_micros,
100 as_of,
101 up_to,
102 },
103 LoadGenerator::Marketing => GeneratorKind::Simple {
104 generator: Box::new(Marketing {}),
105 tick_micros,
106 as_of,
107 up_to,
108 },
109 LoadGenerator::Tpch {
110 count_supplier,
111 count_part,
112 count_customer,
113 count_orders,
114 count_clerk,
115 } => GeneratorKind::Simple {
116 generator: Box::new(Tpch {
117 count_supplier: *count_supplier,
118 count_part: *count_part,
119 count_customer: *count_customer,
120 count_orders: *count_orders,
121 count_clerk: *count_clerk,
122 tick: Duration::from_micros(tick_micros.unwrap_or(0)),
125 }),
126 tick_micros,
127 as_of,
128 up_to,
129 },
130 LoadGenerator::KeyValue(kv) => GeneratorKind::KeyValue(kv.clone()),
131 }
132 }
133
134 fn render<G: Scope<Timestamp = MzOffset>>(
135 self,
136 scope: &mut G,
137 config: &RawSourceCreationConfig,
138 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
139 start_signal: impl std::future::Future<Output = ()> + 'static,
140 ) -> (
141 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
142 Stream<G, Infallible>,
143 Stream<G, HealthStatusMessage>,
144 Vec<PressOnDropButton>,
145 ) {
146 let mut output_map = BTreeMap::new();
148 let mut idx_to_exportid = BTreeMap::new();
150 output_map.insert(LoadGeneratorOutput::Default, Vec::new());
154 for (idx, (export_id, export)) in config.source_exports.iter().enumerate() {
155 let output_type = match &export.details {
156 SourceExportDetails::LoadGenerator(details) => details.output,
157 SourceExportDetails::None => continue,
159 _ => panic!("unexpected source export details: {:?}", export.details),
160 };
161 output_map
162 .entry(output_type)
163 .or_insert_with(Vec::new)
164 .push(idx);
165 idx_to_exportid.insert(idx, export_id.clone());
166 }
167
168 match self {
169 GeneratorKind::Simple {
170 tick_micros,
171 as_of,
172 up_to,
173 generator,
174 } => render_simple_generator(
175 generator,
176 tick_micros,
177 as_of.into(),
178 up_to.into(),
179 scope,
180 config,
181 committed_uppers,
182 output_map,
183 ),
184 GeneratorKind::KeyValue(kv) => key_value::render(
185 kv,
186 scope,
187 config.clone(),
188 committed_uppers,
189 start_signal,
190 output_map,
191 idx_to_exportid,
192 ),
193 }
194 }
195}
196
197impl SourceRender for LoadGeneratorSourceConnection {
198 type Time = MzOffset;
199
200 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Generator;
201
202 fn render<G: Scope<Timestamp = MzOffset>>(
203 self,
204 scope: &mut G,
205 config: &RawSourceCreationConfig,
206 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
207 start_signal: impl std::future::Future<Output = ()> + 'static,
208 ) -> (
209 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
210 Stream<G, Infallible>,
211 Stream<G, HealthStatusMessage>,
212 Option<Stream<G, Probe<MzOffset>>>,
213 Vec<PressOnDropButton>,
214 ) {
215 let generator_kind = GeneratorKind::new(
216 &self.load_generator,
217 self.tick_micros,
218 self.as_of,
219 self.up_to,
220 );
221 let (updates, uppers, health, button) =
222 generator_kind.render(scope, config, committed_uppers, start_signal);
223
224 (updates, uppers, health, None, button)
225 }
226}
227
228fn render_simple_generator<G: Scope<Timestamp = MzOffset>>(
229 generator: Box<dyn Generator>,
230 tick_micros: Option<u64>,
231 as_of: MzOffset,
232 up_to: MzOffset,
233 scope: &G,
234 config: &RawSourceCreationConfig,
235 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
236 output_map: BTreeMap<LoadGeneratorOutput, Vec<usize>>,
237) -> (
238 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
239 Stream<G, Infallible>,
240 Stream<G, HealthStatusMessage>,
241 Vec<PressOnDropButton>,
242) {
243 let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone());
244
245 let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
246 let partition_count = u64::cast_from(config.source_exports.len());
247 let data_streams: Vec<_> = stream.partition::<CapacityContainerBuilder<_>, _, _>(
248 partition_count,
249 |((output, data), time, diff): &(
250 (usize, Result<SourceMessage, DataflowError>),
251 MzOffset,
252 Diff,
253 )| {
254 let output = u64::cast_from(*output);
255 (output, (data.clone(), time.clone(), diff.clone()))
256 },
257 );
258 let mut data_collections = BTreeMap::new();
259 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
260 data_collections.insert(*id, data_stream.as_collection());
261 }
262
263 let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
264 let (health_output, health_stream) = builder.new_output();
265
266 let busy_signal = Arc::clone(&config.busy_signal);
267 let source_resume_uppers = config.source_resume_uppers.clone();
268 let is_active_worker = config.responsible_for(());
269 let source_statistics = config.statistics.clone();
270 let button = builder.build(move |caps| {
271 SignaledFuture::new(busy_signal, async move {
272 let [mut cap, mut progress_cap, health_cap] = caps.try_into().unwrap();
273
274 let mut health_cap = Some(health_cap);
276
277 if !is_active_worker {
278 for stats in source_statistics.values() {
280 stats.set_offset_known(0);
281 stats.set_offset_committed(0);
282 }
283 return;
284 }
285
286 let resume_upper = Antichain::from_iter(
287 source_resume_uppers
288 .values()
289 .flat_map(|f| f.iter().map(MzOffset::decode_row)),
290 );
291
292 let Some(resume_offset) = resume_upper.into_option() else {
293 return;
294 };
295
296 let now_fn = mz_ore::now::SYSTEM_TIME.clone();
297
298 let start_instant = {
299 let now_millis = now_fn();
306 let now_instant = Instant::now();
307 let delay_millis = tick_micros
308 .map(|tick_micros| tick_micros / 1000)
309 .filter(|tick_millis| *tick_millis > 0)
310 .map(|tick_millis| tick_millis - now_millis.rem(tick_millis))
311 .unwrap_or(0);
312 now_instant + Duration::from_millis(delay_millis)
313 };
314 let tick = Duration::from_micros(tick_micros.unwrap_or(1_000_000));
315 let mut tick_interval = interval_at(start_instant, tick);
316
317 let mut rows = generator.by_seed(now_fn, None, resume_offset);
318
319 let mut committed_uppers = std::pin::pin!(committed_uppers);
320
321 let mut offset_committed = if resume_offset.offset == 0 {
323 Some(0)
324 } else {
325 None
326 };
327
328 while let Some((output_type, event)) = rows.next() {
329 match event {
330 Event::Message(mut offset, (value, diff)) => {
331 if offset <= as_of {
333 offset = as_of;
334 }
335
336 if offset >= up_to {
342 continue;
343 }
344
345 let _ = progress_cap.try_downgrade(&(offset + 1));
349
350 let outputs = match output_map.get(&output_type) {
351 Some(outputs) => outputs,
352 None => continue,
354 };
355
356 let message: Result<SourceMessage, DataflowError> = Ok(SourceMessage {
357 key: Row::default(),
358 value,
359 metadata: Row::default(),
360 });
361
362 if resume_offset <= offset {
366 for (&output, message) in outputs.iter().repeat_clone(message) {
367 data_output
368 .give_fueled(&cap, ((output, message), offset, diff))
369 .await;
370 }
371 }
372 }
373 Event::Progress(Some(offset)) => {
374 if resume_offset <= offset && health_cap.is_some() {
375 let health_cap = health_cap.take().expect("known to exist");
376 health_output.give(
377 &health_cap,
378 HealthStatusMessage {
379 id: None,
380 namespace: StatusNamespace::Generator,
381 update: HealthStatusUpdate::running(),
382 },
383 );
384 }
385
386 if offset >= up_to {
388 break;
389 }
390
391 if offset <= as_of {
394 continue;
395 }
396
397 cap.downgrade(&offset);
398 let _ = progress_cap.try_downgrade(&offset);
399
400 if resume_offset < offset {
406 loop {
407 tokio::select! {
408 _tick = tick_interval.tick() => {
409 break;
410 }
411 Some(frontier) = committed_uppers.next() => {
412 if let Some(offset) = frontier.as_option() {
413 offset_committed = Some(offset.offset);
416 }
417 }
418 }
419 }
420
421 if let Some(offset_committed) = offset_committed {
425 for stats in source_statistics.values() {
426 stats.set_offset_committed(offset_committed);
427 stats.set_offset_known(offset_committed);
432 }
433 }
434 }
435 }
436 Event::Progress(None) => return,
437 }
438 }
439 })
440 });
441
442 (
443 data_collections,
444 progress_stream,
445 health_stream,
446 vec![button.press_on_drop()],
447 )
448}