mz_sql/plan/
with_options.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Provides tooling to handle `WITH` options.
11
12use std::collections::BTreeMap;
13use std::time::Duration;
14
15use mz_repr::adt::interval::Interval;
16use mz_repr::bytes::ByteSize;
17use mz_repr::{CatalogItemId, RelationVersionSelector, strconv};
18use mz_sql_parser::ast::{
19    ClusterAlterOptionValue, ClusterScheduleOptionValue, ConnectionDefaultAwsPrivatelink, Expr,
20    Ident, KafkaBroker, NetworkPolicyRuleDefinition, RefreshOptionValue, ReplicaDefinition,
21};
22use mz_storage_types::connections::IcebergCatalogType;
23use mz_storage_types::connections::string_or_secret::StringOrSecret;
24use serde::{Deserialize, Serialize};
25
26use crate::ast::{AstInfo, UnresolvedItemName, Value, WithOptionValue};
27use crate::catalog::SessionCatalog;
28use crate::names::{ResolvedDataType, ResolvedItemName};
29use crate::plan::{Aug, PlanError, literal};
30
31pub trait TryFromValue<T>: Sized {
32    fn try_from_value(v: T) -> Result<Self, PlanError>;
33
34    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<T>;
35
36    fn name() -> String;
37}
38
39pub trait ImpliedValue: Sized {
40    fn implied_value() -> Result<Self, PlanError>;
41}
42
43impl TryFromValue<WithOptionValue<Aug>> for IcebergCatalogType {
44    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
45        match String::try_from_value(v)? {
46            s if s.eq_ignore_ascii_case("rest") => Ok(IcebergCatalogType::Rest),
47            s if s.eq_ignore_ascii_case("s3tablesrest") => Ok(IcebergCatalogType::S3TablesRest),
48            _ => sql_bail!("invalid iceberg catalog type"),
49        }
50    }
51
52    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
53        Some(WithOptionValue::Value(Value::String(match self {
54            IcebergCatalogType::Rest => "rest".to_string(),
55            IcebergCatalogType::S3TablesRest => "s3tablesrest".to_string(),
56        })))
57    }
58
59    fn name() -> String {
60        "iceberg catalog type".to_string()
61    }
62}
63
64impl ImpliedValue for IcebergCatalogType {
65    fn implied_value() -> Result<Self, PlanError> {
66        sql_bail!("must provide an iceberg catalog type")
67    }
68}
69
70#[derive(Copy, Clone, Debug)]
71pub struct Secret(CatalogItemId);
72
73impl From<Secret> for CatalogItemId {
74    fn from(secret: Secret) -> Self {
75        secret.0
76    }
77}
78
79impl TryFromValue<WithOptionValue<Aug>> for Secret {
80    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
81        match StringOrSecret::try_from_value(v)? {
82            StringOrSecret::Secret(id) => Ok(Secret(id)),
83            _ => sql_bail!("must provide a secret value"),
84        }
85    }
86
87    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
88        let secret = catalog.get_item(&self.0);
89        let name = ResolvedItemName::Item {
90            id: self.0,
91            qualifiers: secret.name().qualifiers.clone(),
92            full_name: catalog.resolve_full_name(secret.name()),
93            print_id: false,
94            version: RelationVersionSelector::Latest,
95        };
96        Some(WithOptionValue::Secret(name))
97    }
98
99    fn name() -> String {
100        "secret".to_string()
101    }
102}
103
104impl ImpliedValue for Secret {
105    fn implied_value() -> Result<Self, PlanError> {
106        sql_bail!("must provide a secret value")
107    }
108}
109
110#[derive(Copy, Clone, Debug)]
111pub struct Object(CatalogItemId);
112
113impl From<Object> for CatalogItemId {
114    fn from(obj: Object) -> Self {
115        obj.0
116    }
117}
118
119impl From<&Object> for CatalogItemId {
120    fn from(obj: &Object) -> Self {
121        obj.0
122    }
123}
124
125impl TryFromValue<WithOptionValue<Aug>> for Object {
126    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
127        Ok(match v {
128            WithOptionValue::Item(ResolvedItemName::Item { id, .. }) => Object(id),
129            _ => sql_bail!("must provide an object"),
130        })
131    }
132
133    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
134        let item = catalog.get_item(&self.0);
135        let name = ResolvedItemName::Item {
136            id: self.0,
137            qualifiers: item.name().qualifiers.clone(),
138            full_name: catalog.resolve_full_name(item.name()),
139            print_id: false,
140            // TODO(alter_table): Evaluate if this is correct.
141            version: RelationVersionSelector::Latest,
142        };
143        Some(WithOptionValue::Item(name))
144    }
145
146    fn name() -> String {
147        "object reference".to_string()
148    }
149}
150
151impl ImpliedValue for Object {
152    fn implied_value() -> Result<Self, PlanError> {
153        sql_bail!("must provide an object")
154    }
155}
156
157impl TryFromValue<WithOptionValue<Aug>> for Ident {
158    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
159        Ok(match v {
160            WithOptionValue::UnresolvedItemName(UnresolvedItemName(mut inner))
161                if inner.len() == 1 =>
162            {
163                inner.remove(0)
164            }
165            WithOptionValue::Ident(inner) => inner,
166            _ => sql_bail!("must provide an unqualified identifier"),
167        })
168    }
169
170    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
171        Some(WithOptionValue::Ident(self))
172    }
173
174    fn name() -> String {
175        "identifier".to_string()
176    }
177}
178
179impl ImpliedValue for Ident {
180    fn implied_value() -> Result<Self, PlanError> {
181        sql_bail!("must provide an identifier")
182    }
183}
184
185impl TryFromValue<WithOptionValue<Aug>> for Expr<Aug> {
186    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
187        Ok(match v {
188            WithOptionValue::Expr(e) => e,
189            _ => sql_bail!("must provide an expr"),
190        })
191    }
192
193    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
194        Some(WithOptionValue::Expr(self))
195    }
196
197    fn name() -> String {
198        "expression".to_string()
199    }
200}
201
202impl ImpliedValue for Expr<Aug> {
203    fn implied_value() -> Result<Self, PlanError> {
204        sql_bail!("must provide an expression")
205    }
206}
207
208impl TryFromValue<WithOptionValue<Aug>> for UnresolvedItemName {
209    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
210        Ok(match v {
211            WithOptionValue::UnresolvedItemName(name) => name,
212            WithOptionValue::Ident(inner) => UnresolvedItemName(vec![inner]),
213            _ => sql_bail!("must provide an object name"),
214        })
215    }
216
217    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
218        Some(WithOptionValue::UnresolvedItemName(self))
219    }
220
221    fn name() -> String {
222        "object name".to_string()
223    }
224}
225
226impl ImpliedValue for UnresolvedItemName {
227    fn implied_value() -> Result<Self, PlanError> {
228        sql_bail!("must provide an object name")
229    }
230}
231
232impl TryFromValue<WithOptionValue<Aug>> for ResolvedDataType {
233    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
234        Ok(match v {
235            WithOptionValue::DataType(ty) => ty,
236            _ => sql_bail!("must provide a data type"),
237        })
238    }
239
240    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
241        Some(WithOptionValue::DataType(self))
242    }
243
244    fn name() -> String {
245        "data type".to_string()
246    }
247}
248
249impl ImpliedValue for ResolvedDataType {
250    fn implied_value() -> Result<Self, PlanError> {
251        sql_bail!("must provide a data type")
252    }
253}
254
255impl TryFromValue<WithOptionValue<Aug>> for StringOrSecret {
256    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
257        Ok(match v {
258            WithOptionValue::Secret(ResolvedItemName::Item { id, .. }) => {
259                StringOrSecret::Secret(id)
260            }
261            v => StringOrSecret::String(String::try_from_value(v)?),
262        })
263    }
264
265    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
266        Some(match self {
267            StringOrSecret::Secret(secret) => Secret(secret).try_into_value(catalog)?,
268            StringOrSecret::String(s) => s.try_into_value(catalog)?,
269        })
270    }
271
272    fn name() -> String {
273        "string or secret".to_string()
274    }
275}
276
277impl ImpliedValue for StringOrSecret {
278    fn implied_value() -> Result<Self, PlanError> {
279        sql_bail!("must provide a string or secret value")
280    }
281}
282
283impl TryFromValue<Value> for Duration {
284    fn try_from_value(v: Value) -> Result<Self, PlanError> {
285        let interval = Interval::try_from_value(v)?;
286        Ok(interval.duration()?)
287    }
288
289    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Value> {
290        let interval = Interval::from_duration(&self)
291            .expect("planning ensured that this is convertible back to Interval");
292        interval.try_into_value(catalog)
293    }
294
295    fn name() -> String {
296        "interval".to_string()
297    }
298}
299
300impl ImpliedValue for Duration {
301    fn implied_value() -> Result<Self, PlanError> {
302        sql_bail!("must provide an interval value")
303    }
304}
305
306impl TryFromValue<Value> for ByteSize {
307    fn try_from_value(v: Value) -> Result<Self, PlanError> {
308        match v {
309            Value::Number(value) | Value::String(value) => Ok(value
310                .parse::<ByteSize>()
311                .map_err(|e| sql_err!("invalid bytes value: {e}"))?),
312            _ => sql_bail!("cannot use value as bytes"),
313        }
314    }
315
316    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
317        Some(Value::String(self.to_string()))
318    }
319
320    fn name() -> String {
321        "bytes".to_string()
322    }
323}
324
325impl ImpliedValue for ByteSize {
326    fn implied_value() -> Result<Self, PlanError> {
327        sql_bail!("must provide a value for bytes")
328    }
329}
330
331impl TryFromValue<Value> for Interval {
332    fn try_from_value(v: Value) -> Result<Self, PlanError> {
333        match v {
334            Value::Interval(value) => literal::plan_interval(&value),
335            Value::Number(value) | Value::String(value) => Ok(strconv::parse_interval(&value)?),
336            _ => sql_bail!("cannot use value as interval"),
337        }
338    }
339
340    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
341        let interval_value = literal::unplan_interval(&self);
342        Some(Value::Interval(interval_value))
343    }
344
345    fn name() -> String {
346        "interval".to_string()
347    }
348}
349
350impl ImpliedValue for Interval {
351    fn implied_value() -> Result<Self, PlanError> {
352        sql_bail!("must provide an interval value")
353    }
354}
355
356#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
357pub struct OptionalString(pub Option<String>);
358
359impl TryFromValue<Value> for OptionalString {
360    fn try_from_value(v: Value) -> Result<Self, PlanError> {
361        Ok(match v {
362            Value::Null => Self(None),
363            v => Self(Some(String::try_from_value(v)?)),
364        })
365    }
366
367    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Value> {
368        Some(match self.0 {
369            None => Value::Null,
370            Some(s) => s.try_into_value(catalog)?,
371        })
372    }
373
374    fn name() -> String {
375        "optional string".to_string()
376    }
377}
378
379impl ImpliedValue for OptionalString {
380    fn implied_value() -> Result<Self, PlanError> {
381        sql_bail!("must provide a string value")
382    }
383}
384
385#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Hash, Deserialize)]
386pub struct OptionalDuration(pub Option<Duration>);
387
388impl From<Duration> for OptionalDuration {
389    fn from(i: Duration) -> OptionalDuration {
390        // An interval of 0 disables the setting.
391        let inner = if i == Duration::ZERO { None } else { Some(i) };
392        OptionalDuration(inner)
393    }
394}
395
396impl TryFromValue<Value> for OptionalDuration {
397    fn try_from_value(v: Value) -> Result<Self, PlanError> {
398        Ok(match v {
399            Value::Null => OptionalDuration(None),
400            v => Duration::try_from_value(v)?.into(),
401        })
402    }
403
404    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Value> {
405        Some(match self.0 {
406            None => Value::Null,
407            Some(duration) => duration.try_into_value(catalog)?,
408        })
409    }
410
411    fn name() -> String {
412        "optional interval".to_string()
413    }
414}
415
416impl ImpliedValue for OptionalDuration {
417    fn implied_value() -> Result<Self, PlanError> {
418        sql_bail!("must provide an interval value")
419    }
420}
421
422impl TryFromValue<Value> for String {
423    fn try_from_value(v: Value) -> Result<Self, PlanError> {
424        match v {
425            Value::String(v) => Ok(v),
426            _ => sql_bail!("cannot use value as string"),
427        }
428    }
429
430    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
431        Some(Value::String(self))
432    }
433
434    fn name() -> String {
435        "text".to_string()
436    }
437}
438
439impl ImpliedValue for String {
440    fn implied_value() -> Result<Self, PlanError> {
441        sql_bail!("must provide a string value")
442    }
443}
444
445impl TryFromValue<Value> for bool {
446    fn try_from_value(v: Value) -> Result<Self, PlanError> {
447        match v {
448            Value::Boolean(v) => Ok(v),
449            _ => sql_bail!("cannot use value as boolean"),
450        }
451    }
452
453    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
454        Some(Value::Boolean(self))
455    }
456
457    fn name() -> String {
458        "bool".to_string()
459    }
460}
461
462impl ImpliedValue for bool {
463    fn implied_value() -> Result<Self, PlanError> {
464        Ok(true)
465    }
466}
467
468impl TryFromValue<Value> for f64 {
469    fn try_from_value(v: Value) -> Result<Self, PlanError> {
470        match v {
471            Value::Number(v) => v
472                .parse::<f64>()
473                .map_err(|e| sql_err!("invalid numeric value: {e}")),
474            _ => sql_bail!("cannot use value as number"),
475        }
476    }
477
478    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
479        Some(Value::Number(self.to_string()))
480    }
481
482    fn name() -> String {
483        "float8".to_string()
484    }
485}
486
487impl ImpliedValue for f64 {
488    fn implied_value() -> Result<Self, PlanError> {
489        sql_bail!("must provide a float value")
490    }
491}
492
493impl TryFromValue<Value> for i32 {
494    fn try_from_value(v: Value) -> Result<Self, PlanError> {
495        match v {
496            Value::Number(v) => v
497                .parse::<i32>()
498                .map_err(|e| sql_err!("invalid numeric value: {e}")),
499            _ => sql_bail!("cannot use value as number"),
500        }
501    }
502
503    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
504        Some(Value::Number(self.to_string()))
505    }
506
507    fn name() -> String {
508        "int".to_string()
509    }
510}
511
512impl ImpliedValue for i32 {
513    fn implied_value() -> Result<Self, PlanError> {
514        sql_bail!("must provide an integer value")
515    }
516}
517
518impl TryFromValue<Value> for i64 {
519    fn try_from_value(v: Value) -> Result<Self, PlanError> {
520        match v {
521            Value::Number(v) => v
522                .parse::<i64>()
523                .map_err(|e| sql_err!("invalid numeric value: {e}")),
524            _ => sql_bail!("cannot use value as number"),
525        }
526    }
527    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
528        Some(Value::Number(self.to_string()))
529    }
530    fn name() -> String {
531        "int8".to_string()
532    }
533}
534
535impl ImpliedValue for i64 {
536    fn implied_value() -> Result<Self, PlanError> {
537        sql_bail!("must provide an integer value")
538    }
539}
540
541impl TryFromValue<Value> for u16 {
542    fn try_from_value(v: Value) -> Result<Self, PlanError> {
543        match v {
544            Value::Number(v) => v
545                .parse::<u16>()
546                .map_err(|e| sql_err!("invalid numeric value: {e}")),
547            _ => sql_bail!("cannot use value as number"),
548        }
549    }
550    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
551        Some(Value::Number(self.to_string()))
552    }
553    fn name() -> String {
554        "uint2".to_string()
555    }
556}
557
558impl ImpliedValue for u16 {
559    fn implied_value() -> Result<Self, PlanError> {
560        sql_bail!("must provide an integer value")
561    }
562}
563
564impl TryFromValue<Value> for u32 {
565    fn try_from_value(v: Value) -> Result<Self, PlanError> {
566        match v {
567            Value::Number(v) => v
568                .parse::<u32>()
569                .map_err(|e| sql_err!("invalid numeric value: {e}")),
570            _ => sql_bail!("cannot use value as number"),
571        }
572    }
573    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
574        Some(Value::Number(self.to_string()))
575    }
576    fn name() -> String {
577        "uint4".to_string()
578    }
579}
580
581impl ImpliedValue for u32 {
582    fn implied_value() -> Result<Self, PlanError> {
583        sql_bail!("must provide an integer value")
584    }
585}
586
587impl TryFromValue<Value> for u64 {
588    fn try_from_value(v: Value) -> Result<Self, PlanError> {
589        match v {
590            Value::Number(v) => v
591                .parse::<u64>()
592                .map_err(|e| sql_err!("invalid unsigned numeric value: {e}")),
593            _ => sql_bail!("cannot use value as number"),
594        }
595    }
596    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
597        Some(Value::Number(self.to_string()))
598    }
599    fn name() -> String {
600        "uint8".to_string()
601    }
602}
603
604impl ImpliedValue for u64 {
605    fn implied_value() -> Result<Self, PlanError> {
606        sql_bail!("must provide an unsigned integer value")
607    }
608}
609
610impl<V: TryFromValue<WithOptionValue<Aug>>> TryFromValue<WithOptionValue<Aug>> for Vec<V> {
611    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
612        match v {
613            WithOptionValue::Sequence(a) => {
614                let mut out = Vec::with_capacity(a.len());
615                for i in a {
616                    out.push(
617                        V::try_from_value(i)
618                            .map_err(|_| anyhow::anyhow!("cannot use value in array"))?,
619                    )
620                }
621                Ok(out)
622            }
623            _ => sql_bail!("cannot use value as array"),
624        }
625    }
626
627    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
628        Some(WithOptionValue::Sequence(
629            self.into_iter()
630                .map(|v| v.try_into_value(catalog))
631                .collect::<Option<_>>()?,
632        ))
633    }
634
635    fn name() -> String {
636        format!("array of {}", V::name())
637    }
638}
639
640impl<V: ImpliedValue> ImpliedValue for Vec<V> {
641    fn implied_value() -> Result<Self, PlanError> {
642        sql_bail!("must provide an array value")
643    }
644}
645
646impl<T: AstInfo, V: TryFromValue<WithOptionValue<T>>> TryFromValue<WithOptionValue<T>>
647    for Option<V>
648{
649    fn try_from_value(v: WithOptionValue<T>) -> Result<Self, PlanError> {
650        Ok(Some(V::try_from_value(v)?))
651    }
652
653    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<T>> {
654        match self {
655            Some(v) => v.try_into_value(catalog),
656            None => None,
657        }
658    }
659
660    fn name() -> String {
661        format!("optional {}", V::name())
662    }
663}
664
665impl<V: ImpliedValue> ImpliedValue for Option<V> {
666    fn implied_value() -> Result<Self, PlanError> {
667        Ok(Some(V::implied_value()?))
668    }
669}
670
671impl<V: TryFromValue<Value>, T: AstInfo + std::fmt::Debug> TryFromValue<WithOptionValue<T>> for V {
672    fn try_from_value(v: WithOptionValue<T>) -> Result<Self, PlanError> {
673        match v {
674            WithOptionValue::Value(v) => V::try_from_value(v),
675            WithOptionValue::UnresolvedItemName(UnresolvedItemName(mut inner))
676                if inner.len() == 1 =>
677            {
678                V::try_from_value(Value::String(inner.remove(0).into_string()))
679            }
680            WithOptionValue::Ident(v) => V::try_from_value(Value::String(v.into_string())),
681            WithOptionValue::RetainHistoryFor(v) => V::try_from_value(v),
682            WithOptionValue::Sequence(_)
683            | WithOptionValue::Map(_)
684            | WithOptionValue::Item(_)
685            | WithOptionValue::UnresolvedItemName(_)
686            | WithOptionValue::Secret(_)
687            | WithOptionValue::DataType(_)
688            | WithOptionValue::Expr(_)
689            | WithOptionValue::ClusterReplicas(_)
690            | WithOptionValue::ConnectionKafkaBroker(_)
691            | WithOptionValue::ConnectionAwsPrivatelink(_)
692            | WithOptionValue::ClusterAlterStrategy(_)
693            | WithOptionValue::Refresh(_)
694            | WithOptionValue::ClusterScheduleOptionValue(_)
695            | WithOptionValue::NetworkPolicyRules(_) => sql_bail!(
696                "incompatible value types: cannot convert {} to {}",
697                match v {
698                    // The first few are unreachable because they are handled at the top of the outer match.
699                    WithOptionValue::Value(_) => unreachable!(),
700                    WithOptionValue::RetainHistoryFor(_) => unreachable!(),
701                    WithOptionValue::ClusterAlterStrategy(_) => "cluster alter strategy",
702                    WithOptionValue::Sequence(_) => "sequences",
703                    WithOptionValue::Map(_) => "maps",
704                    WithOptionValue::Item(_) => "object references",
705                    WithOptionValue::UnresolvedItemName(_) => "object names",
706                    WithOptionValue::Ident(_) => "identifiers",
707                    WithOptionValue::Secret(_) => "secrets",
708                    WithOptionValue::DataType(_) => "data types",
709                    WithOptionValue::Expr(_) => "exprs",
710                    WithOptionValue::ClusterReplicas(_) => "cluster replicas",
711                    WithOptionValue::ConnectionKafkaBroker(_) => "connection kafka brokers",
712                    WithOptionValue::ConnectionAwsPrivatelink(_) => "connection kafka brokers",
713                    WithOptionValue::Refresh(_) => "refresh option values",
714                    WithOptionValue::ClusterScheduleOptionValue(_) => "cluster schedule",
715                    WithOptionValue::NetworkPolicyRules(_) => "network policy rules",
716                },
717                V::name()
718            ),
719        }
720    }
721
722    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<T>> {
723        Some(WithOptionValue::Value(self.try_into_value(catalog)?))
724    }
725
726    fn name() -> String {
727        V::name()
728    }
729}
730
731impl<T, V: TryFromValue<T> + ImpliedValue> TryFromValue<Option<T>> for V {
732    fn try_from_value(v: Option<T>) -> Result<Self, PlanError> {
733        match v {
734            Some(v) => V::try_from_value(v),
735            None => V::implied_value(),
736        }
737    }
738
739    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Option<T>> {
740        Some(Some(self.try_into_value(catalog)?))
741    }
742
743    fn name() -> String {
744        V::name()
745    }
746}
747
748impl TryFromValue<WithOptionValue<Aug>> for Vec<ReplicaDefinition<Aug>> {
749    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
750        match v {
751            WithOptionValue::ClusterReplicas(replicas) => Ok(replicas),
752            _ => sql_bail!("cannot use value as cluster replicas"),
753        }
754    }
755
756    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
757        Some(WithOptionValue::ClusterReplicas(self))
758    }
759
760    fn name() -> String {
761        "cluster replicas".to_string()
762    }
763}
764
765impl ImpliedValue for Vec<ReplicaDefinition<Aug>> {
766    fn implied_value() -> Result<Self, PlanError> {
767        sql_bail!("must provide a set of cluster replicas")
768    }
769}
770
771impl TryFromValue<WithOptionValue<Aug>> for Vec<KafkaBroker<Aug>> {
772    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
773        let mut out = vec![];
774        match v {
775            WithOptionValue::ConnectionKafkaBroker(broker) => {
776                out.push(broker);
777            }
778            WithOptionValue::Sequence(values) => {
779                for value in values {
780                    out.extend(Self::try_from_value(value)?);
781                }
782            }
783            _ => sql_bail!("cannot use value as a kafka broker"),
784        }
785        Ok(out)
786    }
787
788    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
789        Some(WithOptionValue::Sequence(
790            self.into_iter()
791                .map(WithOptionValue::ConnectionKafkaBroker)
792                .collect(),
793        ))
794    }
795
796    fn name() -> String {
797        "kafka broker".to_string()
798    }
799}
800
801impl ImpliedValue for Vec<KafkaBroker<Aug>> {
802    fn implied_value() -> Result<Self, PlanError> {
803        sql_bail!("must provide a kafka broker")
804    }
805}
806
807impl TryFromValue<WithOptionValue<Aug>> for RefreshOptionValue<Aug> {
808    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
809        if let WithOptionValue::Refresh(r) = v {
810            Ok(r)
811        } else {
812            sql_bail!("cannot use value `{}` for a refresh option", v)
813        }
814    }
815
816    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
817        Some(WithOptionValue::Refresh(self))
818    }
819
820    fn name() -> String {
821        "refresh option value".to_string()
822    }
823}
824
825impl ImpliedValue for RefreshOptionValue<Aug> {
826    fn implied_value() -> Result<Self, PlanError> {
827        sql_bail!("must provide a refresh option value")
828    }
829}
830
831impl TryFromValue<WithOptionValue<Aug>> for ConnectionDefaultAwsPrivatelink<Aug> {
832    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
833        if let WithOptionValue::ConnectionAwsPrivatelink(r) = v {
834            Ok(r)
835        } else {
836            sql_bail!("cannot use value `{}` for a privatelink", v)
837        }
838    }
839
840    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
841        Some(WithOptionValue::ConnectionAwsPrivatelink(self))
842    }
843
844    fn name() -> String {
845        "privatelink option value".to_string()
846    }
847}
848
849impl ImpliedValue for ConnectionDefaultAwsPrivatelink<Aug> {
850    fn implied_value() -> Result<Self, PlanError> {
851        sql_bail!("must provide a value")
852    }
853}
854
855impl ImpliedValue for ClusterScheduleOptionValue {
856    fn implied_value() -> Result<Self, PlanError> {
857        sql_bail!("must provide a cluster schedule option value")
858    }
859}
860
861impl ImpliedValue for ClusterAlterOptionValue<Aug> {
862    fn implied_value() -> Result<Self, PlanError> {
863        sql_bail!("must provide a value")
864    }
865}
866
867impl TryFromValue<WithOptionValue<Aug>> for ClusterScheduleOptionValue {
868    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
869        if let WithOptionValue::ClusterScheduleOptionValue(r) = v {
870            Ok(r)
871        } else {
872            sql_bail!("cannot use value `{}` for a cluster schedule", v)
873        }
874    }
875
876    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
877        Some(WithOptionValue::ClusterScheduleOptionValue(self))
878    }
879
880    fn name() -> String {
881        "cluster schedule option value".to_string()
882    }
883}
884
885impl<V: ImpliedValue> ImpliedValue for BTreeMap<String, V> {
886    fn implied_value() -> Result<Self, PlanError> {
887        sql_bail!("must provide a map of key-value pairs")
888    }
889}
890
891impl<V: TryFromValue<WithOptionValue<Aug>>> TryFromValue<WithOptionValue<Aug>>
892    for BTreeMap<String, V>
893{
894    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
895        match v {
896            WithOptionValue::Map(a) => a
897                .into_iter()
898                .map(|(k, v)| Ok((k, V::try_from_value(v)?)))
899                .collect(),
900            _ => sql_bail!("cannot use value as map"),
901        }
902    }
903
904    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
905        Some(WithOptionValue::Map(
906            self.into_iter()
907                .map(|(k, v)| {
908                    let v = v.try_into_value(catalog);
909                    v.map(|v| (k, v))
910                })
911                .collect::<Option<_>>()?,
912        ))
913    }
914
915    fn name() -> String {
916        format!("map of string to {}", V::name())
917    }
918}
919
920impl TryFromValue<WithOptionValue<Aug>> for ClusterAlterOptionValue<Aug> {
921    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
922        if let WithOptionValue::ClusterAlterStrategy(r) = v {
923            Ok(r)
924        } else {
925            sql_bail!("cannot use value `{}` for a cluster alter strategy", v)
926        }
927    }
928
929    fn name() -> String {
930        "cluster alter strategyoption value".to_string()
931    }
932
933    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
934        Some(WithOptionValue::ClusterAlterStrategy(self))
935    }
936}
937
938impl TryFromValue<WithOptionValue<Aug>> for Vec<NetworkPolicyRuleDefinition<Aug>> {
939    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
940        match v {
941            WithOptionValue::NetworkPolicyRules(rules) => Ok(rules),
942            _ => sql_bail!("cannot use value as cluster replicas"),
943        }
944    }
945
946    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
947        Some(WithOptionValue::NetworkPolicyRules(self))
948    }
949
950    fn name() -> String {
951        "network policy rules".to_string()
952    }
953}
954
955impl ImpliedValue for Vec<NetworkPolicyRuleDefinition<Aug>> {
956    fn implied_value() -> Result<Self, PlanError> {
957        sql_bail!("must provide a set of network policy rules")
958    }
959}