1use std::collections::BTreeMap;
15use std::iter;
16use std::sync::Arc;
17
18use differential_dataflow::{AsCollection, VecCollection};
19use mz_ore::cast::CastLossy;
20use mz_persist_client::operators::shard_source::SnapshotMode;
21use mz_repr::{Datum, Diff, GlobalId, Row, RowPacker};
22use mz_storage_operators::persist_source;
23use mz_storage_operators::persist_source::Subtime;
24use mz_storage_types::controller::CollectionMetadata;
25use mz_storage_types::dyncfgs;
26use mz_storage_types::errors::{
27 DataflowError, DecodeError, EnvelopeError, UpsertError, UpsertNullKeyError, UpsertValueError,
28};
29use mz_storage_types::parameters::StorageMaxInflightBytesConfig;
30use mz_storage_types::sources::envelope::{KeyEnvelope, NoneEnvelope, UpsertEnvelope, UpsertStyle};
31use mz_storage_types::sources::*;
32use mz_timely_util::builder_async::PressOnDropButton;
33use mz_timely_util::operator::CollectionExt;
34use mz_timely_util::order::refine_antichain;
35use serde::{Deserialize, Serialize};
36use timely::container::CapacityContainerBuilder;
37use timely::dataflow::Stream;
38use timely::dataflow::operators::{ConnectLoop, Feedback, Leave, Map, OkErr};
39use timely::dataflow::scopes::{Child, Scope};
40use timely::progress::{Antichain, Timestamp};
41
42use crate::decode::{render_decode_cdcv2, render_decode_delimited};
43use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
44use crate::source::types::{DecodeResult, SourceOutput, SourceRender};
45use crate::source::{self, RawSourceCreationConfig, SourceExportCreationConfig};
46use crate::upsert::{UpsertKey, UpsertValue};
47
48pub fn render_source<'g, G, C>(
60 scope: &mut Child<'g, G, mz_repr::Timestamp>,
61 dataflow_debug_name: &String,
62 connection: C,
63 description: IngestionDescription<CollectionMetadata>,
64 resume_stream: &Stream<Child<'g, G, mz_repr::Timestamp>, ()>,
65 storage_state: &crate::storage_state::StorageState,
66 base_source_config: RawSourceCreationConfig,
67) -> (
68 BTreeMap<
69 GlobalId,
70 (
71 VecCollection<Child<'g, G, mz_repr::Timestamp>, Row, Diff>,
72 VecCollection<Child<'g, G, mz_repr::Timestamp>, DataflowError, Diff>,
73 ),
74 >,
75 Vec<Stream<G, HealthStatusMessage>>,
76 Vec<PressOnDropButton>,
77)
78where
79 G: Scope<Timestamp = ()>,
80 C: SourceConnection + SourceRender + 'static,
81{
82 let mut needed_tokens = Vec::new();
84
85 let (starter, mut start_signal) = tokio::sync::mpsc::channel::<()>(1);
98 let start_signal = async move {
99 let _ = start_signal.recv().await;
100 };
101
102 let (exports, health, source_tokens) = source::create_raw_source(
105 scope,
106 storage_state,
107 resume_stream,
108 &base_source_config,
109 connection,
110 start_signal,
111 );
112
113 needed_tokens.extend(source_tokens);
114
115 let mut health_streams = Vec::with_capacity(exports.len() + 1);
116 health_streams.push(health);
117
118 let mut outputs = BTreeMap::new();
119 for (export_id, export) in exports {
120 type CB<C> = CapacityContainerBuilder<C>;
121 let (ok_stream, err_stream) =
122 export.map_fallible::<CB<_>, CB<_>, _, _, _>("export-demux-ok-err", |r| r);
123
124 let mut error_collections = Vec::new();
128
129 let data_config = base_source_config.source_exports[&export_id]
130 .data_config
131 .clone();
132 let (ok, extra_tokens, health_stream) = render_source_stream(
133 scope,
134 dataflow_debug_name,
135 export_id,
136 ok_stream,
137 data_config,
138 &description,
139 &mut error_collections,
140 storage_state,
141 &base_source_config,
142 starter.clone(),
143 );
144 needed_tokens.extend(extra_tokens);
145
146 let err_collection = match error_collections.len() {
148 0 => err_stream,
149 _ => err_stream.concatenate(error_collections),
150 };
151
152 outputs.insert(export_id, (ok, err_collection));
153
154 health_streams.extend(health_stream.into_iter().map(|s| s.leave()));
155 }
156 (outputs, health_streams, needed_tokens)
157}
158
159fn render_source_stream<G, FromTime>(
162 scope: &mut G,
163 dataflow_debug_name: &String,
164 export_id: GlobalId,
165 ok_source: VecCollection<G, SourceOutput<FromTime>, Diff>,
166 data_config: SourceExportDataConfig,
167 description: &IngestionDescription<CollectionMetadata>,
168 error_collections: &mut Vec<VecCollection<G, DataflowError, Diff>>,
169 storage_state: &crate::storage_state::StorageState,
170 base_source_config: &RawSourceCreationConfig,
171 rehydrated_token: impl std::any::Any + 'static,
172) -> (
173 VecCollection<G, Row, Diff>,
174 Vec<PressOnDropButton>,
175 Vec<Stream<G, HealthStatusMessage>>,
176)
177where
178 G: Scope<Timestamp = mz_repr::Timestamp>,
179 FromTime: Timestamp + Sync,
180{
181 let mut needed_tokens = vec![];
182
183 let SourceExportDataConfig { encoding, envelope } = data_config;
185
186 let SourceDesc {
187 connection: _,
188 timestamp_interval: _,
189 } = description.desc;
190
191 let (decoded_stream, decode_health) = match encoding {
192 None => (
193 ok_source.map(|r| DecodeResult {
194 key: Some(Ok(r.key)),
202 value: Some(Ok(r.value)),
203 metadata: r.metadata,
204 from_time: r.from_time,
205 }),
206 None,
207 ),
208 Some(encoding) => {
209 let (decoded_stream, decode_health) = render_decode_delimited(
210 &ok_source,
211 encoding.key,
212 encoding.value,
213 dataflow_debug_name.clone(),
214 storage_state.metrics.decode_defs.clone(),
215 storage_state.storage_configuration.clone(),
216 );
217 (decoded_stream, Some(decode_health))
218 }
219 };
220
221 let (envelope_ok, envelope_health) = match &envelope {
223 SourceEnvelope::Upsert(upsert_envelope) => {
224 let upsert_input = upsert_commands(decoded_stream, upsert_envelope.clone());
225
226 let persist_clients = Arc::clone(&storage_state.persist_clients);
227 let resume_upper = base_source_config.resume_uppers[&export_id].clone();
229
230 let upper_ts = resume_upper
231 .as_option()
232 .expect("resuming an already finished ingestion")
233 .clone();
234 let (upsert, health_update) = scope.scoped(
235 &format!("upsert_rehydration_backpressure({})", export_id),
236 |scope| {
237 let (previous, previous_token, feedback_handle, backpressure_metrics) = {
238 let as_of = Antichain::from_elem(upper_ts.saturating_sub(1));
239
240 let backpressure_max_inflight_bytes = get_backpressure_max_inflight_bytes(
241 &storage_state
242 .storage_configuration
243 .parameters
244 .storage_dataflow_max_inflight_bytes_config,
245 &storage_state.instance_context.cluster_memory_limit,
246 );
247
248 let (feedback_handle, flow_control, backpressure_metrics) =
249 if let Some(storage_dataflow_max_inflight_bytes) =
250 backpressure_max_inflight_bytes
251 {
252 tracing::info!(
253 ?backpressure_max_inflight_bytes,
254 "timely-{} using backpressure in upsert for source {}",
255 base_source_config.worker_id,
256 export_id
257 );
258 if !storage_state
259 .storage_configuration
260 .parameters
261 .storage_dataflow_max_inflight_bytes_config
262 .disk_only
263 || storage_state.instance_context.scratch_directory.is_some()
264 {
265 let (feedback_handle, feedback_data) =
266 scope.feedback(Default::default());
267
268 let backpressure_metrics = Some(
270 base_source_config
271 .metrics
272 .get_backpressure_metrics(export_id, scope.index()),
273 );
274
275 (
276 Some(feedback_handle),
277 Some(persist_source::FlowControl {
278 progress_stream: feedback_data,
279 max_inflight_bytes: storage_dataflow_max_inflight_bytes,
280 summary: (Default::default(), Subtime::least_summary()),
281 metrics: backpressure_metrics.clone(),
282 }),
283 backpressure_metrics,
284 )
285 } else {
286 (None, None, None)
287 }
288 } else {
289 (None, None, None)
290 };
291
292 let storage_metadata = description.source_exports[&export_id]
293 .storage_metadata
294 .clone();
295
296 let error_handler =
297 storage_state.error_handler("upsert_rehydration", export_id);
298
299 let (stream, tok) = persist_source::persist_source_core(
300 scope,
301 export_id,
302 persist_clients,
303 storage_metadata,
304 None,
305 Some(as_of),
306 SnapshotMode::Include,
307 Antichain::new(),
308 None,
309 flow_control,
310 false.then_some(|| unreachable!()),
311 async {},
312 error_handler,
313 );
314 (
315 stream.as_collection(),
316 Some(tok),
317 feedback_handle,
318 backpressure_metrics,
319 )
320 };
321
322 let export_statistics = storage_state
323 .aggregated_statistics
324 .get_source(&export_id)
325 .expect("statistics initialized")
326 .clone();
327 let export_config = SourceExportCreationConfig {
328 id: export_id,
329 worker_id: base_source_config.worker_id,
330 metrics: base_source_config.metrics.clone(),
331 source_statistics: export_statistics,
332 };
333 let (upsert, health_update, snapshot_progress, upsert_token) =
334 crate::upsert::upsert(
335 &upsert_input.enter(scope),
336 upsert_envelope.clone(),
337 refine_antichain(&resume_upper),
338 previous,
339 previous_token,
340 export_config,
341 &storage_state.instance_context,
342 &storage_state.storage_configuration,
343 &storage_state.dataflow_parameters,
344 backpressure_metrics,
345 );
346
347 needed_tokens.push(upsert_token);
352
353 if dyncfgs::DELAY_SOURCES_PAST_REHYDRATION
357 .get(storage_state.storage_configuration.config_set())
358 {
359 crate::upsert::rehydration_finished(
360 scope.clone(),
361 base_source_config,
362 rehydrated_token,
363 refine_antichain(&resume_upper),
364 &snapshot_progress,
365 );
366 } else {
367 drop(rehydrated_token)
368 };
369
370 if let Some(feedback_handle) = feedback_handle {
373 snapshot_progress.connect_loop(feedback_handle);
374 }
375
376 (
377 upsert.leave(),
378 health_update
379 .map(|(id, update)| HealthStatusMessage {
380 id,
381 namespace: StatusNamespace::Upsert,
382 update,
383 })
384 .leave(),
385 )
386 },
387 );
388
389 let (upsert_ok, upsert_err) = upsert.inner.ok_err(split_ok_err);
390 error_collections.push(upsert_err.as_collection());
391
392 (upsert_ok.as_collection(), Some(health_update))
393 }
394 SourceEnvelope::None(none_envelope) => {
395 let results = append_metadata_to_value(decoded_stream);
396
397 let flattened_stream = flatten_results_prepend_keys(none_envelope, results);
398
399 let (stream, errors) = flattened_stream.inner.ok_err(split_ok_err);
400
401 error_collections.push(errors.as_collection());
402 (stream.as_collection(), None)
403 }
404 SourceEnvelope::CdcV2 => {
405 let (oks, token) = render_decode_cdcv2(&decoded_stream);
406 needed_tokens.push(token);
407 (oks, None)
408 }
409 };
410
411 let health = decode_health.into_iter().chain(envelope_health).collect();
413 (envelope_ok, needed_tokens, health)
414}
415
416fn get_backpressure_max_inflight_bytes(
419 inflight_bytes_config: &StorageMaxInflightBytesConfig,
420 cluster_memory_limit: &Option<usize>,
421) -> Option<usize> {
422 let StorageMaxInflightBytesConfig {
423 max_inflight_bytes_default,
424 max_inflight_bytes_cluster_size_fraction,
425 disk_only: _,
426 } = inflight_bytes_config;
427
428 if max_inflight_bytes_default.is_some() {
430 let current_cluster_max_bytes_limit =
431 cluster_memory_limit.as_ref().and_then(|cluster_memory| {
432 max_inflight_bytes_cluster_size_fraction.map(|fraction| {
433 usize::cast_lossy(f64::cast_lossy(*cluster_memory) * fraction)
435 })
436 });
437 current_cluster_max_bytes_limit.or(*max_inflight_bytes_default)
438 } else {
439 None
440 }
441}
442
443fn split_ok_err<O, E, T, D>(x: (Result<O, E>, T, D)) -> Result<(O, T, D), (E, T, D)> {
446 match x {
447 (Ok(ok), ts, diff) => Ok((ok, ts, diff)),
448 (Err(err), ts, diff) => Err((err, ts, diff)),
449 }
450}
451
452#[derive(
454 Debug,
455 Clone,
456 Hash,
457 PartialEq,
458 Eq,
459 Ord,
460 PartialOrd,
461 Serialize,
462 Deserialize
463)]
464struct KV {
465 key: Option<Result<Row, DecodeError>>,
466 val: Option<Result<Row, DecodeError>>,
467}
468
469fn append_metadata_to_value<G: Scope, FromTime: Timestamp>(
470 results: VecCollection<G, DecodeResult<FromTime>, Diff>,
471) -> VecCollection<G, KV, Diff> {
472 results.map(move |res| {
473 let val = res.value.map(|val_result| {
474 val_result.map(|mut val| {
475 if !res.metadata.is_empty() {
476 RowPacker::for_existing_row(&mut val).extend_by_row(&res.metadata);
477 }
478 val
479 })
480 });
481
482 KV { val, key: res.key }
483 })
484}
485
486fn upsert_commands<G: Scope, FromTime: Timestamp>(
488 input: VecCollection<G, DecodeResult<FromTime>, Diff>,
489 upsert_envelope: UpsertEnvelope,
490) -> VecCollection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff> {
491 let mut row_buf = Row::default();
492 input.map(move |result| {
493 let from_time = result.from_time;
494
495 let key = match result.key {
496 Some(Ok(key)) => Ok(key),
497 None => Err(UpsertError::NullKey(UpsertNullKeyError)),
498 Some(Err(err)) => Err(UpsertError::KeyDecode(err)),
499 };
500
501 let key = match key {
503 Ok(key) => key,
504 Err(err) => match result.value {
505 Some(_) => {
506 return (
507 UpsertKey::from_key(Err(&err)),
508 Some(Err(Box::new(err))),
509 from_time,
510 );
511 }
512 None => return (UpsertKey::from_key(Err(&err)), None, from_time),
513 },
514 };
515
516 let key_row = match upsert_envelope.style {
518 UpsertStyle::Debezium { .. }
520 | UpsertStyle::Default(KeyEnvelope::Flattened)
521 | UpsertStyle::ValueErrInline {
522 key_envelope: KeyEnvelope::Flattened,
523 error_column: _,
524 } => key,
525 UpsertStyle::Default(KeyEnvelope::Named(_))
527 | UpsertStyle::ValueErrInline {
528 key_envelope: KeyEnvelope::Named(_),
529 error_column: _,
530 } => {
531 if key.iter().nth(1).is_none() {
532 key
533 } else {
534 row_buf.packer().push_list(key.iter());
535 row_buf.clone()
536 }
537 }
538 UpsertStyle::Default(KeyEnvelope::None)
539 | UpsertStyle::ValueErrInline {
540 key_envelope: KeyEnvelope::None,
541 error_column: _,
542 } => unreachable!(),
543 };
544
545 let key = UpsertKey::from_key(Ok(&key_row));
546
547 let metadata = result.metadata;
548
549 let value = match result.value {
550 Some(Ok(ref row)) => match upsert_envelope.style {
551 UpsertStyle::Debezium { after_idx } => match row.iter().nth(after_idx).unwrap() {
552 Datum::List(after) => {
553 let mut packer = row_buf.packer();
554 packer.extend(after.iter());
555 packer.extend_by_row(&metadata);
556 Some(Ok(row_buf.clone()))
557 }
558 Datum::Null => None,
559 d => panic!("type error: expected record, found {:?}", d),
560 },
561 UpsertStyle::Default(_) => {
562 let mut packer = row_buf.packer();
563 packer.extend_by_row(&key_row);
564 packer.extend_by_row(row);
565 packer.extend_by_row(&metadata);
566 Some(Ok(row_buf.clone()))
567 }
568 UpsertStyle::ValueErrInline { .. } => {
569 let mut packer = row_buf.packer();
570 packer.extend_by_row(&key_row);
571 packer.push(Datum::Null);
573 packer.extend_by_row(row);
574 packer.extend_by_row(&metadata);
575 Some(Ok(row_buf.clone()))
576 }
577 },
578 Some(Err(inner)) => {
579 match upsert_envelope.style {
580 UpsertStyle::ValueErrInline { .. } => {
581 let mut count = 0;
582 let err_string = inner.to_string();
584 let mut packer = row_buf.packer();
585 for datum in key_row.iter() {
586 packer.push(datum);
587 count += 1;
588 }
589 packer.push_list(iter::once(Datum::String(&err_string)));
591 count += 1;
592 let metadata_len = metadata.as_row_ref().iter().count();
593 packer.extend(
595 iter::repeat(Datum::Null)
596 .take(upsert_envelope.source_arity - count - metadata_len),
597 );
598 packer.extend_by_row(&metadata);
599 Some(Ok(row_buf.clone()))
600 }
601 _ => Some(Err(Box::new(UpsertError::Value(UpsertValueError {
602 for_key: key_row,
603 inner,
604 })))),
605 }
606 }
607 None => None,
608 };
609
610 (key, value, from_time)
611 })
612}
613
614fn flatten_results_prepend_keys<G>(
616 none_envelope: &NoneEnvelope,
617 results: VecCollection<G, KV, Diff>,
618) -> VecCollection<G, Result<Row, DataflowError>, Diff>
619where
620 G: Scope,
621{
622 let NoneEnvelope {
623 key_envelope,
624 key_arity,
625 } = none_envelope;
626
627 let null_key_columns = Row::pack_slice(&vec![Datum::Null; *key_arity]);
628
629 match key_envelope {
630 KeyEnvelope::None => {
631 results.flat_map(|KV { val, .. }| val.map(|result| result.map_err(Into::into)))
632 }
633 KeyEnvelope::Flattened => results
634 .flat_map(raise_key_value_errors)
635 .map(move |maybe_kv| {
636 maybe_kv.map(|(key, value)| {
637 let mut key = key.unwrap_or_else(|| null_key_columns.clone());
638 RowPacker::for_existing_row(&mut key).extend_by_row(&value);
639 key
640 })
641 }),
642 KeyEnvelope::Named(_) => {
643 results
644 .flat_map(raise_key_value_errors)
645 .map(move |maybe_kv| {
646 maybe_kv.map(|(key, value)| {
647 let mut key = key.unwrap_or_else(|| null_key_columns.clone());
648 let row = if key.iter().nth(1).is_none() {
651 RowPacker::for_existing_row(&mut key).extend_by_row(&value);
652 key
653 } else {
654 let mut new_row = Row::default();
655 let mut packer = new_row.packer();
656 packer.push_list(key.iter());
657 packer.extend_by_row(&value);
658 new_row
659 };
660 row
661 })
662 })
663 }
664 }
665}
666
667fn raise_key_value_errors(
669 KV { key, val }: KV,
670) -> Option<Result<(Option<Row>, Row), DataflowError>> {
671 match (key, val) {
672 (Some(Ok(key)), Some(Ok(value))) => Some(Ok((Some(key), value))),
673 (None, Some(Ok(value))) => Some(Ok((None, value))),
674 (_, Some(Err(e))) => Some(Err(e.into())),
676 (Some(Err(e)), _) => Some(Err(e.into())),
677 (None, None) => None,
678 _ => Some(Err(DataflowError::from(EnvelopeError::Flat(
680 "Value not present for message".into(),
681 )))),
682 }
683}
684
685#[cfg(test)]
686mod test {
687 use super::*;
688
689 #[mz_ore::test]
690 fn test_no_default() {
691 let config = StorageMaxInflightBytesConfig {
692 max_inflight_bytes_default: None,
693 max_inflight_bytes_cluster_size_fraction: Some(0.5),
694 disk_only: false,
695 };
696 let memory_limit = Some(1000);
697
698 let backpressure_inflight_bytes_limit =
699 get_backpressure_max_inflight_bytes(&config, &memory_limit);
700
701 assert_eq!(backpressure_inflight_bytes_limit, None)
702 }
703
704 #[mz_ore::test]
705 fn test_no_matching_size() {
706 let config = StorageMaxInflightBytesConfig {
707 max_inflight_bytes_default: Some(10000),
708 max_inflight_bytes_cluster_size_fraction: Some(0.5),
709 disk_only: false,
710 };
711
712 let backpressure_inflight_bytes_limit = get_backpressure_max_inflight_bytes(&config, &None);
713
714 assert_eq!(
715 backpressure_inflight_bytes_limit,
716 config.max_inflight_bytes_default
717 )
718 }
719
720 #[mz_ore::test]
721 fn test_calculated_cluster_limit() {
722 let config = StorageMaxInflightBytesConfig {
723 max_inflight_bytes_default: Some(10000),
724 max_inflight_bytes_cluster_size_fraction: Some(0.5),
725 disk_only: false,
726 };
727 let memory_limit = Some(2000);
728
729 let backpressure_inflight_bytes_limit =
730 get_backpressure_max_inflight_bytes(&config, &memory_limit);
731
732 assert_eq!(backpressure_inflight_bytes_limit, Some(1000));
734 }
735}