1use std::collections::BTreeMap;
15use std::iter;
16use std::sync::Arc;
17
18use differential_dataflow::{AsCollection, Collection};
19use mz_ore::cast::CastLossy;
20use mz_persist_client::operators::shard_source::SnapshotMode;
21use mz_repr::{Datum, Diff, GlobalId, Row, RowPacker};
22use mz_storage_operators::persist_source;
23use mz_storage_operators::persist_source::Subtime;
24use mz_storage_types::controller::CollectionMetadata;
25use mz_storage_types::dyncfgs;
26use mz_storage_types::errors::{
27 DataflowError, DecodeError, EnvelopeError, UpsertError, UpsertNullKeyError, UpsertValueError,
28};
29use mz_storage_types::parameters::StorageMaxInflightBytesConfig;
30use mz_storage_types::sources::envelope::{KeyEnvelope, NoneEnvelope, UpsertEnvelope, UpsertStyle};
31use mz_storage_types::sources::*;
32use mz_timely_util::builder_async::PressOnDropButton;
33use mz_timely_util::operator::CollectionExt;
34use mz_timely_util::order::refine_antichain;
35use serde::{Deserialize, Serialize};
36use timely::container::CapacityContainerBuilder;
37use timely::dataflow::Stream;
38use timely::dataflow::operators::{ConnectLoop, Feedback, Leave, Map, OkErr};
39use timely::dataflow::scopes::{Child, Scope};
40use timely::progress::{Antichain, Timestamp};
41
42use crate::decode::{render_decode_cdcv2, render_decode_delimited};
43use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
44use crate::source::types::{DecodeResult, SourceOutput, SourceRender};
45use crate::source::{self, RawSourceCreationConfig, SourceExportCreationConfig};
46use crate::upsert::{UpsertKey, UpsertValue};
47
48pub fn render_source<'g, G, C>(
60 scope: &mut Child<'g, G, mz_repr::Timestamp>,
61 dataflow_debug_name: &String,
62 connection: C,
63 description: IngestionDescription<CollectionMetadata>,
64 resume_stream: &Stream<Child<'g, G, mz_repr::Timestamp>, ()>,
65 storage_state: &crate::storage_state::StorageState,
66 base_source_config: RawSourceCreationConfig,
67) -> (
68 BTreeMap<
69 GlobalId,
70 (
71 Collection<Child<'g, G, mz_repr::Timestamp>, Row, Diff>,
72 Collection<Child<'g, G, mz_repr::Timestamp>, DataflowError, Diff>,
73 ),
74 >,
75 Vec<Stream<G, HealthStatusMessage>>,
76 Vec<PressOnDropButton>,
77)
78where
79 G: Scope<Timestamp = ()>,
80 C: SourceConnection + SourceRender + 'static,
81{
82 let mut needed_tokens = Vec::new();
84
85 let (starter, mut start_signal) = tokio::sync::mpsc::channel::<()>(1);
98 let start_signal = async move {
99 let _ = start_signal.recv().await;
100 };
101
102 let (exports, health, source_tokens) = source::create_raw_source(
105 scope,
106 storage_state,
107 resume_stream,
108 &base_source_config,
109 connection,
110 start_signal,
111 );
112
113 needed_tokens.extend(source_tokens);
114
115 let mut health_streams = Vec::with_capacity(exports.len() + 1);
116 health_streams.push(health);
117
118 let mut outputs = BTreeMap::new();
119 for (export_id, export) in exports {
120 type CB<C> = CapacityContainerBuilder<C>;
121 let (ok_stream, err_stream) =
122 export.map_fallible::<CB<_>, CB<_>, _, _, _>("export-demux-ok-err", |r| r);
123
124 let mut error_collections = Vec::new();
128
129 let data_config = base_source_config.source_exports[&export_id]
130 .data_config
131 .clone();
132 let (ok, extra_tokens, health_stream) = render_source_stream(
133 scope,
134 dataflow_debug_name,
135 export_id,
136 ok_stream,
137 data_config,
138 &description,
139 &mut error_collections,
140 storage_state,
141 &base_source_config,
142 starter.clone(),
143 );
144 needed_tokens.extend(extra_tokens);
145
146 let err_collection = match error_collections.len() {
148 0 => err_stream,
149 _ => err_stream.concatenate(error_collections),
150 };
151
152 outputs.insert(export_id, (ok, err_collection));
153
154 health_streams.extend(health_stream.into_iter().map(|s| s.leave()));
155 }
156 (outputs, health_streams, needed_tokens)
157}
158
159fn render_source_stream<G, FromTime>(
162 scope: &mut G,
163 dataflow_debug_name: &String,
164 export_id: GlobalId,
165 ok_source: Collection<G, SourceOutput<FromTime>, Diff>,
166 data_config: SourceExportDataConfig,
167 description: &IngestionDescription<CollectionMetadata>,
168 error_collections: &mut Vec<Collection<G, DataflowError, Diff>>,
169 storage_state: &crate::storage_state::StorageState,
170 base_source_config: &RawSourceCreationConfig,
171 rehydrated_token: impl std::any::Any + 'static,
172) -> (
173 Collection<G, Row, Diff>,
174 Vec<PressOnDropButton>,
175 Vec<Stream<G, HealthStatusMessage>>,
176)
177where
178 G: Scope<Timestamp = mz_repr::Timestamp>,
179 FromTime: Timestamp + Sync,
180{
181 let mut needed_tokens = vec![];
182
183 let SourceExportDataConfig { encoding, envelope } = data_config;
185
186 let SourceDesc {
187 connection: _,
188 timestamp_interval: _,
189 primary_export: _,
190 primary_export_details: _,
191 } = description.desc;
192
193 let (decoded_stream, decode_health) = match encoding {
194 None => (
195 ok_source.map(|r| DecodeResult {
196 key: Some(Ok(r.key)),
204 value: Some(Ok(r.value)),
205 metadata: r.metadata,
206 from_time: r.from_time,
207 }),
208 None,
209 ),
210 Some(encoding) => {
211 let (decoded_stream, decode_health) = render_decode_delimited(
212 &ok_source,
213 encoding.key,
214 encoding.value,
215 dataflow_debug_name.clone(),
216 storage_state.metrics.decode_defs.clone(),
217 storage_state.storage_configuration.clone(),
218 );
219 (decoded_stream, Some(decode_health))
220 }
221 };
222
223 let (envelope_ok, envelope_health) = match &envelope {
225 SourceEnvelope::Upsert(upsert_envelope) => {
226 let upsert_input = upsert_commands(decoded_stream, upsert_envelope.clone());
227
228 let persist_clients = Arc::clone(&storage_state.persist_clients);
229 let resume_upper = base_source_config.resume_uppers[&export_id].clone();
231
232 let upper_ts = resume_upper
233 .as_option()
234 .expect("resuming an already finished ingestion")
235 .clone();
236 let (upsert, health_update) = scope.scoped(
237 &format!("upsert_rehydration_backpressure({})", export_id),
238 |scope| {
239 let (previous, previous_token, feedback_handle, backpressure_metrics) = {
240 let as_of = Antichain::from_elem(upper_ts.saturating_sub(1));
241
242 let backpressure_max_inflight_bytes = get_backpressure_max_inflight_bytes(
243 &storage_state
244 .storage_configuration
245 .parameters
246 .storage_dataflow_max_inflight_bytes_config,
247 &storage_state.instance_context.cluster_memory_limit,
248 );
249
250 let (feedback_handle, flow_control, backpressure_metrics) =
251 if let Some(storage_dataflow_max_inflight_bytes) =
252 backpressure_max_inflight_bytes
253 {
254 tracing::info!(
255 ?backpressure_max_inflight_bytes,
256 "timely-{} using backpressure in upsert for source {}",
257 base_source_config.worker_id,
258 export_id
259 );
260 if !storage_state
261 .storage_configuration
262 .parameters
263 .storage_dataflow_max_inflight_bytes_config
264 .disk_only
265 || storage_state.instance_context.scratch_directory.is_some()
266 {
267 let (feedback_handle, feedback_data) =
268 scope.feedback(Default::default());
269
270 let backpressure_metrics = Some(
272 base_source_config
273 .metrics
274 .get_backpressure_metrics(export_id, scope.index()),
275 );
276
277 (
278 Some(feedback_handle),
279 Some(persist_source::FlowControl {
280 progress_stream: feedback_data,
281 max_inflight_bytes: storage_dataflow_max_inflight_bytes,
282 summary: (Default::default(), Subtime::least_summary()),
283 metrics: backpressure_metrics.clone(),
284 }),
285 backpressure_metrics,
286 )
287 } else {
288 (None, None, None)
289 }
290 } else {
291 (None, None, None)
292 };
293
294 let storage_metadata = description.source_exports[&export_id]
295 .storage_metadata
296 .clone();
297
298 let error_handler =
299 storage_state.error_handler("upsert_rehydration", export_id);
300
301 let (stream, tok) = persist_source::persist_source_core(
302 scope,
303 export_id,
304 persist_clients,
305 storage_metadata,
306 None,
307 Some(as_of),
308 SnapshotMode::Include,
309 Antichain::new(),
310 None,
311 flow_control,
312 false.then_some(|| unreachable!()),
313 async {},
314 error_handler,
315 );
316 (
317 stream.as_collection(),
318 Some(tok),
319 feedback_handle,
320 backpressure_metrics,
321 )
322 };
323
324 let export_statistics = storage_state
325 .aggregated_statistics
326 .get_source(&export_id)
327 .expect("statistics initialized")
328 .clone();
329 let export_config = SourceExportCreationConfig {
330 id: export_id,
331 worker_id: base_source_config.worker_id,
332 metrics: base_source_config.metrics.clone(),
333 source_statistics: export_statistics,
334 };
335 let (upsert, health_update, snapshot_progress, upsert_token) =
336 crate::upsert::upsert(
337 &upsert_input.enter(scope),
338 upsert_envelope.clone(),
339 refine_antichain(&resume_upper),
340 previous,
341 previous_token,
342 export_config,
343 &storage_state.instance_context,
344 &storage_state.storage_configuration,
345 &storage_state.dataflow_parameters,
346 backpressure_metrics,
347 );
348
349 needed_tokens.push(upsert_token);
354
355 if dyncfgs::DELAY_SOURCES_PAST_REHYDRATION
359 .get(storage_state.storage_configuration.config_set())
360 {
361 crate::upsert::rehydration_finished(
362 scope.clone(),
363 base_source_config,
364 rehydrated_token,
365 refine_antichain(&resume_upper),
366 &snapshot_progress,
367 );
368 } else {
369 drop(rehydrated_token)
370 };
371
372 if let Some(feedback_handle) = feedback_handle {
375 snapshot_progress.connect_loop(feedback_handle);
376 }
377
378 (
379 upsert.leave(),
380 health_update
381 .map(|(id, update)| HealthStatusMessage {
382 id,
383 namespace: StatusNamespace::Upsert,
384 update,
385 })
386 .leave(),
387 )
388 },
389 );
390
391 let (upsert_ok, upsert_err) = upsert.inner.ok_err(split_ok_err);
392 error_collections.push(upsert_err.as_collection());
393
394 (upsert_ok.as_collection(), Some(health_update))
395 }
396 SourceEnvelope::None(none_envelope) => {
397 let results = append_metadata_to_value(decoded_stream);
398
399 let flattened_stream = flatten_results_prepend_keys(none_envelope, results);
400
401 let (stream, errors) = flattened_stream.inner.ok_err(split_ok_err);
402
403 error_collections.push(errors.as_collection());
404 (stream.as_collection(), None)
405 }
406 SourceEnvelope::CdcV2 => {
407 let (oks, token) = render_decode_cdcv2(&decoded_stream);
408 needed_tokens.push(token);
409 (oks, None)
410 }
411 };
412
413 let health = decode_health.into_iter().chain(envelope_health).collect();
415 (envelope_ok, needed_tokens, health)
416}
417
418fn get_backpressure_max_inflight_bytes(
421 inflight_bytes_config: &StorageMaxInflightBytesConfig,
422 cluster_memory_limit: &Option<usize>,
423) -> Option<usize> {
424 let StorageMaxInflightBytesConfig {
425 max_inflight_bytes_default,
426 max_inflight_bytes_cluster_size_fraction,
427 disk_only: _,
428 } = inflight_bytes_config;
429
430 if max_inflight_bytes_default.is_some() {
432 let current_cluster_max_bytes_limit =
433 cluster_memory_limit.as_ref().and_then(|cluster_memory| {
434 max_inflight_bytes_cluster_size_fraction.map(|fraction| {
435 usize::cast_lossy(f64::cast_lossy(*cluster_memory) * fraction)
437 })
438 });
439 current_cluster_max_bytes_limit.or(*max_inflight_bytes_default)
440 } else {
441 None
442 }
443}
444
445fn split_ok_err<O, E, T, D>(x: (Result<O, E>, T, D)) -> Result<(O, T, D), (E, T, D)> {
448 match x {
449 (Ok(ok), ts, diff) => Ok((ok, ts, diff)),
450 (Err(err), ts, diff) => Err((err, ts, diff)),
451 }
452}
453
454#[derive(Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
456struct KV {
457 key: Option<Result<Row, DecodeError>>,
458 val: Option<Result<Row, DecodeError>>,
459}
460
461fn append_metadata_to_value<G: Scope, FromTime: Timestamp>(
462 results: Collection<G, DecodeResult<FromTime>, Diff>,
463) -> Collection<G, KV, Diff> {
464 results.map(move |res| {
465 let val = res.value.map(|val_result| {
466 val_result.map(|mut val| {
467 if !res.metadata.is_empty() {
468 RowPacker::for_existing_row(&mut val).extend_by_row(&res.metadata);
469 }
470 val
471 })
472 });
473
474 KV { val, key: res.key }
475 })
476}
477
478fn upsert_commands<G: Scope, FromTime: Timestamp>(
480 input: Collection<G, DecodeResult<FromTime>, Diff>,
481 upsert_envelope: UpsertEnvelope,
482) -> Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff> {
483 let mut row_buf = Row::default();
484 input.map(move |result| {
485 let from_time = result.from_time;
486
487 let key = match result.key {
488 Some(Ok(key)) => Ok(key),
489 None => Err(UpsertError::NullKey(UpsertNullKeyError)),
490 Some(Err(err)) => Err(UpsertError::KeyDecode(err)),
491 };
492
493 let key = match key {
495 Ok(key) => key,
496 Err(err) => match result.value {
497 Some(_) => {
498 return (
499 UpsertKey::from_key(Err(&err)),
500 Some(Err(Box::new(err))),
501 from_time,
502 );
503 }
504 None => return (UpsertKey::from_key(Err(&err)), None, from_time),
505 },
506 };
507
508 let key_row = match upsert_envelope.style {
510 UpsertStyle::Debezium { .. }
512 | UpsertStyle::Default(KeyEnvelope::Flattened)
513 | UpsertStyle::ValueErrInline {
514 key_envelope: KeyEnvelope::Flattened,
515 error_column: _,
516 } => key,
517 UpsertStyle::Default(KeyEnvelope::Named(_))
519 | UpsertStyle::ValueErrInline {
520 key_envelope: KeyEnvelope::Named(_),
521 error_column: _,
522 } => {
523 if key.iter().nth(1).is_none() {
524 key
525 } else {
526 row_buf.packer().push_list(key.iter());
527 row_buf.clone()
528 }
529 }
530 UpsertStyle::Default(KeyEnvelope::None)
531 | UpsertStyle::ValueErrInline {
532 key_envelope: KeyEnvelope::None,
533 error_column: _,
534 } => unreachable!(),
535 };
536
537 let key = UpsertKey::from_key(Ok(&key_row));
538
539 let metadata = result.metadata;
540
541 let value = match result.value {
542 Some(Ok(ref row)) => match upsert_envelope.style {
543 UpsertStyle::Debezium { after_idx } => match row.iter().nth(after_idx).unwrap() {
544 Datum::List(after) => {
545 let mut packer = row_buf.packer();
546 packer.extend(after.iter());
547 packer.extend_by_row(&metadata);
548 Some(Ok(row_buf.clone()))
549 }
550 Datum::Null => None,
551 d => panic!("type error: expected record, found {:?}", d),
552 },
553 UpsertStyle::Default(_) => {
554 let mut packer = row_buf.packer();
555 packer.extend_by_row(&key_row);
556 packer.extend_by_row(row);
557 packer.extend_by_row(&metadata);
558 Some(Ok(row_buf.clone()))
559 }
560 UpsertStyle::ValueErrInline { .. } => {
561 let mut packer = row_buf.packer();
562 packer.extend_by_row(&key_row);
563 packer.push(Datum::Null);
565 packer.extend_by_row(row);
566 packer.extend_by_row(&metadata);
567 Some(Ok(row_buf.clone()))
568 }
569 },
570 Some(Err(inner)) => {
571 match upsert_envelope.style {
572 UpsertStyle::ValueErrInline { .. } => {
573 let mut count = 0;
574 let err_string = inner.to_string();
576 let mut packer = row_buf.packer();
577 for datum in key_row.iter() {
578 packer.push(datum);
579 count += 1;
580 }
581 packer.push_list(iter::once(Datum::String(&err_string)));
583 count += 1;
584 let metadata_len = metadata.as_row_ref().iter().count();
585 packer.extend(
587 iter::repeat(Datum::Null)
588 .take(upsert_envelope.source_arity - count - metadata_len),
589 );
590 packer.extend_by_row(&metadata);
591 Some(Ok(row_buf.clone()))
592 }
593 _ => Some(Err(Box::new(UpsertError::Value(UpsertValueError {
594 for_key: key_row,
595 inner,
596 })))),
597 }
598 }
599 None => None,
600 };
601
602 (key, value, from_time)
603 })
604}
605
606fn flatten_results_prepend_keys<G>(
608 none_envelope: &NoneEnvelope,
609 results: Collection<G, KV, Diff>,
610) -> Collection<G, Result<Row, DataflowError>, Diff>
611where
612 G: Scope,
613{
614 let NoneEnvelope {
615 key_envelope,
616 key_arity,
617 } = none_envelope;
618
619 let null_key_columns = Row::pack_slice(&vec![Datum::Null; *key_arity]);
620
621 match key_envelope {
622 KeyEnvelope::None => {
623 results.flat_map(|KV { val, .. }| val.map(|result| result.map_err(Into::into)))
624 }
625 KeyEnvelope::Flattened => results
626 .flat_map(raise_key_value_errors)
627 .map(move |maybe_kv| {
628 maybe_kv.map(|(key, value)| {
629 let mut key = key.unwrap_or_else(|| null_key_columns.clone());
630 RowPacker::for_existing_row(&mut key).extend_by_row(&value);
631 key
632 })
633 }),
634 KeyEnvelope::Named(_) => {
635 results
636 .flat_map(raise_key_value_errors)
637 .map(move |maybe_kv| {
638 maybe_kv.map(|(key, value)| {
639 let mut key = key.unwrap_or_else(|| null_key_columns.clone());
640 let row = if key.iter().nth(1).is_none() {
643 RowPacker::for_existing_row(&mut key).extend_by_row(&value);
644 key
645 } else {
646 let mut new_row = Row::default();
647 let mut packer = new_row.packer();
648 packer.push_list(key.iter());
649 packer.extend_by_row(&value);
650 new_row
651 };
652 row
653 })
654 })
655 }
656 }
657}
658
659fn raise_key_value_errors(
661 KV { key, val }: KV,
662) -> Option<Result<(Option<Row>, Row), DataflowError>> {
663 match (key, val) {
664 (Some(Ok(key)), Some(Ok(value))) => Some(Ok((Some(key), value))),
665 (None, Some(Ok(value))) => Some(Ok((None, value))),
666 (_, Some(Err(e))) => Some(Err(e.into())),
668 (Some(Err(e)), _) => Some(Err(e.into())),
669 (None, None) => None,
670 _ => Some(Err(DataflowError::from(EnvelopeError::Flat(
672 "Value not present for message".into(),
673 )))),
674 }
675}
676
677#[cfg(test)]
678mod test {
679 use super::*;
680
681 #[mz_ore::test]
682 fn test_no_default() {
683 let config = StorageMaxInflightBytesConfig {
684 max_inflight_bytes_default: None,
685 max_inflight_bytes_cluster_size_fraction: Some(0.5),
686 disk_only: false,
687 };
688 let memory_limit = Some(1000);
689
690 let backpressure_inflight_bytes_limit =
691 get_backpressure_max_inflight_bytes(&config, &memory_limit);
692
693 assert_eq!(backpressure_inflight_bytes_limit, None)
694 }
695
696 #[mz_ore::test]
697 fn test_no_matching_size() {
698 let config = StorageMaxInflightBytesConfig {
699 max_inflight_bytes_default: Some(10000),
700 max_inflight_bytes_cluster_size_fraction: Some(0.5),
701 disk_only: false,
702 };
703
704 let backpressure_inflight_bytes_limit = get_backpressure_max_inflight_bytes(&config, &None);
705
706 assert_eq!(
707 backpressure_inflight_bytes_limit,
708 config.max_inflight_bytes_default
709 )
710 }
711
712 #[mz_ore::test]
713 fn test_calculated_cluster_limit() {
714 let config = StorageMaxInflightBytesConfig {
715 max_inflight_bytes_default: Some(10000),
716 max_inflight_bytes_cluster_size_fraction: Some(0.5),
717 disk_only: false,
718 };
719 let memory_limit = Some(2000);
720
721 let backpressure_inflight_bytes_limit =
722 get_backpressure_max_inflight_bytes(&config, &memory_limit);
723
724 assert_eq!(backpressure_inflight_bytes_limit, Some(1000));
726 }
727}