1use std::collections::BTreeMap;
15use std::iter;
16use std::sync::Arc;
17
18use differential_dataflow::{AsCollection, Collection, collection};
19use mz_ore::cast::CastLossy;
20use mz_persist_client::operators::shard_source::SnapshotMode;
21use mz_repr::{Datum, Diff, GlobalId, Row, RowPacker};
22use mz_storage_operators::persist_source;
23use mz_storage_operators::persist_source::Subtime;
24use mz_storage_types::controller::CollectionMetadata;
25use mz_storage_types::dyncfgs;
26use mz_storage_types::errors::{
27 DataflowError, DecodeError, EnvelopeError, UpsertError, UpsertNullKeyError, UpsertValueError,
28};
29use mz_storage_types::parameters::StorageMaxInflightBytesConfig;
30use mz_storage_types::sources::envelope::{KeyEnvelope, NoneEnvelope, UpsertEnvelope, UpsertStyle};
31use mz_storage_types::sources::*;
32use mz_timely_util::builder_async::PressOnDropButton;
33use mz_timely_util::operator::CollectionExt;
34use mz_timely_util::order::refine_antichain;
35use serde::{Deserialize, Serialize};
36use timely::container::CapacityContainerBuilder;
37use timely::dataflow::Stream;
38use timely::dataflow::operators::generic::operator::empty;
39use timely::dataflow::operators::{Concat, ConnectLoop, Feedback, Leave, Map, OkErr};
40use timely::dataflow::scopes::{Child, Scope};
41use timely::progress::{Antichain, Timestamp};
42
43use crate::decode::{render_decode_cdcv2, render_decode_delimited};
44use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
45use crate::internal_control::InternalStorageCommand;
46use crate::source::types::{DecodeResult, SourceOutput, SourceRender};
47use crate::source::{self, RawSourceCreationConfig, SourceExportCreationConfig};
48use crate::upsert::UpsertKey;
49
50pub fn render_source<'g, G, C>(
62 scope: &mut Child<'g, G, mz_repr::Timestamp>,
63 dataflow_debug_name: &String,
64 connection: C,
65 description: IngestionDescription<CollectionMetadata>,
66 resume_stream: &Stream<Child<'g, G, mz_repr::Timestamp>, ()>,
67 storage_state: &crate::storage_state::StorageState,
68 base_source_config: RawSourceCreationConfig,
69) -> (
70 BTreeMap<
71 GlobalId,
72 (
73 Collection<Child<'g, G, mz_repr::Timestamp>, Row, Diff>,
74 Collection<Child<'g, G, mz_repr::Timestamp>, DataflowError, Diff>,
75 ),
76 >,
77 Stream<G, HealthStatusMessage>,
78 Vec<PressOnDropButton>,
79)
80where
81 G: Scope<Timestamp = ()>,
82 C: SourceConnection + SourceRender + 'static,
83{
84 let mut needed_tokens = Vec::new();
86
87 let (starter, mut start_signal) = tokio::sync::mpsc::channel::<()>(1);
100 let start_signal = async move {
101 let _ = start_signal.recv().await;
102 };
103
104 let (exports, mut health, source_tokens) = source::create_raw_source(
107 scope,
108 storage_state,
109 resume_stream,
110 &base_source_config,
111 connection,
112 start_signal,
113 );
114
115 needed_tokens.extend(source_tokens);
116
117 let mut outputs = BTreeMap::new();
118 for (export_id, export) in exports {
119 type CB<C> = CapacityContainerBuilder<C>;
120 let (ok_stream, err_stream) =
121 export.map_fallible::<CB<_>, CB<_>, _, _, _>("export-demux-ok-err", |r| r);
122
123 let error_collections = vec![err_stream.map(DataflowError::from)];
127
128 let data_config = base_source_config.source_exports[&export_id]
129 .data_config
130 .clone();
131 let (ok, err, extra_tokens, health_stream) = render_source_stream(
132 scope,
133 dataflow_debug_name,
134 export_id,
135 ok_stream,
136 data_config,
137 &description,
138 error_collections,
139 storage_state,
140 &base_source_config,
141 starter.clone(),
142 );
143 needed_tokens.extend(extra_tokens);
144 outputs.insert(export_id, (ok, err));
145
146 health = health.concat(&health_stream.leave());
147 }
148 (outputs, health, needed_tokens)
149}
150
151fn render_source_stream<G, FromTime>(
154 scope: &mut G,
155 dataflow_debug_name: &String,
156 export_id: GlobalId,
157 ok_source: Collection<G, SourceOutput<FromTime>, Diff>,
158 data_config: SourceExportDataConfig,
159 description: &IngestionDescription<CollectionMetadata>,
160 mut error_collections: Vec<Collection<G, DataflowError, Diff>>,
161 storage_state: &crate::storage_state::StorageState,
162 base_source_config: &RawSourceCreationConfig,
163 rehydrated_token: impl std::any::Any + 'static,
164) -> (
165 Collection<G, Row, Diff>,
166 Collection<G, DataflowError, Diff>,
167 Vec<PressOnDropButton>,
168 Stream<G, HealthStatusMessage>,
169)
170where
171 G: Scope<Timestamp = mz_repr::Timestamp>,
172 FromTime: Timestamp + Sync,
173{
174 let mut needed_tokens = vec![];
175
176 let SourceExportDataConfig { encoding, envelope } = data_config;
178
179 let SourceDesc {
180 connection: _,
181 timestamp_interval: _,
182 primary_export: _,
183 primary_export_details: _,
184 } = description.desc;
185
186 let (decoded_stream, decode_health) = match encoding {
187 None => (
188 ok_source.map(|r| DecodeResult {
189 key: Some(Ok(r.key)),
197 value: Some(Ok(r.value)),
198 metadata: r.metadata,
199 from_time: r.from_time,
200 }),
201 empty(scope),
202 ),
203 Some(encoding) => render_decode_delimited(
204 &ok_source,
205 encoding.key,
206 encoding.value,
207 dataflow_debug_name.clone(),
208 storage_state.metrics.decode_defs.clone(),
209 storage_state.storage_configuration.clone(),
210 ),
211 };
212
213 let (envelope_ok, envelope_err, envelope_health) = match &envelope {
215 SourceEnvelope::Upsert(upsert_envelope) => {
216 let upsert_input = upsert_commands(decoded_stream, upsert_envelope.clone());
217
218 let persist_clients = Arc::clone(&storage_state.persist_clients);
219 let resume_upper = base_source_config.resume_uppers[&export_id].clone();
221
222 let upper_ts = resume_upper
223 .as_option()
224 .expect("resuming an already finished ingestion")
225 .clone();
226 let (upsert, health_update) = scope.scoped(
227 &format!("upsert_rehydration_backpressure({})", export_id),
228 |scope| {
229 let (previous, previous_token, feedback_handle, backpressure_metrics) = {
230 let as_of = Antichain::from_elem(upper_ts.saturating_sub(1));
231
232 let backpressure_max_inflight_bytes = get_backpressure_max_inflight_bytes(
233 &storage_state
234 .storage_configuration
235 .parameters
236 .storage_dataflow_max_inflight_bytes_config,
237 &storage_state.instance_context.cluster_memory_limit,
238 );
239
240 let (feedback_handle, flow_control, backpressure_metrics) =
241 if let Some(storage_dataflow_max_inflight_bytes) =
242 backpressure_max_inflight_bytes
243 {
244 tracing::info!(
245 ?backpressure_max_inflight_bytes,
246 "timely-{} using backpressure in upsert for source {}",
247 base_source_config.worker_id,
248 export_id
249 );
250 if !storage_state
251 .storage_configuration
252 .parameters
253 .storage_dataflow_max_inflight_bytes_config
254 .disk_only
255 || storage_state.instance_context.scratch_directory.is_some()
256 {
257 let (feedback_handle, feedback_data) =
258 scope.feedback(Default::default());
259
260 let backpressure_metrics = Some(
262 base_source_config
263 .metrics
264 .get_backpressure_metrics(export_id, scope.index()),
265 );
266
267 (
268 Some(feedback_handle),
269 Some(persist_source::FlowControl {
270 progress_stream: feedback_data,
271 max_inflight_bytes: storage_dataflow_max_inflight_bytes,
272 summary: (Default::default(), Subtime::least_summary()),
273 metrics: backpressure_metrics.clone(),
274 }),
275 backpressure_metrics,
276 )
277 } else {
278 (None, None, None)
279 }
280 } else {
281 (None, None, None)
282 };
283
284 let storage_metadata = description.source_exports[&export_id]
285 .storage_metadata
286 .clone();
287
288 let command_tx = storage_state.internal_cmd_tx.clone();
289
290 let (stream, tok) = persist_source::persist_source_core(
291 scope,
292 export_id,
293 persist_clients,
294 storage_metadata,
295 None,
296 Some(as_of),
297 SnapshotMode::Include,
298 Antichain::new(),
299 None,
300 flow_control,
301 false.then_some(|| unreachable!()),
302 async {},
303 move |error| {
304 let error = format!("upsert_rehydration: {error}");
305 tracing::info!("{error}");
306 Box::pin(async move {
307 command_tx.send(InternalStorageCommand::SuspendAndRestart {
308 id: export_id,
309 reason: error,
310 });
311 })
312 },
313 );
314 (
315 stream.as_collection(),
316 Some(tok),
317 feedback_handle,
318 backpressure_metrics,
319 )
320 };
321
322 let export_statistics = storage_state
323 .aggregated_statistics
324 .get_source(&export_id)
325 .expect("statistics initialized")
326 .clone();
327 let export_config = SourceExportCreationConfig {
328 id: export_id,
329 worker_id: base_source_config.worker_id,
330 metrics: base_source_config.metrics.clone(),
331 source_statistics: export_statistics,
332 };
333 let (upsert, health_update, snapshot_progress, upsert_token) =
334 crate::upsert::upsert(
335 &upsert_input.enter(scope),
336 upsert_envelope.clone(),
337 refine_antichain(&resume_upper),
338 previous,
339 previous_token,
340 export_config,
341 &storage_state.instance_context,
342 &storage_state.storage_configuration,
343 &storage_state.dataflow_parameters,
344 backpressure_metrics,
345 );
346
347 needed_tokens.push(upsert_token);
352
353 if dyncfgs::DELAY_SOURCES_PAST_REHYDRATION
357 .get(storage_state.storage_configuration.config_set())
358 {
359 crate::upsert::rehydration_finished(
360 scope.clone(),
361 base_source_config,
362 rehydrated_token,
363 refine_antichain(&resume_upper),
364 &snapshot_progress,
365 );
366 } else {
367 drop(rehydrated_token)
368 };
369
370 let upsert = match feedback_handle {
373 Some(feedback_handle) => {
374 snapshot_progress.connect_loop(feedback_handle);
375 upsert
376 }
377 None => upsert,
378 };
379
380 (
381 upsert.leave(),
382 health_update
383 .map(|(id, update)| HealthStatusMessage {
384 id,
385 namespace: StatusNamespace::Upsert,
386 update,
387 })
388 .leave(),
389 )
390 },
391 );
392
393 let (upsert_ok, upsert_err) = upsert.inner.ok_err(split_ok_err);
394
395 (
396 upsert_ok.as_collection(),
397 Some(upsert_err.as_collection()),
398 health_update,
399 )
400 }
401 SourceEnvelope::None(none_envelope) => {
402 let results = append_metadata_to_value(decoded_stream);
403
404 let flattened_stream = flatten_results_prepend_keys(none_envelope, results);
405
406 let (stream, errors) = flattened_stream.inner.ok_err(split_ok_err);
407
408 let errors = errors.as_collection();
409 (stream.as_collection(), Some(errors), empty(scope))
410 }
411 SourceEnvelope::CdcV2 => {
412 let (oks, token) = render_decode_cdcv2(&decoded_stream);
413 needed_tokens.push(token);
414 (oks, None, empty(scope))
415 }
416 };
417
418 let (collection, errors, health) = (
419 envelope_ok,
420 envelope_err,
421 decode_health.concat(&envelope_health),
422 );
423
424 if let Some(errors) = errors {
425 error_collections.push(errors);
426 }
427
428 let err_collection = match error_collections.len() {
430 0 => Collection::empty(scope),
431 1 => error_collections.pop().unwrap(),
432 _ => collection::concatenate(scope, error_collections),
433 };
434
435 (collection, err_collection, needed_tokens, health)
437}
438
439fn get_backpressure_max_inflight_bytes(
442 inflight_bytes_config: &StorageMaxInflightBytesConfig,
443 cluster_memory_limit: &Option<usize>,
444) -> Option<usize> {
445 let StorageMaxInflightBytesConfig {
446 max_inflight_bytes_default,
447 max_inflight_bytes_cluster_size_fraction,
448 disk_only: _,
449 } = inflight_bytes_config;
450
451 if max_inflight_bytes_default.is_some() {
453 let current_cluster_max_bytes_limit =
454 cluster_memory_limit.as_ref().and_then(|cluster_memory| {
455 max_inflight_bytes_cluster_size_fraction.map(|fraction| {
456 usize::cast_lossy(f64::cast_lossy(*cluster_memory) * fraction)
458 })
459 });
460 current_cluster_max_bytes_limit.or(*max_inflight_bytes_default)
461 } else {
462 None
463 }
464}
465
466fn split_ok_err<O, E, T, D>(x: (Result<O, E>, T, D)) -> Result<(O, T, D), (E, T, D)> {
469 match x {
470 (Ok(ok), ts, diff) => Ok((ok, ts, diff)),
471 (Err(err), ts, diff) => Err((err, ts, diff)),
472 }
473}
474
475#[derive(Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
477struct KV {
478 key: Option<Result<Row, DecodeError>>,
479 val: Option<Result<Row, DecodeError>>,
480}
481
482fn append_metadata_to_value<G: Scope, FromTime: Timestamp>(
483 results: Collection<G, DecodeResult<FromTime>, Diff>,
484) -> Collection<G, KV, Diff> {
485 results.map(move |res| {
486 let val = res.value.map(|val_result| {
487 val_result.map(|mut val| {
488 if !res.metadata.is_empty() {
489 RowPacker::for_existing_row(&mut val).extend_by_row(&res.metadata);
490 }
491 val
492 })
493 });
494
495 KV { val, key: res.key }
496 })
497}
498
499fn upsert_commands<G: Scope, FromTime: Timestamp>(
501 input: Collection<G, DecodeResult<FromTime>, Diff>,
502 upsert_envelope: UpsertEnvelope,
503) -> Collection<G, (UpsertKey, Option<Result<Row, UpsertError>>, FromTime), Diff> {
504 let mut row_buf = Row::default();
505 input.map(move |result| {
506 let from_time = result.from_time;
507
508 let key = match result.key {
509 Some(Ok(key)) => Ok(key),
510 None => Err(UpsertError::NullKey(UpsertNullKeyError)),
511 Some(Err(err)) => Err(UpsertError::KeyDecode(err)),
512 };
513
514 let key = match key {
516 Ok(key) => key,
517 err @ Err(_) => match result.value {
518 Some(_) => return (UpsertKey::from_key(err.as_ref()), Some(err), from_time),
519 None => return (UpsertKey::from_key(err.as_ref()), None, from_time),
520 },
521 };
522
523 let key_row = match upsert_envelope.style {
525 UpsertStyle::Debezium { .. }
527 | UpsertStyle::Default(KeyEnvelope::Flattened)
528 | UpsertStyle::ValueErrInline {
529 key_envelope: KeyEnvelope::Flattened,
530 error_column: _,
531 } => key,
532 UpsertStyle::Default(KeyEnvelope::Named(_))
534 | UpsertStyle::ValueErrInline {
535 key_envelope: KeyEnvelope::Named(_),
536 error_column: _,
537 } => {
538 if key.iter().nth(1).is_none() {
539 key
540 } else {
541 row_buf.packer().push_list(key.iter());
542 row_buf.clone()
543 }
544 }
545 UpsertStyle::Default(KeyEnvelope::None)
546 | UpsertStyle::ValueErrInline {
547 key_envelope: KeyEnvelope::None,
548 error_column: _,
549 } => unreachable!(),
550 };
551
552 let key = UpsertKey::from_key(Ok(&key_row));
553
554 let metadata = result.metadata;
555
556 let value = match result.value {
557 Some(Ok(ref row)) => match upsert_envelope.style {
558 UpsertStyle::Debezium { after_idx } => match row.iter().nth(after_idx).unwrap() {
559 Datum::List(after) => {
560 let mut packer = row_buf.packer();
561 packer.extend(after.iter());
562 packer.extend_by_row(&metadata);
563 Some(Ok(row_buf.clone()))
564 }
565 Datum::Null => None,
566 d => panic!("type error: expected record, found {:?}", d),
567 },
568 UpsertStyle::Default(_) => {
569 let mut packer = row_buf.packer();
570 packer.extend_by_row(&key_row);
571 packer.extend_by_row(row);
572 packer.extend_by_row(&metadata);
573 Some(Ok(row_buf.clone()))
574 }
575 UpsertStyle::ValueErrInline { .. } => {
576 let mut packer = row_buf.packer();
577 packer.extend_by_row(&key_row);
578 packer.push(Datum::Null);
580 packer.extend_by_row(row);
581 packer.extend_by_row(&metadata);
582 Some(Ok(row_buf.clone()))
583 }
584 },
585 Some(Err(inner)) => {
586 match upsert_envelope.style {
587 UpsertStyle::ValueErrInline { .. } => {
588 let mut count = 0;
589 let err_string = inner.to_string();
591 let mut packer = row_buf.packer();
592 for datum in key_row.iter() {
593 packer.push(datum);
594 count += 1;
595 }
596 packer.push_list(iter::once(Datum::String(&err_string)));
598 count += 1;
599 let metadata_len = metadata.as_row_ref().iter().count();
600 packer.extend(
602 iter::repeat(Datum::Null)
603 .take(upsert_envelope.source_arity - count - metadata_len),
604 );
605 packer.extend_by_row(&metadata);
606 Some(Ok(row_buf.clone()))
607 }
608 _ => Some(Err(UpsertError::Value(UpsertValueError {
609 for_key: key_row,
610 inner,
611 }))),
612 }
613 }
614 None => None,
615 };
616
617 (key, value, from_time)
618 })
619}
620
621fn flatten_results_prepend_keys<G>(
623 none_envelope: &NoneEnvelope,
624 results: Collection<G, KV, Diff>,
625) -> Collection<G, Result<Row, DataflowError>, Diff>
626where
627 G: Scope,
628{
629 let NoneEnvelope {
630 key_envelope,
631 key_arity,
632 } = none_envelope;
633
634 let null_key_columns = Row::pack_slice(&vec![Datum::Null; *key_arity]);
635
636 match key_envelope {
637 KeyEnvelope::None => {
638 results.flat_map(|KV { val, .. }| val.map(|result| result.map_err(Into::into)))
639 }
640 KeyEnvelope::Flattened => results
641 .flat_map(raise_key_value_errors)
642 .map(move |maybe_kv| {
643 maybe_kv.map(|(key, value)| {
644 let mut key = key.unwrap_or_else(|| null_key_columns.clone());
645 RowPacker::for_existing_row(&mut key).extend_by_row(&value);
646 key
647 })
648 }),
649 KeyEnvelope::Named(_) => {
650 results
651 .flat_map(raise_key_value_errors)
652 .map(move |maybe_kv| {
653 maybe_kv.map(|(key, value)| {
654 let mut key = key.unwrap_or_else(|| null_key_columns.clone());
655 let row = if key.iter().nth(1).is_none() {
658 RowPacker::for_existing_row(&mut key).extend_by_row(&value);
659 key
660 } else {
661 let mut new_row = Row::default();
662 let mut packer = new_row.packer();
663 packer.push_list(key.iter());
664 packer.extend_by_row(&value);
665 new_row
666 };
667 row
668 })
669 })
670 }
671 }
672}
673
674fn raise_key_value_errors(
676 KV { key, val }: KV,
677) -> Option<Result<(Option<Row>, Row), DataflowError>> {
678 match (key, val) {
679 (Some(Ok(key)), Some(Ok(value))) => Some(Ok((Some(key), value))),
680 (None, Some(Ok(value))) => Some(Ok((None, value))),
681 (_, Some(Err(e))) => Some(Err(e.into())),
683 (Some(Err(e)), _) => Some(Err(e.into())),
684 (None, None) => None,
685 _ => Some(Err(DataflowError::from(EnvelopeError::Flat(
687 "Value not present for message".into(),
688 )))),
689 }
690}
691
692#[cfg(test)]
693mod test {
694 use super::*;
695
696 #[mz_ore::test]
697 fn test_no_default() {
698 let config = StorageMaxInflightBytesConfig {
699 max_inflight_bytes_default: None,
700 max_inflight_bytes_cluster_size_fraction: Some(0.5),
701 disk_only: false,
702 };
703 let memory_limit = Some(1000);
704
705 let backpressure_inflight_bytes_limit =
706 get_backpressure_max_inflight_bytes(&config, &memory_limit);
707
708 assert_eq!(backpressure_inflight_bytes_limit, None)
709 }
710
711 #[mz_ore::test]
712 fn test_no_matching_size() {
713 let config = StorageMaxInflightBytesConfig {
714 max_inflight_bytes_default: Some(10000),
715 max_inflight_bytes_cluster_size_fraction: Some(0.5),
716 disk_only: false,
717 };
718
719 let backpressure_inflight_bytes_limit = get_backpressure_max_inflight_bytes(&config, &None);
720
721 assert_eq!(
722 backpressure_inflight_bytes_limit,
723 config.max_inflight_bytes_default
724 )
725 }
726
727 #[mz_ore::test]
728 fn test_calculated_cluster_limit() {
729 let config = StorageMaxInflightBytesConfig {
730 max_inflight_bytes_default: Some(10000),
731 max_inflight_bytes_cluster_size_fraction: Some(0.5),
732 disk_only: false,
733 };
734 let memory_limit = Some(2000);
735
736 let backpressure_inflight_bytes_limit =
737 get_backpressure_max_inflight_bytes(&config, &memory_limit);
738
739 assert_eq!(backpressure_inflight_bytes_limit, Some(1000));
741 }
742}