1use std::any::Any;
11use std::borrow::Cow;
12use std::fmt::{self, Debug};
13use std::num::NonZeroU32;
14use std::str::FromStr;
15use std::time::Duration;
16
17use chrono::{DateTime, Utc};
18use ipnet::IpNet;
19use itertools::Itertools;
20use mz_pgwire_common::Severity;
21use mz_repr::adt::numeric::Numeric;
22use mz_repr::adt::timestamp::CheckedTimestamp;
23use mz_repr::strconv;
24use mz_rocksdb_types::config::{CompactionStyle, CompressionType};
25use mz_sql_parser::ast::{Ident, TransactionIsolationLevel};
26use mz_tracing::{CloneableEnvFilter, SerializableDirective};
27use serde::{Deserialize, Serialize};
28use uncased::UncasedStr;
29
30use super::VarInput;
31use super::errors::VarParseError;
32
33pub trait Value: Any + AsAny + Debug + Send + Sync {
39 fn type_name() -> Cow<'static, str>
40 where
41 Self: Sized;
42
43 fn parse(input: VarInput) -> Result<Self, VarParseError>
44 where
45 Self: Sized;
46
47 fn format(&self) -> String;
48
49 fn box_clone(&self) -> Box<dyn Value>;
50
51 fn parse_dyn_value(input: VarInput) -> Result<Box<dyn Value>, VarParseError>
53 where
54 Self: Sized,
55 {
56 Self::parse(input).map(|val| {
57 let dyn_val: Box<dyn Value> = Box::new(val);
58 dyn_val
59 })
60 }
61}
62
63impl PartialEq for Box<dyn Value> {
66 fn eq(&self, other: &Box<dyn Value>) -> bool {
67 self.format().eq(&other.format())
68 }
69}
70impl Eq for Box<dyn Value> {}
71
72impl<'a> PartialEq for &'a dyn Value {
73 fn eq(&self, other: &&dyn Value) -> bool {
74 self.format().eq(&other.format())
75 }
76}
77impl<'a> Eq for &'a dyn Value {}
78
79pub trait AsAny {
88 fn as_any(&self) -> &dyn Any;
89}
90
91impl<T: Any> AsAny for T {
92 fn as_any(&self) -> &dyn Any {
93 self
94 }
95}
96
97fn extract_single_value(input: VarInput<'_>) -> Result<&str, VarParseError> {
99 match input {
100 VarInput::Flat(value) => Ok(value),
101 VarInput::SqlSet([value]) => Ok(value),
102 VarInput::SqlSet(values) => Err(VarParseError::InvalidParameterValue {
103 invalid_values: values.to_vec(),
104 reason: "expects a single value".into(),
105 }),
106 }
107}
108
109impl<V: Value + Clone> Value for Option<V> {
110 fn type_name() -> Cow<'static, str>
111 where
112 Self: Sized,
113 {
114 format!("optional {}", V::type_name()).into()
115 }
116
117 fn parse(input: VarInput) -> Result<Self, VarParseError>
118 where
119 Self: Sized,
120 {
121 let s = extract_single_value(input)?;
122 match s {
123 "" => Ok(None),
124 _ => <V as Value>::parse(VarInput::Flat(s)).map(Some),
125 }
126 }
127
128 fn box_clone(&self) -> Box<dyn Value> {
129 Box::new(self.clone())
130 }
131
132 fn format(&self) -> String {
133 match self {
134 Some(s) => s.format(),
135 None => "".to_string(),
136 }
137 }
138}
139
140impl Value for String {
141 fn type_name() -> Cow<'static, str>
142 where
143 Self: Sized,
144 {
145 "string".into()
146 }
147
148 fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
149 where
150 Self: Sized,
151 {
152 let s = extract_single_value(input)?;
153 Ok(s.to_string())
154 }
155
156 fn box_clone(&self) -> Box<dyn Value> {
157 Box::new(self.clone())
158 }
159
160 fn format(&self) -> String {
161 self.to_string()
162 }
163}
164
165impl Value for Cow<'static, str> {
166 fn type_name() -> Cow<'static, str>
167 where
168 Self: Sized,
169 {
170 "string".into()
171 }
172
173 fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
174 where
175 Self: Sized,
176 {
177 let s = extract_single_value(input)?;
178 Ok(s.to_string().into())
179 }
180
181 fn box_clone(&self) -> Box<dyn Value> {
182 Box::new(self.clone())
183 }
184
185 fn format(&self) -> String {
186 self.to_string()
187 }
188}
189
190impl Value for bool {
191 fn type_name() -> Cow<'static, str>
192 where
193 Self: Sized,
194 {
195 "boolean".into()
196 }
197
198 fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
199 where
200 Self: Sized,
201 {
202 let s = extract_single_value(input)?;
203 match s {
204 "t" | "true" | "on" => Ok(true),
205 "f" | "false" | "off" => Ok(false),
206 _ => Err(VarParseError::InvalidParameterType),
207 }
208 }
209
210 fn box_clone(&self) -> Box<dyn Value> {
211 Box::new(self.clone())
212 }
213
214 fn format(&self) -> String {
215 match self {
216 true => "on".into(),
217 false => "off".into(),
218 }
219 }
220}
221
222impl Value for Option<CheckedTimestamp<DateTime<Utc>>> {
223 fn type_name() -> Cow<'static, str>
224 where
225 Self: Sized,
226 {
227 "timestamptz".into()
228 }
229
230 fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
231 where
232 Self: Sized,
233 {
234 let s = extract_single_value(input)?;
235 strconv::parse_timestamptz(s)
236 .map_err(|_| VarParseError::InvalidParameterType)
237 .map(Some)
238 }
239
240 fn box_clone(&self) -> Box<dyn Value> {
241 Box::new(self.clone())
242 }
243
244 fn format(&self) -> String {
245 self.map(|t| t.to_string()).unwrap_or_default()
246 }
247}
248
249impl Value for Numeric {
250 fn type_name() -> Cow<'static, str>
251 where
252 Self: Sized,
253 {
254 "numeric".into()
255 }
256
257 fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
258 where
259 Self: Sized,
260 {
261 let s = extract_single_value(input)?;
262 s.parse::<Numeric>()
263 .map_err(|_| VarParseError::InvalidParameterType)
264 }
265
266 fn box_clone(&self) -> Box<dyn Value> {
267 Box::new(self.clone())
268 }
269
270 fn format(&self) -> String {
271 self.to_standard_notation_string()
272 }
273}
274
275const SEC_TO_MIN: u64 = 60u64;
276const SEC_TO_HOUR: u64 = 60u64 * 60;
277const SEC_TO_DAY: u64 = 60u64 * 60 * 24;
278const MICRO_TO_MILLI: u32 = 1000u32;
279
280impl Value for Duration {
281 fn type_name() -> Cow<'static, str>
282 where
283 Self: Sized,
284 {
285 "duration".into()
286 }
287
288 fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
289 where
290 Self: Sized,
291 {
292 let s = extract_single_value(input)?;
293 let s = s.trim();
294 let split_pos = s.find(|p: char| !p.is_ascii_digit()).unwrap_or(s.len());
300
301 let d = s[..split_pos]
303 .parse::<u64>()
304 .map_err(|_| VarParseError::InvalidParameterType)?;
305
306 let (f, m): (fn(u64) -> Duration, u64) = match s[split_pos..].trim_start() {
308 "us" => (Duration::from_micros, 1),
309 "ms" | "" => (Duration::from_millis, 1),
311 "s" => (Duration::from_secs, 1),
312 "min" => (Duration::from_secs, SEC_TO_MIN),
313 "h" => (Duration::from_secs, SEC_TO_HOUR),
314 "d" => (Duration::from_secs, SEC_TO_DAY),
315 o => {
316 return Err(VarParseError::InvalidParameterValue {
317 invalid_values: vec![o.to_string()],
318 reason: format!("expected us, ms, s, min, h, or d but got {o:?}"),
319 });
320 }
321 };
322
323 let d = f(d
324 .checked_mul(m)
325 .ok_or_else(|| VarParseError::InvalidParameterValue {
326 invalid_values: vec![s.to_string()],
327 reason: "expected value to fit in u64".into(),
328 })?);
329 Ok(d)
330 }
331
332 fn box_clone(&self) -> Box<dyn Value> {
333 Box::new(self.clone())
334 }
335
336 fn format(&self) -> String {
341 let micros = self.subsec_micros();
342 if micros > 0 {
343 match micros {
344 ms if ms != 0 && ms % MICRO_TO_MILLI == 0 => {
345 format!(
346 "{} ms",
347 self.as_secs() * 1000 + u64::from(ms / MICRO_TO_MILLI)
348 )
349 }
350 us => format!("{} us", self.as_secs() * 1_000_000 + u64::from(us)),
351 }
352 } else {
353 match self.as_secs() {
354 zero if zero == u64::MAX => "0".to_string(),
355 d if d != 0 && d % SEC_TO_DAY == 0 => format!("{} d", d / SEC_TO_DAY),
356 h if h != 0 && h % SEC_TO_HOUR == 0 => format!("{} h", h / SEC_TO_HOUR),
357 m if m != 0 && m % SEC_TO_MIN == 0 => format!("{} min", m / SEC_TO_MIN),
358 s => format!("{} s", s),
359 }
360 }
361 }
362}
363
364impl Value for serde_json::Value {
365 fn type_name() -> Cow<'static, str>
366 where
367 Self: Sized,
368 {
369 "jsonb".into()
370 }
371
372 fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
373 where
374 Self: Sized,
375 {
376 let s = extract_single_value(input)?;
377 serde_json::from_str(s).map_err(|_| VarParseError::InvalidParameterType)
378 }
379
380 fn box_clone(&self) -> Box<dyn Value> {
381 Box::new(self.clone())
382 }
383
384 fn format(&self) -> String {
385 self.to_string()
386 }
387}
388
389#[derive(Debug, Clone, Eq, PartialEq)]
392pub struct DateStyle(pub [&'static str; 2]);
393
394pub static DEFAULT_DATE_STYLE: DateStyle = DateStyle(["ISO", "MDY"]);
395
396impl Value for DateStyle {
397 fn type_name() -> Cow<'static, str>
398 where
399 Self: Sized,
400 {
401 "string list".into()
402 }
403
404 fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
406 where
407 Self: Sized,
408 {
409 let input = match input {
410 VarInput::Flat(v) => mz_sql_parser::parser::split_identifier_string(v)
411 .map_err(|_| VarParseError::InvalidParameterType)?,
412 VarInput::SqlSet(values) => {
415 let mut out = vec![];
416 for v in values {
417 let idents = mz_sql_parser::parser::split_identifier_string(v)
418 .map_err(|_| VarParseError::InvalidParameterType)?;
419 out.extend(idents)
420 }
421 out
422 }
423 };
424
425 for input in input {
426 if !DEFAULT_DATE_STYLE
427 .0
428 .iter()
429 .any(|valid| UncasedStr::new(valid) == &input)
430 {
431 return Err(VarParseError::FixedValueParameter);
432 }
433 }
434
435 Ok(DEFAULT_DATE_STYLE.clone())
436 }
437
438 fn box_clone(&self) -> Box<dyn Value> {
439 Box::new(self.clone())
440 }
441
442 fn format(&self) -> String {
443 self.0.join(", ")
444 }
445}
446
447impl Value for Vec<Ident> {
448 fn type_name() -> Cow<'static, str>
449 where
450 Self: Sized,
451 {
452 "identifier list".into()
453 }
454
455 fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
456 where
457 Self: Sized,
458 {
459 let holder;
460 let values = match input {
461 VarInput::Flat(value) => {
462 holder = mz_sql_parser::parser::split_identifier_string(value)
463 .map_err(|_| VarParseError::InvalidParameterType)?;
464 &holder
465 }
466 VarInput::SqlSet(values) => values,
469 };
470 let values = values
471 .iter()
472 .map(Ident::new)
473 .collect::<Result<_, _>>()
474 .map_err(|e| VarParseError::InvalidParameterValue {
475 invalid_values: values.to_vec(),
476 reason: e.to_string(),
477 })?;
478 Ok(values)
479 }
480
481 fn box_clone(&self) -> Box<dyn Value> {
482 Box::new(self.clone())
483 }
484
485 fn format(&self) -> String {
486 self.iter().map(|ident| ident.to_string()).join(", ")
487 }
488}
489
490impl Value for Vec<IpNet> {
491 fn type_name() -> Cow<'static, str>
492 where
493 Self: Sized,
494 {
495 "CIDR list".into()
496 }
497
498 fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
499 where
500 Self: Sized,
501 {
502 let values = input.to_vec();
503 let values: Vec<IpNet> = values
504 .iter()
505 .flat_map(|i| i.split(','))
506 .map(|d| IpNet::from_str(d.trim()))
507 .collect::<Result<_, _>>()
508 .map_err(|e| VarParseError::InvalidParameterValue {
509 invalid_values: values,
510 reason: e.to_string(),
511 })?;
512 Ok(values)
513 }
514
515 fn box_clone(&self) -> Box<dyn Value> {
516 Box::new(self.clone())
517 }
518
519 fn format(&self) -> String {
520 self.iter().map(|ident| ident.to_string()).join(", ")
521 }
522}
523
524impl Value for Vec<SerializableDirective> {
525 fn type_name() -> Cow<'static, str>
526 where
527 Self: Sized,
528 {
529 "directive list".into()
530 }
531
532 fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
533 where
534 Self: Sized,
535 {
536 let values = input.to_vec();
537 let dirs: Result<_, _> = values
538 .iter()
539 .flat_map(|i| i.split(','))
540 .map(|d| SerializableDirective::from_str(d.trim()))
541 .collect();
542 dirs.map_err(|e| VarParseError::InvalidParameterValue {
543 invalid_values: values.to_vec(),
544 reason: e.to_string(),
545 })
546 }
547
548 fn box_clone(&self) -> Box<dyn Value> {
549 Box::new(self.clone())
550 }
551
552 fn format(&self) -> String {
553 self.iter().map(|d| d.to_string()).join(", ")
554 }
555}
556
557#[derive(Clone, Debug, Eq, PartialEq)]
559pub struct Failpoints;
560
561impl Value for Failpoints {
562 fn type_name() -> Cow<'static, str>
563 where
564 Self: Sized,
565 {
566 "failpoints config".into()
567 }
568
569 fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
570 where
571 Self: Sized,
572 {
573 let values = input.to_vec();
574 for mut cfg in values.iter().map(|v| v.trim().split(';')).flatten() {
575 cfg = cfg.trim();
576 if cfg.is_empty() {
577 continue;
578 }
579 let mut splits = cfg.splitn(2, '=');
580 let failpoint = splits
581 .next()
582 .ok_or_else(|| VarParseError::InvalidParameterValue {
583 invalid_values: input.to_vec(),
584 reason: "missing failpoint name".into(),
585 })?;
586 let action = splits
587 .next()
588 .ok_or_else(|| VarParseError::InvalidParameterValue {
589 invalid_values: input.to_vec(),
590 reason: "missing failpoint action".into(),
591 })?;
592 fail::cfg(failpoint, action).map_err(|e| VarParseError::InvalidParameterValue {
593 invalid_values: input.to_vec(),
594 reason: e.to_string(),
595 })?;
596 }
597
598 Ok(Failpoints)
599 }
600
601 fn box_clone(&self) -> Box<dyn Value> {
602 Box::new(self.clone())
603 }
604
605 fn format(&self) -> String {
606 "<omitted>".to_string()
607 }
608}
609
610#[derive(Clone, Copy, Debug, Eq, PartialEq)]
618pub enum ClientSeverity {
619 Error,
621 Warning,
623 Notice,
625 Log,
627 Debug1,
629 Debug2,
631 Debug3,
633 Debug4,
635 Debug5,
637 Info,
640}
641
642impl Serialize for ClientSeverity {
643 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
644 where
645 S: serde::Serializer,
646 {
647 serializer.serialize_str(self.as_str())
648 }
649}
650
651impl ClientSeverity {
652 fn as_str(&self) -> &'static str {
653 match self {
654 ClientSeverity::Error => "error",
655 ClientSeverity::Warning => "warning",
656 ClientSeverity::Notice => "notice",
657 ClientSeverity::Info => "info",
658 ClientSeverity::Log => "log",
659 ClientSeverity::Debug1 => "debug1",
660 ClientSeverity::Debug2 => "debug2",
661 ClientSeverity::Debug3 => "debug3",
662 ClientSeverity::Debug4 => "debug4",
663 ClientSeverity::Debug5 => "debug5",
664 }
665 }
666
667 fn valid_values() -> Vec<&'static str> {
668 vec![
670 ClientSeverity::Debug5.as_str(),
671 ClientSeverity::Debug4.as_str(),
672 ClientSeverity::Debug3.as_str(),
673 ClientSeverity::Debug2.as_str(),
674 ClientSeverity::Debug1.as_str(),
675 ClientSeverity::Log.as_str(),
676 ClientSeverity::Notice.as_str(),
677 ClientSeverity::Warning.as_str(),
678 ClientSeverity::Error.as_str(),
679 ]
680 }
681
682 pub fn should_output_to_client(&self, severity: &Severity) -> bool {
693 match (self, severity) {
694 (_, Severity::Info) => true,
696 (ClientSeverity::Error, Severity::Error | Severity::Fatal | Severity::Panic) => true,
697 (
698 ClientSeverity::Warning,
699 Severity::Error | Severity::Fatal | Severity::Panic | Severity::Warning,
700 ) => true,
701 (
702 ClientSeverity::Notice,
703 Severity::Error
704 | Severity::Fatal
705 | Severity::Panic
706 | Severity::Warning
707 | Severity::Notice,
708 ) => true,
709 (
710 ClientSeverity::Info,
711 Severity::Error
712 | Severity::Fatal
713 | Severity::Panic
714 | Severity::Warning
715 | Severity::Notice,
716 ) => true,
717 (
718 ClientSeverity::Log,
719 Severity::Error
720 | Severity::Fatal
721 | Severity::Panic
722 | Severity::Warning
723 | Severity::Notice
724 | Severity::Log,
725 ) => true,
726 (
727 ClientSeverity::Debug1
728 | ClientSeverity::Debug2
729 | ClientSeverity::Debug3
730 | ClientSeverity::Debug4
731 | ClientSeverity::Debug5,
732 _,
733 ) => true,
734
735 (
736 ClientSeverity::Error,
737 Severity::Warning | Severity::Notice | Severity::Log | Severity::Debug,
738 ) => false,
739 (ClientSeverity::Warning, Severity::Notice | Severity::Log | Severity::Debug) => false,
740 (ClientSeverity::Notice, Severity::Log | Severity::Debug) => false,
741 (ClientSeverity::Info, Severity::Log | Severity::Debug) => false,
742 (ClientSeverity::Log, Severity::Debug) => false,
743 }
744 }
745}
746
747impl Value for ClientSeverity {
748 fn type_name() -> Cow<'static, str>
749 where
750 Self: Sized,
751 {
752 "string".into()
753 }
754
755 fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
756 where
757 Self: Sized,
758 {
759 let s = extract_single_value(input)?;
760 let s = UncasedStr::new(s);
761
762 if s == ClientSeverity::Error.as_str() {
763 Ok(ClientSeverity::Error)
764 } else if s == ClientSeverity::Warning.as_str() {
765 Ok(ClientSeverity::Warning)
766 } else if s == ClientSeverity::Notice.as_str() {
767 Ok(ClientSeverity::Notice)
768 } else if s == ClientSeverity::Info.as_str() {
769 Ok(ClientSeverity::Info)
770 } else if s == ClientSeverity::Log.as_str() {
771 Ok(ClientSeverity::Log)
772 } else if s == ClientSeverity::Debug1.as_str() {
773 Ok(ClientSeverity::Debug1)
774 } else if s == ClientSeverity::Debug2.as_str() || s == "debug" {
776 Ok(ClientSeverity::Debug2)
777 } else if s == ClientSeverity::Debug3.as_str() {
778 Ok(ClientSeverity::Debug3)
779 } else if s == ClientSeverity::Debug4.as_str() {
780 Ok(ClientSeverity::Debug4)
781 } else if s == ClientSeverity::Debug5.as_str() {
782 Ok(ClientSeverity::Debug5)
783 } else {
784 Err(VarParseError::ConstrainedParameter {
785 invalid_values: input.to_vec(),
786 valid_values: Some(ClientSeverity::valid_values()),
787 })
788 }
789 }
790
791 fn box_clone(&self) -> Box<dyn Value> {
792 Box::new(self.clone())
793 }
794
795 fn format(&self) -> String {
796 self.as_str().into()
797 }
798}
799
800#[derive(Clone, Copy, Debug, Eq, PartialEq)]
805pub enum TimeZone {
806 UTC,
808 GMT,
810 FixedOffset(&'static str),
813}
814
815impl TimeZone {
816 fn as_str(&self) -> &'static str {
817 match self {
818 TimeZone::UTC => "UTC",
819 TimeZone::GMT => "GMT",
820 TimeZone::FixedOffset(s) => s,
821 }
822 }
823}
824
825impl Value for TimeZone {
826 fn type_name() -> Cow<'static, str>
827 where
828 Self: Sized,
829 {
830 "string".into()
832 }
833
834 fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
835 where
836 Self: Sized,
837 {
838 let s = extract_single_value(input)?;
839 let s = UncasedStr::new(s);
840
841 if s == TimeZone::UTC.as_str() {
842 Ok(TimeZone::UTC)
843 } else if s == TimeZone::GMT.as_str() {
844 Ok(TimeZone::GMT)
845 } else if s == "+00:00" {
846 Ok(TimeZone::FixedOffset("+00:00"))
847 } else {
848 Err(VarParseError::ConstrainedParameter {
849 invalid_values: input.to_vec(),
850 valid_values: None,
851 })
852 }
853 }
854
855 fn box_clone(&self) -> Box<dyn Value> {
856 Box::new(self.clone())
857 }
858
859 fn format(&self) -> String {
860 self.as_str().into()
861 }
862}
863
864#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
866pub enum IsolationLevel {
867 ReadUncommitted,
868 ReadCommitted,
869 RepeatableRead,
870 Serializable,
871 StrongSessionSerializable,
909 StrictSerializable,
910 BoundedStaleness(std::time::Duration),
915}
916
917impl IsolationLevel {
918 const READ_UNCOMMITTED: &'static str = "read uncommitted";
919 const READ_COMMITTED: &'static str = "read committed";
920 const REPEATABLE_READ: &'static str = "repeatable read";
921 const SERIALIZABLE: &'static str = "serializable";
922 const STRONG_SESSION_SERIALIZABLE: &'static str = "strong session serializable";
923 const STRICT_SERIALIZABLE: &'static str = "strict serializable";
924 const BOUNDED_STALENESS: &'static str = "bounded staleness";
925 const BOUNDED_STALENESS_HINT: &'static str = "bounded staleness <duration>";
926
927 pub fn is_bounded_staleness(&self) -> bool {
929 matches!(self, Self::BoundedStaleness(_))
930 }
931
932 pub fn as_variant_str(&self) -> &'static str {
936 match self {
937 Self::ReadUncommitted => Self::READ_UNCOMMITTED,
938 Self::ReadCommitted => Self::READ_COMMITTED,
939 Self::RepeatableRead => Self::REPEATABLE_READ,
940 Self::Serializable => Self::SERIALIZABLE,
941 Self::StrongSessionSerializable => Self::STRONG_SESSION_SERIALIZABLE,
942 Self::StrictSerializable => Self::STRICT_SERIALIZABLE,
943 Self::BoundedStaleness(_) => Self::BOUNDED_STALENESS,
944 }
945 }
946
947 fn valid_values() -> Vec<&'static str> {
948 vec![
949 Self::READ_UNCOMMITTED,
950 Self::READ_COMMITTED,
951 Self::REPEATABLE_READ,
952 Self::SERIALIZABLE,
953 Self::STRICT_SERIALIZABLE,
955 Self::BOUNDED_STALENESS_HINT,
956 ]
957 }
958}
959
960impl fmt::Display for IsolationLevel {
961 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
962 match self {
963 Self::BoundedStaleness(d) => write!(
964 f,
965 "{} {}",
966 Self::BOUNDED_STALENESS,
967 humantime::format_duration(*d)
968 ),
969 other => f.write_str(other.as_variant_str()),
970 }
971 }
972}
973
974impl Value for IsolationLevel {
975 fn type_name() -> Cow<'static, str>
976 where
977 Self: Sized,
978 {
979 "string".into()
980 }
981
982 fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
983 where
984 Self: Sized,
985 {
986 let invalid = || VarParseError::ConstrainedParameter {
987 invalid_values: input.to_vec(),
988 valid_values: Some(IsolationLevel::valid_values()),
989 };
990
991 let s = extract_single_value(input)?;
992 let lower = s.to_ascii_lowercase();
993 let lower = lower.trim();
994
995 match lower {
996 Self::READ_UNCOMMITTED
998 | Self::READ_COMMITTED
999 | Self::REPEATABLE_READ
1000 | Self::SERIALIZABLE => Ok(Self::Serializable),
1001 Self::STRONG_SESSION_SERIALIZABLE => Ok(Self::StrongSessionSerializable),
1002 Self::STRICT_SERIALIZABLE => Ok(Self::StrictSerializable),
1003 other => {
1004 let rest = other
1005 .strip_prefix(Self::BOUNDED_STALENESS)
1006 .ok_or_else(invalid)?;
1007 if !rest.starts_with(char::is_whitespace) {
1010 return Err(invalid());
1011 }
1012 let rest = rest.trim();
1013 if rest.is_empty() {
1014 return Err(invalid());
1015 }
1016 let d = humantime::parse_duration(rest).map_err(|_| invalid())?;
1017 if d < std::time::Duration::from_millis(1) {
1022 return Err(invalid());
1023 }
1024 Ok(Self::BoundedStaleness(d))
1025 }
1026 }
1027 }
1028
1029 fn box_clone(&self) -> Box<dyn Value> {
1030 Box::new(self.clone())
1031 }
1032
1033 fn format(&self) -> String {
1034 self.to_string()
1035 }
1036}
1037
1038impl From<TransactionIsolationLevel> for IsolationLevel {
1039 fn from(transaction_isolation_level: TransactionIsolationLevel) -> Self {
1040 match transaction_isolation_level {
1041 TransactionIsolationLevel::ReadUncommitted => Self::ReadUncommitted,
1042 TransactionIsolationLevel::ReadCommitted => Self::ReadCommitted,
1043 TransactionIsolationLevel::RepeatableRead => Self::RepeatableRead,
1044 TransactionIsolationLevel::Serializable => Self::Serializable,
1045 TransactionIsolationLevel::StrongSessionSerializable => Self::StrongSessionSerializable,
1046 TransactionIsolationLevel::StrictSerializable => Self::StrictSerializable,
1047 }
1048 }
1049}
1050
1051impl Value for CloneableEnvFilter {
1052 fn type_name() -> Cow<'static, str>
1053 where
1054 Self: Sized,
1055 {
1056 "EnvFilter".into()
1057 }
1058
1059 fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
1060 where
1061 Self: Sized,
1062 {
1063 let s = extract_single_value(input)?;
1064 CloneableEnvFilter::from_str(s).map_err(|e| VarParseError::InvalidParameterValue {
1065 invalid_values: vec![s.to_string()],
1066 reason: e.to_string(),
1067 })
1068 }
1069
1070 fn box_clone(&self) -> Box<dyn Value> {
1071 Box::new(self.clone())
1072 }
1073
1074 fn format(&self) -> String {
1075 self.to_string()
1076 }
1077}
1078
1079#[derive(Clone, Copy, PartialEq, Eq, Debug)]
1080pub enum ClientEncoding {
1081 Utf8,
1082}
1083
1084impl ClientEncoding {
1085 fn as_str(&self) -> &'static str {
1086 match self {
1087 ClientEncoding::Utf8 => "UTF8",
1088 }
1089 }
1090
1091 fn valid_values() -> Vec<&'static str> {
1092 vec![ClientEncoding::Utf8.as_str()]
1093 }
1094}
1095
1096impl Value for ClientEncoding {
1097 fn type_name() -> Cow<'static, str>
1098 where
1099 Self: Sized,
1100 {
1101 "string".into()
1102 }
1103
1104 fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
1105 where
1106 Self: Sized,
1107 {
1108 let s = extract_single_value(input)?;
1109 let s = UncasedStr::new(s);
1110 if s == Self::Utf8.as_str() {
1111 Ok(Self::Utf8)
1112 } else {
1113 Err(VarParseError::ConstrainedParameter {
1114 invalid_values: vec![s.to_string()],
1115 valid_values: Some(ClientEncoding::valid_values()),
1116 })
1117 }
1118 }
1119
1120 fn box_clone(&self) -> Box<dyn Value> {
1121 Box::new(self.clone())
1122 }
1123
1124 fn format(&self) -> String {
1125 self.as_str().to_string()
1126 }
1127}
1128
1129#[derive(Clone, Copy, PartialEq, Eq, Debug)]
1130pub enum IntervalStyle {
1131 Postgres,
1132}
1133
1134impl IntervalStyle {
1135 fn as_str(&self) -> &'static str {
1136 match self {
1137 IntervalStyle::Postgres => "postgres",
1138 }
1139 }
1140
1141 fn valid_values() -> Vec<&'static str> {
1142 vec![IntervalStyle::Postgres.as_str()]
1143 }
1144}
1145
1146impl Value for IntervalStyle {
1147 fn type_name() -> Cow<'static, str>
1148 where
1149 Self: Sized,
1150 {
1151 "string".into()
1152 }
1153
1154 fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
1155 where
1156 Self: Sized,
1157 {
1158 let s = extract_single_value(input)?;
1159 let s = UncasedStr::new(s);
1160 if s == Self::Postgres.as_str() {
1161 Ok(Self::Postgres)
1162 } else {
1163 Err(VarParseError::ConstrainedParameter {
1164 invalid_values: vec![s.to_string()],
1165 valid_values: Some(IntervalStyle::valid_values()),
1166 })
1167 }
1168 }
1169
1170 fn box_clone(&self) -> Box<dyn Value> {
1171 Box::new(self.clone())
1172 }
1173
1174 fn format(&self) -> String {
1175 self.as_str().to_string()
1176 }
1177}
1178
1179macro_rules! impl_value_for_simple {
1185 ($t: ty, $name: literal) => {
1186 impl Value for $t {
1187 fn type_name() -> Cow<'static, str>
1188 where
1189 Self: Sized,
1190 {
1191 $name.into()
1192 }
1193
1194 fn parse(input: VarInput<'_>) -> Result<Self, VarParseError>
1195 where
1196 Self: Sized,
1197 {
1198 let s = extract_single_value(input)?;
1199 s.parse::<Self>()
1200 .map_err(|_| VarParseError::InvalidParameterType)
1201 }
1202
1203 fn box_clone(&self) -> Box<dyn Value> {
1204 Box::new(self.clone())
1205 }
1206
1207 fn format(&self) -> String {
1208 self.to_string()
1209 }
1210 }
1211 };
1212}
1213
1214impl_value_for_simple!(i32, "integer");
1215impl_value_for_simple!(u32, "unsigned integer");
1216impl_value_for_simple!(u64, "64-bit unsigned integer");
1217impl_value_for_simple!(usize, "unsigned integer");
1218impl_value_for_simple!(f64, "double-precision floating-point number");
1219
1220impl_value_for_simple!(NonZeroU32, "unsigned integer");
1221
1222impl_value_for_simple!(mz_repr::Timestamp, "mz-timestamp");
1223impl_value_for_simple!(mz_repr::bytes::ByteSize, "bytes");
1224impl_value_for_simple!(CompactionStyle, "rocksdb_compaction_style");
1225impl_value_for_simple!(CompressionType, "rocksdb_compression_type");
1226
1227#[cfg(test)]
1228mod tests {
1229 use mz_ore::assert_err;
1230
1231 use super::*;
1232
1233 #[mz_ore::test]
1234 fn test_value_duration() {
1235 fn inner(t: &'static str, e: Duration, expected_format: Option<&'static str>) {
1236 let d = Duration::parse(VarInput::Flat(t)).expect("invalid duration");
1237 assert_eq!(d, e);
1238 let mut d_format = d.format();
1239 d_format.retain(|c| !c.is_whitespace());
1240 if let Some(expected) = expected_format {
1241 assert_eq!(d_format, expected);
1242 } else {
1243 assert_eq!(
1244 t.chars().filter(|c| !c.is_whitespace()).collect::<String>(),
1245 d_format
1246 )
1247 }
1248 }
1249 inner("1", Duration::from_millis(1), Some("1ms"));
1250 inner("0", Duration::from_secs(0), Some("0s"));
1251 inner("1ms", Duration::from_millis(1), None);
1252 inner("1000ms", Duration::from_millis(1000), Some("1s"));
1253 inner("1001ms", Duration::from_millis(1001), None);
1254 inner("1us", Duration::from_micros(1), None);
1255 inner("1000us", Duration::from_micros(1000), Some("1ms"));
1256 inner("1s", Duration::from_secs(1), None);
1257 inner("60s", Duration::from_secs(60), Some("1min"));
1258 inner("3600s", Duration::from_secs(3600), Some("1h"));
1259 inner("3660s", Duration::from_secs(3660), Some("61min"));
1260 inner("1min", Duration::from_secs(1 * SEC_TO_MIN), None);
1261 inner("60min", Duration::from_secs(60 * SEC_TO_MIN), Some("1h"));
1262 inner("1h", Duration::from_secs(1 * SEC_TO_HOUR), None);
1263 inner("24h", Duration::from_secs(24 * SEC_TO_HOUR), Some("1d"));
1264 inner("1d", Duration::from_secs(1 * SEC_TO_DAY), None);
1265 inner("2d", Duration::from_secs(2 * SEC_TO_DAY), None);
1266 inner(" 1 s ", Duration::from_secs(1), None);
1267 inner("1s ", Duration::from_secs(1), None);
1268 inner(" 1s", Duration::from_secs(1), None);
1269 inner("0d", Duration::from_secs(0), Some("0s"));
1270 inner(
1271 "18446744073709551615",
1272 Duration::from_millis(u64::MAX),
1273 Some("18446744073709551615ms"),
1274 );
1275 inner(
1276 "18446744073709551615 s",
1277 Duration::from_secs(u64::MAX),
1278 Some("0"),
1279 );
1280
1281 fn errs(t: &'static str) {
1282 assert_err!(Duration::parse(VarInput::Flat(t)));
1283 }
1284 errs("1 m");
1285 errs("1 sec");
1286 errs("1 min 1 s");
1287 errs("1m1s");
1288 errs("1.1");
1289 errs("1.1 min");
1290 errs("-1 s");
1291 errs("");
1292 errs(" ");
1293 errs("x");
1294 errs("s");
1295 errs("18446744073709551615 min");
1296 errs("²");
1300 errs("1²ms");
1301 errs("½");
1302 errs("1ms");
1303 }
1304
1305 #[mz_ore::test]
1306 fn test_should_output_to_client() {
1307 #[rustfmt::skip]
1308 let test_cases = [
1309 (ClientSeverity::Debug1, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
1310 (ClientSeverity::Debug2, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
1311 (ClientSeverity::Debug3, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
1312 (ClientSeverity::Debug4, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
1313 (ClientSeverity::Debug5, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
1314 (ClientSeverity::Log, vec![Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
1315 (ClientSeverity::Log, vec![Severity::Debug], false),
1316 (ClientSeverity::Info, vec![Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
1317 (ClientSeverity::Info, vec![Severity::Debug, Severity::Log], false),
1318 (ClientSeverity::Notice, vec![Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
1319 (ClientSeverity::Notice, vec![Severity::Debug, Severity::Log], false),
1320 (ClientSeverity::Warning, vec![Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
1321 (ClientSeverity::Warning, vec![Severity::Debug, Severity::Log, Severity::Notice], false),
1322 (ClientSeverity::Error, vec![Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
1323 (ClientSeverity::Error, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning], false),
1324 ];
1325
1326 for test_case in test_cases {
1327 run_test(test_case)
1328 }
1329
1330 fn run_test(test_case: (ClientSeverity, Vec<Severity>, bool)) {
1331 let client_min_messages_setting = test_case.0;
1332 let expected = test_case.2;
1333 for message_severity in test_case.1 {
1334 assert!(
1335 client_min_messages_setting.should_output_to_client(&message_severity)
1336 == expected
1337 )
1338 }
1339 }
1340 }
1341}
1342
1343#[cfg(test)]
1344mod bounded_staleness_tests {
1345 use super::*;
1346 use std::time::Duration;
1347
1348 fn parse_iso(s: &str) -> Result<IsolationLevel, VarParseError> {
1349 IsolationLevel::parse(VarInput::Flat(s))
1350 }
1351
1352 #[mz_ore::test]
1353 fn parses_bounded_staleness() {
1354 assert_eq!(
1355 parse_iso("bounded staleness 5s").unwrap(),
1356 IsolationLevel::BoundedStaleness(Duration::from_secs(5))
1357 );
1358 assert_eq!(
1359 parse_iso("bounded staleness 500ms").unwrap(),
1360 IsolationLevel::BoundedStaleness(Duration::from_millis(500))
1361 );
1362 assert_eq!(
1364 parse_iso("BOUNDED STALENESS 5s").unwrap(),
1365 IsolationLevel::BoundedStaleness(Duration::from_secs(5))
1366 );
1367 assert_eq!(
1369 parse_iso(" bounded staleness 5s ").unwrap(),
1370 IsolationLevel::BoundedStaleness(Duration::from_secs(5))
1371 );
1372 }
1373
1374 #[mz_ore::test]
1375 fn rejects_zero_duration() {
1376 assert!(parse_iso("bounded staleness 0s").is_err());
1377 assert!(parse_iso("bounded staleness 0ms").is_err());
1378 }
1379
1380 #[mz_ore::test]
1381 fn rejects_sub_millisecond_duration() {
1382 assert!(parse_iso("bounded staleness 1us").is_err());
1385 assert!(parse_iso("bounded staleness 999us").is_err());
1386 assert!(parse_iso("bounded staleness 999ns").is_err());
1387 }
1388
1389 #[mz_ore::test]
1390 fn parses_multi_component_duration() {
1391 let lvl = parse_iso("bounded staleness 1m30s").unwrap();
1394 assert_eq!(
1395 lvl,
1396 IsolationLevel::BoundedStaleness(Duration::from_secs(90))
1397 );
1398 let formatted = lvl.format();
1399 assert_eq!(parse_iso(&formatted).unwrap(), lvl);
1400 }
1401
1402 #[mz_ore::test]
1403 fn rejects_unparseable_duration() {
1404 assert!(parse_iso("bounded staleness banana").is_err());
1405 assert!(parse_iso("bounded staleness").is_err());
1406 assert!(parse_iso("bounded staleness5s").is_err());
1408 }
1409
1410 #[mz_ore::test]
1411 fn round_trips_format() {
1412 let lvl = IsolationLevel::BoundedStaleness(Duration::from_secs(5));
1413 assert_eq!(lvl.format(), "bounded staleness 5s");
1414 let parsed = parse_iso(&lvl.format()).unwrap();
1415 assert_eq!(parsed, lvl);
1416 }
1417
1418 #[mz_ore::test]
1419 fn accepts_long_durations() {
1420 assert!(parse_iso("bounded staleness 1h").is_ok());
1423 assert!(parse_iso("bounded staleness 24h").is_ok());
1424 }
1425}