Skip to main content

mz_sql/plan/
with_options.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Provides tooling to handle `WITH` options.
11
12use std::collections::BTreeMap;
13use std::time::Duration;
14
15use mz_repr::adt::interval::Interval;
16use mz_repr::bytes::ByteSize;
17use mz_repr::{CatalogItemId, RelationVersionSelector, strconv};
18use mz_sql_parser::ast::{
19    ClusterAlterOptionValue, ClusterScheduleOptionValue, ConnectionDefaultAwsPrivatelink, Expr,
20    Ident, KafkaBroker, NetworkPolicyRuleDefinition, RefreshOptionValue, ReplicaDefinition,
21};
22use mz_storage_types::connections::IcebergCatalogType;
23use mz_storage_types::connections::string_or_secret::StringOrSecret;
24use serde::{Deserialize, Serialize};
25
26use crate::ast::{AstInfo, UnresolvedItemName, Value, WithOptionValue};
27use crate::catalog::SessionCatalog;
28use crate::names::{ResolvedDataType, ResolvedItemName};
29use crate::plan::{Aug, PlanError, literal};
30
31pub trait TryFromValue<T>: Sized {
32    fn try_from_value(v: T) -> Result<Self, PlanError>;
33
34    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<T>;
35
36    fn name() -> String;
37}
38
39pub trait ImpliedValue: Sized {
40    fn implied_value() -> Result<Self, PlanError>;
41}
42
43impl TryFromValue<WithOptionValue<Aug>> for IcebergCatalogType {
44    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
45        match String::try_from_value(v)? {
46            s if s.eq_ignore_ascii_case("rest") => Ok(IcebergCatalogType::Rest),
47            s if s.eq_ignore_ascii_case("s3tablesrest") => Ok(IcebergCatalogType::S3TablesRest),
48            _ => sql_bail!("invalid iceberg catalog type"),
49        }
50    }
51
52    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
53        Some(WithOptionValue::Value(Value::String(match self {
54            IcebergCatalogType::Rest => "rest".to_string(),
55            IcebergCatalogType::S3TablesRest => "s3tablesrest".to_string(),
56        })))
57    }
58
59    fn name() -> String {
60        "iceberg catalog type".to_string()
61    }
62}
63
64impl ImpliedValue for IcebergCatalogType {
65    fn implied_value() -> Result<Self, PlanError> {
66        sql_bail!("must provide an iceberg catalog type")
67    }
68}
69
70#[derive(Copy, Clone, Debug)]
71pub struct Secret(CatalogItemId);
72
73impl From<Secret> for CatalogItemId {
74    fn from(secret: Secret) -> Self {
75        secret.0
76    }
77}
78
79impl TryFromValue<WithOptionValue<Aug>> for Secret {
80    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
81        match StringOrSecret::try_from_value(v)? {
82            StringOrSecret::Secret(id) => Ok(Secret(id)),
83            _ => sql_bail!("must provide a secret value"),
84        }
85    }
86
87    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
88        let secret = catalog.get_item(&self.0);
89        let name = ResolvedItemName::Item {
90            id: self.0,
91            qualifiers: secret.name().qualifiers.clone(),
92            full_name: catalog.resolve_full_name(secret.name()),
93            print_id: false,
94            version: RelationVersionSelector::Latest,
95        };
96        Some(WithOptionValue::Secret(name))
97    }
98
99    fn name() -> String {
100        "secret".to_string()
101    }
102}
103
104impl ImpliedValue for Secret {
105    fn implied_value() -> Result<Self, PlanError> {
106        sql_bail!("must provide a secret value")
107    }
108}
109
110#[derive(Copy, Clone, Debug)]
111pub struct Object(CatalogItemId);
112
113impl From<Object> for CatalogItemId {
114    fn from(obj: Object) -> Self {
115        obj.0
116    }
117}
118
119impl From<&Object> for CatalogItemId {
120    fn from(obj: &Object) -> Self {
121        obj.0
122    }
123}
124
125impl TryFromValue<WithOptionValue<Aug>> for Object {
126    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
127        Ok(match v {
128            WithOptionValue::Item(ResolvedItemName::Item { id, .. }) => Object(id),
129            _ => sql_bail!("must provide an object"),
130        })
131    }
132
133    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
134        let item = catalog.get_item(&self.0);
135        let name = ResolvedItemName::Item {
136            id: self.0,
137            qualifiers: item.name().qualifiers.clone(),
138            full_name: catalog.resolve_full_name(item.name()),
139            print_id: false,
140            // TODO(alter_table): Evaluate if this is correct.
141            version: RelationVersionSelector::Latest,
142        };
143        Some(WithOptionValue::Item(name))
144    }
145
146    fn name() -> String {
147        "object reference".to_string()
148    }
149}
150
151impl ImpliedValue for Object {
152    fn implied_value() -> Result<Self, PlanError> {
153        sql_bail!("must provide an object")
154    }
155}
156
157impl TryFromValue<WithOptionValue<Aug>> for Ident {
158    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
159        Ok(match v {
160            WithOptionValue::UnresolvedItemName(UnresolvedItemName(mut inner))
161                if inner.len() == 1 =>
162            {
163                inner.remove(0)
164            }
165            WithOptionValue::Ident(inner) => inner,
166            _ => sql_bail!("must provide an unqualified identifier"),
167        })
168    }
169
170    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
171        Some(WithOptionValue::Ident(self))
172    }
173
174    fn name() -> String {
175        "identifier".to_string()
176    }
177}
178
179impl ImpliedValue for Ident {
180    fn implied_value() -> Result<Self, PlanError> {
181        sql_bail!("must provide an identifier")
182    }
183}
184
185impl TryFromValue<WithOptionValue<Aug>> for Expr<Aug> {
186    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
187        Ok(match v {
188            WithOptionValue::Expr(e) => e,
189            _ => sql_bail!("must provide an expr"),
190        })
191    }
192
193    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
194        Some(WithOptionValue::Expr(self))
195    }
196
197    fn name() -> String {
198        "expression".to_string()
199    }
200}
201
202impl ImpliedValue for Expr<Aug> {
203    fn implied_value() -> Result<Self, PlanError> {
204        sql_bail!("must provide an expression")
205    }
206}
207
208impl TryFromValue<WithOptionValue<Aug>> for UnresolvedItemName {
209    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
210        Ok(match v {
211            WithOptionValue::UnresolvedItemName(name) => name,
212            WithOptionValue::Ident(inner) => UnresolvedItemName(vec![inner]),
213            _ => sql_bail!("must provide an object name"),
214        })
215    }
216
217    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
218        Some(WithOptionValue::UnresolvedItemName(self))
219    }
220
221    fn name() -> String {
222        "object name".to_string()
223    }
224}
225
226impl ImpliedValue for UnresolvedItemName {
227    fn implied_value() -> Result<Self, PlanError> {
228        sql_bail!("must provide an object name")
229    }
230}
231
232impl TryFromValue<WithOptionValue<Aug>> for ResolvedDataType {
233    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
234        Ok(match v {
235            WithOptionValue::DataType(ty) => ty,
236            _ => sql_bail!("must provide a data type"),
237        })
238    }
239
240    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
241        Some(WithOptionValue::DataType(self))
242    }
243
244    fn name() -> String {
245        "data type".to_string()
246    }
247}
248
249impl ImpliedValue for ResolvedDataType {
250    fn implied_value() -> Result<Self, PlanError> {
251        sql_bail!("must provide a data type")
252    }
253}
254
255impl TryFromValue<WithOptionValue<Aug>> for StringOrSecret {
256    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
257        Ok(match v {
258            WithOptionValue::Secret(ResolvedItemName::Item { id, .. }) => {
259                StringOrSecret::Secret(id)
260            }
261            v => StringOrSecret::String(String::try_from_value(v)?),
262        })
263    }
264
265    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
266        Some(match self {
267            StringOrSecret::Secret(secret) => Secret(secret).try_into_value(catalog)?,
268            StringOrSecret::String(s) => s.try_into_value(catalog)?,
269        })
270    }
271
272    fn name() -> String {
273        "string or secret".to_string()
274    }
275}
276
277impl ImpliedValue for StringOrSecret {
278    fn implied_value() -> Result<Self, PlanError> {
279        sql_bail!("must provide a string or secret value")
280    }
281}
282
283impl TryFromValue<Value> for Duration {
284    fn try_from_value(v: Value) -> Result<Self, PlanError> {
285        let interval = Interval::try_from_value(v)?;
286        Ok(interval.duration()?)
287    }
288
289    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Value> {
290        let interval = Interval::from_duration(&self)
291            .expect("planning ensured that this is convertible back to Interval");
292        interval.try_into_value(catalog)
293    }
294
295    fn name() -> String {
296        "interval".to_string()
297    }
298}
299
300impl ImpliedValue for Duration {
301    fn implied_value() -> Result<Self, PlanError> {
302        sql_bail!("must provide an interval value")
303    }
304}
305
306impl TryFromValue<Value> for ByteSize {
307    fn try_from_value(v: Value) -> Result<Self, PlanError> {
308        match v {
309            Value::Number(value) | Value::String(value) => Ok(value
310                .parse::<ByteSize>()
311                .map_err(|e| sql_err!("invalid bytes value: {e}"))?),
312            _ => sql_bail!("cannot use value as bytes"),
313        }
314    }
315
316    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
317        Some(Value::String(self.to_string()))
318    }
319
320    fn name() -> String {
321        "bytes".to_string()
322    }
323}
324
325impl ImpliedValue for ByteSize {
326    fn implied_value() -> Result<Self, PlanError> {
327        sql_bail!("must provide a value for bytes")
328    }
329}
330
331impl TryFromValue<Value> for Interval {
332    fn try_from_value(v: Value) -> Result<Self, PlanError> {
333        match v {
334            Value::Interval(value) => literal::plan_interval(&value),
335            Value::Number(value) | Value::String(value) => Ok(strconv::parse_interval(&value)?),
336            _ => sql_bail!("cannot use value as interval"),
337        }
338    }
339
340    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
341        let interval_value = literal::unplan_interval(&self);
342        Some(Value::Interval(interval_value))
343    }
344
345    fn name() -> String {
346        "interval".to_string()
347    }
348}
349
350impl ImpliedValue for Interval {
351    fn implied_value() -> Result<Self, PlanError> {
352        sql_bail!("must provide an interval value")
353    }
354}
355
356#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
357pub struct OptionalString(pub Option<String>);
358
359impl TryFromValue<Value> for OptionalString {
360    fn try_from_value(v: Value) -> Result<Self, PlanError> {
361        Ok(match v {
362            Value::Null => Self(None),
363            v => Self(Some(String::try_from_value(v)?)),
364        })
365    }
366
367    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Value> {
368        Some(match self.0 {
369            None => Value::Null,
370            Some(s) => s.try_into_value(catalog)?,
371        })
372    }
373
374    fn name() -> String {
375        "optional string".to_string()
376    }
377}
378
379impl ImpliedValue for OptionalString {
380    fn implied_value() -> Result<Self, PlanError> {
381        sql_bail!("must provide a string value")
382    }
383}
384
385#[derive(
386    Debug,
387    Clone,
388    Copy,
389    PartialEq,
390    Eq,
391    PartialOrd,
392    Ord,
393    Serialize,
394    Hash,
395    Deserialize
396)]
397pub struct OptionalDuration(pub Option<Duration>);
398
399impl From<Duration> for OptionalDuration {
400    fn from(i: Duration) -> OptionalDuration {
401        // An interval of 0 disables the setting.
402        let inner = if i == Duration::ZERO { None } else { Some(i) };
403        OptionalDuration(inner)
404    }
405}
406
407impl TryFromValue<Value> for OptionalDuration {
408    fn try_from_value(v: Value) -> Result<Self, PlanError> {
409        Ok(match v {
410            Value::Null => OptionalDuration(None),
411            v => Duration::try_from_value(v)?.into(),
412        })
413    }
414
415    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Value> {
416        Some(match self.0 {
417            None => Value::Null,
418            Some(duration) => duration.try_into_value(catalog)?,
419        })
420    }
421
422    fn name() -> String {
423        "optional interval".to_string()
424    }
425}
426
427impl ImpliedValue for OptionalDuration {
428    fn implied_value() -> Result<Self, PlanError> {
429        sql_bail!("must provide an interval value")
430    }
431}
432
433impl TryFromValue<Value> for String {
434    fn try_from_value(v: Value) -> Result<Self, PlanError> {
435        match v {
436            Value::String(v) => Ok(v),
437            _ => sql_bail!("cannot use value as string"),
438        }
439    }
440
441    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
442        Some(Value::String(self))
443    }
444
445    fn name() -> String {
446        "text".to_string()
447    }
448}
449
450impl ImpliedValue for String {
451    fn implied_value() -> Result<Self, PlanError> {
452        sql_bail!("must provide a string value")
453    }
454}
455
456impl TryFromValue<Value> for bool {
457    fn try_from_value(v: Value) -> Result<Self, PlanError> {
458        match v {
459            Value::Boolean(v) => Ok(v),
460            _ => sql_bail!("cannot use value as boolean"),
461        }
462    }
463
464    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
465        Some(Value::Boolean(self))
466    }
467
468    fn name() -> String {
469        "bool".to_string()
470    }
471}
472
473impl ImpliedValue for bool {
474    fn implied_value() -> Result<Self, PlanError> {
475        Ok(true)
476    }
477}
478
479impl TryFromValue<Value> for f64 {
480    fn try_from_value(v: Value) -> Result<Self, PlanError> {
481        match v {
482            Value::Number(v) => v
483                .parse::<f64>()
484                .map_err(|e| sql_err!("invalid numeric value: {e}")),
485            _ => sql_bail!("cannot use value as number"),
486        }
487    }
488
489    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
490        Some(Value::Number(self.to_string()))
491    }
492
493    fn name() -> String {
494        "float8".to_string()
495    }
496}
497
498impl ImpliedValue for f64 {
499    fn implied_value() -> Result<Self, PlanError> {
500        sql_bail!("must provide a float value")
501    }
502}
503
504impl TryFromValue<Value> for i32 {
505    fn try_from_value(v: Value) -> Result<Self, PlanError> {
506        match v {
507            Value::Number(v) => v
508                .parse::<i32>()
509                .map_err(|e| sql_err!("invalid numeric value: {e}")),
510            _ => sql_bail!("cannot use value as number"),
511        }
512    }
513
514    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
515        Some(Value::Number(self.to_string()))
516    }
517
518    fn name() -> String {
519        "int".to_string()
520    }
521}
522
523impl ImpliedValue for i32 {
524    fn implied_value() -> Result<Self, PlanError> {
525        sql_bail!("must provide an integer value")
526    }
527}
528
529impl TryFromValue<Value> for i64 {
530    fn try_from_value(v: Value) -> Result<Self, PlanError> {
531        match v {
532            Value::Number(v) => v
533                .parse::<i64>()
534                .map_err(|e| sql_err!("invalid numeric value: {e}")),
535            _ => sql_bail!("cannot use value as number"),
536        }
537    }
538    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
539        Some(Value::Number(self.to_string()))
540    }
541    fn name() -> String {
542        "int8".to_string()
543    }
544}
545
546impl ImpliedValue for i64 {
547    fn implied_value() -> Result<Self, PlanError> {
548        sql_bail!("must provide an integer value")
549    }
550}
551
552impl TryFromValue<Value> for u16 {
553    fn try_from_value(v: Value) -> Result<Self, PlanError> {
554        match v {
555            Value::Number(v) => v
556                .parse::<u16>()
557                .map_err(|e| sql_err!("invalid numeric value: {e}")),
558            _ => sql_bail!("cannot use value as number"),
559        }
560    }
561    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
562        Some(Value::Number(self.to_string()))
563    }
564    fn name() -> String {
565        "uint2".to_string()
566    }
567}
568
569impl ImpliedValue for u16 {
570    fn implied_value() -> Result<Self, PlanError> {
571        sql_bail!("must provide an integer value")
572    }
573}
574
575impl TryFromValue<Value> for u32 {
576    fn try_from_value(v: Value) -> Result<Self, PlanError> {
577        match v {
578            Value::Number(v) => v
579                .parse::<u32>()
580                .map_err(|e| sql_err!("invalid numeric value: {e}")),
581            _ => sql_bail!("cannot use value as number"),
582        }
583    }
584    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
585        Some(Value::Number(self.to_string()))
586    }
587    fn name() -> String {
588        "uint4".to_string()
589    }
590}
591
592impl ImpliedValue for u32 {
593    fn implied_value() -> Result<Self, PlanError> {
594        sql_bail!("must provide an integer value")
595    }
596}
597
598impl TryFromValue<Value> for u64 {
599    fn try_from_value(v: Value) -> Result<Self, PlanError> {
600        match v {
601            Value::Number(v) => v
602                .parse::<u64>()
603                .map_err(|e| sql_err!("invalid unsigned numeric value: {e}")),
604            _ => sql_bail!("cannot use value as number"),
605        }
606    }
607    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
608        Some(Value::Number(self.to_string()))
609    }
610    fn name() -> String {
611        "uint8".to_string()
612    }
613}
614
615impl ImpliedValue for u64 {
616    fn implied_value() -> Result<Self, PlanError> {
617        sql_bail!("must provide an unsigned integer value")
618    }
619}
620
621impl<V: TryFromValue<WithOptionValue<Aug>>> TryFromValue<WithOptionValue<Aug>> for Vec<V> {
622    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
623        match v {
624            WithOptionValue::Sequence(a) => {
625                let mut out = Vec::with_capacity(a.len());
626                for i in a {
627                    out.push(
628                        V::try_from_value(i)
629                            .map_err(|_| anyhow::anyhow!("cannot use value in array"))?,
630                    )
631                }
632                Ok(out)
633            }
634            _ => sql_bail!("cannot use value as array"),
635        }
636    }
637
638    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
639        Some(WithOptionValue::Sequence(
640            self.into_iter()
641                .map(|v| v.try_into_value(catalog))
642                .collect::<Option<_>>()?,
643        ))
644    }
645
646    fn name() -> String {
647        format!("array of {}", V::name())
648    }
649}
650
651impl<V: ImpliedValue> ImpliedValue for Vec<V> {
652    fn implied_value() -> Result<Self, PlanError> {
653        sql_bail!("must provide an array value")
654    }
655}
656
657impl<T: AstInfo, V: TryFromValue<WithOptionValue<T>>> TryFromValue<WithOptionValue<T>>
658    for Option<V>
659{
660    fn try_from_value(v: WithOptionValue<T>) -> Result<Self, PlanError> {
661        Ok(Some(V::try_from_value(v)?))
662    }
663
664    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<T>> {
665        match self {
666            Some(v) => v.try_into_value(catalog),
667            None => None,
668        }
669    }
670
671    fn name() -> String {
672        format!("optional {}", V::name())
673    }
674}
675
676impl<V: ImpliedValue> ImpliedValue for Option<V> {
677    fn implied_value() -> Result<Self, PlanError> {
678        Ok(Some(V::implied_value()?))
679    }
680}
681
682impl<V: TryFromValue<Value>, T: AstInfo + std::fmt::Debug> TryFromValue<WithOptionValue<T>> for V {
683    fn try_from_value(v: WithOptionValue<T>) -> Result<Self, PlanError> {
684        match v {
685            WithOptionValue::Value(v) => V::try_from_value(v),
686            WithOptionValue::UnresolvedItemName(UnresolvedItemName(mut inner))
687                if inner.len() == 1 =>
688            {
689                V::try_from_value(Value::String(inner.remove(0).into_string()))
690            }
691            WithOptionValue::Ident(v) => V::try_from_value(Value::String(v.into_string())),
692            WithOptionValue::RetainHistoryFor(v) => V::try_from_value(v),
693            WithOptionValue::Sequence(_)
694            | WithOptionValue::Map(_)
695            | WithOptionValue::Item(_)
696            | WithOptionValue::UnresolvedItemName(_)
697            | WithOptionValue::Secret(_)
698            | WithOptionValue::DataType(_)
699            | WithOptionValue::Expr(_)
700            | WithOptionValue::ClusterReplicas(_)
701            | WithOptionValue::ConnectionKafkaBroker(_)
702            | WithOptionValue::ConnectionAwsPrivatelink(_)
703            | WithOptionValue::ClusterAlterStrategy(_)
704            | WithOptionValue::Refresh(_)
705            | WithOptionValue::ClusterScheduleOptionValue(_)
706            | WithOptionValue::NetworkPolicyRules(_) => sql_bail!(
707                "incompatible value types: cannot convert {} to {}",
708                match v {
709                    // The first few are unreachable because they are handled at the top of the outer match.
710                    WithOptionValue::Value(_) => unreachable!(),
711                    WithOptionValue::RetainHistoryFor(_) => unreachable!(),
712                    WithOptionValue::ClusterAlterStrategy(_) => "cluster alter strategy",
713                    WithOptionValue::Sequence(_) => "sequences",
714                    WithOptionValue::Map(_) => "maps",
715                    WithOptionValue::Item(_) => "object references",
716                    WithOptionValue::UnresolvedItemName(_) => "object names",
717                    WithOptionValue::Ident(_) => "identifiers",
718                    WithOptionValue::Secret(_) => "secrets",
719                    WithOptionValue::DataType(_) => "data types",
720                    WithOptionValue::Expr(_) => "exprs",
721                    WithOptionValue::ClusterReplicas(_) => "cluster replicas",
722                    WithOptionValue::ConnectionKafkaBroker(_) => "connection kafka brokers",
723                    WithOptionValue::ConnectionAwsPrivatelink(_) => "connection kafka brokers",
724                    WithOptionValue::Refresh(_) => "refresh option values",
725                    WithOptionValue::ClusterScheduleOptionValue(_) => "cluster schedule",
726                    WithOptionValue::NetworkPolicyRules(_) => "network policy rules",
727                },
728                V::name()
729            ),
730        }
731    }
732
733    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<T>> {
734        Some(WithOptionValue::Value(self.try_into_value(catalog)?))
735    }
736
737    fn name() -> String {
738        V::name()
739    }
740}
741
742impl<T, V: TryFromValue<T> + ImpliedValue> TryFromValue<Option<T>> for V {
743    fn try_from_value(v: Option<T>) -> Result<Self, PlanError> {
744        match v {
745            Some(v) => V::try_from_value(v),
746            None => V::implied_value(),
747        }
748    }
749
750    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Option<T>> {
751        Some(Some(self.try_into_value(catalog)?))
752    }
753
754    fn name() -> String {
755        V::name()
756    }
757}
758
759impl TryFromValue<WithOptionValue<Aug>> for Vec<ReplicaDefinition<Aug>> {
760    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
761        match v {
762            WithOptionValue::ClusterReplicas(replicas) => Ok(replicas),
763            _ => sql_bail!("cannot use value as cluster replicas"),
764        }
765    }
766
767    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
768        Some(WithOptionValue::ClusterReplicas(self))
769    }
770
771    fn name() -> String {
772        "cluster replicas".to_string()
773    }
774}
775
776impl ImpliedValue for Vec<ReplicaDefinition<Aug>> {
777    fn implied_value() -> Result<Self, PlanError> {
778        sql_bail!("must provide a set of cluster replicas")
779    }
780}
781
782impl TryFromValue<WithOptionValue<Aug>> for Vec<KafkaBroker<Aug>> {
783    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
784        let mut out = vec![];
785        match v {
786            WithOptionValue::ConnectionKafkaBroker(broker) => {
787                out.push(broker);
788            }
789            WithOptionValue::Sequence(values) => {
790                for value in values {
791                    out.extend(Self::try_from_value(value)?);
792                }
793            }
794            _ => sql_bail!("cannot use value as a kafka broker"),
795        }
796        Ok(out)
797    }
798
799    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
800        Some(WithOptionValue::Sequence(
801            self.into_iter()
802                .map(WithOptionValue::ConnectionKafkaBroker)
803                .collect(),
804        ))
805    }
806
807    fn name() -> String {
808        "kafka broker".to_string()
809    }
810}
811
812impl ImpliedValue for Vec<KafkaBroker<Aug>> {
813    fn implied_value() -> Result<Self, PlanError> {
814        sql_bail!("must provide a kafka broker")
815    }
816}
817
818impl TryFromValue<WithOptionValue<Aug>> for RefreshOptionValue<Aug> {
819    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
820        if let WithOptionValue::Refresh(r) = v {
821            Ok(r)
822        } else {
823            sql_bail!("cannot use value `{}` for a refresh option", v)
824        }
825    }
826
827    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
828        Some(WithOptionValue::Refresh(self))
829    }
830
831    fn name() -> String {
832        "refresh option value".to_string()
833    }
834}
835
836impl ImpliedValue for RefreshOptionValue<Aug> {
837    fn implied_value() -> Result<Self, PlanError> {
838        sql_bail!("must provide a refresh option value")
839    }
840}
841
842impl TryFromValue<WithOptionValue<Aug>> for ConnectionDefaultAwsPrivatelink<Aug> {
843    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
844        if let WithOptionValue::ConnectionAwsPrivatelink(r) = v {
845            Ok(r)
846        } else {
847            sql_bail!("cannot use value `{}` for a privatelink", v)
848        }
849    }
850
851    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
852        Some(WithOptionValue::ConnectionAwsPrivatelink(self))
853    }
854
855    fn name() -> String {
856        "privatelink option value".to_string()
857    }
858}
859
860impl ImpliedValue for ConnectionDefaultAwsPrivatelink<Aug> {
861    fn implied_value() -> Result<Self, PlanError> {
862        sql_bail!("must provide a value")
863    }
864}
865
866impl ImpliedValue for ClusterScheduleOptionValue {
867    fn implied_value() -> Result<Self, PlanError> {
868        sql_bail!("must provide a cluster schedule option value")
869    }
870}
871
872impl ImpliedValue for ClusterAlterOptionValue<Aug> {
873    fn implied_value() -> Result<Self, PlanError> {
874        sql_bail!("must provide a value")
875    }
876}
877
878impl TryFromValue<WithOptionValue<Aug>> for ClusterScheduleOptionValue {
879    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
880        if let WithOptionValue::ClusterScheduleOptionValue(r) = v {
881            Ok(r)
882        } else {
883            sql_bail!("cannot use value `{}` for a cluster schedule", v)
884        }
885    }
886
887    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
888        Some(WithOptionValue::ClusterScheduleOptionValue(self))
889    }
890
891    fn name() -> String {
892        "cluster schedule option value".to_string()
893    }
894}
895
896impl<V: ImpliedValue> ImpliedValue for BTreeMap<String, V> {
897    fn implied_value() -> Result<Self, PlanError> {
898        sql_bail!("must provide a map of key-value pairs")
899    }
900}
901
902impl<V: TryFromValue<WithOptionValue<Aug>>> TryFromValue<WithOptionValue<Aug>>
903    for BTreeMap<String, V>
904{
905    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
906        match v {
907            WithOptionValue::Map(a) => a
908                .into_iter()
909                .map(|(k, v)| Ok((k, V::try_from_value(v)?)))
910                .collect(),
911            _ => sql_bail!("cannot use value as map"),
912        }
913    }
914
915    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
916        Some(WithOptionValue::Map(
917            self.into_iter()
918                .map(|(k, v)| {
919                    let v = v.try_into_value(catalog);
920                    v.map(|v| (k, v))
921                })
922                .collect::<Option<_>>()?,
923        ))
924    }
925
926    fn name() -> String {
927        format!("map of string to {}", V::name())
928    }
929}
930
931impl TryFromValue<WithOptionValue<Aug>> for ClusterAlterOptionValue<Aug> {
932    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
933        if let WithOptionValue::ClusterAlterStrategy(r) = v {
934            Ok(r)
935        } else {
936            sql_bail!("cannot use value `{}` for a cluster alter strategy", v)
937        }
938    }
939
940    fn name() -> String {
941        "cluster alter strategyoption value".to_string()
942    }
943
944    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
945        Some(WithOptionValue::ClusterAlterStrategy(self))
946    }
947}
948
949impl TryFromValue<WithOptionValue<Aug>> for Vec<NetworkPolicyRuleDefinition<Aug>> {
950    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
951        match v {
952            WithOptionValue::NetworkPolicyRules(rules) => Ok(rules),
953            _ => sql_bail!("cannot use value as cluster replicas"),
954        }
955    }
956
957    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
958        Some(WithOptionValue::NetworkPolicyRules(self))
959    }
960
961    fn name() -> String {
962        "network policy rules".to_string()
963    }
964}
965
966impl ImpliedValue for Vec<NetworkPolicyRuleDefinition<Aug>> {
967    fn implied_value() -> Result<Self, PlanError> {
968        sql_bail!("must provide a set of network policy rules")
969    }
970}