1use std::collections::BTreeMap;
15use std::iter;
16use std::sync::Arc;
17
18use differential_dataflow::{AsCollection, Collection};
19use mz_ore::cast::CastLossy;
20use mz_persist_client::operators::shard_source::SnapshotMode;
21use mz_repr::{Datum, Diff, GlobalId, Row, RowPacker};
22use mz_storage_operators::persist_source;
23use mz_storage_operators::persist_source::Subtime;
24use mz_storage_types::controller::CollectionMetadata;
25use mz_storage_types::dyncfgs;
26use mz_storage_types::errors::{
27 DataflowError, DecodeError, EnvelopeError, UpsertError, UpsertNullKeyError, UpsertValueError,
28};
29use mz_storage_types::parameters::StorageMaxInflightBytesConfig;
30use mz_storage_types::sources::envelope::{KeyEnvelope, NoneEnvelope, UpsertEnvelope, UpsertStyle};
31use mz_storage_types::sources::*;
32use mz_timely_util::builder_async::PressOnDropButton;
33use mz_timely_util::operator::CollectionExt;
34use mz_timely_util::order::refine_antichain;
35use serde::{Deserialize, Serialize};
36use timely::container::CapacityContainerBuilder;
37use timely::dataflow::Stream;
38use timely::dataflow::operators::{ConnectLoop, Feedback, Leave, Map, OkErr};
39use timely::dataflow::scopes::{Child, Scope};
40use timely::progress::{Antichain, Timestamp};
41
42use crate::decode::{render_decode_cdcv2, render_decode_delimited};
43use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
44use crate::source::types::{DecodeResult, SourceOutput, SourceRender};
45use crate::source::{self, RawSourceCreationConfig, SourceExportCreationConfig};
46use crate::upsert::{UpsertKey, UpsertValue};
47
48pub fn render_source<'g, G, C>(
60 scope: &mut Child<'g, G, mz_repr::Timestamp>,
61 dataflow_debug_name: &String,
62 connection: C,
63 description: IngestionDescription<CollectionMetadata>,
64 resume_stream: &Stream<Child<'g, G, mz_repr::Timestamp>, ()>,
65 storage_state: &crate::storage_state::StorageState,
66 base_source_config: RawSourceCreationConfig,
67) -> (
68 BTreeMap<
69 GlobalId,
70 (
71 Collection<Child<'g, G, mz_repr::Timestamp>, Row, Diff>,
72 Collection<Child<'g, G, mz_repr::Timestamp>, DataflowError, Diff>,
73 ),
74 >,
75 Vec<Stream<G, HealthStatusMessage>>,
76 Vec<PressOnDropButton>,
77)
78where
79 G: Scope<Timestamp = ()>,
80 C: SourceConnection + SourceRender + 'static,
81{
82 let mut needed_tokens = Vec::new();
84
85 let (starter, mut start_signal) = tokio::sync::mpsc::channel::<()>(1);
98 let start_signal = async move {
99 let _ = start_signal.recv().await;
100 };
101
102 let (exports, health, source_tokens) = source::create_raw_source(
105 scope,
106 storage_state,
107 resume_stream,
108 &base_source_config,
109 connection,
110 start_signal,
111 );
112
113 needed_tokens.extend(source_tokens);
114
115 let mut health_streams = Vec::with_capacity(exports.len() + 1);
116 health_streams.push(health);
117
118 let mut outputs = BTreeMap::new();
119 for (export_id, export) in exports {
120 type CB<C> = CapacityContainerBuilder<C>;
121 let (ok_stream, err_stream) =
122 export.map_fallible::<CB<_>, CB<_>, _, _, _>("export-demux-ok-err", |r| r);
123
124 let mut error_collections = Vec::new();
128
129 let data_config = base_source_config.source_exports[&export_id]
130 .data_config
131 .clone();
132 let (ok, extra_tokens, health_stream) = render_source_stream(
133 scope,
134 dataflow_debug_name,
135 export_id,
136 ok_stream,
137 data_config,
138 &description,
139 &mut error_collections,
140 storage_state,
141 &base_source_config,
142 starter.clone(),
143 );
144 needed_tokens.extend(extra_tokens);
145
146 let err_collection = match error_collections.len() {
148 0 => err_stream,
149 _ => err_stream.concatenate(error_collections),
150 };
151
152 outputs.insert(export_id, (ok, err_collection));
153
154 health_streams.extend(health_stream.into_iter().map(|s| s.leave()));
155 }
156 (outputs, health_streams, needed_tokens)
157}
158
159fn render_source_stream<G, FromTime>(
162 scope: &mut G,
163 dataflow_debug_name: &String,
164 export_id: GlobalId,
165 ok_source: Collection<G, SourceOutput<FromTime>, Diff>,
166 data_config: SourceExportDataConfig,
167 description: &IngestionDescription<CollectionMetadata>,
168 error_collections: &mut Vec<Collection<G, DataflowError, Diff>>,
169 storage_state: &crate::storage_state::StorageState,
170 base_source_config: &RawSourceCreationConfig,
171 rehydrated_token: impl std::any::Any + 'static,
172) -> (
173 Collection<G, Row, Diff>,
174 Vec<PressOnDropButton>,
175 Vec<Stream<G, HealthStatusMessage>>,
176)
177where
178 G: Scope<Timestamp = mz_repr::Timestamp>,
179 FromTime: Timestamp + Sync,
180{
181 let mut needed_tokens = vec![];
182
183 let SourceExportDataConfig { encoding, envelope } = data_config;
185
186 let SourceDesc {
187 connection: _,
188 timestamp_interval: _,
189 } = description.desc;
190
191 let (decoded_stream, decode_health) = match encoding {
192 None => (
193 ok_source.map(|r| DecodeResult {
194 key: Some(Ok(r.key)),
202 value: Some(Ok(r.value)),
203 metadata: r.metadata,
204 from_time: r.from_time,
205 }),
206 None,
207 ),
208 Some(encoding) => {
209 let (decoded_stream, decode_health) = render_decode_delimited(
210 &ok_source,
211 encoding.key,
212 encoding.value,
213 dataflow_debug_name.clone(),
214 storage_state.metrics.decode_defs.clone(),
215 storage_state.storage_configuration.clone(),
216 );
217 (decoded_stream, Some(decode_health))
218 }
219 };
220
221 let (envelope_ok, envelope_health) = match &envelope {
223 SourceEnvelope::Upsert(upsert_envelope) => {
224 let upsert_input = upsert_commands(decoded_stream, upsert_envelope.clone());
225
226 let persist_clients = Arc::clone(&storage_state.persist_clients);
227 let resume_upper = base_source_config.resume_uppers[&export_id].clone();
229
230 let upper_ts = resume_upper
231 .as_option()
232 .expect("resuming an already finished ingestion")
233 .clone();
234 let (upsert, health_update) = scope.scoped(
235 &format!("upsert_rehydration_backpressure({})", export_id),
236 |scope| {
237 let (previous, previous_token, feedback_handle, backpressure_metrics) = {
238 let as_of = Antichain::from_elem(upper_ts.saturating_sub(1));
239
240 let backpressure_max_inflight_bytes = get_backpressure_max_inflight_bytes(
241 &storage_state
242 .storage_configuration
243 .parameters
244 .storage_dataflow_max_inflight_bytes_config,
245 &storage_state.instance_context.cluster_memory_limit,
246 );
247
248 let (feedback_handle, flow_control, backpressure_metrics) =
249 if let Some(storage_dataflow_max_inflight_bytes) =
250 backpressure_max_inflight_bytes
251 {
252 tracing::info!(
253 ?backpressure_max_inflight_bytes,
254 "timely-{} using backpressure in upsert for source {}",
255 base_source_config.worker_id,
256 export_id
257 );
258 if !storage_state
259 .storage_configuration
260 .parameters
261 .storage_dataflow_max_inflight_bytes_config
262 .disk_only
263 || storage_state.instance_context.scratch_directory.is_some()
264 {
265 let (feedback_handle, feedback_data) =
266 scope.feedback(Default::default());
267
268 let backpressure_metrics = Some(
270 base_source_config
271 .metrics
272 .get_backpressure_metrics(export_id, scope.index()),
273 );
274
275 (
276 Some(feedback_handle),
277 Some(persist_source::FlowControl {
278 progress_stream: feedback_data,
279 max_inflight_bytes: storage_dataflow_max_inflight_bytes,
280 summary: (Default::default(), Subtime::least_summary()),
281 metrics: backpressure_metrics.clone(),
282 }),
283 backpressure_metrics,
284 )
285 } else {
286 (None, None, None)
287 }
288 } else {
289 (None, None, None)
290 };
291
292 let storage_metadata = description.source_exports[&export_id]
293 .storage_metadata
294 .clone();
295
296 let error_handler =
297 storage_state.error_handler("upsert_rehydration", export_id);
298
299 let (stream, tok) = persist_source::persist_source_core(
300 scope,
301 export_id,
302 persist_clients,
303 storage_metadata,
304 None,
305 Some(as_of),
306 SnapshotMode::Include,
307 Antichain::new(),
308 None,
309 flow_control,
310 false.then_some(|| unreachable!()),
311 async {},
312 error_handler,
313 );
314 (
315 stream.as_collection(),
316 Some(tok),
317 feedback_handle,
318 backpressure_metrics,
319 )
320 };
321
322 let export_statistics = storage_state
323 .aggregated_statistics
324 .get_source(&export_id)
325 .expect("statistics initialized")
326 .clone();
327 let export_config = SourceExportCreationConfig {
328 id: export_id,
329 worker_id: base_source_config.worker_id,
330 metrics: base_source_config.metrics.clone(),
331 source_statistics: export_statistics,
332 };
333 let (upsert, health_update, snapshot_progress, upsert_token) =
334 crate::upsert::upsert(
335 &upsert_input.enter(scope),
336 upsert_envelope.clone(),
337 refine_antichain(&resume_upper),
338 previous,
339 previous_token,
340 export_config,
341 &storage_state.instance_context,
342 &storage_state.storage_configuration,
343 &storage_state.dataflow_parameters,
344 backpressure_metrics,
345 );
346
347 needed_tokens.push(upsert_token);
352
353 if dyncfgs::DELAY_SOURCES_PAST_REHYDRATION
357 .get(storage_state.storage_configuration.config_set())
358 {
359 crate::upsert::rehydration_finished(
360 scope.clone(),
361 base_source_config,
362 rehydrated_token,
363 refine_antichain(&resume_upper),
364 &snapshot_progress,
365 );
366 } else {
367 drop(rehydrated_token)
368 };
369
370 if let Some(feedback_handle) = feedback_handle {
373 snapshot_progress.connect_loop(feedback_handle);
374 }
375
376 (
377 upsert.leave(),
378 health_update
379 .map(|(id, update)| HealthStatusMessage {
380 id,
381 namespace: StatusNamespace::Upsert,
382 update,
383 })
384 .leave(),
385 )
386 },
387 );
388
389 let (upsert_ok, upsert_err) = upsert.inner.ok_err(split_ok_err);
390 error_collections.push(upsert_err.as_collection());
391
392 (upsert_ok.as_collection(), Some(health_update))
393 }
394 SourceEnvelope::None(none_envelope) => {
395 let results = append_metadata_to_value(decoded_stream);
396
397 let flattened_stream = flatten_results_prepend_keys(none_envelope, results);
398
399 let (stream, errors) = flattened_stream.inner.ok_err(split_ok_err);
400
401 error_collections.push(errors.as_collection());
402 (stream.as_collection(), None)
403 }
404 SourceEnvelope::CdcV2 => {
405 let (oks, token) = render_decode_cdcv2(&decoded_stream);
406 needed_tokens.push(token);
407 (oks, None)
408 }
409 };
410
411 let health = decode_health.into_iter().chain(envelope_health).collect();
413 (envelope_ok, needed_tokens, health)
414}
415
416fn get_backpressure_max_inflight_bytes(
419 inflight_bytes_config: &StorageMaxInflightBytesConfig,
420 cluster_memory_limit: &Option<usize>,
421) -> Option<usize> {
422 let StorageMaxInflightBytesConfig {
423 max_inflight_bytes_default,
424 max_inflight_bytes_cluster_size_fraction,
425 disk_only: _,
426 } = inflight_bytes_config;
427
428 if max_inflight_bytes_default.is_some() {
430 let current_cluster_max_bytes_limit =
431 cluster_memory_limit.as_ref().and_then(|cluster_memory| {
432 max_inflight_bytes_cluster_size_fraction.map(|fraction| {
433 usize::cast_lossy(f64::cast_lossy(*cluster_memory) * fraction)
435 })
436 });
437 current_cluster_max_bytes_limit.or(*max_inflight_bytes_default)
438 } else {
439 None
440 }
441}
442
443fn split_ok_err<O, E, T, D>(x: (Result<O, E>, T, D)) -> Result<(O, T, D), (E, T, D)> {
446 match x {
447 (Ok(ok), ts, diff) => Ok((ok, ts, diff)),
448 (Err(err), ts, diff) => Err((err, ts, diff)),
449 }
450}
451
452#[derive(Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
454struct KV {
455 key: Option<Result<Row, DecodeError>>,
456 val: Option<Result<Row, DecodeError>>,
457}
458
459fn append_metadata_to_value<G: Scope, FromTime: Timestamp>(
460 results: Collection<G, DecodeResult<FromTime>, Diff>,
461) -> Collection<G, KV, Diff> {
462 results.map(move |res| {
463 let val = res.value.map(|val_result| {
464 val_result.map(|mut val| {
465 if !res.metadata.is_empty() {
466 RowPacker::for_existing_row(&mut val).extend_by_row(&res.metadata);
467 }
468 val
469 })
470 });
471
472 KV { val, key: res.key }
473 })
474}
475
476fn upsert_commands<G: Scope, FromTime: Timestamp>(
478 input: Collection<G, DecodeResult<FromTime>, Diff>,
479 upsert_envelope: UpsertEnvelope,
480) -> Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff> {
481 let mut row_buf = Row::default();
482 input.map(move |result| {
483 let from_time = result.from_time;
484
485 let key = match result.key {
486 Some(Ok(key)) => Ok(key),
487 None => Err(UpsertError::NullKey(UpsertNullKeyError)),
488 Some(Err(err)) => Err(UpsertError::KeyDecode(err)),
489 };
490
491 let key = match key {
493 Ok(key) => key,
494 Err(err) => match result.value {
495 Some(_) => {
496 return (
497 UpsertKey::from_key(Err(&err)),
498 Some(Err(Box::new(err))),
499 from_time,
500 );
501 }
502 None => return (UpsertKey::from_key(Err(&err)), None, from_time),
503 },
504 };
505
506 let key_row = match upsert_envelope.style {
508 UpsertStyle::Debezium { .. }
510 | UpsertStyle::Default(KeyEnvelope::Flattened)
511 | UpsertStyle::ValueErrInline {
512 key_envelope: KeyEnvelope::Flattened,
513 error_column: _,
514 } => key,
515 UpsertStyle::Default(KeyEnvelope::Named(_))
517 | UpsertStyle::ValueErrInline {
518 key_envelope: KeyEnvelope::Named(_),
519 error_column: _,
520 } => {
521 if key.iter().nth(1).is_none() {
522 key
523 } else {
524 row_buf.packer().push_list(key.iter());
525 row_buf.clone()
526 }
527 }
528 UpsertStyle::Default(KeyEnvelope::None)
529 | UpsertStyle::ValueErrInline {
530 key_envelope: KeyEnvelope::None,
531 error_column: _,
532 } => unreachable!(),
533 };
534
535 let key = UpsertKey::from_key(Ok(&key_row));
536
537 let metadata = result.metadata;
538
539 let value = match result.value {
540 Some(Ok(ref row)) => match upsert_envelope.style {
541 UpsertStyle::Debezium { after_idx } => match row.iter().nth(after_idx).unwrap() {
542 Datum::List(after) => {
543 let mut packer = row_buf.packer();
544 packer.extend(after.iter());
545 packer.extend_by_row(&metadata);
546 Some(Ok(row_buf.clone()))
547 }
548 Datum::Null => None,
549 d => panic!("type error: expected record, found {:?}", d),
550 },
551 UpsertStyle::Default(_) => {
552 let mut packer = row_buf.packer();
553 packer.extend_by_row(&key_row);
554 packer.extend_by_row(row);
555 packer.extend_by_row(&metadata);
556 Some(Ok(row_buf.clone()))
557 }
558 UpsertStyle::ValueErrInline { .. } => {
559 let mut packer = row_buf.packer();
560 packer.extend_by_row(&key_row);
561 packer.push(Datum::Null);
563 packer.extend_by_row(row);
564 packer.extend_by_row(&metadata);
565 Some(Ok(row_buf.clone()))
566 }
567 },
568 Some(Err(inner)) => {
569 match upsert_envelope.style {
570 UpsertStyle::ValueErrInline { .. } => {
571 let mut count = 0;
572 let err_string = inner.to_string();
574 let mut packer = row_buf.packer();
575 for datum in key_row.iter() {
576 packer.push(datum);
577 count += 1;
578 }
579 packer.push_list(iter::once(Datum::String(&err_string)));
581 count += 1;
582 let metadata_len = metadata.as_row_ref().iter().count();
583 packer.extend(
585 iter::repeat(Datum::Null)
586 .take(upsert_envelope.source_arity - count - metadata_len),
587 );
588 packer.extend_by_row(&metadata);
589 Some(Ok(row_buf.clone()))
590 }
591 _ => Some(Err(Box::new(UpsertError::Value(UpsertValueError {
592 for_key: key_row,
593 inner,
594 })))),
595 }
596 }
597 None => None,
598 };
599
600 (key, value, from_time)
601 })
602}
603
604fn flatten_results_prepend_keys<G>(
606 none_envelope: &NoneEnvelope,
607 results: Collection<G, KV, Diff>,
608) -> Collection<G, Result<Row, DataflowError>, Diff>
609where
610 G: Scope,
611{
612 let NoneEnvelope {
613 key_envelope,
614 key_arity,
615 } = none_envelope;
616
617 let null_key_columns = Row::pack_slice(&vec![Datum::Null; *key_arity]);
618
619 match key_envelope {
620 KeyEnvelope::None => {
621 results.flat_map(|KV { val, .. }| val.map(|result| result.map_err(Into::into)))
622 }
623 KeyEnvelope::Flattened => results
624 .flat_map(raise_key_value_errors)
625 .map(move |maybe_kv| {
626 maybe_kv.map(|(key, value)| {
627 let mut key = key.unwrap_or_else(|| null_key_columns.clone());
628 RowPacker::for_existing_row(&mut key).extend_by_row(&value);
629 key
630 })
631 }),
632 KeyEnvelope::Named(_) => {
633 results
634 .flat_map(raise_key_value_errors)
635 .map(move |maybe_kv| {
636 maybe_kv.map(|(key, value)| {
637 let mut key = key.unwrap_or_else(|| null_key_columns.clone());
638 let row = if key.iter().nth(1).is_none() {
641 RowPacker::for_existing_row(&mut key).extend_by_row(&value);
642 key
643 } else {
644 let mut new_row = Row::default();
645 let mut packer = new_row.packer();
646 packer.push_list(key.iter());
647 packer.extend_by_row(&value);
648 new_row
649 };
650 row
651 })
652 })
653 }
654 }
655}
656
657fn raise_key_value_errors(
659 KV { key, val }: KV,
660) -> Option<Result<(Option<Row>, Row), DataflowError>> {
661 match (key, val) {
662 (Some(Ok(key)), Some(Ok(value))) => Some(Ok((Some(key), value))),
663 (None, Some(Ok(value))) => Some(Ok((None, value))),
664 (_, Some(Err(e))) => Some(Err(e.into())),
666 (Some(Err(e)), _) => Some(Err(e.into())),
667 (None, None) => None,
668 _ => Some(Err(DataflowError::from(EnvelopeError::Flat(
670 "Value not present for message".into(),
671 )))),
672 }
673}
674
675#[cfg(test)]
676mod test {
677 use super::*;
678
679 #[mz_ore::test]
680 fn test_no_default() {
681 let config = StorageMaxInflightBytesConfig {
682 max_inflight_bytes_default: None,
683 max_inflight_bytes_cluster_size_fraction: Some(0.5),
684 disk_only: false,
685 };
686 let memory_limit = Some(1000);
687
688 let backpressure_inflight_bytes_limit =
689 get_backpressure_max_inflight_bytes(&config, &memory_limit);
690
691 assert_eq!(backpressure_inflight_bytes_limit, None)
692 }
693
694 #[mz_ore::test]
695 fn test_no_matching_size() {
696 let config = StorageMaxInflightBytesConfig {
697 max_inflight_bytes_default: Some(10000),
698 max_inflight_bytes_cluster_size_fraction: Some(0.5),
699 disk_only: false,
700 };
701
702 let backpressure_inflight_bytes_limit = get_backpressure_max_inflight_bytes(&config, &None);
703
704 assert_eq!(
705 backpressure_inflight_bytes_limit,
706 config.max_inflight_bytes_default
707 )
708 }
709
710 #[mz_ore::test]
711 fn test_calculated_cluster_limit() {
712 let config = StorageMaxInflightBytesConfig {
713 max_inflight_bytes_default: Some(10000),
714 max_inflight_bytes_cluster_size_fraction: Some(0.5),
715 disk_only: false,
716 };
717 let memory_limit = Some(2000);
718
719 let backpressure_inflight_bytes_limit =
720 get_backpressure_max_inflight_bytes(&config, &memory_limit);
721
722 assert_eq!(backpressure_inflight_bytes_limit, Some(1000));
724 }
725}