1use std::collections::BTreeMap;
15use std::iter;
16use std::sync::Arc;
17
18use differential_dataflow::{AsCollection, VecCollection};
19use mz_ore::cast::CastLossy;
20use mz_persist_client::operators::shard_source::SnapshotMode;
21use mz_repr::{Datum, Diff, GlobalId, Row, RowPacker};
22use mz_storage_operators::persist_source;
23use mz_storage_operators::persist_source::Subtime;
24use mz_storage_types::controller::CollectionMetadata;
25use mz_storage_types::dyncfgs;
26use mz_storage_types::errors::{
27 DataflowError, DecodeError, EnvelopeError, UpsertError, UpsertNullKeyError, UpsertValueError,
28};
29use mz_storage_types::parameters::StorageMaxInflightBytesConfig;
30use mz_storage_types::sources::envelope::{KeyEnvelope, NoneEnvelope, UpsertEnvelope, UpsertStyle};
31use mz_storage_types::sources::*;
32use mz_timely_util::builder_async::PressOnDropButton;
33use mz_timely_util::operator::CollectionExt;
34use mz_timely_util::order::refine_antichain;
35use serde::{Deserialize, Serialize};
36use timely::container::CapacityContainerBuilder;
37use timely::dataflow::StreamVec;
38use timely::dataflow::operators::vec::Map;
39use timely::dataflow::operators::{ConnectLoop, Feedback, Leave, OkErr};
40use timely::dataflow::scope::Scope;
41use timely::progress::{Antichain, Timestamp};
42
43use crate::decode::{render_decode_cdcv2, render_decode_delimited};
44use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
45use crate::source::types::{DecodeResult, SourceOutput, SourceRender};
46use crate::source::{self, RawSourceCreationConfig, SourceExportCreationConfig};
47use crate::upsert::{UpsertKey, UpsertValue};
48
49pub fn render_source<'scope, 'root, C>(
61 scope: Scope<'scope, mz_repr::Timestamp>,
62 root_scope: Scope<'root, ()>,
63 dataflow_debug_name: &String,
64 connection: C,
65 description: IngestionDescription<CollectionMetadata>,
66 resume_stream: StreamVec<'scope, mz_repr::Timestamp, ()>,
67 storage_state: &crate::storage_state::StorageState,
68 base_source_config: RawSourceCreationConfig,
69) -> (
70 BTreeMap<
71 GlobalId,
72 (
73 VecCollection<'scope, mz_repr::Timestamp, Row, Diff>,
74 VecCollection<'scope, mz_repr::Timestamp, DataflowError, Diff>,
75 ),
76 >,
77 Vec<StreamVec<'root, (), HealthStatusMessage>>,
78 Vec<PressOnDropButton>,
79)
80where
81 C: SourceConnection + SourceRender + 'static,
82{
83 let mut needed_tokens = Vec::new();
85
86 let (starter, mut start_signal) = tokio::sync::mpsc::channel::<()>(1);
99 let start_signal = async move {
100 let _ = start_signal.recv().await;
101 };
102
103 let (exports, health, source_tokens) = source::create_raw_source(
106 scope,
107 root_scope,
108 storage_state,
109 resume_stream,
110 &base_source_config,
111 connection,
112 start_signal,
113 );
114
115 needed_tokens.extend(source_tokens);
116
117 let mut health_streams = Vec::with_capacity(exports.len() + 1);
118 health_streams.push(health);
119
120 let mut outputs = BTreeMap::new();
121 for (export_id, export) in exports {
122 type CB<C> = CapacityContainerBuilder<C>;
123 let (ok_stream, err_stream) =
124 export.map_fallible::<CB<_>, CB<_>, _, _, _>("export-demux-ok-err", |r| r);
125
126 let mut error_collections = Vec::new();
130
131 let data_config = base_source_config.source_exports[&export_id]
132 .data_config
133 .clone();
134 let (ok, extra_tokens, health_stream) = render_source_stream(
135 scope,
136 dataflow_debug_name,
137 export_id,
138 ok_stream,
139 data_config,
140 &description,
141 &mut error_collections,
142 storage_state,
143 &base_source_config,
144 starter.clone(),
145 );
146 needed_tokens.extend(extra_tokens);
147
148 let err_collection = match error_collections.len() {
150 0 => err_stream,
151 _ => err_stream.concatenate(error_collections),
152 };
153
154 outputs.insert(export_id, (ok, err_collection));
155
156 health_streams.extend(health_stream.into_iter().map(|s| s.leave(root_scope)));
157 }
158 (outputs, health_streams, needed_tokens)
159}
160
161fn render_source_stream<'scope, FromTime>(
164 scope: Scope<'scope, mz_repr::Timestamp>,
165 dataflow_debug_name: &String,
166 export_id: GlobalId,
167 ok_source: VecCollection<'scope, mz_repr::Timestamp, SourceOutput<FromTime>, Diff>,
168 data_config: SourceExportDataConfig,
169 description: &IngestionDescription<CollectionMetadata>,
170 error_collections: &mut Vec<VecCollection<'scope, mz_repr::Timestamp, DataflowError, Diff>>,
171 storage_state: &crate::storage_state::StorageState,
172 base_source_config: &RawSourceCreationConfig,
173 rehydrated_token: impl std::any::Any + 'static,
174) -> (
175 VecCollection<'scope, mz_repr::Timestamp, Row, Diff>,
176 Vec<PressOnDropButton>,
177 Vec<StreamVec<'scope, mz_repr::Timestamp, HealthStatusMessage>>,
178)
179where
180 FromTime: Timestamp + Sync,
181{
182 let mut needed_tokens = vec![];
183
184 let SourceExportDataConfig { encoding, envelope } = data_config;
186
187 let SourceDesc {
188 connection: _,
189 timestamp_interval: _,
190 } = description.desc;
191
192 let (decoded_stream, decode_health) = match encoding {
193 None => (
194 ok_source.map(|r| DecodeResult {
195 key: Some(Ok(r.key)),
203 value: Some(Ok(r.value)),
204 metadata: r.metadata,
205 from_time: r.from_time,
206 }),
207 None,
208 ),
209 Some(encoding) => {
210 let (decoded_stream, decode_health) = render_decode_delimited(
211 ok_source,
212 encoding.key,
213 encoding.value,
214 dataflow_debug_name.clone(),
215 storage_state.metrics.decode_defs.clone(),
216 storage_state.storage_configuration.clone(),
217 );
218 (decoded_stream, Some(decode_health))
219 }
220 };
221
222 let (envelope_ok, envelope_health) = match &envelope {
224 SourceEnvelope::Upsert(upsert_envelope) => {
225 let upsert_input = upsert_commands(decoded_stream, upsert_envelope.clone());
226
227 let persist_clients = Arc::clone(&storage_state.persist_clients);
228 let resume_upper = base_source_config.resume_uppers[&export_id].clone();
230
231 let upper_ts = resume_upper
232 .as_option()
233 .expect("resuming an already finished ingestion")
234 .clone();
235 let outer_mz_scope = scope.clone();
236 let (upsert, health_update) = scope.scoped(
237 &format!("upsert_rehydration_backpressure({})", export_id),
238 |scope| {
239 let (previous, previous_token, feedback_handle, backpressure_metrics) = {
240 let as_of = Antichain::from_elem(upper_ts.saturating_sub(1));
241
242 let backpressure_max_inflight_bytes = get_backpressure_max_inflight_bytes(
243 &storage_state
244 .storage_configuration
245 .parameters
246 .storage_dataflow_max_inflight_bytes_config,
247 &storage_state.instance_context.cluster_memory_limit,
248 );
249
250 let (feedback_handle, flow_control, backpressure_metrics) =
251 if let Some(storage_dataflow_max_inflight_bytes) =
252 backpressure_max_inflight_bytes
253 {
254 tracing::info!(
255 ?backpressure_max_inflight_bytes,
256 "timely-{} using backpressure in upsert for source {}",
257 base_source_config.worker_id,
258 export_id
259 );
260 if !storage_state
261 .storage_configuration
262 .parameters
263 .storage_dataflow_max_inflight_bytes_config
264 .disk_only
265 || storage_state.instance_context.scratch_directory.is_some()
266 {
267 let (feedback_handle, feedback_data) =
268 scope.feedback(Default::default());
269
270 let backpressure_metrics = Some(
272 base_source_config
273 .metrics
274 .get_backpressure_metrics(export_id, scope.index()),
275 );
276
277 (
278 Some(feedback_handle),
279 Some(persist_source::FlowControl {
280 progress_stream: feedback_data,
281 max_inflight_bytes: storage_dataflow_max_inflight_bytes,
282 summary: (Default::default(), Subtime::least_summary()),
283 metrics: backpressure_metrics.clone(),
284 }),
285 backpressure_metrics,
286 )
287 } else {
288 (None, None, None)
289 }
290 } else {
291 (None, None, None)
292 };
293
294 let storage_metadata = description.source_exports[&export_id]
295 .storage_metadata
296 .clone();
297
298 let error_handler =
299 storage_state.error_handler("upsert_rehydration", export_id);
300
301 let (stream, tok) = persist_source::persist_source_core(
302 outer_mz_scope,
303 scope,
304 export_id,
305 persist_clients,
306 storage_metadata,
307 None,
308 Some(as_of),
309 SnapshotMode::Include,
310 Antichain::new(),
311 None,
312 flow_control,
313 false.then_some(|| unreachable!()),
314 async {},
315 error_handler,
316 );
317 (
318 stream.as_collection(),
319 Some(tok),
320 feedback_handle,
321 backpressure_metrics,
322 )
323 };
324
325 let export_statistics = storage_state
326 .aggregated_statistics
327 .get_source(&export_id)
328 .expect("statistics initialized")
329 .clone();
330 let export_config = SourceExportCreationConfig {
331 id: export_id,
332 worker_id: base_source_config.worker_id,
333 metrics: base_source_config.metrics.clone(),
334 source_statistics: export_statistics,
335 };
336 let (upsert, health_update, snapshot_progress, upsert_token) =
337 if dyncfgs::ENABLE_UPSERT_V2
338 .get(storage_state.storage_configuration.config_set())
339 {
340 crate::upsert::upsert_v2(
341 upsert_input.enter(scope),
342 upsert_envelope.clone(),
343 refine_antichain(&resume_upper),
344 previous,
345 previous_token,
346 export_config,
347 backpressure_metrics,
348 )
349 } else {
350 crate::upsert::upsert(
351 upsert_input.enter(scope),
352 upsert_envelope.clone(),
353 refine_antichain(&resume_upper),
354 previous,
355 previous_token,
356 export_config,
357 &storage_state.instance_context,
358 &storage_state.storage_configuration,
359 &storage_state.dataflow_parameters,
360 backpressure_metrics,
361 )
362 };
363
364 needed_tokens.push(upsert_token);
369
370 if dyncfgs::DELAY_SOURCES_PAST_REHYDRATION
374 .get(storage_state.storage_configuration.config_set())
375 {
376 crate::upsert::rehydration_finished(
377 scope.clone(),
378 base_source_config,
379 rehydrated_token,
380 refine_antichain(&resume_upper),
381 snapshot_progress.clone(),
382 );
383 } else {
384 drop(rehydrated_token)
385 };
386
387 if let Some(feedback_handle) = feedback_handle {
390 snapshot_progress.connect_loop(feedback_handle);
391 }
392
393 (
394 upsert.leave(outer_mz_scope),
395 health_update
396 .map(|(id, update)| HealthStatusMessage {
397 id,
398 namespace: StatusNamespace::Upsert,
399 update,
400 })
401 .leave(outer_mz_scope),
402 )
403 },
404 );
405
406 let (upsert_ok, upsert_err) = upsert.inner.ok_err(split_ok_err);
407 error_collections.push(upsert_err.as_collection());
408
409 (upsert_ok.as_collection(), Some(health_update))
410 }
411 SourceEnvelope::None(none_envelope) => {
412 let results = append_metadata_to_value(decoded_stream);
413
414 let flattened_stream = flatten_results_prepend_keys(none_envelope, results);
415
416 let (stream, errors) = flattened_stream.inner.ok_err(split_ok_err);
417
418 error_collections.push(errors.as_collection());
419 (stream.as_collection(), None)
420 }
421 SourceEnvelope::CdcV2 => {
422 let (oks, token) = render_decode_cdcv2(&decoded_stream);
423 needed_tokens.push(token);
424 (oks, None)
425 }
426 };
427
428 let health = decode_health.into_iter().chain(envelope_health).collect();
430 (envelope_ok, needed_tokens, health)
431}
432
433fn get_backpressure_max_inflight_bytes(
436 inflight_bytes_config: &StorageMaxInflightBytesConfig,
437 cluster_memory_limit: &Option<usize>,
438) -> Option<usize> {
439 let StorageMaxInflightBytesConfig {
440 max_inflight_bytes_default,
441 max_inflight_bytes_cluster_size_fraction,
442 disk_only: _,
443 } = inflight_bytes_config;
444
445 if max_inflight_bytes_default.is_some() {
447 let current_cluster_max_bytes_limit =
448 cluster_memory_limit.as_ref().and_then(|cluster_memory| {
449 max_inflight_bytes_cluster_size_fraction.map(|fraction| {
450 usize::cast_lossy(f64::cast_lossy(*cluster_memory) * fraction)
452 })
453 });
454 current_cluster_max_bytes_limit.or(*max_inflight_bytes_default)
455 } else {
456 None
457 }
458}
459
460fn split_ok_err<O, E, T, D>(x: (Result<O, E>, T, D)) -> Result<(O, T, D), (E, T, D)> {
463 match x {
464 (Ok(ok), ts, diff) => Ok((ok, ts, diff)),
465 (Err(err), ts, diff) => Err((err, ts, diff)),
466 }
467}
468
469#[derive(
471 Debug,
472 Clone,
473 Hash,
474 PartialEq,
475 Eq,
476 Ord,
477 PartialOrd,
478 Serialize,
479 Deserialize
480)]
481struct KV {
482 key: Option<Result<Row, DecodeError>>,
483 val: Option<Result<Row, DecodeError>>,
484}
485
486fn append_metadata_to_value<'scope, T: Timestamp, FromTime: Timestamp>(
487 results: VecCollection<'scope, T, DecodeResult<FromTime>, Diff>,
488) -> VecCollection<'scope, T, KV, Diff> {
489 results.map(move |res| {
490 let val = res.value.map(|val_result| {
491 val_result.map(|mut val| {
492 if !res.metadata.is_empty() {
493 RowPacker::for_existing_row(&mut val).extend_by_row(&res.metadata);
494 }
495 val
496 })
497 });
498
499 KV { val, key: res.key }
500 })
501}
502
503fn upsert_commands<'scope, T: Timestamp, FromTime: Timestamp>(
505 input: VecCollection<'scope, T, DecodeResult<FromTime>, Diff>,
506 upsert_envelope: UpsertEnvelope,
507) -> VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff> {
508 let mut row_buf = Row::default();
509 input.map(move |result| {
510 let from_time = result.from_time;
511
512 let key = match result.key {
513 Some(Ok(key)) => Ok(key),
514 None => Err(UpsertError::NullKey(UpsertNullKeyError)),
515 Some(Err(err)) => Err(UpsertError::KeyDecode(err)),
516 };
517
518 let key = match key {
520 Ok(key) => key,
521 Err(err) => match result.value {
522 Some(_) => {
523 return (
524 UpsertKey::from_key(Err(&err)),
525 Some(Err(Box::new(err))),
526 from_time,
527 );
528 }
529 None => return (UpsertKey::from_key(Err(&err)), None, from_time),
530 },
531 };
532
533 let key_row = match upsert_envelope.style {
535 UpsertStyle::Debezium { .. }
537 | UpsertStyle::Default(KeyEnvelope::Flattened)
538 | UpsertStyle::ValueErrInline {
539 key_envelope: KeyEnvelope::Flattened,
540 error_column: _,
541 } => key,
542 UpsertStyle::Default(KeyEnvelope::Named(_))
544 | UpsertStyle::ValueErrInline {
545 key_envelope: KeyEnvelope::Named(_),
546 error_column: _,
547 } => {
548 if key.iter().nth(1).is_none() {
549 key
550 } else {
551 row_buf.packer().push_list(key.iter());
552 row_buf.clone()
553 }
554 }
555 UpsertStyle::Default(KeyEnvelope::None)
556 | UpsertStyle::ValueErrInline {
557 key_envelope: KeyEnvelope::None,
558 error_column: _,
559 } => unreachable!(),
560 };
561
562 let key = UpsertKey::from_key(Ok(&key_row));
563
564 let metadata = result.metadata;
565
566 let value = match result.value {
567 Some(Ok(ref row)) => match upsert_envelope.style {
568 UpsertStyle::Debezium { after_idx } => match row.iter().nth(after_idx).unwrap() {
569 Datum::List(after) => {
570 let mut packer = row_buf.packer();
571 packer.extend(after.iter());
572 packer.extend_by_row(&metadata);
573 Some(Ok(row_buf.clone()))
574 }
575 Datum::Null => None,
576 d => panic!("type error: expected record, found {:?}", d),
577 },
578 UpsertStyle::Default(_) => {
579 let mut packer = row_buf.packer();
580 packer.extend_by_row(&key_row);
581 packer.extend_by_row(row);
582 packer.extend_by_row(&metadata);
583 Some(Ok(row_buf.clone()))
584 }
585 UpsertStyle::ValueErrInline { .. } => {
586 let mut packer = row_buf.packer();
587 packer.extend_by_row(&key_row);
588 packer.push(Datum::Null);
590 packer.extend_by_row(row);
591 packer.extend_by_row(&metadata);
592 Some(Ok(row_buf.clone()))
593 }
594 },
595 Some(Err(inner)) => {
596 match upsert_envelope.style {
597 UpsertStyle::ValueErrInline { .. } => {
598 let mut count = 0;
599 let err_string = inner.to_string();
601 let mut packer = row_buf.packer();
602 for datum in key_row.iter() {
603 packer.push(datum);
604 count += 1;
605 }
606 packer.push_list(iter::once(Datum::String(&err_string)));
608 count += 1;
609 let metadata_len = metadata.as_row_ref().iter().count();
610 packer.extend(
612 iter::repeat(Datum::Null)
613 .take(upsert_envelope.source_arity - count - metadata_len),
614 );
615 packer.extend_by_row(&metadata);
616 Some(Ok(row_buf.clone()))
617 }
618 _ => Some(Err(Box::new(UpsertError::Value(UpsertValueError {
619 for_key: key_row,
620 inner,
621 })))),
622 }
623 }
624 None => None,
625 };
626
627 (key, value, from_time)
628 })
629}
630
631fn flatten_results_prepend_keys<'scope, T: Timestamp>(
633 none_envelope: &NoneEnvelope,
634 results: VecCollection<'scope, T, KV, Diff>,
635) -> VecCollection<'scope, T, Result<Row, DataflowError>, Diff> {
636 let NoneEnvelope {
637 key_envelope,
638 key_arity,
639 } = none_envelope;
640
641 let null_key_columns = Row::pack_slice(&vec![Datum::Null; *key_arity]);
642
643 match key_envelope {
644 KeyEnvelope::None => {
645 results.flat_map(|KV { val, .. }| val.map(|result| result.map_err(Into::into)))
646 }
647 KeyEnvelope::Flattened => results
648 .flat_map(raise_key_value_errors)
649 .map(move |maybe_kv| {
650 maybe_kv.map(|(key, value)| {
651 let mut key = key.unwrap_or_else(|| null_key_columns.clone());
652 RowPacker::for_existing_row(&mut key).extend_by_row(&value);
653 key
654 })
655 }),
656 KeyEnvelope::Named(_) => {
657 results
658 .flat_map(raise_key_value_errors)
659 .map(move |maybe_kv| {
660 maybe_kv.map(|(key, value)| {
661 let mut key = key.unwrap_or_else(|| null_key_columns.clone());
662 let row = if key.iter().nth(1).is_none() {
665 RowPacker::for_existing_row(&mut key).extend_by_row(&value);
666 key
667 } else {
668 let mut new_row = Row::default();
669 let mut packer = new_row.packer();
670 packer.push_list(key.iter());
671 packer.extend_by_row(&value);
672 new_row
673 };
674 row
675 })
676 })
677 }
678 }
679}
680
681fn raise_key_value_errors(
683 KV { key, val }: KV,
684) -> Option<Result<(Option<Row>, Row), DataflowError>> {
685 match (key, val) {
686 (Some(Ok(key)), Some(Ok(value))) => Some(Ok((Some(key), value))),
687 (None, Some(Ok(value))) => Some(Ok((None, value))),
688 (_, Some(Err(e))) => Some(Err(e.into())),
690 (Some(Err(e)), _) => Some(Err(e.into())),
691 (None, None) => None,
692 _ => Some(Err(DataflowError::from(EnvelopeError::Flat(
694 "Value not present for message".into(),
695 )))),
696 }
697}
698
699#[cfg(test)]
700mod test {
701 use super::*;
702
703 #[mz_ore::test]
704 fn test_no_default() {
705 let config = StorageMaxInflightBytesConfig {
706 max_inflight_bytes_default: None,
707 max_inflight_bytes_cluster_size_fraction: Some(0.5),
708 disk_only: false,
709 };
710 let memory_limit = Some(1000);
711
712 let backpressure_inflight_bytes_limit =
713 get_backpressure_max_inflight_bytes(&config, &memory_limit);
714
715 assert_eq!(backpressure_inflight_bytes_limit, None)
716 }
717
718 #[mz_ore::test]
719 fn test_no_matching_size() {
720 let config = StorageMaxInflightBytesConfig {
721 max_inflight_bytes_default: Some(10000),
722 max_inflight_bytes_cluster_size_fraction: Some(0.5),
723 disk_only: false,
724 };
725
726 let backpressure_inflight_bytes_limit = get_backpressure_max_inflight_bytes(&config, &None);
727
728 assert_eq!(
729 backpressure_inflight_bytes_limit,
730 config.max_inflight_bytes_default
731 )
732 }
733
734 #[mz_ore::test]
735 fn test_calculated_cluster_limit() {
736 let config = StorageMaxInflightBytesConfig {
737 max_inflight_bytes_default: Some(10000),
738 max_inflight_bytes_cluster_size_fraction: Some(0.5),
739 disk_only: false,
740 };
741 let memory_limit = Some(2000);
742
743 let backpressure_inflight_bytes_limit =
744 get_backpressure_max_inflight_bytes(&config, &memory_limit);
745
746 assert_eq!(backpressure_inflight_bytes_limit, Some(1000));
748 }
749}