Skip to main content

mz_storage/render/
sources.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to the creation of dataflow sources.
11//!
12//! See [`render_source`] for more details.
13
14use std::collections::BTreeMap;
15use std::iter;
16use std::sync::Arc;
17
18use differential_dataflow::{AsCollection, VecCollection};
19use mz_ore::cast::CastLossy;
20use mz_persist_client::operators::shard_source::SnapshotMode;
21use mz_repr::{Datum, Diff, GlobalId, Row, RowPacker};
22use mz_storage_operators::persist_source;
23use mz_storage_operators::persist_source::Subtime;
24use mz_storage_types::controller::CollectionMetadata;
25use mz_storage_types::dyncfgs;
26use mz_storage_types::errors::{
27    DataflowError, DecodeError, EnvelopeError, UpsertError, UpsertNullKeyError, UpsertValueError,
28};
29use mz_storage_types::parameters::StorageMaxInflightBytesConfig;
30use mz_storage_types::sources::envelope::{KeyEnvelope, NoneEnvelope, UpsertEnvelope, UpsertStyle};
31use mz_storage_types::sources::*;
32use mz_timely_util::builder_async::PressOnDropButton;
33use mz_timely_util::operator::CollectionExt;
34use mz_timely_util::order::refine_antichain;
35use serde::{Deserialize, Serialize};
36use timely::container::CapacityContainerBuilder;
37use timely::dataflow::StreamVec;
38use timely::dataflow::operators::vec::Map;
39use timely::dataflow::operators::{ConnectLoop, Feedback, Leave, OkErr};
40use timely::dataflow::scope::Scope;
41use timely::progress::{Antichain, Timestamp};
42
43use crate::decode::{render_decode_cdcv2, render_decode_delimited};
44use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
45use crate::source::types::{DecodeResult, SourceOutput, SourceRender};
46use crate::source::{self, RawSourceCreationConfig, SourceExportCreationConfig};
47use crate::upsert::{UpsertKey, UpsertSourceTime, UpsertValue};
48
49/// _Renders_ complete _differential_ collections
50/// that represent the final source and its errors
51/// as requested by the original `CREATE SOURCE` statement,
52/// encapsulated in the passed `SourceInstanceDesc`.
53///
54/// The first element in the returned tuple is the pair of Collections,
55/// the second is a type-erased token that will keep the source
56/// alive as long as it is not dropped.
57///
58/// This function is intended to implement the recipe described here:
59/// <https://github.com/MaterializeInc/materialize/blob/main/doc/developer/platform/architecture-storage.md#source-ingestion>
60pub fn render_source<'scope, 'root, C>(
61    scope: Scope<'scope, mz_repr::Timestamp>,
62    root_scope: Scope<'root, ()>,
63    dataflow_debug_name: &String,
64    connection: C,
65    description: IngestionDescription<CollectionMetadata>,
66    resume_stream: StreamVec<'scope, mz_repr::Timestamp, ()>,
67    storage_state: &crate::storage_state::StorageState,
68    base_source_config: RawSourceCreationConfig,
69) -> (
70    BTreeMap<
71        GlobalId,
72        (
73            VecCollection<'scope, mz_repr::Timestamp, Row, Diff>,
74            VecCollection<'scope, mz_repr::Timestamp, DataflowError, Diff>,
75        ),
76    >,
77    Vec<StreamVec<'root, (), HealthStatusMessage>>,
78    Vec<PressOnDropButton>,
79)
80where
81    C: SourceConnection + SourceRender + 'static,
82    C::Time: UpsertSourceTime,
83{
84    // Tokens that we should return from the method.
85    let mut needed_tokens = Vec::new();
86
87    // Note that this `render_source` attaches a single _instance_ of a source
88    // to the passed `Scope`, and this instance may be disabled if the
89    // source type does not support multiple instances. `render_source`
90    // is called on each timely worker as part of
91    // [`super::build_storage_dataflow`].
92
93    // A set of channels (1 per worker) used to signal rehydration being finished
94    // to raw sources. These are channels and not timely streams because they
95    // have to cross a scope boundary.
96    //
97    // Note that these will be entirely subsumed by full `hydration` backpressure,
98    // once that is implemented.
99    let (starter, mut start_signal) = tokio::sync::mpsc::channel::<()>(1);
100    let start_signal = async move {
101        let _ = start_signal.recv().await;
102    };
103
104    // Build the _raw_ ok and error sources using `create_raw_source` and the
105    // correct `SourceReader` implementations
106    let (exports, health, source_tokens) = source::create_raw_source(
107        scope,
108        root_scope,
109        storage_state,
110        resume_stream,
111        &base_source_config,
112        connection,
113        start_signal,
114    );
115
116    needed_tokens.extend(source_tokens);
117
118    let mut health_streams = Vec::with_capacity(exports.len() + 1);
119    health_streams.push(health);
120
121    let mut outputs = BTreeMap::new();
122    for (export_id, export) in exports {
123        type CB<C> = CapacityContainerBuilder<C>;
124        let (ok_stream, err_stream) =
125            export.map_fallible::<CB<_>, CB<_>, _, _, _>("export-demux-ok-err", |r| r);
126
127        // All sources should push their various error streams into this vector,
128        // whose contents will be concatenated and inserted along the collection.
129        // All subsources include the non-definite errors of the ingestion
130        let mut error_collections = Vec::new();
131
132        let data_config = base_source_config.source_exports[&export_id]
133            .data_config
134            .clone();
135        let (ok, extra_tokens, health_stream) = render_source_stream(
136            scope,
137            dataflow_debug_name,
138            export_id,
139            ok_stream,
140            data_config,
141            &description,
142            &mut error_collections,
143            storage_state,
144            &base_source_config,
145            starter.clone(),
146        );
147        needed_tokens.extend(extra_tokens);
148
149        // Flatten the error collections.
150        let err_collection = match error_collections.len() {
151            0 => err_stream,
152            _ => err_stream.concatenate(error_collections),
153        };
154
155        outputs.insert(export_id, (ok, err_collection));
156
157        health_streams.extend(health_stream.into_iter().map(|s| s.leave(root_scope)));
158    }
159    (outputs, health_streams, needed_tokens)
160}
161
162/// Completes the rendering of a particular source stream by applying decoding and envelope
163/// processing as necessary
164fn render_source_stream<'scope, FromTime>(
165    scope: Scope<'scope, mz_repr::Timestamp>,
166    dataflow_debug_name: &String,
167    export_id: GlobalId,
168    ok_source: VecCollection<'scope, mz_repr::Timestamp, SourceOutput<FromTime>, Diff>,
169    data_config: SourceExportDataConfig,
170    description: &IngestionDescription<CollectionMetadata>,
171    error_collections: &mut Vec<VecCollection<'scope, mz_repr::Timestamp, DataflowError, Diff>>,
172    storage_state: &crate::storage_state::StorageState,
173    base_source_config: &RawSourceCreationConfig,
174    rehydrated_token: impl std::any::Any + 'static,
175) -> (
176    VecCollection<'scope, mz_repr::Timestamp, Row, Diff>,
177    Vec<PressOnDropButton>,
178    Vec<StreamVec<'scope, mz_repr::Timestamp, HealthStatusMessage>>,
179)
180where
181    FromTime: Timestamp + Sync,
182    FromTime: UpsertSourceTime,
183{
184    let mut needed_tokens = vec![];
185
186    // Use the envelope and encoding configs for this particular source export
187    let SourceExportDataConfig { encoding, envelope } = data_config;
188
189    let SourceDesc {
190        connection: _,
191        timestamp_interval: _,
192    } = description.desc;
193
194    let (decoded_stream, decode_health) = match encoding {
195        None => (
196            ok_source.map(|r| DecodeResult {
197                // This is safe because the current set of sources produce
198                // either:
199                // 1. Non-nullable keys
200                // 2. No keys at all.
201                //
202                // Please see the comment on `key_envelope_no_encoding` in
203                // `mz_sql::plan::statement::ddl` for more details.
204                key: Some(Ok(r.key)),
205                value: Some(Ok(r.value)),
206                metadata: r.metadata,
207                from_time: r.from_time,
208            }),
209            None,
210        ),
211        Some(encoding) => {
212            let (decoded_stream, decode_health) = render_decode_delimited(
213                ok_source,
214                encoding.key,
215                encoding.value,
216                dataflow_debug_name.clone(),
217                storage_state.metrics.decode_defs.clone(),
218                storage_state.storage_configuration.clone(),
219            );
220            (decoded_stream, Some(decode_health))
221        }
222    };
223
224    // render envelopes
225    let (envelope_ok, envelope_health) = match &envelope {
226        SourceEnvelope::Upsert(upsert_envelope) => {
227            let upsert_input = upsert_commands(decoded_stream, upsert_envelope.clone());
228
229            let persist_clients = Arc::clone(&storage_state.persist_clients);
230            // TODO: Get this to work with the as_of.
231            let resume_upper = base_source_config.resume_uppers[&export_id].clone();
232
233            let upper_ts = resume_upper
234                .as_option()
235                .expect("resuming an already finished ingestion")
236                .clone();
237            let outer_mz_scope = scope.clone();
238            let (upsert, health_update) = scope.scoped(
239                &format!("upsert_rehydration_backpressure({})", export_id),
240                |scope| {
241                    let (previous, previous_token, feedback_handle, backpressure_metrics) = {
242                        let as_of = Antichain::from_elem(upper_ts.saturating_sub(1));
243
244                        let backpressure_max_inflight_bytes = get_backpressure_max_inflight_bytes(
245                            &storage_state
246                                .storage_configuration
247                                .parameters
248                                .storage_dataflow_max_inflight_bytes_config,
249                            &storage_state.instance_context.cluster_memory_limit,
250                        );
251
252                        let (feedback_handle, flow_control, backpressure_metrics) =
253                            if let Some(storage_dataflow_max_inflight_bytes) =
254                                backpressure_max_inflight_bytes
255                            {
256                                tracing::info!(
257                                    ?backpressure_max_inflight_bytes,
258                                    "timely-{} using backpressure in upsert for source {}",
259                                    base_source_config.worker_id,
260                                    export_id
261                                );
262                                if !storage_state
263                                    .storage_configuration
264                                    .parameters
265                                    .storage_dataflow_max_inflight_bytes_config
266                                    .disk_only
267                                    || storage_state.instance_context.scratch_directory.is_some()
268                                {
269                                    let (feedback_handle, feedback_data) =
270                                        scope.feedback(Default::default());
271
272                                    // TODO(guswynn): cleanup
273                                    let backpressure_metrics = Some(
274                                        base_source_config
275                                            .metrics
276                                            .get_backpressure_metrics(export_id, scope.index()),
277                                    );
278
279                                    (
280                                        Some(feedback_handle),
281                                        Some(persist_source::FlowControl {
282                                            progress_stream: feedback_data,
283                                            max_inflight_bytes: storage_dataflow_max_inflight_bytes,
284                                            summary: (Default::default(), Subtime::least_summary()),
285                                            metrics: backpressure_metrics.clone(),
286                                        }),
287                                        backpressure_metrics,
288                                    )
289                                } else {
290                                    (None, None, None)
291                                }
292                            } else {
293                                (None, None, None)
294                            };
295
296                        let storage_metadata = description.source_exports[&export_id]
297                            .storage_metadata
298                            .clone();
299
300                        let error_handler =
301                            storage_state.error_handler("upsert_rehydration", export_id);
302
303                        let (stream, tok) = persist_source::persist_source_core(
304                            outer_mz_scope,
305                            scope,
306                            export_id,
307                            persist_clients,
308                            storage_metadata,
309                            None,
310                            Some(as_of),
311                            SnapshotMode::Include,
312                            Antichain::new(),
313                            None,
314                            flow_control,
315                            false.then_some(|| unreachable!()),
316                            async {},
317                            error_handler,
318                        );
319                        (
320                            stream.as_collection(),
321                            Some(tok),
322                            feedback_handle,
323                            backpressure_metrics,
324                        )
325                    };
326
327                    let export_statistics = storage_state
328                        .aggregated_statistics
329                        .get_source(&export_id)
330                        .expect("statistics initialized")
331                        .clone();
332                    let export_config = SourceExportCreationConfig {
333                        id: export_id,
334                        worker_id: base_source_config.worker_id,
335                        metrics: base_source_config.metrics.clone(),
336                        source_statistics: export_statistics,
337                    };
338                    let (upsert, health_update, snapshot_progress, upsert_token) =
339                        if dyncfgs::ENABLE_UPSERT_V2
340                            .get(storage_state.storage_configuration.config_set())
341                        {
342                            crate::upsert::upsert_v2(
343                                upsert_input.enter(scope),
344                                upsert_envelope.clone(),
345                                refine_antichain(&resume_upper),
346                                previous,
347                                previous_token,
348                                export_config,
349                                backpressure_metrics,
350                            )
351                        } else {
352                            crate::upsert::upsert(
353                                upsert_input.enter(scope),
354                                upsert_envelope.clone(),
355                                refine_antichain(&resume_upper),
356                                previous,
357                                previous_token,
358                                export_config,
359                                &storage_state.instance_context,
360                                &storage_state.storage_configuration,
361                                &storage_state.dataflow_parameters,
362                                backpressure_metrics,
363                            )
364                        };
365
366                    // Even though we register the `persist_sink` token at a top-level,
367                    // which will stop any data from being committed, we also register
368                    // a token for the `upsert` operator which may be in the middle of
369                    // rehydration processing the `persist_source` input above.
370                    needed_tokens.push(upsert_token);
371
372                    // If configured, delay raw sources until we rehydrate the upsert
373                    // source. Otherwise, drop the token, unblocking the sources at the
374                    // end rendering.
375                    if dyncfgs::DELAY_SOURCES_PAST_REHYDRATION
376                        .get(storage_state.storage_configuration.config_set())
377                    {
378                        crate::upsert::rehydration_finished(
379                            scope.clone(),
380                            base_source_config,
381                            rehydrated_token,
382                            refine_antichain(&resume_upper),
383                            snapshot_progress.clone(),
384                        );
385                    } else {
386                        drop(rehydrated_token)
387                    };
388
389                    // If backpressure from persist is enabled, we connect the upsert operator's
390                    // snapshot progress to the persist source feedback handle.
391                    if let Some(feedback_handle) = feedback_handle {
392                        snapshot_progress.connect_loop(feedback_handle);
393                    }
394
395                    (
396                        upsert.leave(outer_mz_scope),
397                        health_update
398                            .map(|(id, update)| HealthStatusMessage {
399                                id,
400                                namespace: StatusNamespace::Upsert,
401                                update,
402                            })
403                            .leave(outer_mz_scope),
404                    )
405                },
406            );
407
408            let (upsert_ok, upsert_err) = upsert.inner.ok_err(split_ok_err);
409            error_collections.push(upsert_err.as_collection());
410
411            (upsert_ok.as_collection(), Some(health_update))
412        }
413        SourceEnvelope::None(none_envelope) => {
414            let results = append_metadata_to_value(decoded_stream);
415
416            let flattened_stream = flatten_results_prepend_keys(none_envelope, results);
417
418            let (stream, errors) = flattened_stream.inner.ok_err(split_ok_err);
419
420            error_collections.push(errors.as_collection());
421            (stream.as_collection(), None)
422        }
423        SourceEnvelope::CdcV2 => {
424            let (oks, token) = render_decode_cdcv2(&decoded_stream);
425            needed_tokens.push(token);
426            (oks, None)
427        }
428    };
429
430    // Return the collections and any needed tokens.
431    let health = decode_health.into_iter().chain(envelope_health).collect();
432    (envelope_ok, needed_tokens, health)
433}
434
435// Returns the maximum limit of inflight bytes for backpressure based on given config
436// and the current cluster size
437fn get_backpressure_max_inflight_bytes(
438    inflight_bytes_config: &StorageMaxInflightBytesConfig,
439    cluster_memory_limit: &Option<usize>,
440) -> Option<usize> {
441    let StorageMaxInflightBytesConfig {
442        max_inflight_bytes_default,
443        max_inflight_bytes_cluster_size_fraction,
444        disk_only: _,
445    } = inflight_bytes_config;
446
447    // Will use backpressure only if the default inflight value is provided
448    if max_inflight_bytes_default.is_some() {
449        let current_cluster_max_bytes_limit =
450            cluster_memory_limit.as_ref().and_then(|cluster_memory| {
451                max_inflight_bytes_cluster_size_fraction.map(|fraction| {
452                    // We just need close the correct % of bytes here, so we just use lossy casts.
453                    usize::cast_lossy(f64::cast_lossy(*cluster_memory) * fraction)
454                })
455            });
456        current_cluster_max_bytes_limit.or(*max_inflight_bytes_default)
457    } else {
458        None
459    }
460}
461
462// TODO: Maybe we should finally move this to some central place and re-use. There seem to be
463// enough instances of this by now.
464fn split_ok_err<O, E, T, D>(x: (Result<O, E>, T, D)) -> Result<(O, T, D), (E, T, D)> {
465    match x {
466        (Ok(ok), ts, diff) => Ok((ok, ts, diff)),
467        (Err(err), ts, diff) => Err((err, ts, diff)),
468    }
469}
470
471/// After handling metadata insertion, we split streams into key/value parts for convenience
472#[derive(
473    Debug,
474    Clone,
475    Hash,
476    PartialEq,
477    Eq,
478    Ord,
479    PartialOrd,
480    Serialize,
481    Deserialize
482)]
483struct KV {
484    key: Option<Result<Row, DecodeError>>,
485    val: Option<Result<Row, DecodeError>>,
486}
487
488fn append_metadata_to_value<'scope, T: Timestamp, FromTime: Timestamp>(
489    results: VecCollection<'scope, T, DecodeResult<FromTime>, Diff>,
490) -> VecCollection<'scope, T, KV, Diff> {
491    results.map(move |res| {
492        let val = res.value.map(|val_result| {
493            val_result.map(|mut val| {
494                if !res.metadata.is_empty() {
495                    RowPacker::for_existing_row(&mut val).extend_by_row(&res.metadata);
496                }
497                val
498            })
499        });
500
501        KV { val, key: res.key }
502    })
503}
504
505/// Convert from streams of [`DecodeResult`] to UpsertCommands, inserting the Key according to [`KeyEnvelope`]
506fn upsert_commands<'scope, T: Timestamp, FromTime: Timestamp>(
507    input: VecCollection<'scope, T, DecodeResult<FromTime>, Diff>,
508    upsert_envelope: UpsertEnvelope,
509) -> VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff> {
510    let mut row_buf = Row::default();
511    input.map(move |result| {
512        let from_time = result.from_time;
513
514        let key = match result.key {
515            Some(Ok(key)) => Ok(key),
516            None => Err(UpsertError::NullKey(UpsertNullKeyError)),
517            Some(Err(err)) => Err(UpsertError::KeyDecode(err)),
518        };
519
520        // If we have a well-formed key we can continue, otherwise we're upserting an error
521        let key = match key {
522            Ok(key) => key,
523            Err(err) => match result.value {
524                Some(_) => {
525                    return (
526                        UpsertKey::from_key(Err(&err)),
527                        Some(Err(Box::new(err))),
528                        from_time,
529                    );
530                }
531                None => return (UpsertKey::from_key(Err(&err)), None, from_time),
532            },
533        };
534
535        // We can now apply the key envelope
536        let key_row = match upsert_envelope.style {
537            // flattened or debezium
538            UpsertStyle::Debezium { .. }
539            | UpsertStyle::Default(KeyEnvelope::Flattened)
540            | UpsertStyle::ValueErrInline {
541                key_envelope: KeyEnvelope::Flattened,
542                error_column: _,
543            } => key,
544            // named
545            UpsertStyle::Default(KeyEnvelope::Named(_))
546            | UpsertStyle::ValueErrInline {
547                key_envelope: KeyEnvelope::Named(_),
548                error_column: _,
549            } => {
550                if key.iter().nth(1).is_none() {
551                    key
552                } else {
553                    row_buf.packer().push_list(key.iter());
554                    row_buf.clone()
555                }
556            }
557            UpsertStyle::Default(KeyEnvelope::None)
558            | UpsertStyle::ValueErrInline {
559                key_envelope: KeyEnvelope::None,
560                error_column: _,
561            } => unreachable!(),
562        };
563
564        let key = UpsertKey::from_key(Ok(&key_row));
565
566        let metadata = result.metadata;
567
568        let value = match result.value {
569            Some(Ok(ref row)) => match upsert_envelope.style {
570                UpsertStyle::Debezium { after_idx } => match row.iter().nth(after_idx).unwrap() {
571                    Datum::List(after) => {
572                        let mut packer = row_buf.packer();
573                        packer.extend(after.iter());
574                        packer.extend_by_row(&metadata);
575                        Some(Ok(row_buf.clone()))
576                    }
577                    Datum::Null => None,
578                    d => panic!("type error: expected record, found {:?}", d),
579                },
580                UpsertStyle::Default(_) => {
581                    let mut packer = row_buf.packer();
582                    packer.extend_by_row(&key_row);
583                    packer.extend_by_row(row);
584                    packer.extend_by_row(&metadata);
585                    Some(Ok(row_buf.clone()))
586                }
587                UpsertStyle::ValueErrInline { .. } => {
588                    let mut packer = row_buf.packer();
589                    packer.extend_by_row(&key_row);
590                    // The 'error' column is null
591                    packer.push(Datum::Null);
592                    packer.extend_by_row(row);
593                    packer.extend_by_row(&metadata);
594                    Some(Ok(row_buf.clone()))
595                }
596            },
597            Some(Err(inner)) => {
598                match upsert_envelope.style {
599                    UpsertStyle::ValueErrInline { .. } => {
600                        let mut count = 0;
601                        // inline the error in the data output
602                        let err_string = inner.to_string();
603                        let mut packer = row_buf.packer();
604                        for datum in key_row.iter() {
605                            packer.push(datum);
606                            count += 1;
607                        }
608                        // The 'error' column is a record with a 'description' column
609                        packer.push_list(iter::once(Datum::String(&err_string)));
610                        count += 1;
611                        let metadata_len = metadata.as_row_ref().iter().count();
612                        // push nulls for all value columns
613                        packer.extend(
614                            iter::repeat(Datum::Null)
615                                .take(upsert_envelope.source_arity - count - metadata_len),
616                        );
617                        packer.extend_by_row(&metadata);
618                        Some(Ok(row_buf.clone()))
619                    }
620                    _ => Some(Err(Box::new(UpsertError::Value(UpsertValueError {
621                        for_key: key_row,
622                        inner,
623                    })))),
624                }
625            }
626            None => None,
627        };
628
629        (key, value, from_time)
630    })
631}
632
633/// Convert from streams of [`DecodeResult`] to Rows, inserting the Key according to [`KeyEnvelope`]
634fn flatten_results_prepend_keys<'scope, T: Timestamp>(
635    none_envelope: &NoneEnvelope,
636    results: VecCollection<'scope, T, KV, Diff>,
637) -> VecCollection<'scope, T, Result<Row, DataflowError>, Diff> {
638    let NoneEnvelope {
639        key_envelope,
640        key_arity,
641    } = none_envelope;
642
643    let null_key_columns = Row::pack_slice(&vec![Datum::Null; *key_arity]);
644
645    match key_envelope {
646        KeyEnvelope::None => {
647            results.flat_map(|KV { val, .. }| val.map(|result| result.map_err(Into::into)))
648        }
649        KeyEnvelope::Flattened => results
650            .flat_map(raise_key_value_errors)
651            .map(move |maybe_kv| {
652                maybe_kv.map(|(key, value)| {
653                    let mut key = key.unwrap_or_else(|| null_key_columns.clone());
654                    RowPacker::for_existing_row(&mut key).extend_by_row(&value);
655                    key
656                })
657            }),
658        KeyEnvelope::Named(_) => {
659            results
660                .flat_map(raise_key_value_errors)
661                .map(move |maybe_kv| {
662                    maybe_kv.map(|(key, value)| {
663                        let mut key = key.unwrap_or_else(|| null_key_columns.clone());
664                        // Named semantics rename a key that is a single column, and encode a
665                        // multi-column field as a struct with that name
666                        let row = if key.iter().nth(1).is_none() {
667                            RowPacker::for_existing_row(&mut key).extend_by_row(&value);
668                            key
669                        } else {
670                            let mut new_row = Row::default();
671                            let mut packer = new_row.packer();
672                            packer.push_list(key.iter());
673                            packer.extend_by_row(&value);
674                            new_row
675                        };
676                        row
677                    })
678                })
679        }
680    }
681}
682
683/// Handle possibly missing key or value portions of messages
684fn raise_key_value_errors(
685    KV { key, val }: KV,
686) -> Option<Result<(Option<Row>, Row), DataflowError>> {
687    match (key, val) {
688        (Some(Ok(key)), Some(Ok(value))) => Some(Ok((Some(key), value))),
689        (None, Some(Ok(value))) => Some(Ok((None, value))),
690        // always prioritize the value error if either or both have an error
691        (_, Some(Err(e))) => Some(Err(e.into())),
692        (Some(Err(e)), _) => Some(Err(e.into())),
693        (None, None) => None,
694        // TODO(petrosagg): these errors would be better grouped under an EnvelopeError enum
695        _ => Some(Err(DataflowError::from(EnvelopeError::Flat(
696            "Value not present for message".into(),
697        )))),
698    }
699}
700
701#[cfg(test)]
702mod test {
703    use super::*;
704
705    #[mz_ore::test]
706    fn test_no_default() {
707        let config = StorageMaxInflightBytesConfig {
708            max_inflight_bytes_default: None,
709            max_inflight_bytes_cluster_size_fraction: Some(0.5),
710            disk_only: false,
711        };
712        let memory_limit = Some(1000);
713
714        let backpressure_inflight_bytes_limit =
715            get_backpressure_max_inflight_bytes(&config, &memory_limit);
716
717        assert_eq!(backpressure_inflight_bytes_limit, None)
718    }
719
720    #[mz_ore::test]
721    fn test_no_matching_size() {
722        let config = StorageMaxInflightBytesConfig {
723            max_inflight_bytes_default: Some(10000),
724            max_inflight_bytes_cluster_size_fraction: Some(0.5),
725            disk_only: false,
726        };
727
728        let backpressure_inflight_bytes_limit = get_backpressure_max_inflight_bytes(&config, &None);
729
730        assert_eq!(
731            backpressure_inflight_bytes_limit,
732            config.max_inflight_bytes_default
733        )
734    }
735
736    #[mz_ore::test]
737    fn test_calculated_cluster_limit() {
738        let config = StorageMaxInflightBytesConfig {
739            max_inflight_bytes_default: Some(10000),
740            max_inflight_bytes_cluster_size_fraction: Some(0.5),
741            disk_only: false,
742        };
743        let memory_limit = Some(2000);
744
745        let backpressure_inflight_bytes_limit =
746            get_backpressure_max_inflight_bytes(&config, &memory_limit);
747
748        // the limit should be 50% of 2000 i.e. 1000
749        assert_eq!(backpressure_inflight_bytes_limit, Some(1000));
750    }
751}