1use std::collections::BTreeMap;
15use std::iter;
16use std::sync::Arc;
17
18use differential_dataflow::{AsCollection, VecCollection};
19use mz_ore::cast::CastLossy;
20use mz_persist_client::operators::shard_source::SnapshotMode;
21use mz_repr::{Datum, Diff, GlobalId, Row, RowPacker};
22use mz_storage_operators::persist_source;
23use mz_storage_operators::persist_source::Subtime;
24use mz_storage_types::controller::CollectionMetadata;
25use mz_storage_types::dyncfgs;
26use mz_storage_types::errors::{
27 DataflowError, DecodeError, EnvelopeError, UpsertError, UpsertNullKeyError, UpsertValueError,
28};
29use mz_storage_types::parameters::StorageMaxInflightBytesConfig;
30use mz_storage_types::sources::envelope::{KeyEnvelope, NoneEnvelope, UpsertEnvelope, UpsertStyle};
31use mz_storage_types::sources::*;
32use mz_timely_util::builder_async::PressOnDropButton;
33use mz_timely_util::operator::CollectionExt;
34use mz_timely_util::order::refine_antichain;
35use serde::{Deserialize, Serialize};
36use timely::container::CapacityContainerBuilder;
37use timely::dataflow::StreamVec;
38use timely::dataflow::operators::vec::Map;
39use timely::dataflow::operators::{ConnectLoop, Feedback, Leave, OkErr};
40use timely::dataflow::scope::Scope;
41use timely::progress::{Antichain, Timestamp};
42
43use crate::decode::{render_decode_cdcv2, render_decode_delimited};
44use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
45use crate::source::types::{DecodeResult, SourceOutput, SourceRender};
46use crate::source::{self, RawSourceCreationConfig, SourceExportCreationConfig};
47use crate::upsert::{UpsertKey, UpsertSourceTime, UpsertValue};
48
49pub fn render_source<'scope, 'root, C>(
61 scope: Scope<'scope, mz_repr::Timestamp>,
62 root_scope: Scope<'root, ()>,
63 dataflow_debug_name: &String,
64 connection: C,
65 description: IngestionDescription<CollectionMetadata>,
66 resume_stream: StreamVec<'scope, mz_repr::Timestamp, ()>,
67 storage_state: &crate::storage_state::StorageState,
68 base_source_config: RawSourceCreationConfig,
69) -> (
70 BTreeMap<
71 GlobalId,
72 (
73 VecCollection<'scope, mz_repr::Timestamp, Row, Diff>,
74 VecCollection<'scope, mz_repr::Timestamp, DataflowError, Diff>,
75 ),
76 >,
77 Vec<StreamVec<'root, (), HealthStatusMessage>>,
78 Vec<PressOnDropButton>,
79)
80where
81 C: SourceConnection + SourceRender + 'static,
82 C::Time: UpsertSourceTime,
83{
84 let mut needed_tokens = Vec::new();
86
87 let (starter, mut start_signal) = tokio::sync::mpsc::channel::<()>(1);
100 let start_signal = async move {
101 let _ = start_signal.recv().await;
102 };
103
104 let (exports, health, source_tokens) = source::create_raw_source(
107 scope,
108 root_scope,
109 storage_state,
110 resume_stream,
111 &base_source_config,
112 connection,
113 start_signal,
114 );
115
116 needed_tokens.extend(source_tokens);
117
118 let mut health_streams = Vec::with_capacity(exports.len() + 1);
119 health_streams.push(health);
120
121 let mut outputs = BTreeMap::new();
122 for (export_id, export) in exports {
123 type CB<C> = CapacityContainerBuilder<C>;
124 let (ok_stream, err_stream) =
125 export.map_fallible::<CB<_>, CB<_>, _, _, _>("export-demux-ok-err", |r| r);
126
127 let mut error_collections = Vec::new();
131
132 let data_config = base_source_config.source_exports[&export_id]
133 .data_config
134 .clone();
135 let (ok, extra_tokens, health_stream) = render_source_stream(
136 scope,
137 dataflow_debug_name,
138 export_id,
139 ok_stream,
140 data_config,
141 &description,
142 &mut error_collections,
143 storage_state,
144 &base_source_config,
145 starter.clone(),
146 );
147 needed_tokens.extend(extra_tokens);
148
149 let err_collection = match error_collections.len() {
151 0 => err_stream,
152 _ => err_stream.concatenate(error_collections),
153 };
154
155 outputs.insert(export_id, (ok, err_collection));
156
157 health_streams.extend(health_stream.into_iter().map(|s| s.leave(root_scope)));
158 }
159 (outputs, health_streams, needed_tokens)
160}
161
162fn render_source_stream<'scope, FromTime>(
165 scope: Scope<'scope, mz_repr::Timestamp>,
166 dataflow_debug_name: &String,
167 export_id: GlobalId,
168 ok_source: VecCollection<'scope, mz_repr::Timestamp, SourceOutput<FromTime>, Diff>,
169 data_config: SourceExportDataConfig,
170 description: &IngestionDescription<CollectionMetadata>,
171 error_collections: &mut Vec<VecCollection<'scope, mz_repr::Timestamp, DataflowError, Diff>>,
172 storage_state: &crate::storage_state::StorageState,
173 base_source_config: &RawSourceCreationConfig,
174 rehydrated_token: impl std::any::Any + 'static,
175) -> (
176 VecCollection<'scope, mz_repr::Timestamp, Row, Diff>,
177 Vec<PressOnDropButton>,
178 Vec<StreamVec<'scope, mz_repr::Timestamp, HealthStatusMessage>>,
179)
180where
181 FromTime: Timestamp + Sync,
182 FromTime: UpsertSourceTime,
183{
184 let mut needed_tokens = vec![];
185
186 let SourceExportDataConfig { encoding, envelope } = data_config;
188
189 let SourceDesc {
190 connection: _,
191 timestamp_interval: _,
192 } = description.desc;
193
194 let (decoded_stream, decode_health) = match encoding {
195 None => (
196 ok_source.map(|r| DecodeResult {
197 key: Some(Ok(r.key)),
205 value: Some(Ok(r.value)),
206 metadata: r.metadata,
207 from_time: r.from_time,
208 }),
209 None,
210 ),
211 Some(encoding) => {
212 let (decoded_stream, decode_health) = render_decode_delimited(
213 ok_source,
214 encoding.key,
215 encoding.value,
216 dataflow_debug_name.clone(),
217 storage_state.metrics.decode_defs.clone(),
218 storage_state.storage_configuration.clone(),
219 );
220 (decoded_stream, Some(decode_health))
221 }
222 };
223
224 let (envelope_ok, envelope_health) = match &envelope {
226 SourceEnvelope::Upsert(upsert_envelope) => {
227 let upsert_input = upsert_commands(decoded_stream, upsert_envelope.clone());
228
229 let persist_clients = Arc::clone(&storage_state.persist_clients);
230 let resume_upper = base_source_config.resume_uppers[&export_id].clone();
232
233 let upper_ts = resume_upper
234 .as_option()
235 .expect("resuming an already finished ingestion")
236 .clone();
237 let outer_mz_scope = scope.clone();
238 let (upsert, health_update) = scope.scoped(
239 &format!("upsert_rehydration_backpressure({})", export_id),
240 |scope| {
241 let (previous, previous_token, feedback_handle, backpressure_metrics) = {
242 let as_of = Antichain::from_elem(upper_ts.saturating_sub(1));
243
244 let backpressure_max_inflight_bytes = get_backpressure_max_inflight_bytes(
245 &storage_state
246 .storage_configuration
247 .parameters
248 .storage_dataflow_max_inflight_bytes_config,
249 &storage_state.instance_context.cluster_memory_limit,
250 );
251
252 let (feedback_handle, flow_control, backpressure_metrics) =
253 if let Some(storage_dataflow_max_inflight_bytes) =
254 backpressure_max_inflight_bytes
255 {
256 tracing::info!(
257 ?backpressure_max_inflight_bytes,
258 "timely-{} using backpressure in upsert for source {}",
259 base_source_config.worker_id,
260 export_id
261 );
262 if !storage_state
263 .storage_configuration
264 .parameters
265 .storage_dataflow_max_inflight_bytes_config
266 .disk_only
267 || storage_state.instance_context.scratch_directory.is_some()
268 {
269 let (feedback_handle, feedback_data) =
270 scope.feedback(Default::default());
271
272 let backpressure_metrics = Some(
274 base_source_config
275 .metrics
276 .get_backpressure_metrics(export_id, scope.index()),
277 );
278
279 (
280 Some(feedback_handle),
281 Some(persist_source::FlowControl {
282 progress_stream: feedback_data,
283 max_inflight_bytes: storage_dataflow_max_inflight_bytes,
284 summary: (Default::default(), Subtime::least_summary()),
285 metrics: backpressure_metrics.clone(),
286 }),
287 backpressure_metrics,
288 )
289 } else {
290 (None, None, None)
291 }
292 } else {
293 (None, None, None)
294 };
295
296 let storage_metadata = description.source_exports[&export_id]
297 .storage_metadata
298 .clone();
299
300 let error_handler =
301 storage_state.error_handler("upsert_rehydration", export_id);
302
303 let (stream, tok) = persist_source::persist_source_core(
304 outer_mz_scope,
305 scope,
306 export_id,
307 persist_clients,
308 storage_metadata,
309 None,
310 Some(as_of),
311 SnapshotMode::Include,
312 Antichain::new(),
313 None,
314 flow_control,
315 false.then_some(|| unreachable!()),
316 async {},
317 error_handler,
318 );
319 (
320 stream.as_collection(),
321 Some(tok),
322 feedback_handle,
323 backpressure_metrics,
324 )
325 };
326
327 let export_statistics = storage_state
328 .aggregated_statistics
329 .get_source(&export_id)
330 .expect("statistics initialized")
331 .clone();
332 let export_config = SourceExportCreationConfig {
333 id: export_id,
334 worker_id: base_source_config.worker_id,
335 metrics: base_source_config.metrics.clone(),
336 source_statistics: export_statistics,
337 };
338 let (upsert, health_update, snapshot_progress, upsert_token) =
339 if dyncfgs::ENABLE_UPSERT_V2
340 .get(storage_state.storage_configuration.config_set())
341 {
342 crate::upsert::upsert_v2(
343 upsert_input.enter(scope),
344 upsert_envelope.clone(),
345 refine_antichain(&resume_upper),
346 previous,
347 previous_token,
348 export_config,
349 backpressure_metrics,
350 )
351 } else {
352 crate::upsert::upsert(
353 upsert_input.enter(scope),
354 upsert_envelope.clone(),
355 refine_antichain(&resume_upper),
356 previous,
357 previous_token,
358 export_config,
359 &storage_state.instance_context,
360 &storage_state.storage_configuration,
361 &storage_state.dataflow_parameters,
362 backpressure_metrics,
363 )
364 };
365
366 needed_tokens.push(upsert_token);
371
372 if dyncfgs::DELAY_SOURCES_PAST_REHYDRATION
376 .get(storage_state.storage_configuration.config_set())
377 {
378 crate::upsert::rehydration_finished(
379 scope.clone(),
380 base_source_config,
381 rehydrated_token,
382 refine_antichain(&resume_upper),
383 snapshot_progress.clone(),
384 );
385 } else {
386 drop(rehydrated_token)
387 };
388
389 if let Some(feedback_handle) = feedback_handle {
392 snapshot_progress.connect_loop(feedback_handle);
393 }
394
395 (
396 upsert.leave(outer_mz_scope),
397 health_update
398 .map(|(id, update)| HealthStatusMessage {
399 id,
400 namespace: StatusNamespace::Upsert,
401 update,
402 })
403 .leave(outer_mz_scope),
404 )
405 },
406 );
407
408 let (upsert_ok, upsert_err) = upsert.inner.ok_err(split_ok_err);
409 error_collections.push(upsert_err.as_collection());
410
411 (upsert_ok.as_collection(), Some(health_update))
412 }
413 SourceEnvelope::None(none_envelope) => {
414 let results = append_metadata_to_value(decoded_stream);
415
416 let flattened_stream = flatten_results_prepend_keys(none_envelope, results);
417
418 let (stream, errors) = flattened_stream.inner.ok_err(split_ok_err);
419
420 error_collections.push(errors.as_collection());
421 (stream.as_collection(), None)
422 }
423 SourceEnvelope::CdcV2 => {
424 let (oks, token) = render_decode_cdcv2(&decoded_stream);
425 needed_tokens.push(token);
426 (oks, None)
427 }
428 };
429
430 let health = decode_health.into_iter().chain(envelope_health).collect();
432 (envelope_ok, needed_tokens, health)
433}
434
435fn get_backpressure_max_inflight_bytes(
438 inflight_bytes_config: &StorageMaxInflightBytesConfig,
439 cluster_memory_limit: &Option<usize>,
440) -> Option<usize> {
441 let StorageMaxInflightBytesConfig {
442 max_inflight_bytes_default,
443 max_inflight_bytes_cluster_size_fraction,
444 disk_only: _,
445 } = inflight_bytes_config;
446
447 if max_inflight_bytes_default.is_some() {
449 let current_cluster_max_bytes_limit =
450 cluster_memory_limit.as_ref().and_then(|cluster_memory| {
451 max_inflight_bytes_cluster_size_fraction.map(|fraction| {
452 usize::cast_lossy(f64::cast_lossy(*cluster_memory) * fraction)
454 })
455 });
456 current_cluster_max_bytes_limit.or(*max_inflight_bytes_default)
457 } else {
458 None
459 }
460}
461
462fn split_ok_err<O, E, T, D>(x: (Result<O, E>, T, D)) -> Result<(O, T, D), (E, T, D)> {
465 match x {
466 (Ok(ok), ts, diff) => Ok((ok, ts, diff)),
467 (Err(err), ts, diff) => Err((err, ts, diff)),
468 }
469}
470
471#[derive(
473 Debug,
474 Clone,
475 Hash,
476 PartialEq,
477 Eq,
478 Ord,
479 PartialOrd,
480 Serialize,
481 Deserialize
482)]
483struct KV {
484 key: Option<Result<Row, DecodeError>>,
485 val: Option<Result<Row, DecodeError>>,
486}
487
488fn append_metadata_to_value<'scope, T: Timestamp, FromTime: Timestamp>(
489 results: VecCollection<'scope, T, DecodeResult<FromTime>, Diff>,
490) -> VecCollection<'scope, T, KV, Diff> {
491 results.map(move |res| {
492 let val = res.value.map(|val_result| {
493 val_result.map(|mut val| {
494 if !res.metadata.is_empty() {
495 RowPacker::for_existing_row(&mut val).extend_by_row(&res.metadata);
496 }
497 val
498 })
499 });
500
501 KV { val, key: res.key }
502 })
503}
504
505fn upsert_commands<'scope, T: Timestamp, FromTime: Timestamp>(
507 input: VecCollection<'scope, T, DecodeResult<FromTime>, Diff>,
508 upsert_envelope: UpsertEnvelope,
509) -> VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff> {
510 let mut row_buf = Row::default();
511 input.map(move |result| {
512 let from_time = result.from_time;
513
514 let key = match result.key {
515 Some(Ok(key)) => Ok(key),
516 None => Err(UpsertError::NullKey(UpsertNullKeyError)),
517 Some(Err(err)) => Err(UpsertError::KeyDecode(err)),
518 };
519
520 let key = match key {
522 Ok(key) => key,
523 Err(err) => match result.value {
524 Some(_) => {
525 return (
526 UpsertKey::from_key(Err(&err)),
527 Some(Err(Box::new(err))),
528 from_time,
529 );
530 }
531 None => return (UpsertKey::from_key(Err(&err)), None, from_time),
532 },
533 };
534
535 let key_row = match upsert_envelope.style {
537 UpsertStyle::Debezium { .. }
539 | UpsertStyle::Default(KeyEnvelope::Flattened)
540 | UpsertStyle::ValueErrInline {
541 key_envelope: KeyEnvelope::Flattened,
542 error_column: _,
543 } => key,
544 UpsertStyle::Default(KeyEnvelope::Named(_))
546 | UpsertStyle::ValueErrInline {
547 key_envelope: KeyEnvelope::Named(_),
548 error_column: _,
549 } => {
550 if key.iter().nth(1).is_none() {
551 key
552 } else {
553 row_buf.packer().push_list(key.iter());
554 row_buf.clone()
555 }
556 }
557 UpsertStyle::Default(KeyEnvelope::None)
558 | UpsertStyle::ValueErrInline {
559 key_envelope: KeyEnvelope::None,
560 error_column: _,
561 } => unreachable!(),
562 };
563
564 let key = UpsertKey::from_key(Ok(&key_row));
565
566 let metadata = result.metadata;
567
568 let value = match result.value {
569 Some(Ok(ref row)) => match upsert_envelope.style {
570 UpsertStyle::Debezium { after_idx } => match row.iter().nth(after_idx).unwrap() {
571 Datum::List(after) => {
572 let mut packer = row_buf.packer();
573 packer.extend(after.iter());
574 packer.extend_by_row(&metadata);
575 Some(Ok(row_buf.clone()))
576 }
577 Datum::Null => None,
578 d => panic!("type error: expected record, found {:?}", d),
579 },
580 UpsertStyle::Default(_) => {
581 let mut packer = row_buf.packer();
582 packer.extend_by_row(&key_row);
583 packer.extend_by_row(row);
584 packer.extend_by_row(&metadata);
585 Some(Ok(row_buf.clone()))
586 }
587 UpsertStyle::ValueErrInline { .. } => {
588 let mut packer = row_buf.packer();
589 packer.extend_by_row(&key_row);
590 packer.push(Datum::Null);
592 packer.extend_by_row(row);
593 packer.extend_by_row(&metadata);
594 Some(Ok(row_buf.clone()))
595 }
596 },
597 Some(Err(inner)) => {
598 match upsert_envelope.style {
599 UpsertStyle::ValueErrInline { .. } => {
600 let mut count = 0;
601 let err_string = inner.to_string();
603 let mut packer = row_buf.packer();
604 for datum in key_row.iter() {
605 packer.push(datum);
606 count += 1;
607 }
608 packer.push_list(iter::once(Datum::String(&err_string)));
610 count += 1;
611 let metadata_len = metadata.as_row_ref().iter().count();
612 packer.extend(
614 iter::repeat(Datum::Null)
615 .take(upsert_envelope.source_arity - count - metadata_len),
616 );
617 packer.extend_by_row(&metadata);
618 Some(Ok(row_buf.clone()))
619 }
620 _ => Some(Err(Box::new(UpsertError::Value(UpsertValueError {
621 for_key: key_row,
622 inner,
623 })))),
624 }
625 }
626 None => None,
627 };
628
629 (key, value, from_time)
630 })
631}
632
633fn flatten_results_prepend_keys<'scope, T: Timestamp>(
635 none_envelope: &NoneEnvelope,
636 results: VecCollection<'scope, T, KV, Diff>,
637) -> VecCollection<'scope, T, Result<Row, DataflowError>, Diff> {
638 let NoneEnvelope {
639 key_envelope,
640 key_arity,
641 } = none_envelope;
642
643 let null_key_columns = Row::pack_slice(&vec![Datum::Null; *key_arity]);
644
645 match key_envelope {
646 KeyEnvelope::None => {
647 results.flat_map(|KV { val, .. }| val.map(|result| result.map_err(Into::into)))
648 }
649 KeyEnvelope::Flattened => results
650 .flat_map(raise_key_value_errors)
651 .map(move |maybe_kv| {
652 maybe_kv.map(|(key, value)| {
653 let mut key = key.unwrap_or_else(|| null_key_columns.clone());
654 RowPacker::for_existing_row(&mut key).extend_by_row(&value);
655 key
656 })
657 }),
658 KeyEnvelope::Named(_) => {
659 results
660 .flat_map(raise_key_value_errors)
661 .map(move |maybe_kv| {
662 maybe_kv.map(|(key, value)| {
663 let mut key = key.unwrap_or_else(|| null_key_columns.clone());
664 let row = if key.iter().nth(1).is_none() {
667 RowPacker::for_existing_row(&mut key).extend_by_row(&value);
668 key
669 } else {
670 let mut new_row = Row::default();
671 let mut packer = new_row.packer();
672 packer.push_list(key.iter());
673 packer.extend_by_row(&value);
674 new_row
675 };
676 row
677 })
678 })
679 }
680 }
681}
682
683fn raise_key_value_errors(
685 KV { key, val }: KV,
686) -> Option<Result<(Option<Row>, Row), DataflowError>> {
687 match (key, val) {
688 (Some(Ok(key)), Some(Ok(value))) => Some(Ok((Some(key), value))),
689 (None, Some(Ok(value))) => Some(Ok((None, value))),
690 (_, Some(Err(e))) => Some(Err(e.into())),
692 (Some(Err(e)), _) => Some(Err(e.into())),
693 (None, None) => None,
694 _ => Some(Err(DataflowError::from(EnvelopeError::Flat(
696 "Value not present for message".into(),
697 )))),
698 }
699}
700
701#[cfg(test)]
702mod test {
703 use super::*;
704
705 #[mz_ore::test]
706 fn test_no_default() {
707 let config = StorageMaxInflightBytesConfig {
708 max_inflight_bytes_default: None,
709 max_inflight_bytes_cluster_size_fraction: Some(0.5),
710 disk_only: false,
711 };
712 let memory_limit = Some(1000);
713
714 let backpressure_inflight_bytes_limit =
715 get_backpressure_max_inflight_bytes(&config, &memory_limit);
716
717 assert_eq!(backpressure_inflight_bytes_limit, None)
718 }
719
720 #[mz_ore::test]
721 fn test_no_matching_size() {
722 let config = StorageMaxInflightBytesConfig {
723 max_inflight_bytes_default: Some(10000),
724 max_inflight_bytes_cluster_size_fraction: Some(0.5),
725 disk_only: false,
726 };
727
728 let backpressure_inflight_bytes_limit = get_backpressure_max_inflight_bytes(&config, &None);
729
730 assert_eq!(
731 backpressure_inflight_bytes_limit,
732 config.max_inflight_bytes_default
733 )
734 }
735
736 #[mz_ore::test]
737 fn test_calculated_cluster_limit() {
738 let config = StorageMaxInflightBytesConfig {
739 max_inflight_bytes_default: Some(10000),
740 max_inflight_bytes_cluster_size_fraction: Some(0.5),
741 disk_only: false,
742 };
743 let memory_limit = Some(2000);
744
745 let backpressure_inflight_bytes_limit =
746 get_backpressure_max_inflight_bytes(&config, &memory_limit);
747
748 assert_eq!(backpressure_inflight_bytes_limit, Some(1000));
750 }
751}