Skip to main content

mz_sql/session/vars/
value.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::any::Any;
11use std::borrow::Cow;
12use std::fmt::{self, Debug};
13use std::num::NonZeroU32;
14use std::str::FromStr;
15use std::time::Duration;
16
17use chrono::{DateTime, Utc};
18use ipnet::IpNet;
19use itertools::Itertools;
20use mz_pgwire_common::Severity;
21use mz_repr::adt::numeric::Numeric;
22use mz_repr::adt::timestamp::CheckedTimestamp;
23use mz_repr::strconv;
24use mz_rocksdb_types::config::{CompactionStyle, CompressionType};
25use mz_sql_parser::ast::{Ident, TransactionIsolationLevel};
26use mz_tracing::{CloneableEnvFilter, SerializableDirective};
27use serde::{Deserialize, Serialize};
28use uncased::UncasedStr;
29
30use super::VarInput;
31use super::errors::VarParseError;
32
33/// Defines a value that get stored as part of a System or Session variable.
34///
35/// This trait is partially object safe, see [`VarDefinition`] for more details.
36///
37/// [`VarDefinition`]: crate::session::vars::definitions::VarDefinition
38pub trait Value: Any + AsAny + Debug + Send + Sync {
39    fn type_name() -> Cow<'static, str>
40    where
41        Self: Sized;
42
43    fn parse(input: VarInput) -> Result<Self, VarParseError>
44    where
45        Self: Sized;
46
47    fn format(&self) -> String;
48
49    fn box_clone(&self) -> Box<dyn Value>;
50
51    /// Parse an instance of `Self` from [`VarInput`], returning it as a `Box<dyn Value>`.
52    fn parse_dyn_value(input: VarInput) -> Result<Box<dyn Value>, VarParseError>
53    where
54        Self: Sized,
55    {
56        Self::parse(input).map(|val| {
57            let dyn_val: Box<dyn Value> = Box::new(val);
58            dyn_val
59        })
60    }
61}
62
63// Note(parkmycar): We have a blanket impl for `PartialEq` instead of requiring it as a trait
64// bound because otherwise it's tricky to make `Value` object safe.
65impl PartialEq for Box<dyn Value> {
66    fn eq(&self, other: &Box<dyn Value>) -> bool {
67        self.format().eq(&other.format())
68    }
69}
70impl Eq for Box<dyn Value> {}
71
72impl<'a> PartialEq for &'a dyn Value {
73    fn eq(&self, other: &&dyn Value) -> bool {
74        self.format().eq(&other.format())
75    }
76}
77impl<'a> Eq for &'a dyn Value {}
78
79/// Helper trait to cast a `&dyn T` to a `&dyn Any`.
80///
81/// In Rust all types that are `'static` implement [`std::any::Any`] and thus can be casted to a
82/// `&dyn Any`. But once you create a trait object, the type is erased and thus so is the
83/// implementation for [`Any`]. This trait essentially adds a types' [`Any`] impl to the vtable
84/// created when casted to trait object, if [`AsAny`] is a supertrait.
85///
86/// See [`Value`] for an example of using [`AsAny`].
87pub trait AsAny {
88    fn as_any(&self) -> &dyn Any;
89}
90
91impl<T: Any> AsAny for T {
92    fn as_any(&self) -> &dyn Any {
93        self
94    }
95}
96
97/// Helper method to extract a single value from any kind of [`VarInput`].
98fn extract_single_value(input: VarInput<'_>) -> Result<&str, VarParseError> {
99    match input {
100        VarInput::Flat(value) => Ok(value),
101        VarInput::SqlSet([value]) => Ok(value),
102        VarInput::SqlSet(values) => Err(VarParseError::InvalidParameterValue {
103            invalid_values: values.to_vec(),
104            reason: "expects a single value".into(),
105        }),
106    }
107}
108
109impl<V: Value + Clone> Value for Option<V> {
110    fn type_name() -> Cow<'static, str>
111    where
112        Self: Sized,
113    {
114        format!("optional {}", V::type_name()).into()
115    }
116
117    fn parse(input: VarInput) -> Result<Self, VarParseError>
118    where
119        Self: Sized,
120    {
121        let s = extract_single_value(input)?;
122        match s {
123            "" => Ok(None),
124            _ => <V as Value>::parse(VarInput::Flat(s)).map(Some),
125        }
126    }
127
128    fn box_clone(&self) -> Box<dyn Value> {
129        Box::new(self.clone())
130    }
131
132    fn format(&self) -> String {
133        match self {
134            Some(s) => s.format(),
135            None => "".to_string(),
136        }
137    }
138}
139
140impl Value for String {
141    fn type_name() -> Cow<'static, str>
142    where
143        Self: Sized,
144    {
145        "string".into()
146    }
147
148    fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
149    where
150        Self: Sized,
151    {
152        let s = extract_single_value(input)?;
153        Ok(s.to_string())
154    }
155
156    fn box_clone(&self) -> Box<dyn Value> {
157        Box::new(self.clone())
158    }
159
160    fn format(&self) -> String {
161        self.to_string()
162    }
163}
164
165impl Value for Cow<'static, str> {
166    fn type_name() -> Cow<'static, str>
167    where
168        Self: Sized,
169    {
170        "string".into()
171    }
172
173    fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
174    where
175        Self: Sized,
176    {
177        let s = extract_single_value(input)?;
178        Ok(s.to_string().into())
179    }
180
181    fn box_clone(&self) -> Box<dyn Value> {
182        Box::new(self.clone())
183    }
184
185    fn format(&self) -> String {
186        self.to_string()
187    }
188}
189
190impl Value for bool {
191    fn type_name() -> Cow<'static, str>
192    where
193        Self: Sized,
194    {
195        "boolean".into()
196    }
197
198    fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
199    where
200        Self: Sized,
201    {
202        let s = extract_single_value(input)?;
203        match s {
204            "t" | "true" | "on" => Ok(true),
205            "f" | "false" | "off" => Ok(false),
206            _ => Err(VarParseError::InvalidParameterType),
207        }
208    }
209
210    fn box_clone(&self) -> Box<dyn Value> {
211        Box::new(self.clone())
212    }
213
214    fn format(&self) -> String {
215        match self {
216            true => "on".into(),
217            false => "off".into(),
218        }
219    }
220}
221
222impl Value for Option<CheckedTimestamp<DateTime<Utc>>> {
223    fn type_name() -> Cow<'static, str>
224    where
225        Self: Sized,
226    {
227        "timestamptz".into()
228    }
229
230    fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
231    where
232        Self: Sized,
233    {
234        let s = extract_single_value(input)?;
235        strconv::parse_timestamptz(s)
236            .map_err(|_| VarParseError::InvalidParameterType)
237            .map(Some)
238    }
239
240    fn box_clone(&self) -> Box<dyn Value> {
241        Box::new(self.clone())
242    }
243
244    fn format(&self) -> String {
245        self.map(|t| t.to_string()).unwrap_or_default()
246    }
247}
248
249impl Value for Numeric {
250    fn type_name() -> Cow<'static, str>
251    where
252        Self: Sized,
253    {
254        "numeric".into()
255    }
256
257    fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
258    where
259        Self: Sized,
260    {
261        let s = extract_single_value(input)?;
262        s.parse::<Numeric>()
263            .map_err(|_| VarParseError::InvalidParameterType)
264    }
265
266    fn box_clone(&self) -> Box<dyn Value> {
267        Box::new(self.clone())
268    }
269
270    fn format(&self) -> String {
271        self.to_standard_notation_string()
272    }
273}
274
275const SEC_TO_MIN: u64 = 60u64;
276const SEC_TO_HOUR: u64 = 60u64 * 60;
277const SEC_TO_DAY: u64 = 60u64 * 60 * 24;
278const MICRO_TO_MILLI: u32 = 1000u32;
279
280impl Value for Duration {
281    fn type_name() -> Cow<'static, str>
282    where
283        Self: Sized,
284    {
285        "duration".into()
286    }
287
288    fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
289    where
290        Self: Sized,
291    {
292        let s = extract_single_value(input)?;
293        let s = s.trim();
294        // Find where the leading run of ASCII digits ends. `str::find` returns a
295        // byte index, so it is safe to slice with directly. We restrict to ASCII
296        // digits (rather than `char::is_numeric`) because that is exactly what
297        // `u64::parse` accepts; matching broader Unicode numerics such as '²'
298        // would yield a byte offset that can land mid-character and panic.
299        let split_pos = s.find(|p: char| !p.is_ascii_digit()).unwrap_or(s.len());
300
301        // Error if the numeric values don't parse, i.e. there aren't any.
302        let d = s[..split_pos]
303            .parse::<u64>()
304            .map_err(|_| VarParseError::InvalidParameterType)?;
305
306        // We've already trimmed end
307        let (f, m): (fn(u64) -> Duration, u64) = match s[split_pos..].trim_start() {
308            "us" => (Duration::from_micros, 1),
309            // Default unit is milliseconds
310            "ms" | "" => (Duration::from_millis, 1),
311            "s" => (Duration::from_secs, 1),
312            "min" => (Duration::from_secs, SEC_TO_MIN),
313            "h" => (Duration::from_secs, SEC_TO_HOUR),
314            "d" => (Duration::from_secs, SEC_TO_DAY),
315            o => {
316                return Err(VarParseError::InvalidParameterValue {
317                    invalid_values: vec![o.to_string()],
318                    reason: format!("expected us, ms, s, min, h, or d but got {o:?}"),
319                });
320            }
321        };
322
323        let d = f(d
324            .checked_mul(m)
325            .ok_or_else(|| VarParseError::InvalidParameterValue {
326                invalid_values: vec![s.to_string()],
327                reason: "expected value to fit in u64".into(),
328            })?);
329        Ok(d)
330    }
331
332    fn box_clone(&self) -> Box<dyn Value> {
333        Box::new(self.clone())
334    }
335
336    // The strategy for formatting these strings is to find the least
337    // significant unit of time that can be printed as an integer––we know this
338    // is always possible because the input can only be an integer of a single
339    // unit of time.
340    fn format(&self) -> String {
341        let micros = self.subsec_micros();
342        if micros > 0 {
343            match micros {
344                ms if ms != 0 && ms % MICRO_TO_MILLI == 0 => {
345                    format!(
346                        "{} ms",
347                        self.as_secs() * 1000 + u64::from(ms / MICRO_TO_MILLI)
348                    )
349                }
350                us => format!("{} us", self.as_secs() * 1_000_000 + u64::from(us)),
351            }
352        } else {
353            match self.as_secs() {
354                zero if zero == u64::MAX => "0".to_string(),
355                d if d != 0 && d % SEC_TO_DAY == 0 => format!("{} d", d / SEC_TO_DAY),
356                h if h != 0 && h % SEC_TO_HOUR == 0 => format!("{} h", h / SEC_TO_HOUR),
357                m if m != 0 && m % SEC_TO_MIN == 0 => format!("{} min", m / SEC_TO_MIN),
358                s => format!("{} s", s),
359            }
360        }
361    }
362}
363
364impl Value for serde_json::Value {
365    fn type_name() -> Cow<'static, str>
366    where
367        Self: Sized,
368    {
369        "jsonb".into()
370    }
371
372    fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
373    where
374        Self: Sized,
375    {
376        let s = extract_single_value(input)?;
377        serde_json::from_str(s).map_err(|_| VarParseError::InvalidParameterType)
378    }
379
380    fn box_clone(&self) -> Box<dyn Value> {
381        Box::new(self.clone())
382    }
383
384    fn format(&self) -> String {
385        self.to_string()
386    }
387}
388
389/// This style should actually be some more complex struct, but we only support this configuration
390/// of it, so this is fine for the time being.
391#[derive(Debug, Clone, Eq, PartialEq)]
392pub struct DateStyle(pub [&'static str; 2]);
393
394pub static DEFAULT_DATE_STYLE: DateStyle = DateStyle(["ISO", "MDY"]);
395
396impl Value for DateStyle {
397    fn type_name() -> Cow<'static, str>
398    where
399        Self: Sized,
400    {
401        "string list".into()
402    }
403
404    /// This impl is unlike most others because we have under-implemented its backing struct.
405    fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
406    where
407        Self: Sized,
408    {
409        let input = match input {
410            VarInput::Flat(v) => mz_sql_parser::parser::split_identifier_string(v)
411                .map_err(|_| VarParseError::InvalidParameterType)?,
412            // Unlike parsing `Vec<Ident>`, we further split each element.
413            // This matches PostgreSQL.
414            VarInput::SqlSet(values) => {
415                let mut out = vec![];
416                for v in values {
417                    let idents = mz_sql_parser::parser::split_identifier_string(v)
418                        .map_err(|_| VarParseError::InvalidParameterType)?;
419                    out.extend(idents)
420                }
421                out
422            }
423        };
424
425        for input in input {
426            if !DEFAULT_DATE_STYLE
427                .0
428                .iter()
429                .any(|valid| UncasedStr::new(valid) == &input)
430            {
431                return Err(VarParseError::FixedValueParameter);
432            }
433        }
434
435        Ok(DEFAULT_DATE_STYLE.clone())
436    }
437
438    fn box_clone(&self) -> Box<dyn Value> {
439        Box::new(self.clone())
440    }
441
442    fn format(&self) -> String {
443        self.0.join(", ")
444    }
445}
446
447impl Value for Vec<Ident> {
448    fn type_name() -> Cow<'static, str>
449    where
450        Self: Sized,
451    {
452        "identifier list".into()
453    }
454
455    fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
456    where
457        Self: Sized,
458    {
459        let holder;
460        let values = match input {
461            VarInput::Flat(value) => {
462                holder = mz_sql_parser::parser::split_identifier_string(value)
463                    .map_err(|_| VarParseError::InvalidParameterType)?;
464                &holder
465            }
466            // Unlike parsing `Vec<String>`, we do *not* further split each
467            // element. This matches PostgreSQL.
468            VarInput::SqlSet(values) => values,
469        };
470        let values = values
471            .iter()
472            .map(Ident::new)
473            .collect::<Result<_, _>>()
474            .map_err(|e| VarParseError::InvalidParameterValue {
475                invalid_values: values.to_vec(),
476                reason: e.to_string(),
477            })?;
478        Ok(values)
479    }
480
481    fn box_clone(&self) -> Box<dyn Value> {
482        Box::new(self.clone())
483    }
484
485    fn format(&self) -> String {
486        self.iter().map(|ident| ident.to_string()).join(", ")
487    }
488}
489
490impl Value for Vec<IpNet> {
491    fn type_name() -> Cow<'static, str>
492    where
493        Self: Sized,
494    {
495        "CIDR list".into()
496    }
497
498    fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
499    where
500        Self: Sized,
501    {
502        let values = input.to_vec();
503        let values: Vec<IpNet> = values
504            .iter()
505            .flat_map(|i| i.split(','))
506            .map(|d| IpNet::from_str(d.trim()))
507            .collect::<Result<_, _>>()
508            .map_err(|e| VarParseError::InvalidParameterValue {
509                invalid_values: values,
510                reason: e.to_string(),
511            })?;
512        Ok(values)
513    }
514
515    fn box_clone(&self) -> Box<dyn Value> {
516        Box::new(self.clone())
517    }
518
519    fn format(&self) -> String {
520        self.iter().map(|ident| ident.to_string()).join(", ")
521    }
522}
523
524impl Value for Vec<SerializableDirective> {
525    fn type_name() -> Cow<'static, str>
526    where
527        Self: Sized,
528    {
529        "directive list".into()
530    }
531
532    fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
533    where
534        Self: Sized,
535    {
536        let values = input.to_vec();
537        let dirs: Result<_, _> = values
538            .iter()
539            .flat_map(|i| i.split(','))
540            .map(|d| SerializableDirective::from_str(d.trim()))
541            .collect();
542        dirs.map_err(|e| VarParseError::InvalidParameterValue {
543            invalid_values: values.to_vec(),
544            reason: e.to_string(),
545        })
546    }
547
548    fn box_clone(&self) -> Box<dyn Value> {
549        Box::new(self.clone())
550    }
551
552    fn format(&self) -> String {
553        self.iter().map(|d| d.to_string()).join(", ")
554    }
555}
556
557// This unorthodox design lets us escape complex errors from value parsing.
558#[derive(Clone, Debug, Eq, PartialEq)]
559pub struct Failpoints;
560
561impl Value for Failpoints {
562    fn type_name() -> Cow<'static, str>
563    where
564        Self: Sized,
565    {
566        "failpoints config".into()
567    }
568
569    fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
570    where
571        Self: Sized,
572    {
573        let values = input.to_vec();
574        for mut cfg in values.iter().map(|v| v.trim().split(';')).flatten() {
575            cfg = cfg.trim();
576            if cfg.is_empty() {
577                continue;
578            }
579            let mut splits = cfg.splitn(2, '=');
580            let failpoint = splits
581                .next()
582                .ok_or_else(|| VarParseError::InvalidParameterValue {
583                    invalid_values: input.to_vec(),
584                    reason: "missing failpoint name".into(),
585                })?;
586            let action = splits
587                .next()
588                .ok_or_else(|| VarParseError::InvalidParameterValue {
589                    invalid_values: input.to_vec(),
590                    reason: "missing failpoint action".into(),
591                })?;
592            fail::cfg(failpoint, action).map_err(|e| VarParseError::InvalidParameterValue {
593                invalid_values: input.to_vec(),
594                reason: e.to_string(),
595            })?;
596        }
597
598        Ok(Failpoints)
599    }
600
601    fn box_clone(&self) -> Box<dyn Value> {
602        Box::new(self.clone())
603    }
604
605    fn format(&self) -> String {
606        "<omitted>".to_string()
607    }
608}
609
610/// Severity levels can used to be used to filter which messages get sent
611/// to a client.
612///
613/// The ordering of severity levels used for client-level filtering differs from the
614/// one used for server-side logging in two aspects: INFO messages are always sent,
615/// and the LOG severity is considered as below NOTICE, while it is above ERROR for
616/// server-side logs.
617#[derive(Clone, Copy, Debug, Eq, PartialEq)]
618pub enum ClientSeverity {
619    /// Sends only INFO, ERROR, FATAL and PANIC level messages.
620    Error,
621    /// Sends only WARNING, INFO, ERROR, FATAL and PANIC level messages.
622    Warning,
623    /// Sends only NOTICE, WARNING, INFO, ERROR, FATAL and PANIC level messages.
624    Notice,
625    /// Sends only LOG, NOTICE, WARNING, INFO, ERROR, FATAL and PANIC level messages.
626    Log,
627    /// Sends all messages to the client, since all DEBUG levels are treated as the same right now.
628    Debug1,
629    /// Sends all messages to the client, since all DEBUG levels are treated as the same right now.
630    Debug2,
631    /// Sends all messages to the client, since all DEBUG levels are treated as the same right now.
632    Debug3,
633    /// Sends all messages to the client, since all DEBUG levels are treated as the same right now.
634    Debug4,
635    /// Sends all messages to the client, since all DEBUG levels are treated as the same right now.
636    Debug5,
637    /// Sends only NOTICE, WARNING, INFO, ERROR, FATAL and PANIC level messages.
638    /// Not listed as a valid value, but accepted by Postgres
639    Info,
640}
641
642impl Serialize for ClientSeverity {
643    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
644    where
645        S: serde::Serializer,
646    {
647        serializer.serialize_str(self.as_str())
648    }
649}
650
651impl ClientSeverity {
652    fn as_str(&self) -> &'static str {
653        match self {
654            ClientSeverity::Error => "error",
655            ClientSeverity::Warning => "warning",
656            ClientSeverity::Notice => "notice",
657            ClientSeverity::Info => "info",
658            ClientSeverity::Log => "log",
659            ClientSeverity::Debug1 => "debug1",
660            ClientSeverity::Debug2 => "debug2",
661            ClientSeverity::Debug3 => "debug3",
662            ClientSeverity::Debug4 => "debug4",
663            ClientSeverity::Debug5 => "debug5",
664        }
665    }
666
667    fn valid_values() -> Vec<&'static str> {
668        // INFO left intentionally out, to match Postgres
669        vec![
670            ClientSeverity::Debug5.as_str(),
671            ClientSeverity::Debug4.as_str(),
672            ClientSeverity::Debug3.as_str(),
673            ClientSeverity::Debug2.as_str(),
674            ClientSeverity::Debug1.as_str(),
675            ClientSeverity::Log.as_str(),
676            ClientSeverity::Notice.as_str(),
677            ClientSeverity::Warning.as_str(),
678            ClientSeverity::Error.as_str(),
679        ]
680    }
681
682    /// Checks if a message of a given severity level should be sent to a client.
683    ///
684    /// The ordering of severity levels used for client-level filtering differs from the
685    /// one used for server-side logging in two aspects: INFO messages are always sent,
686    /// and the LOG severity is considered as below NOTICE, while it is above ERROR for
687    /// server-side logs.
688    ///
689    /// Postgres only considers the session setting after the client authentication
690    /// handshake is completed. Since this function is only called after client authentication
691    /// is done, we are not treating this case right now, but be aware if refactoring it.
692    pub fn should_output_to_client(&self, severity: &Severity) -> bool {
693        match (self, severity) {
694            // INFO messages are always sent
695            (_, Severity::Info) => true,
696            (ClientSeverity::Error, Severity::Error | Severity::Fatal | Severity::Panic) => true,
697            (
698                ClientSeverity::Warning,
699                Severity::Error | Severity::Fatal | Severity::Panic | Severity::Warning,
700            ) => true,
701            (
702                ClientSeverity::Notice,
703                Severity::Error
704                | Severity::Fatal
705                | Severity::Panic
706                | Severity::Warning
707                | Severity::Notice,
708            ) => true,
709            (
710                ClientSeverity::Info,
711                Severity::Error
712                | Severity::Fatal
713                | Severity::Panic
714                | Severity::Warning
715                | Severity::Notice,
716            ) => true,
717            (
718                ClientSeverity::Log,
719                Severity::Error
720                | Severity::Fatal
721                | Severity::Panic
722                | Severity::Warning
723                | Severity::Notice
724                | Severity::Log,
725            ) => true,
726            (
727                ClientSeverity::Debug1
728                | ClientSeverity::Debug2
729                | ClientSeverity::Debug3
730                | ClientSeverity::Debug4
731                | ClientSeverity::Debug5,
732                _,
733            ) => true,
734
735            (
736                ClientSeverity::Error,
737                Severity::Warning | Severity::Notice | Severity::Log | Severity::Debug,
738            ) => false,
739            (ClientSeverity::Warning, Severity::Notice | Severity::Log | Severity::Debug) => false,
740            (ClientSeverity::Notice, Severity::Log | Severity::Debug) => false,
741            (ClientSeverity::Info, Severity::Log | Severity::Debug) => false,
742            (ClientSeverity::Log, Severity::Debug) => false,
743        }
744    }
745}
746
747impl Value for ClientSeverity {
748    fn type_name() -> Cow<'static, str>
749    where
750        Self: Sized,
751    {
752        "string".into()
753    }
754
755    fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
756    where
757        Self: Sized,
758    {
759        let s = extract_single_value(input)?;
760        let s = UncasedStr::new(s);
761
762        if s == ClientSeverity::Error.as_str() {
763            Ok(ClientSeverity::Error)
764        } else if s == ClientSeverity::Warning.as_str() {
765            Ok(ClientSeverity::Warning)
766        } else if s == ClientSeverity::Notice.as_str() {
767            Ok(ClientSeverity::Notice)
768        } else if s == ClientSeverity::Info.as_str() {
769            Ok(ClientSeverity::Info)
770        } else if s == ClientSeverity::Log.as_str() {
771            Ok(ClientSeverity::Log)
772        } else if s == ClientSeverity::Debug1.as_str() {
773            Ok(ClientSeverity::Debug1)
774        // Postgres treats `debug` as an input as equivalent to `debug2`
775        } else if s == ClientSeverity::Debug2.as_str() || s == "debug" {
776            Ok(ClientSeverity::Debug2)
777        } else if s == ClientSeverity::Debug3.as_str() {
778            Ok(ClientSeverity::Debug3)
779        } else if s == ClientSeverity::Debug4.as_str() {
780            Ok(ClientSeverity::Debug4)
781        } else if s == ClientSeverity::Debug5.as_str() {
782            Ok(ClientSeverity::Debug5)
783        } else {
784            Err(VarParseError::ConstrainedParameter {
785                invalid_values: input.to_vec(),
786                valid_values: Some(ClientSeverity::valid_values()),
787            })
788        }
789    }
790
791    fn box_clone(&self) -> Box<dyn Value> {
792        Box::new(self.clone())
793    }
794
795    fn format(&self) -> String {
796        self.as_str().into()
797    }
798}
799
800/// List of valid time zones.
801///
802/// Names are following the tz database, but only time zones equivalent
803/// to UTC±00:00 are supported.
804#[derive(Clone, Copy, Debug, Eq, PartialEq)]
805pub enum TimeZone {
806    /// UTC
807    UTC,
808    /// GMT
809    GMT,
810    /// Fixed offset from UTC, currently only "+00:00" is supported.
811    /// A string representation is kept here for compatibility with Postgres.
812    FixedOffset(&'static str),
813}
814
815impl TimeZone {
816    fn as_str(&self) -> &'static str {
817        match self {
818            TimeZone::UTC => "UTC",
819            TimeZone::GMT => "GMT",
820            TimeZone::FixedOffset(s) => s,
821        }
822    }
823}
824
825impl Value for TimeZone {
826    fn type_name() -> Cow<'static, str>
827    where
828        Self: Sized,
829    {
830        // TODO(parkmycar): It seems like we should change this?
831        "string".into()
832    }
833
834    fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
835    where
836        Self: Sized,
837    {
838        let s = extract_single_value(input)?;
839        let s = UncasedStr::new(s);
840
841        if s == TimeZone::UTC.as_str() {
842            Ok(TimeZone::UTC)
843        } else if s == TimeZone::GMT.as_str() {
844            Ok(TimeZone::GMT)
845        } else if s == "+00:00" {
846            Ok(TimeZone::FixedOffset("+00:00"))
847        } else {
848            Err(VarParseError::ConstrainedParameter {
849                invalid_values: input.to_vec(),
850                valid_values: None,
851            })
852        }
853    }
854
855    fn box_clone(&self) -> Box<dyn Value> {
856        Box::new(self.clone())
857    }
858
859    fn format(&self) -> String {
860        self.as_str().into()
861    }
862}
863
864/// List of valid isolation levels.
865#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
866pub enum IsolationLevel {
867    ReadUncommitted,
868    ReadCommitted,
869    RepeatableRead,
870    Serializable,
871    /* TODO(jkosh44) Move this comment to user facing docs when this isolation level becomes available to users.
872     * The Strong Session Serializable isolation level combines the Serializable isolation level
873     * (https://jepsen.io/consistency/models/serializable) with the Sequential consistency model
874     * (https://jepsen.io/consistency/models/sequential). See
875     * http://dbmsmusings.blogspot.com/2019/06/correctness-anomalies-under.html and
876     * https://cs.uwaterloo.ca/~kmsalem/pubs/DaudjeeICDE04.pdf. Operations within a single session
877     * are linearizable, but operations across sessions are not linearizable.
878     *
879     * Operations in sessions that use Strong Session Serializable are not linearizable with
880     * operations in sessions that use Strict Serializable. For example, consider the following
881     * sequence of events in order:
882     *
883     *   1. Session s0 executes read at timestamp t0 under Strong Session Serializable.
884     *   2. Session s1 executes read at timestamp t1 under Strict Serializable.
885     *
886     * If t0 > t1, then this is not considered a consistency violation. This matches with the
887     * semantics of Serializable, which can execute queries arbitrarily in the future without
888     * violating the consistency of Strict Serializable queries.
889     *
890     * All operations within a session that use Strong Session Serializable are only
891     * linearizable within operations within the same session that also use Strong Session
892     * Serializable. For example, consider the following sequence of events in order:
893     *
894     *   1. Session s0 executes read at timestamp t0 under Strong Session Serializable.
895     *   2. Session s0 executes read at timestamp t1 under I.
896     *
897     * If I is Strong Session Serializable then t0 > t1 is guaranteed. If I is any other isolation
898     * level then t0 < t1 is not considered a consistency violation. This matches the semantics of
899     * Serializable, which can execute queries arbitrarily in the future without violating the
900     * consistency of Strict Serializable queries within the same session.
901     *
902     * The items left TODO before this is considered ready for prod are:
903     *
904     * - Add more tests.
905     * - Linearize writes to system tables under this isolation (most of these are the side effect
906     *   of some DDL).
907     */
908    StrongSessionSerializable,
909    StrictSerializable,
910    /// Bounded staleness — pick `T` such that `T >= now_ms - D` and the
911    /// query does not wait on input frontiers. Errors if no such `T` exists
912    /// in the no-wait window. See
913    /// `doc/developer/design/20260429_bounded_staleness_isolation.md`.
914    BoundedStaleness(std::time::Duration),
915}
916
917impl IsolationLevel {
918    const READ_UNCOMMITTED: &'static str = "read uncommitted";
919    const READ_COMMITTED: &'static str = "read committed";
920    const REPEATABLE_READ: &'static str = "repeatable read";
921    const SERIALIZABLE: &'static str = "serializable";
922    const STRONG_SESSION_SERIALIZABLE: &'static str = "strong session serializable";
923    const STRICT_SERIALIZABLE: &'static str = "strict serializable";
924    const BOUNDED_STALENESS: &'static str = "bounded staleness";
925    const BOUNDED_STALENESS_HINT: &'static str = "bounded staleness <duration>";
926
927    /// Returns true if the isolation level is of bounded staleness.
928    pub fn is_bounded_staleness(&self) -> bool {
929        matches!(self, Self::BoundedStaleness(_))
930    }
931
932    /// Unit-cardinality variant identifier, suitable for Prometheus labels and
933    /// equality checks against parsed input. The duration on `BoundedStaleness`
934    /// is omitted; use [`fmt::Display`] for the user-facing rendering.
935    pub fn as_variant_str(&self) -> &'static str {
936        match self {
937            Self::ReadUncommitted => Self::READ_UNCOMMITTED,
938            Self::ReadCommitted => Self::READ_COMMITTED,
939            Self::RepeatableRead => Self::REPEATABLE_READ,
940            Self::Serializable => Self::SERIALIZABLE,
941            Self::StrongSessionSerializable => Self::STRONG_SESSION_SERIALIZABLE,
942            Self::StrictSerializable => Self::STRICT_SERIALIZABLE,
943            Self::BoundedStaleness(_) => Self::BOUNDED_STALENESS,
944        }
945    }
946
947    fn valid_values() -> Vec<&'static str> {
948        vec![
949            Self::READ_UNCOMMITTED,
950            Self::READ_COMMITTED,
951            Self::REPEATABLE_READ,
952            Self::SERIALIZABLE,
953            // TODO(jkosh44) Add STRONG_SESSION_SERIALIZABLE when it becomes available to users.
954            Self::STRICT_SERIALIZABLE,
955            Self::BOUNDED_STALENESS_HINT,
956        ]
957    }
958}
959
960impl fmt::Display for IsolationLevel {
961    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
962        match self {
963            Self::BoundedStaleness(d) => write!(
964                f,
965                "{} {}",
966                Self::BOUNDED_STALENESS,
967                humantime::format_duration(*d)
968            ),
969            other => f.write_str(other.as_variant_str()),
970        }
971    }
972}
973
974impl Value for IsolationLevel {
975    fn type_name() -> Cow<'static, str>
976    where
977        Self: Sized,
978    {
979        "string".into()
980    }
981
982    fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
983    where
984        Self: Sized,
985    {
986        let invalid = || VarParseError::ConstrainedParameter {
987            invalid_values: input.to_vec(),
988            valid_values: Some(IsolationLevel::valid_values()),
989        };
990
991        let s = extract_single_value(input)?;
992        let lower = s.to_ascii_lowercase();
993        let lower = lower.trim();
994
995        match lower {
996            // Weak isolations all upgrade to Serializable.
997            Self::READ_UNCOMMITTED
998            | Self::READ_COMMITTED
999            | Self::REPEATABLE_READ
1000            | Self::SERIALIZABLE => Ok(Self::Serializable),
1001            Self::STRONG_SESSION_SERIALIZABLE => Ok(Self::StrongSessionSerializable),
1002            Self::STRICT_SERIALIZABLE => Ok(Self::StrictSerializable),
1003            other => {
1004                let rest = other
1005                    .strip_prefix(Self::BOUNDED_STALENESS)
1006                    .ok_or_else(invalid)?;
1007                // Require whitespace between the keyword and the duration, so
1008                // e.g. `bounded staleness5s` is rejected.
1009                if !rest.starts_with(char::is_whitespace) {
1010                    return Err(invalid());
1011                }
1012                let rest = rest.trim();
1013                if rest.is_empty() {
1014                    return Err(invalid());
1015                }
1016                let d = humantime::parse_duration(rest).map_err(|_| invalid())?;
1017                // Reject anything below 1ms: downstream we truncate to whole
1018                // milliseconds, so e.g. `999us` would silently behave like
1019                // `0ms` and degenerate to "no staleness allowed", which is
1020                // exactly the case the zero-rejection meant to forbid.
1021                if d < std::time::Duration::from_millis(1) {
1022                    return Err(invalid());
1023                }
1024                Ok(Self::BoundedStaleness(d))
1025            }
1026        }
1027    }
1028
1029    fn box_clone(&self) -> Box<dyn Value> {
1030        Box::new(self.clone())
1031    }
1032
1033    fn format(&self) -> String {
1034        self.to_string()
1035    }
1036}
1037
1038impl From<TransactionIsolationLevel> for IsolationLevel {
1039    fn from(transaction_isolation_level: TransactionIsolationLevel) -> Self {
1040        match transaction_isolation_level {
1041            TransactionIsolationLevel::ReadUncommitted => Self::ReadUncommitted,
1042            TransactionIsolationLevel::ReadCommitted => Self::ReadCommitted,
1043            TransactionIsolationLevel::RepeatableRead => Self::RepeatableRead,
1044            TransactionIsolationLevel::Serializable => Self::Serializable,
1045            TransactionIsolationLevel::StrongSessionSerializable => Self::StrongSessionSerializable,
1046            TransactionIsolationLevel::StrictSerializable => Self::StrictSerializable,
1047        }
1048    }
1049}
1050
1051impl Value for CloneableEnvFilter {
1052    fn type_name() -> Cow<'static, str>
1053    where
1054        Self: Sized,
1055    {
1056        "EnvFilter".into()
1057    }
1058
1059    fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
1060    where
1061        Self: Sized,
1062    {
1063        let s = extract_single_value(input)?;
1064        CloneableEnvFilter::from_str(s).map_err(|e| VarParseError::InvalidParameterValue {
1065            invalid_values: vec![s.to_string()],
1066            reason: e.to_string(),
1067        })
1068    }
1069
1070    fn box_clone(&self) -> Box<dyn Value> {
1071        Box::new(self.clone())
1072    }
1073
1074    fn format(&self) -> String {
1075        self.to_string()
1076    }
1077}
1078
1079#[derive(Clone, Copy, PartialEq, Eq, Debug)]
1080pub enum ClientEncoding {
1081    Utf8,
1082}
1083
1084impl ClientEncoding {
1085    fn as_str(&self) -> &'static str {
1086        match self {
1087            ClientEncoding::Utf8 => "UTF8",
1088        }
1089    }
1090
1091    fn valid_values() -> Vec<&'static str> {
1092        vec![ClientEncoding::Utf8.as_str()]
1093    }
1094}
1095
1096impl Value for ClientEncoding {
1097    fn type_name() -> Cow<'static, str>
1098    where
1099        Self: Sized,
1100    {
1101        "string".into()
1102    }
1103
1104    fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
1105    where
1106        Self: Sized,
1107    {
1108        let s = extract_single_value(input)?;
1109        let s = UncasedStr::new(s);
1110        if s == Self::Utf8.as_str() {
1111            Ok(Self::Utf8)
1112        } else {
1113            Err(VarParseError::ConstrainedParameter {
1114                invalid_values: vec![s.to_string()],
1115                valid_values: Some(ClientEncoding::valid_values()),
1116            })
1117        }
1118    }
1119
1120    fn box_clone(&self) -> Box<dyn Value> {
1121        Box::new(self.clone())
1122    }
1123
1124    fn format(&self) -> String {
1125        self.as_str().to_string()
1126    }
1127}
1128
1129#[derive(Clone, Copy, PartialEq, Eq, Debug)]
1130pub enum IntervalStyle {
1131    Postgres,
1132}
1133
1134impl IntervalStyle {
1135    fn as_str(&self) -> &'static str {
1136        match self {
1137            IntervalStyle::Postgres => "postgres",
1138        }
1139    }
1140
1141    fn valid_values() -> Vec<&'static str> {
1142        vec![IntervalStyle::Postgres.as_str()]
1143    }
1144}
1145
1146impl Value for IntervalStyle {
1147    fn type_name() -> Cow<'static, str>
1148    where
1149        Self: Sized,
1150    {
1151        "string".into()
1152    }
1153
1154    fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
1155    where
1156        Self: Sized,
1157    {
1158        let s = extract_single_value(input)?;
1159        let s = UncasedStr::new(s);
1160        if s == Self::Postgres.as_str() {
1161            Ok(Self::Postgres)
1162        } else {
1163            Err(VarParseError::ConstrainedParameter {
1164                invalid_values: vec![s.to_string()],
1165                valid_values: Some(IntervalStyle::valid_values()),
1166            })
1167        }
1168    }
1169
1170    fn box_clone(&self) -> Box<dyn Value> {
1171        Box::new(self.clone())
1172    }
1173
1174    fn format(&self) -> String {
1175        self.as_str().to_string()
1176    }
1177}
1178
1179/// Macro to implement [`Value`] for simpler types, i.e. ones that already implement `FromStr` and
1180/// `ToString`.
1181///
1182/// Note: Macros can be hot garbage, if at any point folks think this is too complicated please
1183/// feel free to refactor!
1184macro_rules! impl_value_for_simple {
1185    ($t: ty, $name: literal) => {
1186        impl Value for $t {
1187            fn type_name() -> Cow<'static, str>
1188            where
1189                Self: Sized,
1190            {
1191                $name.into()
1192            }
1193
1194            fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
1195            where
1196                Self: Sized,
1197            {
1198                let s = extract_single_value(input)?;
1199                s.parse::<Self>()
1200                    .map_err(|_| VarParseError::InvalidParameterType)
1201            }
1202
1203            fn box_clone(&self) -> Box<dyn Value> {
1204                Box::new(self.clone())
1205            }
1206
1207            fn format(&self) -> String {
1208                self.to_string()
1209            }
1210        }
1211    };
1212}
1213
1214impl_value_for_simple!(i32, "integer");
1215impl_value_for_simple!(u32, "unsigned integer");
1216impl_value_for_simple!(u64, "64-bit unsigned integer");
1217impl_value_for_simple!(usize, "unsigned integer");
1218impl_value_for_simple!(f64, "double-precision floating-point number");
1219
1220impl_value_for_simple!(NonZeroU32, "unsigned integer");
1221
1222impl_value_for_simple!(mz_repr::Timestamp, "mz-timestamp");
1223impl_value_for_simple!(mz_repr::bytes::ByteSize, "bytes");
1224impl_value_for_simple!(CompactionStyle, "rocksdb_compaction_style");
1225impl_value_for_simple!(CompressionType, "rocksdb_compression_type");
1226
1227#[cfg(test)]
1228mod tests {
1229    use mz_ore::assert_err;
1230
1231    use super::*;
1232
1233    #[mz_ore::test]
1234    fn test_value_duration() {
1235        fn inner(t: &'static str, e: Duration, expected_format: Option<&'static str>) {
1236            let d = Duration::parse(VarInput::Flat(t)).expect("invalid duration");
1237            assert_eq!(d, e);
1238            let mut d_format = d.format();
1239            d_format.retain(|c| !c.is_whitespace());
1240            if let Some(expected) = expected_format {
1241                assert_eq!(d_format, expected);
1242            } else {
1243                assert_eq!(
1244                    t.chars().filter(|c| !c.is_whitespace()).collect::<String>(),
1245                    d_format
1246                )
1247            }
1248        }
1249        inner("1", Duration::from_millis(1), Some("1ms"));
1250        inner("0", Duration::from_secs(0), Some("0s"));
1251        inner("1ms", Duration::from_millis(1), None);
1252        inner("1000ms", Duration::from_millis(1000), Some("1s"));
1253        inner("1001ms", Duration::from_millis(1001), None);
1254        inner("1us", Duration::from_micros(1), None);
1255        inner("1000us", Duration::from_micros(1000), Some("1ms"));
1256        inner("1s", Duration::from_secs(1), None);
1257        inner("60s", Duration::from_secs(60), Some("1min"));
1258        inner("3600s", Duration::from_secs(3600), Some("1h"));
1259        inner("3660s", Duration::from_secs(3660), Some("61min"));
1260        inner("1min", Duration::from_secs(1 * SEC_TO_MIN), None);
1261        inner("60min", Duration::from_secs(60 * SEC_TO_MIN), Some("1h"));
1262        inner("1h", Duration::from_secs(1 * SEC_TO_HOUR), None);
1263        inner("24h", Duration::from_secs(24 * SEC_TO_HOUR), Some("1d"));
1264        inner("1d", Duration::from_secs(1 * SEC_TO_DAY), None);
1265        inner("2d", Duration::from_secs(2 * SEC_TO_DAY), None);
1266        inner("  1   s ", Duration::from_secs(1), None);
1267        inner("1s ", Duration::from_secs(1), None);
1268        inner("   1s", Duration::from_secs(1), None);
1269        inner("0d", Duration::from_secs(0), Some("0s"));
1270        inner(
1271            "18446744073709551615",
1272            Duration::from_millis(u64::MAX),
1273            Some("18446744073709551615ms"),
1274        );
1275        inner(
1276            "18446744073709551615 s",
1277            Duration::from_secs(u64::MAX),
1278            Some("0"),
1279        );
1280
1281        fn errs(t: &'static str) {
1282            assert_err!(Duration::parse(VarInput::Flat(t)));
1283        }
1284        errs("1 m");
1285        errs("1 sec");
1286        errs("1 min 1 s");
1287        errs("1m1s");
1288        errs("1.1");
1289        errs("1.1 min");
1290        errs("-1 s");
1291        errs("");
1292        errs("   ");
1293        errs("x");
1294        errs("s");
1295        errs("18446744073709551615 min");
1296        // Unicode numerics such as '²' (U+00B2) are multi-byte and must not be
1297        // treated as parseable digits: their char index differs from their byte
1298        // offset, so slicing on them would land mid-character and panic.
1299        errs("²");
1300        errs("1²ms");
1301        errs("½");
1302        errs("1ms");
1303    }
1304
1305    #[mz_ore::test]
1306    fn test_should_output_to_client() {
1307        #[rustfmt::skip]
1308        let test_cases = [
1309            (ClientSeverity::Debug1, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
1310            (ClientSeverity::Debug2, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
1311            (ClientSeverity::Debug3, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
1312            (ClientSeverity::Debug4, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
1313            (ClientSeverity::Debug5, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
1314            (ClientSeverity::Log, vec![Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
1315            (ClientSeverity::Log, vec![Severity::Debug], false),
1316            (ClientSeverity::Info, vec![Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
1317            (ClientSeverity::Info, vec![Severity::Debug, Severity::Log], false),
1318            (ClientSeverity::Notice, vec![Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
1319            (ClientSeverity::Notice, vec![Severity::Debug, Severity::Log], false),
1320            (ClientSeverity::Warning, vec![Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
1321            (ClientSeverity::Warning, vec![Severity::Debug, Severity::Log, Severity::Notice], false),
1322            (ClientSeverity::Error, vec![Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
1323            (ClientSeverity::Error, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning], false),
1324        ];
1325
1326        for test_case in test_cases {
1327            run_test(test_case)
1328        }
1329
1330        fn run_test(test_case: (ClientSeverity, Vec<Severity>, bool)) {
1331            let client_min_messages_setting = test_case.0;
1332            let expected = test_case.2;
1333            for message_severity in test_case.1 {
1334                assert!(
1335                    client_min_messages_setting.should_output_to_client(&message_severity)
1336                        == expected
1337                )
1338            }
1339        }
1340    }
1341}
1342
1343#[cfg(test)]
1344mod bounded_staleness_tests {
1345    use super::*;
1346    use std::time::Duration;
1347
1348    fn parse_iso(s: &str) -> Result<IsolationLevel, VarParseError> {
1349        IsolationLevel::parse(VarInput::Flat(s))
1350    }
1351
1352    #[mz_ore::test]
1353    fn parses_bounded_staleness() {
1354        assert_eq!(
1355            parse_iso("bounded staleness 5s").unwrap(),
1356            IsolationLevel::BoundedStaleness(Duration::from_secs(5))
1357        );
1358        assert_eq!(
1359            parse_iso("bounded staleness 500ms").unwrap(),
1360            IsolationLevel::BoundedStaleness(Duration::from_millis(500))
1361        );
1362        // Upper-case input normalises.
1363        assert_eq!(
1364            parse_iso("BOUNDED STALENESS 5s").unwrap(),
1365            IsolationLevel::BoundedStaleness(Duration::from_secs(5))
1366        );
1367        // Leading/trailing whitespace tolerated.
1368        assert_eq!(
1369            parse_iso("  bounded staleness 5s  ").unwrap(),
1370            IsolationLevel::BoundedStaleness(Duration::from_secs(5))
1371        );
1372    }
1373
1374    #[mz_ore::test]
1375    fn rejects_zero_duration() {
1376        assert!(parse_iso("bounded staleness 0s").is_err());
1377        assert!(parse_iso("bounded staleness 0ms").is_err());
1378    }
1379
1380    #[mz_ore::test]
1381    fn rejects_sub_millisecond_duration() {
1382        // Truncated to 0ms downstream; treated as the same degenerate case as
1383        // a literal zero.
1384        assert!(parse_iso("bounded staleness 1us").is_err());
1385        assert!(parse_iso("bounded staleness 999us").is_err());
1386        assert!(parse_iso("bounded staleness 999ns").is_err());
1387    }
1388
1389    #[mz_ore::test]
1390    fn parses_multi_component_duration() {
1391        // `1m30s` round-trips through humantime which formats as `1m 30s`,
1392        // so the parser must accept the space-separated form.
1393        let lvl = parse_iso("bounded staleness 1m30s").unwrap();
1394        assert_eq!(
1395            lvl,
1396            IsolationLevel::BoundedStaleness(Duration::from_secs(90))
1397        );
1398        let formatted = lvl.format();
1399        assert_eq!(parse_iso(&formatted).unwrap(), lvl);
1400    }
1401
1402    #[mz_ore::test]
1403    fn rejects_unparseable_duration() {
1404        assert!(parse_iso("bounded staleness banana").is_err());
1405        assert!(parse_iso("bounded staleness").is_err());
1406        // Whitespace between the keyword and the duration is required.
1407        assert!(parse_iso("bounded staleness5s").is_err());
1408    }
1409
1410    #[mz_ore::test]
1411    fn round_trips_format() {
1412        let lvl = IsolationLevel::BoundedStaleness(Duration::from_secs(5));
1413        assert_eq!(lvl.format(), "bounded staleness 5s");
1414        let parsed = parse_iso(&lvl.format()).unwrap();
1415        assert_eq!(parsed, lvl);
1416    }
1417
1418    #[mz_ore::test]
1419    fn accepts_long_durations() {
1420        // No upper cap; a multi-hour bound is a perfectly valid (if loose)
1421        // staleness contract.
1422        assert!(parse_iso("bounded staleness 1h").is_ok());
1423        assert!(parse_iso("bounded staleness 24h").is_ok());
1424    }
1425}