mz_storage/render/
sources.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to the creation of dataflow sources.
11//!
12//! See [`render_source`] for more details.
13
14use std::collections::BTreeMap;
15use std::iter;
16use std::sync::Arc;
17
18use differential_dataflow::{AsCollection, Collection, collection};
19use mz_ore::cast::CastLossy;
20use mz_persist_client::operators::shard_source::SnapshotMode;
21use mz_repr::{Datum, Diff, GlobalId, Row, RowPacker};
22use mz_storage_operators::persist_source;
23use mz_storage_operators::persist_source::Subtime;
24use mz_storage_types::controller::CollectionMetadata;
25use mz_storage_types::dyncfgs;
26use mz_storage_types::errors::{
27    DataflowError, DecodeError, EnvelopeError, UpsertError, UpsertNullKeyError, UpsertValueError,
28};
29use mz_storage_types::parameters::StorageMaxInflightBytesConfig;
30use mz_storage_types::sources::envelope::{KeyEnvelope, NoneEnvelope, UpsertEnvelope, UpsertStyle};
31use mz_storage_types::sources::*;
32use mz_timely_util::builder_async::PressOnDropButton;
33use mz_timely_util::operator::CollectionExt;
34use mz_timely_util::order::refine_antichain;
35use serde::{Deserialize, Serialize};
36use timely::container::CapacityContainerBuilder;
37use timely::dataflow::Stream;
38use timely::dataflow::operators::generic::operator::empty;
39use timely::dataflow::operators::{Concat, ConnectLoop, Feedback, Leave, Map, OkErr};
40use timely::dataflow::scopes::{Child, Scope};
41use timely::progress::{Antichain, Timestamp};
42
43use crate::decode::{render_decode_cdcv2, render_decode_delimited};
44use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
45use crate::internal_control::InternalStorageCommand;
46use crate::source::types::{DecodeResult, SourceOutput, SourceRender};
47use crate::source::{self, RawSourceCreationConfig, SourceExportCreationConfig};
48use crate::upsert::UpsertKey;
49
50/// _Renders_ complete _differential_ [`Collection`]s
51/// that represent the final source and its errors
52/// as requested by the original `CREATE SOURCE` statement,
53/// encapsulated in the passed `SourceInstanceDesc`.
54///
55/// The first element in the returned tuple is the pair of [`Collection`]s,
56/// the second is a type-erased token that will keep the source
57/// alive as long as it is not dropped.
58///
59/// This function is intended to implement the recipe described here:
60/// <https://github.com/MaterializeInc/materialize/blob/main/doc/developer/platform/architecture-storage.md#source-ingestion>
61pub fn render_source<'g, G, C>(
62    scope: &mut Child<'g, G, mz_repr::Timestamp>,
63    dataflow_debug_name: &String,
64    connection: C,
65    description: IngestionDescription<CollectionMetadata>,
66    resume_stream: &Stream<Child<'g, G, mz_repr::Timestamp>, ()>,
67    storage_state: &crate::storage_state::StorageState,
68    base_source_config: RawSourceCreationConfig,
69) -> (
70    BTreeMap<
71        GlobalId,
72        (
73            Collection<Child<'g, G, mz_repr::Timestamp>, Row, Diff>,
74            Collection<Child<'g, G, mz_repr::Timestamp>, DataflowError, Diff>,
75        ),
76    >,
77    Stream<G, HealthStatusMessage>,
78    Vec<PressOnDropButton>,
79)
80where
81    G: Scope<Timestamp = ()>,
82    C: SourceConnection + SourceRender + 'static,
83{
84    // Tokens that we should return from the method.
85    let mut needed_tokens = Vec::new();
86
87    // Note that this `render_source` attaches a single _instance_ of a source
88    // to the passed `Scope`, and this instance may be disabled if the
89    // source type does not support multiple instances. `render_source`
90    // is called on each timely worker as part of
91    // [`super::build_storage_dataflow`].
92
93    // A set of channels (1 per worker) used to signal rehydration being finished
94    // to raw sources. These are channels and not timely streams because they
95    // have to cross a scope boundary.
96    //
97    // Note that these will be entirely subsumed by full `hydration` backpressure,
98    // once that is implemented.
99    let (starter, mut start_signal) = tokio::sync::mpsc::channel::<()>(1);
100    let start_signal = async move {
101        let _ = start_signal.recv().await;
102    };
103
104    // Build the _raw_ ok and error sources using `create_raw_source` and the
105    // correct `SourceReader` implementations
106    let (exports, mut health, source_tokens) = source::create_raw_source(
107        scope,
108        storage_state,
109        resume_stream,
110        &base_source_config,
111        connection,
112        start_signal,
113    );
114
115    needed_tokens.extend(source_tokens);
116
117    let mut outputs = BTreeMap::new();
118    for (export_id, export) in exports {
119        type CB<C> = CapacityContainerBuilder<C>;
120        let (ok_stream, err_stream) =
121            export.map_fallible::<CB<_>, CB<_>, _, _, _>("export-demux-ok-err", |r| r);
122
123        // All sources should push their various error streams into this vector,
124        // whose contents will be concatenated and inserted along the collection.
125        // All subsources include the non-definite errors of the ingestion
126        let error_collections = vec![err_stream.map(DataflowError::from)];
127
128        let data_config = base_source_config.source_exports[&export_id]
129            .data_config
130            .clone();
131        let (ok, err, extra_tokens, health_stream) = render_source_stream(
132            scope,
133            dataflow_debug_name,
134            export_id,
135            ok_stream,
136            data_config,
137            &description,
138            error_collections,
139            storage_state,
140            &base_source_config,
141            starter.clone(),
142        );
143        needed_tokens.extend(extra_tokens);
144        outputs.insert(export_id, (ok, err));
145
146        health = health.concat(&health_stream.leave());
147    }
148    (outputs, health, needed_tokens)
149}
150
151/// Completes the rendering of a particular source stream by applying decoding and envelope
152/// processing as necessary
153fn render_source_stream<G, FromTime>(
154    scope: &mut G,
155    dataflow_debug_name: &String,
156    export_id: GlobalId,
157    ok_source: Collection<G, SourceOutput<FromTime>, Diff>,
158    data_config: SourceExportDataConfig,
159    description: &IngestionDescription<CollectionMetadata>,
160    mut error_collections: Vec<Collection<G, DataflowError, Diff>>,
161    storage_state: &crate::storage_state::StorageState,
162    base_source_config: &RawSourceCreationConfig,
163    rehydrated_token: impl std::any::Any + 'static,
164) -> (
165    Collection<G, Row, Diff>,
166    Collection<G, DataflowError, Diff>,
167    Vec<PressOnDropButton>,
168    Stream<G, HealthStatusMessage>,
169)
170where
171    G: Scope<Timestamp = mz_repr::Timestamp>,
172    FromTime: Timestamp + Sync,
173{
174    let mut needed_tokens = vec![];
175
176    // Use the envelope and encoding configs for this particular source export
177    let SourceExportDataConfig { encoding, envelope } = data_config;
178
179    let SourceDesc {
180        connection: _,
181        timestamp_interval: _,
182        primary_export: _,
183        primary_export_details: _,
184    } = description.desc;
185
186    let (decoded_stream, decode_health) = match encoding {
187        None => (
188            ok_source.map(|r| DecodeResult {
189                // This is safe because the current set of sources produce
190                // either:
191                // 1. Non-nullable keys
192                // 2. No keys at all.
193                //
194                // Please see the comment on `key_envelope_no_encoding` in
195                // `mz_sql::plan::statement::ddl` for more details.
196                key: Some(Ok(r.key)),
197                value: Some(Ok(r.value)),
198                metadata: r.metadata,
199                from_time: r.from_time,
200            }),
201            empty(scope),
202        ),
203        Some(encoding) => render_decode_delimited(
204            &ok_source,
205            encoding.key,
206            encoding.value,
207            dataflow_debug_name.clone(),
208            storage_state.metrics.decode_defs.clone(),
209            storage_state.storage_configuration.clone(),
210        ),
211    };
212
213    // render envelopes
214    let (envelope_ok, envelope_err, envelope_health) = match &envelope {
215        SourceEnvelope::Upsert(upsert_envelope) => {
216            let upsert_input = upsert_commands(decoded_stream, upsert_envelope.clone());
217
218            let persist_clients = Arc::clone(&storage_state.persist_clients);
219            // TODO: Get this to work with the as_of.
220            let resume_upper = base_source_config.resume_uppers[&export_id].clone();
221
222            let upper_ts = resume_upper
223                .as_option()
224                .expect("resuming an already finished ingestion")
225                .clone();
226            let (upsert, health_update) = scope.scoped(
227                &format!("upsert_rehydration_backpressure({})", export_id),
228                |scope| {
229                    let (previous, previous_token, feedback_handle, backpressure_metrics) = {
230                        let as_of = Antichain::from_elem(upper_ts.saturating_sub(1));
231
232                        let backpressure_max_inflight_bytes = get_backpressure_max_inflight_bytes(
233                            &storage_state
234                                .storage_configuration
235                                .parameters
236                                .storage_dataflow_max_inflight_bytes_config,
237                            &storage_state.instance_context.cluster_memory_limit,
238                        );
239
240                        let (feedback_handle, flow_control, backpressure_metrics) =
241                            if let Some(storage_dataflow_max_inflight_bytes) =
242                                backpressure_max_inflight_bytes
243                            {
244                                tracing::info!(
245                                    ?backpressure_max_inflight_bytes,
246                                    "timely-{} using backpressure in upsert for source {}",
247                                    base_source_config.worker_id,
248                                    export_id
249                                );
250                                if !storage_state
251                                    .storage_configuration
252                                    .parameters
253                                    .storage_dataflow_max_inflight_bytes_config
254                                    .disk_only
255                                    || storage_state.instance_context.scratch_directory.is_some()
256                                {
257                                    let (feedback_handle, feedback_data) =
258                                        scope.feedback(Default::default());
259
260                                    // TODO(guswynn): cleanup
261                                    let backpressure_metrics = Some(
262                                        base_source_config
263                                            .metrics
264                                            .get_backpressure_metrics(export_id, scope.index()),
265                                    );
266
267                                    (
268                                        Some(feedback_handle),
269                                        Some(persist_source::FlowControl {
270                                            progress_stream: feedback_data,
271                                            max_inflight_bytes: storage_dataflow_max_inflight_bytes,
272                                            summary: (Default::default(), Subtime::least_summary()),
273                                            metrics: backpressure_metrics.clone(),
274                                        }),
275                                        backpressure_metrics,
276                                    )
277                                } else {
278                                    (None, None, None)
279                                }
280                            } else {
281                                (None, None, None)
282                            };
283
284                        let storage_metadata = description.source_exports[&export_id]
285                            .storage_metadata
286                            .clone();
287
288                        let command_tx = storage_state.internal_cmd_tx.clone();
289
290                        let (stream, tok) = persist_source::persist_source_core(
291                            scope,
292                            export_id,
293                            persist_clients,
294                            storage_metadata,
295                            None,
296                            Some(as_of),
297                            SnapshotMode::Include,
298                            Antichain::new(),
299                            None,
300                            flow_control,
301                            false.then_some(|| unreachable!()),
302                            async {},
303                            move |error| {
304                                let error = format!("upsert_rehydration: {error}");
305                                tracing::info!("{error}");
306                                Box::pin(async move {
307                                    command_tx.send(InternalStorageCommand::SuspendAndRestart {
308                                        id: export_id,
309                                        reason: error,
310                                    });
311                                })
312                            },
313                        );
314                        (
315                            stream.as_collection(),
316                            Some(tok),
317                            feedback_handle,
318                            backpressure_metrics,
319                        )
320                    };
321
322                    let export_statistics = storage_state
323                        .aggregated_statistics
324                        .get_source(&export_id)
325                        .expect("statistics initialized")
326                        .clone();
327                    let export_config = SourceExportCreationConfig {
328                        id: export_id,
329                        worker_id: base_source_config.worker_id,
330                        metrics: base_source_config.metrics.clone(),
331                        source_statistics: export_statistics,
332                    };
333                    let (upsert, health_update, snapshot_progress, upsert_token) =
334                        crate::upsert::upsert(
335                            &upsert_input.enter(scope),
336                            upsert_envelope.clone(),
337                            refine_antichain(&resume_upper),
338                            previous,
339                            previous_token,
340                            export_config,
341                            &storage_state.instance_context,
342                            &storage_state.storage_configuration,
343                            &storage_state.dataflow_parameters,
344                            backpressure_metrics,
345                        );
346
347                    // Even though we register the `persist_sink` token at a top-level,
348                    // which will stop any data from being committed, we also register
349                    // a token for the `upsert` operator which may be in the middle of
350                    // rehydration processing the `persist_source` input above.
351                    needed_tokens.push(upsert_token);
352
353                    // If configured, delay raw sources until we rehydrate the upsert
354                    // source. Otherwise, drop the token, unblocking the sources at the
355                    // end rendering.
356                    if dyncfgs::DELAY_SOURCES_PAST_REHYDRATION
357                        .get(storage_state.storage_configuration.config_set())
358                    {
359                        crate::upsert::rehydration_finished(
360                            scope.clone(),
361                            base_source_config,
362                            rehydrated_token,
363                            refine_antichain(&resume_upper),
364                            &snapshot_progress,
365                        );
366                    } else {
367                        drop(rehydrated_token)
368                    };
369
370                    // If backpressure from persist is enabled, we connect the upsert operator's
371                    // snapshot progress to the persist source feedback handle.
372                    let upsert = match feedback_handle {
373                        Some(feedback_handle) => {
374                            snapshot_progress.connect_loop(feedback_handle);
375                            upsert
376                        }
377                        None => upsert,
378                    };
379
380                    (
381                        upsert.leave(),
382                        health_update
383                            .map(|(id, update)| HealthStatusMessage {
384                                id,
385                                namespace: StatusNamespace::Upsert,
386                                update,
387                            })
388                            .leave(),
389                    )
390                },
391            );
392
393            let (upsert_ok, upsert_err) = upsert.inner.ok_err(split_ok_err);
394
395            (
396                upsert_ok.as_collection(),
397                Some(upsert_err.as_collection()),
398                health_update,
399            )
400        }
401        SourceEnvelope::None(none_envelope) => {
402            let results = append_metadata_to_value(decoded_stream);
403
404            let flattened_stream = flatten_results_prepend_keys(none_envelope, results);
405
406            let (stream, errors) = flattened_stream.inner.ok_err(split_ok_err);
407
408            let errors = errors.as_collection();
409            (stream.as_collection(), Some(errors), empty(scope))
410        }
411        SourceEnvelope::CdcV2 => {
412            let (oks, token) = render_decode_cdcv2(&decoded_stream);
413            needed_tokens.push(token);
414            (oks, None, empty(scope))
415        }
416    };
417
418    let (collection, errors, health) = (
419        envelope_ok,
420        envelope_err,
421        decode_health.concat(&envelope_health),
422    );
423
424    if let Some(errors) = errors {
425        error_collections.push(errors);
426    }
427
428    // Flatten the error collections.
429    let err_collection = match error_collections.len() {
430        0 => Collection::empty(scope),
431        1 => error_collections.pop().unwrap(),
432        _ => collection::concatenate(scope, error_collections),
433    };
434
435    // Return the collections and any needed tokens.
436    (collection, err_collection, needed_tokens, health)
437}
438
439// Returns the maximum limit of inflight bytes for backpressure based on given config
440// and the current cluster size
441fn get_backpressure_max_inflight_bytes(
442    inflight_bytes_config: &StorageMaxInflightBytesConfig,
443    cluster_memory_limit: &Option<usize>,
444) -> Option<usize> {
445    let StorageMaxInflightBytesConfig {
446        max_inflight_bytes_default,
447        max_inflight_bytes_cluster_size_fraction,
448        disk_only: _,
449    } = inflight_bytes_config;
450
451    // Will use backpressure only if the default inflight value is provided
452    if max_inflight_bytes_default.is_some() {
453        let current_cluster_max_bytes_limit =
454            cluster_memory_limit.as_ref().and_then(|cluster_memory| {
455                max_inflight_bytes_cluster_size_fraction.map(|fraction| {
456                    // We just need close the correct % of bytes here, so we just use lossy casts.
457                    usize::cast_lossy(f64::cast_lossy(*cluster_memory) * fraction)
458                })
459            });
460        current_cluster_max_bytes_limit.or(*max_inflight_bytes_default)
461    } else {
462        None
463    }
464}
465
466// TODO: Maybe we should finally move this to some central place and re-use. There seem to be
467// enough instances of this by now.
468fn split_ok_err<O, E, T, D>(x: (Result<O, E>, T, D)) -> Result<(O, T, D), (E, T, D)> {
469    match x {
470        (Ok(ok), ts, diff) => Ok((ok, ts, diff)),
471        (Err(err), ts, diff) => Err((err, ts, diff)),
472    }
473}
474
475/// After handling metadata insertion, we split streams into key/value parts for convenience
476#[derive(Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
477struct KV {
478    key: Option<Result<Row, DecodeError>>,
479    val: Option<Result<Row, DecodeError>>,
480}
481
482fn append_metadata_to_value<G: Scope, FromTime: Timestamp>(
483    results: Collection<G, DecodeResult<FromTime>, Diff>,
484) -> Collection<G, KV, Diff> {
485    results.map(move |res| {
486        let val = res.value.map(|val_result| {
487            val_result.map(|mut val| {
488                if !res.metadata.is_empty() {
489                    RowPacker::for_existing_row(&mut val).extend_by_row(&res.metadata);
490                }
491                val
492            })
493        });
494
495        KV { val, key: res.key }
496    })
497}
498
499/// Convert from streams of [`DecodeResult`] to UpsertCommands, inserting the Key according to [`KeyEnvelope`]
500fn upsert_commands<G: Scope, FromTime: Timestamp>(
501    input: Collection<G, DecodeResult<FromTime>, Diff>,
502    upsert_envelope: UpsertEnvelope,
503) -> Collection<G, (UpsertKey, Option<Result<Row, UpsertError>>, FromTime), Diff> {
504    let mut row_buf = Row::default();
505    input.map(move |result| {
506        let from_time = result.from_time;
507
508        let key = match result.key {
509            Some(Ok(key)) => Ok(key),
510            None => Err(UpsertError::NullKey(UpsertNullKeyError)),
511            Some(Err(err)) => Err(UpsertError::KeyDecode(err)),
512        };
513
514        // If we have a well-formed key we can continue, otherwise we're upserting an error
515        let key = match key {
516            Ok(key) => key,
517            err @ Err(_) => match result.value {
518                Some(_) => return (UpsertKey::from_key(err.as_ref()), Some(err), from_time),
519                None => return (UpsertKey::from_key(err.as_ref()), None, from_time),
520            },
521        };
522
523        // We can now apply the key envelope
524        let key_row = match upsert_envelope.style {
525            // flattened or debezium
526            UpsertStyle::Debezium { .. }
527            | UpsertStyle::Default(KeyEnvelope::Flattened)
528            | UpsertStyle::ValueErrInline {
529                key_envelope: KeyEnvelope::Flattened,
530                error_column: _,
531            } => key,
532            // named
533            UpsertStyle::Default(KeyEnvelope::Named(_))
534            | UpsertStyle::ValueErrInline {
535                key_envelope: KeyEnvelope::Named(_),
536                error_column: _,
537            } => {
538                if key.iter().nth(1).is_none() {
539                    key
540                } else {
541                    row_buf.packer().push_list(key.iter());
542                    row_buf.clone()
543                }
544            }
545            UpsertStyle::Default(KeyEnvelope::None)
546            | UpsertStyle::ValueErrInline {
547                key_envelope: KeyEnvelope::None,
548                error_column: _,
549            } => unreachable!(),
550        };
551
552        let key = UpsertKey::from_key(Ok(&key_row));
553
554        let metadata = result.metadata;
555
556        let value = match result.value {
557            Some(Ok(ref row)) => match upsert_envelope.style {
558                UpsertStyle::Debezium { after_idx } => match row.iter().nth(after_idx).unwrap() {
559                    Datum::List(after) => {
560                        let mut packer = row_buf.packer();
561                        packer.extend(after.iter());
562                        packer.extend_by_row(&metadata);
563                        Some(Ok(row_buf.clone()))
564                    }
565                    Datum::Null => None,
566                    d => panic!("type error: expected record, found {:?}", d),
567                },
568                UpsertStyle::Default(_) => {
569                    let mut packer = row_buf.packer();
570                    packer.extend_by_row(&key_row);
571                    packer.extend_by_row(row);
572                    packer.extend_by_row(&metadata);
573                    Some(Ok(row_buf.clone()))
574                }
575                UpsertStyle::ValueErrInline { .. } => {
576                    let mut packer = row_buf.packer();
577                    packer.extend_by_row(&key_row);
578                    // The 'error' column is null
579                    packer.push(Datum::Null);
580                    packer.extend_by_row(row);
581                    packer.extend_by_row(&metadata);
582                    Some(Ok(row_buf.clone()))
583                }
584            },
585            Some(Err(inner)) => {
586                match upsert_envelope.style {
587                    UpsertStyle::ValueErrInline { .. } => {
588                        let mut count = 0;
589                        // inline the error in the data output
590                        let err_string = inner.to_string();
591                        let mut packer = row_buf.packer();
592                        for datum in key_row.iter() {
593                            packer.push(datum);
594                            count += 1;
595                        }
596                        // The 'error' column is a record with a 'description' column
597                        packer.push_list(iter::once(Datum::String(&err_string)));
598                        count += 1;
599                        let metadata_len = metadata.as_row_ref().iter().count();
600                        // push nulls for all value columns
601                        packer.extend(
602                            iter::repeat(Datum::Null)
603                                .take(upsert_envelope.source_arity - count - metadata_len),
604                        );
605                        packer.extend_by_row(&metadata);
606                        Some(Ok(row_buf.clone()))
607                    }
608                    _ => Some(Err(UpsertError::Value(UpsertValueError {
609                        for_key: key_row,
610                        inner,
611                    }))),
612                }
613            }
614            None => None,
615        };
616
617        (key, value, from_time)
618    })
619}
620
621/// Convert from streams of [`DecodeResult`] to Rows, inserting the Key according to [`KeyEnvelope`]
622fn flatten_results_prepend_keys<G>(
623    none_envelope: &NoneEnvelope,
624    results: Collection<G, KV, Diff>,
625) -> Collection<G, Result<Row, DataflowError>, Diff>
626where
627    G: Scope,
628{
629    let NoneEnvelope {
630        key_envelope,
631        key_arity,
632    } = none_envelope;
633
634    let null_key_columns = Row::pack_slice(&vec![Datum::Null; *key_arity]);
635
636    match key_envelope {
637        KeyEnvelope::None => {
638            results.flat_map(|KV { val, .. }| val.map(|result| result.map_err(Into::into)))
639        }
640        KeyEnvelope::Flattened => results
641            .flat_map(raise_key_value_errors)
642            .map(move |maybe_kv| {
643                maybe_kv.map(|(key, value)| {
644                    let mut key = key.unwrap_or_else(|| null_key_columns.clone());
645                    RowPacker::for_existing_row(&mut key).extend_by_row(&value);
646                    key
647                })
648            }),
649        KeyEnvelope::Named(_) => {
650            results
651                .flat_map(raise_key_value_errors)
652                .map(move |maybe_kv| {
653                    maybe_kv.map(|(key, value)| {
654                        let mut key = key.unwrap_or_else(|| null_key_columns.clone());
655                        // Named semantics rename a key that is a single column, and encode a
656                        // multi-column field as a struct with that name
657                        let row = if key.iter().nth(1).is_none() {
658                            RowPacker::for_existing_row(&mut key).extend_by_row(&value);
659                            key
660                        } else {
661                            let mut new_row = Row::default();
662                            let mut packer = new_row.packer();
663                            packer.push_list(key.iter());
664                            packer.extend_by_row(&value);
665                            new_row
666                        };
667                        row
668                    })
669                })
670        }
671    }
672}
673
674/// Handle possibly missing key or value portions of messages
675fn raise_key_value_errors(
676    KV { key, val }: KV,
677) -> Option<Result<(Option<Row>, Row), DataflowError>> {
678    match (key, val) {
679        (Some(Ok(key)), Some(Ok(value))) => Some(Ok((Some(key), value))),
680        (None, Some(Ok(value))) => Some(Ok((None, value))),
681        // always prioritize the value error if either or both have an error
682        (_, Some(Err(e))) => Some(Err(e.into())),
683        (Some(Err(e)), _) => Some(Err(e.into())),
684        (None, None) => None,
685        // TODO(petrosagg): these errors would be better grouped under an EnvelopeError enum
686        _ => Some(Err(DataflowError::from(EnvelopeError::Flat(
687            "Value not present for message".into(),
688        )))),
689    }
690}
691
692#[cfg(test)]
693mod test {
694    use super::*;
695
696    #[mz_ore::test]
697    fn test_no_default() {
698        let config = StorageMaxInflightBytesConfig {
699            max_inflight_bytes_default: None,
700            max_inflight_bytes_cluster_size_fraction: Some(0.5),
701            disk_only: false,
702        };
703        let memory_limit = Some(1000);
704
705        let backpressure_inflight_bytes_limit =
706            get_backpressure_max_inflight_bytes(&config, &memory_limit);
707
708        assert_eq!(backpressure_inflight_bytes_limit, None)
709    }
710
711    #[mz_ore::test]
712    fn test_no_matching_size() {
713        let config = StorageMaxInflightBytesConfig {
714            max_inflight_bytes_default: Some(10000),
715            max_inflight_bytes_cluster_size_fraction: Some(0.5),
716            disk_only: false,
717        };
718
719        let backpressure_inflight_bytes_limit = get_backpressure_max_inflight_bytes(&config, &None);
720
721        assert_eq!(
722            backpressure_inflight_bytes_limit,
723            config.max_inflight_bytes_default
724        )
725    }
726
727    #[mz_ore::test]
728    fn test_calculated_cluster_limit() {
729        let config = StorageMaxInflightBytesConfig {
730            max_inflight_bytes_default: Some(10000),
731            max_inflight_bytes_cluster_size_fraction: Some(0.5),
732            disk_only: false,
733        };
734        let memory_limit = Some(2000);
735
736        let backpressure_inflight_bytes_limit =
737            get_backpressure_max_inflight_bytes(&config, &memory_limit);
738
739        // the limit should be 50% of 2000 i.e. 1000
740        assert_eq!(backpressure_inflight_bytes_limit, Some(1000));
741    }
742}