mz_storage/render/
sources.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to the creation of dataflow sources.
11//!
12//! See [`render_source`] for more details.
13
14use std::collections::BTreeMap;
15use std::iter;
16use std::sync::Arc;
17
18use differential_dataflow::{AsCollection, Collection};
19use mz_ore::cast::CastLossy;
20use mz_persist_client::operators::shard_source::SnapshotMode;
21use mz_repr::{Datum, Diff, GlobalId, Row, RowPacker};
22use mz_storage_operators::persist_source;
23use mz_storage_operators::persist_source::Subtime;
24use mz_storage_types::controller::CollectionMetadata;
25use mz_storage_types::dyncfgs;
26use mz_storage_types::errors::{
27    DataflowError, DecodeError, EnvelopeError, UpsertError, UpsertNullKeyError, UpsertValueError,
28};
29use mz_storage_types::parameters::StorageMaxInflightBytesConfig;
30use mz_storage_types::sources::envelope::{KeyEnvelope, NoneEnvelope, UpsertEnvelope, UpsertStyle};
31use mz_storage_types::sources::*;
32use mz_timely_util::builder_async::PressOnDropButton;
33use mz_timely_util::operator::CollectionExt;
34use mz_timely_util::order::refine_antichain;
35use serde::{Deserialize, Serialize};
36use timely::container::CapacityContainerBuilder;
37use timely::dataflow::Stream;
38use timely::dataflow::operators::{ConnectLoop, Feedback, Leave, Map, OkErr};
39use timely::dataflow::scopes::{Child, Scope};
40use timely::progress::{Antichain, Timestamp};
41
42use crate::decode::{render_decode_cdcv2, render_decode_delimited};
43use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
44use crate::source::types::{DecodeResult, SourceOutput, SourceRender};
45use crate::source::{self, RawSourceCreationConfig, SourceExportCreationConfig};
46use crate::upsert::{UpsertKey, UpsertValue};
47
48/// _Renders_ complete _differential_ [`Collection`]s
49/// that represent the final source and its errors
50/// as requested by the original `CREATE SOURCE` statement,
51/// encapsulated in the passed `SourceInstanceDesc`.
52///
53/// The first element in the returned tuple is the pair of [`Collection`]s,
54/// the second is a type-erased token that will keep the source
55/// alive as long as it is not dropped.
56///
57/// This function is intended to implement the recipe described here:
58/// <https://github.com/MaterializeInc/materialize/blob/main/doc/developer/platform/architecture-storage.md#source-ingestion>
59pub fn render_source<'g, G, C>(
60    scope: &mut Child<'g, G, mz_repr::Timestamp>,
61    dataflow_debug_name: &String,
62    connection: C,
63    description: IngestionDescription<CollectionMetadata>,
64    resume_stream: &Stream<Child<'g, G, mz_repr::Timestamp>, ()>,
65    storage_state: &crate::storage_state::StorageState,
66    base_source_config: RawSourceCreationConfig,
67) -> (
68    BTreeMap<
69        GlobalId,
70        (
71            Collection<Child<'g, G, mz_repr::Timestamp>, Row, Diff>,
72            Collection<Child<'g, G, mz_repr::Timestamp>, DataflowError, Diff>,
73        ),
74    >,
75    Vec<Stream<G, HealthStatusMessage>>,
76    Vec<PressOnDropButton>,
77)
78where
79    G: Scope<Timestamp = ()>,
80    C: SourceConnection + SourceRender + 'static,
81{
82    // Tokens that we should return from the method.
83    let mut needed_tokens = Vec::new();
84
85    // Note that this `render_source` attaches a single _instance_ of a source
86    // to the passed `Scope`, and this instance may be disabled if the
87    // source type does not support multiple instances. `render_source`
88    // is called on each timely worker as part of
89    // [`super::build_storage_dataflow`].
90
91    // A set of channels (1 per worker) used to signal rehydration being finished
92    // to raw sources. These are channels and not timely streams because they
93    // have to cross a scope boundary.
94    //
95    // Note that these will be entirely subsumed by full `hydration` backpressure,
96    // once that is implemented.
97    let (starter, mut start_signal) = tokio::sync::mpsc::channel::<()>(1);
98    let start_signal = async move {
99        let _ = start_signal.recv().await;
100    };
101
102    // Build the _raw_ ok and error sources using `create_raw_source` and the
103    // correct `SourceReader` implementations
104    let (exports, health, source_tokens) = source::create_raw_source(
105        scope,
106        storage_state,
107        resume_stream,
108        &base_source_config,
109        connection,
110        start_signal,
111    );
112
113    needed_tokens.extend(source_tokens);
114
115    let mut health_streams = Vec::with_capacity(exports.len() + 1);
116    health_streams.push(health);
117
118    let mut outputs = BTreeMap::new();
119    for (export_id, export) in exports {
120        type CB<C> = CapacityContainerBuilder<C>;
121        let (ok_stream, err_stream) =
122            export.map_fallible::<CB<_>, CB<_>, _, _, _>("export-demux-ok-err", |r| r);
123
124        // All sources should push their various error streams into this vector,
125        // whose contents will be concatenated and inserted along the collection.
126        // All subsources include the non-definite errors of the ingestion
127        let mut error_collections = Vec::new();
128
129        let data_config = base_source_config.source_exports[&export_id]
130            .data_config
131            .clone();
132        let (ok, extra_tokens, health_stream) = render_source_stream(
133            scope,
134            dataflow_debug_name,
135            export_id,
136            ok_stream,
137            data_config,
138            &description,
139            &mut error_collections,
140            storage_state,
141            &base_source_config,
142            starter.clone(),
143        );
144        needed_tokens.extend(extra_tokens);
145
146        // Flatten the error collections.
147        let err_collection = match error_collections.len() {
148            0 => err_stream,
149            _ => err_stream.concatenate(error_collections),
150        };
151
152        outputs.insert(export_id, (ok, err_collection));
153
154        health_streams.extend(health_stream.into_iter().map(|s| s.leave()));
155    }
156    (outputs, health_streams, needed_tokens)
157}
158
159/// Completes the rendering of a particular source stream by applying decoding and envelope
160/// processing as necessary
161fn render_source_stream<G, FromTime>(
162    scope: &mut G,
163    dataflow_debug_name: &String,
164    export_id: GlobalId,
165    ok_source: Collection<G, SourceOutput<FromTime>, Diff>,
166    data_config: SourceExportDataConfig,
167    description: &IngestionDescription<CollectionMetadata>,
168    error_collections: &mut Vec<Collection<G, DataflowError, Diff>>,
169    storage_state: &crate::storage_state::StorageState,
170    base_source_config: &RawSourceCreationConfig,
171    rehydrated_token: impl std::any::Any + 'static,
172) -> (
173    Collection<G, Row, Diff>,
174    Vec<PressOnDropButton>,
175    Vec<Stream<G, HealthStatusMessage>>,
176)
177where
178    G: Scope<Timestamp = mz_repr::Timestamp>,
179    FromTime: Timestamp + Sync,
180{
181    let mut needed_tokens = vec![];
182
183    // Use the envelope and encoding configs for this particular source export
184    let SourceExportDataConfig { encoding, envelope } = data_config;
185
186    let SourceDesc {
187        connection: _,
188        timestamp_interval: _,
189        primary_export: _,
190        primary_export_details: _,
191    } = description.desc;
192
193    let (decoded_stream, decode_health) = match encoding {
194        None => (
195            ok_source.map(|r| DecodeResult {
196                // This is safe because the current set of sources produce
197                // either:
198                // 1. Non-nullable keys
199                // 2. No keys at all.
200                //
201                // Please see the comment on `key_envelope_no_encoding` in
202                // `mz_sql::plan::statement::ddl` for more details.
203                key: Some(Ok(r.key)),
204                value: Some(Ok(r.value)),
205                metadata: r.metadata,
206                from_time: r.from_time,
207            }),
208            None,
209        ),
210        Some(encoding) => {
211            let (decoded_stream, decode_health) = render_decode_delimited(
212                &ok_source,
213                encoding.key,
214                encoding.value,
215                dataflow_debug_name.clone(),
216                storage_state.metrics.decode_defs.clone(),
217                storage_state.storage_configuration.clone(),
218            );
219            (decoded_stream, Some(decode_health))
220        }
221    };
222
223    // render envelopes
224    let (envelope_ok, envelope_health) = match &envelope {
225        SourceEnvelope::Upsert(upsert_envelope) => {
226            let upsert_input = upsert_commands(decoded_stream, upsert_envelope.clone());
227
228            let persist_clients = Arc::clone(&storage_state.persist_clients);
229            // TODO: Get this to work with the as_of.
230            let resume_upper = base_source_config.resume_uppers[&export_id].clone();
231
232            let upper_ts = resume_upper
233                .as_option()
234                .expect("resuming an already finished ingestion")
235                .clone();
236            let (upsert, health_update) = scope.scoped(
237                &format!("upsert_rehydration_backpressure({})", export_id),
238                |scope| {
239                    let (previous, previous_token, feedback_handle, backpressure_metrics) = {
240                        let as_of = Antichain::from_elem(upper_ts.saturating_sub(1));
241
242                        let backpressure_max_inflight_bytes = get_backpressure_max_inflight_bytes(
243                            &storage_state
244                                .storage_configuration
245                                .parameters
246                                .storage_dataflow_max_inflight_bytes_config,
247                            &storage_state.instance_context.cluster_memory_limit,
248                        );
249
250                        let (feedback_handle, flow_control, backpressure_metrics) =
251                            if let Some(storage_dataflow_max_inflight_bytes) =
252                                backpressure_max_inflight_bytes
253                            {
254                                tracing::info!(
255                                    ?backpressure_max_inflight_bytes,
256                                    "timely-{} using backpressure in upsert for source {}",
257                                    base_source_config.worker_id,
258                                    export_id
259                                );
260                                if !storage_state
261                                    .storage_configuration
262                                    .parameters
263                                    .storage_dataflow_max_inflight_bytes_config
264                                    .disk_only
265                                    || storage_state.instance_context.scratch_directory.is_some()
266                                {
267                                    let (feedback_handle, feedback_data) =
268                                        scope.feedback(Default::default());
269
270                                    // TODO(guswynn): cleanup
271                                    let backpressure_metrics = Some(
272                                        base_source_config
273                                            .metrics
274                                            .get_backpressure_metrics(export_id, scope.index()),
275                                    );
276
277                                    (
278                                        Some(feedback_handle),
279                                        Some(persist_source::FlowControl {
280                                            progress_stream: feedback_data,
281                                            max_inflight_bytes: storage_dataflow_max_inflight_bytes,
282                                            summary: (Default::default(), Subtime::least_summary()),
283                                            metrics: backpressure_metrics.clone(),
284                                        }),
285                                        backpressure_metrics,
286                                    )
287                                } else {
288                                    (None, None, None)
289                                }
290                            } else {
291                                (None, None, None)
292                            };
293
294                        let storage_metadata = description.source_exports[&export_id]
295                            .storage_metadata
296                            .clone();
297
298                        let error_handler =
299                            storage_state.error_handler("upsert_rehydration", export_id);
300
301                        let (stream, tok) = persist_source::persist_source_core(
302                            scope,
303                            export_id,
304                            persist_clients,
305                            storage_metadata,
306                            None,
307                            Some(as_of),
308                            SnapshotMode::Include,
309                            Antichain::new(),
310                            None,
311                            flow_control,
312                            false.then_some(|| unreachable!()),
313                            async {},
314                            error_handler,
315                        );
316                        (
317                            stream.as_collection(),
318                            Some(tok),
319                            feedback_handle,
320                            backpressure_metrics,
321                        )
322                    };
323
324                    let export_statistics = storage_state
325                        .aggregated_statistics
326                        .get_source(&export_id)
327                        .expect("statistics initialized")
328                        .clone();
329                    let export_config = SourceExportCreationConfig {
330                        id: export_id,
331                        worker_id: base_source_config.worker_id,
332                        metrics: base_source_config.metrics.clone(),
333                        source_statistics: export_statistics,
334                    };
335                    let (upsert, health_update, snapshot_progress, upsert_token) =
336                        crate::upsert::upsert(
337                            &upsert_input.enter(scope),
338                            upsert_envelope.clone(),
339                            refine_antichain(&resume_upper),
340                            previous,
341                            previous_token,
342                            export_config,
343                            &storage_state.instance_context,
344                            &storage_state.storage_configuration,
345                            &storage_state.dataflow_parameters,
346                            backpressure_metrics,
347                        );
348
349                    // Even though we register the `persist_sink` token at a top-level,
350                    // which will stop any data from being committed, we also register
351                    // a token for the `upsert` operator which may be in the middle of
352                    // rehydration processing the `persist_source` input above.
353                    needed_tokens.push(upsert_token);
354
355                    // If configured, delay raw sources until we rehydrate the upsert
356                    // source. Otherwise, drop the token, unblocking the sources at the
357                    // end rendering.
358                    if dyncfgs::DELAY_SOURCES_PAST_REHYDRATION
359                        .get(storage_state.storage_configuration.config_set())
360                    {
361                        crate::upsert::rehydration_finished(
362                            scope.clone(),
363                            base_source_config,
364                            rehydrated_token,
365                            refine_antichain(&resume_upper),
366                            &snapshot_progress,
367                        );
368                    } else {
369                        drop(rehydrated_token)
370                    };
371
372                    // If backpressure from persist is enabled, we connect the upsert operator's
373                    // snapshot progress to the persist source feedback handle.
374                    if let Some(feedback_handle) = feedback_handle {
375                        snapshot_progress.connect_loop(feedback_handle);
376                    }
377
378                    (
379                        upsert.leave(),
380                        health_update
381                            .map(|(id, update)| HealthStatusMessage {
382                                id,
383                                namespace: StatusNamespace::Upsert,
384                                update,
385                            })
386                            .leave(),
387                    )
388                },
389            );
390
391            let (upsert_ok, upsert_err) = upsert.inner.ok_err(split_ok_err);
392            error_collections.push(upsert_err.as_collection());
393
394            (upsert_ok.as_collection(), Some(health_update))
395        }
396        SourceEnvelope::None(none_envelope) => {
397            let results = append_metadata_to_value(decoded_stream);
398
399            let flattened_stream = flatten_results_prepend_keys(none_envelope, results);
400
401            let (stream, errors) = flattened_stream.inner.ok_err(split_ok_err);
402
403            error_collections.push(errors.as_collection());
404            (stream.as_collection(), None)
405        }
406        SourceEnvelope::CdcV2 => {
407            let (oks, token) = render_decode_cdcv2(&decoded_stream);
408            needed_tokens.push(token);
409            (oks, None)
410        }
411    };
412
413    // Return the collections and any needed tokens.
414    let health = decode_health.into_iter().chain(envelope_health).collect();
415    (envelope_ok, needed_tokens, health)
416}
417
418// Returns the maximum limit of inflight bytes for backpressure based on given config
419// and the current cluster size
420fn get_backpressure_max_inflight_bytes(
421    inflight_bytes_config: &StorageMaxInflightBytesConfig,
422    cluster_memory_limit: &Option<usize>,
423) -> Option<usize> {
424    let StorageMaxInflightBytesConfig {
425        max_inflight_bytes_default,
426        max_inflight_bytes_cluster_size_fraction,
427        disk_only: _,
428    } = inflight_bytes_config;
429
430    // Will use backpressure only if the default inflight value is provided
431    if max_inflight_bytes_default.is_some() {
432        let current_cluster_max_bytes_limit =
433            cluster_memory_limit.as_ref().and_then(|cluster_memory| {
434                max_inflight_bytes_cluster_size_fraction.map(|fraction| {
435                    // We just need close the correct % of bytes here, so we just use lossy casts.
436                    usize::cast_lossy(f64::cast_lossy(*cluster_memory) * fraction)
437                })
438            });
439        current_cluster_max_bytes_limit.or(*max_inflight_bytes_default)
440    } else {
441        None
442    }
443}
444
445// TODO: Maybe we should finally move this to some central place and re-use. There seem to be
446// enough instances of this by now.
447fn split_ok_err<O, E, T, D>(x: (Result<O, E>, T, D)) -> Result<(O, T, D), (E, T, D)> {
448    match x {
449        (Ok(ok), ts, diff) => Ok((ok, ts, diff)),
450        (Err(err), ts, diff) => Err((err, ts, diff)),
451    }
452}
453
454/// After handling metadata insertion, we split streams into key/value parts for convenience
455#[derive(Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
456struct KV {
457    key: Option<Result<Row, DecodeError>>,
458    val: Option<Result<Row, DecodeError>>,
459}
460
461fn append_metadata_to_value<G: Scope, FromTime: Timestamp>(
462    results: Collection<G, DecodeResult<FromTime>, Diff>,
463) -> Collection<G, KV, Diff> {
464    results.map(move |res| {
465        let val = res.value.map(|val_result| {
466            val_result.map(|mut val| {
467                if !res.metadata.is_empty() {
468                    RowPacker::for_existing_row(&mut val).extend_by_row(&res.metadata);
469                }
470                val
471            })
472        });
473
474        KV { val, key: res.key }
475    })
476}
477
478/// Convert from streams of [`DecodeResult`] to UpsertCommands, inserting the Key according to [`KeyEnvelope`]
479fn upsert_commands<G: Scope, FromTime: Timestamp>(
480    input: Collection<G, DecodeResult<FromTime>, Diff>,
481    upsert_envelope: UpsertEnvelope,
482) -> Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff> {
483    let mut row_buf = Row::default();
484    input.map(move |result| {
485        let from_time = result.from_time;
486
487        let key = match result.key {
488            Some(Ok(key)) => Ok(key),
489            None => Err(UpsertError::NullKey(UpsertNullKeyError)),
490            Some(Err(err)) => Err(UpsertError::KeyDecode(err)),
491        };
492
493        // If we have a well-formed key we can continue, otherwise we're upserting an error
494        let key = match key {
495            Ok(key) => key,
496            Err(err) => match result.value {
497                Some(_) => {
498                    return (
499                        UpsertKey::from_key(Err(&err)),
500                        Some(Err(Box::new(err))),
501                        from_time,
502                    );
503                }
504                None => return (UpsertKey::from_key(Err(&err)), None, from_time),
505            },
506        };
507
508        // We can now apply the key envelope
509        let key_row = match upsert_envelope.style {
510            // flattened or debezium
511            UpsertStyle::Debezium { .. }
512            | UpsertStyle::Default(KeyEnvelope::Flattened)
513            | UpsertStyle::ValueErrInline {
514                key_envelope: KeyEnvelope::Flattened,
515                error_column: _,
516            } => key,
517            // named
518            UpsertStyle::Default(KeyEnvelope::Named(_))
519            | UpsertStyle::ValueErrInline {
520                key_envelope: KeyEnvelope::Named(_),
521                error_column: _,
522            } => {
523                if key.iter().nth(1).is_none() {
524                    key
525                } else {
526                    row_buf.packer().push_list(key.iter());
527                    row_buf.clone()
528                }
529            }
530            UpsertStyle::Default(KeyEnvelope::None)
531            | UpsertStyle::ValueErrInline {
532                key_envelope: KeyEnvelope::None,
533                error_column: _,
534            } => unreachable!(),
535        };
536
537        let key = UpsertKey::from_key(Ok(&key_row));
538
539        let metadata = result.metadata;
540
541        let value = match result.value {
542            Some(Ok(ref row)) => match upsert_envelope.style {
543                UpsertStyle::Debezium { after_idx } => match row.iter().nth(after_idx).unwrap() {
544                    Datum::List(after) => {
545                        let mut packer = row_buf.packer();
546                        packer.extend(after.iter());
547                        packer.extend_by_row(&metadata);
548                        Some(Ok(row_buf.clone()))
549                    }
550                    Datum::Null => None,
551                    d => panic!("type error: expected record, found {:?}", d),
552                },
553                UpsertStyle::Default(_) => {
554                    let mut packer = row_buf.packer();
555                    packer.extend_by_row(&key_row);
556                    packer.extend_by_row(row);
557                    packer.extend_by_row(&metadata);
558                    Some(Ok(row_buf.clone()))
559                }
560                UpsertStyle::ValueErrInline { .. } => {
561                    let mut packer = row_buf.packer();
562                    packer.extend_by_row(&key_row);
563                    // The 'error' column is null
564                    packer.push(Datum::Null);
565                    packer.extend_by_row(row);
566                    packer.extend_by_row(&metadata);
567                    Some(Ok(row_buf.clone()))
568                }
569            },
570            Some(Err(inner)) => {
571                match upsert_envelope.style {
572                    UpsertStyle::ValueErrInline { .. } => {
573                        let mut count = 0;
574                        // inline the error in the data output
575                        let err_string = inner.to_string();
576                        let mut packer = row_buf.packer();
577                        for datum in key_row.iter() {
578                            packer.push(datum);
579                            count += 1;
580                        }
581                        // The 'error' column is a record with a 'description' column
582                        packer.push_list(iter::once(Datum::String(&err_string)));
583                        count += 1;
584                        let metadata_len = metadata.as_row_ref().iter().count();
585                        // push nulls for all value columns
586                        packer.extend(
587                            iter::repeat(Datum::Null)
588                                .take(upsert_envelope.source_arity - count - metadata_len),
589                        );
590                        packer.extend_by_row(&metadata);
591                        Some(Ok(row_buf.clone()))
592                    }
593                    _ => Some(Err(Box::new(UpsertError::Value(UpsertValueError {
594                        for_key: key_row,
595                        inner,
596                    })))),
597                }
598            }
599            None => None,
600        };
601
602        (key, value, from_time)
603    })
604}
605
606/// Convert from streams of [`DecodeResult`] to Rows, inserting the Key according to [`KeyEnvelope`]
607fn flatten_results_prepend_keys<G>(
608    none_envelope: &NoneEnvelope,
609    results: Collection<G, KV, Diff>,
610) -> Collection<G, Result<Row, DataflowError>, Diff>
611where
612    G: Scope,
613{
614    let NoneEnvelope {
615        key_envelope,
616        key_arity,
617    } = none_envelope;
618
619    let null_key_columns = Row::pack_slice(&vec![Datum::Null; *key_arity]);
620
621    match key_envelope {
622        KeyEnvelope::None => {
623            results.flat_map(|KV { val, .. }| val.map(|result| result.map_err(Into::into)))
624        }
625        KeyEnvelope::Flattened => results
626            .flat_map(raise_key_value_errors)
627            .map(move |maybe_kv| {
628                maybe_kv.map(|(key, value)| {
629                    let mut key = key.unwrap_or_else(|| null_key_columns.clone());
630                    RowPacker::for_existing_row(&mut key).extend_by_row(&value);
631                    key
632                })
633            }),
634        KeyEnvelope::Named(_) => {
635            results
636                .flat_map(raise_key_value_errors)
637                .map(move |maybe_kv| {
638                    maybe_kv.map(|(key, value)| {
639                        let mut key = key.unwrap_or_else(|| null_key_columns.clone());
640                        // Named semantics rename a key that is a single column, and encode a
641                        // multi-column field as a struct with that name
642                        let row = if key.iter().nth(1).is_none() {
643                            RowPacker::for_existing_row(&mut key).extend_by_row(&value);
644                            key
645                        } else {
646                            let mut new_row = Row::default();
647                            let mut packer = new_row.packer();
648                            packer.push_list(key.iter());
649                            packer.extend_by_row(&value);
650                            new_row
651                        };
652                        row
653                    })
654                })
655        }
656    }
657}
658
659/// Handle possibly missing key or value portions of messages
660fn raise_key_value_errors(
661    KV { key, val }: KV,
662) -> Option<Result<(Option<Row>, Row), DataflowError>> {
663    match (key, val) {
664        (Some(Ok(key)), Some(Ok(value))) => Some(Ok((Some(key), value))),
665        (None, Some(Ok(value))) => Some(Ok((None, value))),
666        // always prioritize the value error if either or both have an error
667        (_, Some(Err(e))) => Some(Err(e.into())),
668        (Some(Err(e)), _) => Some(Err(e.into())),
669        (None, None) => None,
670        // TODO(petrosagg): these errors would be better grouped under an EnvelopeError enum
671        _ => Some(Err(DataflowError::from(EnvelopeError::Flat(
672            "Value not present for message".into(),
673        )))),
674    }
675}
676
677#[cfg(test)]
678mod test {
679    use super::*;
680
681    #[mz_ore::test]
682    fn test_no_default() {
683        let config = StorageMaxInflightBytesConfig {
684            max_inflight_bytes_default: None,
685            max_inflight_bytes_cluster_size_fraction: Some(0.5),
686            disk_only: false,
687        };
688        let memory_limit = Some(1000);
689
690        let backpressure_inflight_bytes_limit =
691            get_backpressure_max_inflight_bytes(&config, &memory_limit);
692
693        assert_eq!(backpressure_inflight_bytes_limit, None)
694    }
695
696    #[mz_ore::test]
697    fn test_no_matching_size() {
698        let config = StorageMaxInflightBytesConfig {
699            max_inflight_bytes_default: Some(10000),
700            max_inflight_bytes_cluster_size_fraction: Some(0.5),
701            disk_only: false,
702        };
703
704        let backpressure_inflight_bytes_limit = get_backpressure_max_inflight_bytes(&config, &None);
705
706        assert_eq!(
707            backpressure_inflight_bytes_limit,
708            config.max_inflight_bytes_default
709        )
710    }
711
712    #[mz_ore::test]
713    fn test_calculated_cluster_limit() {
714        let config = StorageMaxInflightBytesConfig {
715            max_inflight_bytes_default: Some(10000),
716            max_inflight_bytes_cluster_size_fraction: Some(0.5),
717            disk_only: false,
718        };
719        let memory_limit = Some(2000);
720
721        let backpressure_inflight_bytes_limit =
722            get_backpressure_max_inflight_bytes(&config, &memory_limit);
723
724        // the limit should be 50% of 2000 i.e. 1000
725        assert_eq!(backpressure_inflight_bytes_limit, Some(1000));
726    }
727}