Skip to main content

mz_sql/plan/
with_options.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Provides tooling to handle `WITH` options.
11
12use std::collections::BTreeMap;
13use std::time::Duration;
14
15use mz_repr::adt::interval::Interval;
16use mz_repr::bytes::ByteSize;
17use mz_repr::{CatalogItemId, RelationVersionSelector, strconv};
18use mz_sql_parser::ast::{
19    ClusterAlterOptionValue, ClusterScheduleOptionValue, ConnectionDefaultAwsPrivatelink, Expr,
20    Ident, KafkaBroker, KafkaMatchingBrokerRule, NetworkPolicyRuleDefinition, RefreshOptionValue,
21    ReplicaDefinition,
22};
23use mz_storage_types::connections::IcebergCatalogType;
24use mz_storage_types::connections::string_or_secret::StringOrSecret;
25use serde::{Deserialize, Serialize};
26
27use crate::ast::{AstInfo, UnresolvedItemName, Value, WithOptionValue};
28use crate::catalog::SessionCatalog;
29use crate::names::{ResolvedDataType, ResolvedItemName};
30use crate::plan::{Aug, PlanError, literal};
31
32pub trait TryFromValue<T>: Sized {
33    fn try_from_value(v: T) -> Result<Self, PlanError>;
34
35    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<T>;
36
37    fn name() -> String;
38}
39
40pub trait ImpliedValue: Sized {
41    fn implied_value() -> Result<Self, PlanError>;
42}
43
44impl TryFromValue<WithOptionValue<Aug>> for IcebergCatalogType {
45    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
46        match String::try_from_value(v)? {
47            s if s.eq_ignore_ascii_case("rest") => Ok(IcebergCatalogType::Rest),
48            s if s.eq_ignore_ascii_case("s3tablesrest") => Ok(IcebergCatalogType::S3TablesRest),
49            _ => sql_bail!("invalid iceberg catalog type"),
50        }
51    }
52
53    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
54        Some(WithOptionValue::Value(Value::String(match self {
55            IcebergCatalogType::Rest => "rest".to_string(),
56            IcebergCatalogType::S3TablesRest => "s3tablesrest".to_string(),
57        })))
58    }
59
60    fn name() -> String {
61        "iceberg catalog type".to_string()
62    }
63}
64
65impl ImpliedValue for IcebergCatalogType {
66    fn implied_value() -> Result<Self, PlanError> {
67        sql_bail!("must provide an iceberg catalog type")
68    }
69}
70
71#[derive(Copy, Clone, Debug)]
72pub struct Secret(CatalogItemId);
73
74impl From<Secret> for CatalogItemId {
75    fn from(secret: Secret) -> Self {
76        secret.0
77    }
78}
79
80impl TryFromValue<WithOptionValue<Aug>> for Secret {
81    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
82        match StringOrSecret::try_from_value(v)? {
83            StringOrSecret::Secret(id) => Ok(Secret(id)),
84            StringOrSecret::String(_) => sql_bail!("must provide a secret value"),
85        }
86    }
87
88    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
89        let secret = catalog.get_item(&self.0);
90        let name = ResolvedItemName::Item {
91            id: self.0,
92            qualifiers: secret.name().qualifiers.clone(),
93            full_name: catalog.resolve_full_name(secret.name()),
94            print_id: false,
95            version: RelationVersionSelector::Latest,
96        };
97        Some(WithOptionValue::Secret(name))
98    }
99
100    fn name() -> String {
101        "secret".to_string()
102    }
103}
104
105impl ImpliedValue for Secret {
106    fn implied_value() -> Result<Self, PlanError> {
107        sql_bail!("must provide a secret value")
108    }
109}
110
111#[derive(Copy, Clone, Debug)]
112pub struct Object(CatalogItemId);
113
114impl From<Object> for CatalogItemId {
115    fn from(obj: Object) -> Self {
116        obj.0
117    }
118}
119
120impl From<&Object> for CatalogItemId {
121    fn from(obj: &Object) -> Self {
122        obj.0
123    }
124}
125
126impl TryFromValue<WithOptionValue<Aug>> for Object {
127    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
128        Ok(match v {
129            WithOptionValue::Item(ResolvedItemName::Item { id, .. }) => Object(id),
130            _ => sql_bail!("must provide an object"),
131        })
132    }
133
134    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
135        let item = catalog.get_item(&self.0);
136        let name = ResolvedItemName::Item {
137            id: self.0,
138            qualifiers: item.name().qualifiers.clone(),
139            full_name: catalog.resolve_full_name(item.name()),
140            print_id: false,
141            // TODO(alter_table): Evaluate if this is correct.
142            version: RelationVersionSelector::Latest,
143        };
144        Some(WithOptionValue::Item(name))
145    }
146
147    fn name() -> String {
148        "object reference".to_string()
149    }
150}
151
152impl ImpliedValue for Object {
153    fn implied_value() -> Result<Self, PlanError> {
154        sql_bail!("must provide an object")
155    }
156}
157
158impl TryFromValue<WithOptionValue<Aug>> for Ident {
159    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
160        Ok(match v {
161            WithOptionValue::UnresolvedItemName(UnresolvedItemName(mut inner))
162                if inner.len() == 1 =>
163            {
164                inner.remove(0)
165            }
166            WithOptionValue::Ident(inner) => inner,
167            _ => sql_bail!("must provide an unqualified identifier"),
168        })
169    }
170
171    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
172        Some(WithOptionValue::Ident(self))
173    }
174
175    fn name() -> String {
176        "identifier".to_string()
177    }
178}
179
180impl ImpliedValue for Ident {
181    fn implied_value() -> Result<Self, PlanError> {
182        sql_bail!("must provide an identifier")
183    }
184}
185
186impl TryFromValue<WithOptionValue<Aug>> for Expr<Aug> {
187    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
188        Ok(match v {
189            WithOptionValue::Expr(e) => e,
190            _ => sql_bail!("must provide an expr"),
191        })
192    }
193
194    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
195        Some(WithOptionValue::Expr(self))
196    }
197
198    fn name() -> String {
199        "expression".to_string()
200    }
201}
202
203impl ImpliedValue for Expr<Aug> {
204    fn implied_value() -> Result<Self, PlanError> {
205        sql_bail!("must provide an expression")
206    }
207}
208
209impl TryFromValue<WithOptionValue<Aug>> for UnresolvedItemName {
210    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
211        Ok(match v {
212            WithOptionValue::UnresolvedItemName(name) => name,
213            WithOptionValue::Ident(inner) => UnresolvedItemName(vec![inner]),
214            _ => sql_bail!("must provide an object name"),
215        })
216    }
217
218    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
219        Some(WithOptionValue::UnresolvedItemName(self))
220    }
221
222    fn name() -> String {
223        "object name".to_string()
224    }
225}
226
227impl ImpliedValue for UnresolvedItemName {
228    fn implied_value() -> Result<Self, PlanError> {
229        sql_bail!("must provide an object name")
230    }
231}
232
233impl TryFromValue<WithOptionValue<Aug>> for ResolvedDataType {
234    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
235        Ok(match v {
236            WithOptionValue::DataType(ty) => ty,
237            _ => sql_bail!("must provide a data type"),
238        })
239    }
240
241    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
242        Some(WithOptionValue::DataType(self))
243    }
244
245    fn name() -> String {
246        "data type".to_string()
247    }
248}
249
250impl ImpliedValue for ResolvedDataType {
251    fn implied_value() -> Result<Self, PlanError> {
252        sql_bail!("must provide a data type")
253    }
254}
255
256impl TryFromValue<WithOptionValue<Aug>> for StringOrSecret {
257    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
258        Ok(match v {
259            WithOptionValue::Secret(ResolvedItemName::Item { id, .. }) => {
260                StringOrSecret::Secret(id)
261            }
262            v => StringOrSecret::String(String::try_from_value(v)?),
263        })
264    }
265
266    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
267        Some(match self {
268            StringOrSecret::Secret(secret) => Secret(secret).try_into_value(catalog)?,
269            StringOrSecret::String(s) => s.try_into_value(catalog)?,
270        })
271    }
272
273    fn name() -> String {
274        "string or secret".to_string()
275    }
276}
277
278impl ImpliedValue for StringOrSecret {
279    fn implied_value() -> Result<Self, PlanError> {
280        sql_bail!("must provide a string or secret value")
281    }
282}
283
284impl TryFromValue<Value> for Duration {
285    fn try_from_value(v: Value) -> Result<Self, PlanError> {
286        let interval = Interval::try_from_value(v)?;
287        let duration = interval.duration()?;
288        // `try_into_value` (used during unplanning) requires that the `Duration`
289        // is convertible back to an `Interval`, which has a smaller range than
290        // `Duration`. Enforce that here so we return a graceful error instead of
291        // panicking later. See database-issues SQL-361.
292        Interval::from_duration(&duration).map_err(|_| sql_err!("interval out of range"))?;
293        Ok(duration)
294    }
295
296    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Value> {
297        let interval = Interval::from_duration(&self)
298            .expect("planning ensured that this is convertible back to Interval");
299        interval.try_into_value(catalog)
300    }
301
302    fn name() -> String {
303        "interval".to_string()
304    }
305}
306
307impl ImpliedValue for Duration {
308    fn implied_value() -> Result<Self, PlanError> {
309        sql_bail!("must provide an interval value")
310    }
311}
312
313impl TryFromValue<Value> for ByteSize {
314    fn try_from_value(v: Value) -> Result<Self, PlanError> {
315        match v {
316            Value::Number(value) | Value::String(value) => Ok(value
317                .parse::<ByteSize>()
318                .map_err(|e| sql_err!("invalid bytes value: {e}"))?),
319            _ => sql_bail!("cannot use value as bytes"),
320        }
321    }
322
323    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
324        Some(Value::String(self.to_string()))
325    }
326
327    fn name() -> String {
328        "bytes".to_string()
329    }
330}
331
332impl ImpliedValue for ByteSize {
333    fn implied_value() -> Result<Self, PlanError> {
334        sql_bail!("must provide a value for bytes")
335    }
336}
337
338impl TryFromValue<Value> for Interval {
339    fn try_from_value(v: Value) -> Result<Self, PlanError> {
340        match v {
341            Value::Interval(value) => literal::plan_interval(&value),
342            Value::Number(value) | Value::String(value) => Ok(strconv::parse_interval(&value)?),
343            _ => sql_bail!("cannot use value as interval"),
344        }
345    }
346
347    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
348        let interval_value = literal::unplan_interval(&self);
349        Some(Value::Interval(interval_value))
350    }
351
352    fn name() -> String {
353        "interval".to_string()
354    }
355}
356
357impl ImpliedValue for Interval {
358    fn implied_value() -> Result<Self, PlanError> {
359        sql_bail!("must provide an interval value")
360    }
361}
362
363#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
364pub struct OptionalString(pub Option<String>);
365
366impl TryFromValue<Value> for OptionalString {
367    fn try_from_value(v: Value) -> Result<Self, PlanError> {
368        Ok(match v {
369            Value::Null => Self(None),
370            v => Self(Some(String::try_from_value(v)?)),
371        })
372    }
373
374    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Value> {
375        Some(match self.0 {
376            None => Value::Null,
377            Some(s) => s.try_into_value(catalog)?,
378        })
379    }
380
381    fn name() -> String {
382        "optional string".to_string()
383    }
384}
385
386impl ImpliedValue for OptionalString {
387    fn implied_value() -> Result<Self, PlanError> {
388        sql_bail!("must provide a string value")
389    }
390}
391
392#[derive(
393    Debug,
394    Clone,
395    Copy,
396    PartialEq,
397    Eq,
398    PartialOrd,
399    Ord,
400    Serialize,
401    Hash,
402    Deserialize
403)]
404pub struct OptionalDuration(pub Option<Duration>);
405
406impl From<Duration> for OptionalDuration {
407    fn from(i: Duration) -> OptionalDuration {
408        // An interval of 0 disables the setting.
409        let inner = if i == Duration::ZERO { None } else { Some(i) };
410        OptionalDuration(inner)
411    }
412}
413
414impl TryFromValue<Value> for OptionalDuration {
415    fn try_from_value(v: Value) -> Result<Self, PlanError> {
416        Ok(match v {
417            Value::Null => OptionalDuration(None),
418            v => Duration::try_from_value(v)?.into(),
419        })
420    }
421
422    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Value> {
423        Some(match self.0 {
424            None => Value::Null,
425            Some(duration) => duration.try_into_value(catalog)?,
426        })
427    }
428
429    fn name() -> String {
430        "optional interval".to_string()
431    }
432}
433
434impl ImpliedValue for OptionalDuration {
435    fn implied_value() -> Result<Self, PlanError> {
436        sql_bail!("must provide an interval value")
437    }
438}
439
440impl TryFromValue<Value> for String {
441    fn try_from_value(v: Value) -> Result<Self, PlanError> {
442        match v {
443            Value::String(v) => Ok(v),
444            _ => sql_bail!("cannot use value as string"),
445        }
446    }
447
448    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
449        Some(Value::String(self))
450    }
451
452    fn name() -> String {
453        "text".to_string()
454    }
455}
456
457impl ImpliedValue for String {
458    fn implied_value() -> Result<Self, PlanError> {
459        sql_bail!("must provide a string value")
460    }
461}
462
463impl TryFromValue<Value> for bool {
464    fn try_from_value(v: Value) -> Result<Self, PlanError> {
465        match v {
466            Value::Boolean(v) => Ok(v),
467            _ => sql_bail!("cannot use value as boolean"),
468        }
469    }
470
471    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
472        Some(Value::Boolean(self))
473    }
474
475    fn name() -> String {
476        "bool".to_string()
477    }
478}
479
480impl ImpliedValue for bool {
481    fn implied_value() -> Result<Self, PlanError> {
482        Ok(true)
483    }
484}
485
486impl TryFromValue<Value> for f64 {
487    fn try_from_value(v: Value) -> Result<Self, PlanError> {
488        match v {
489            Value::Number(v) => v
490                .parse::<f64>()
491                .map_err(|e| sql_err!("invalid numeric value: {e}")),
492            _ => sql_bail!("cannot use value as number"),
493        }
494    }
495
496    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
497        Some(Value::Number(self.to_string()))
498    }
499
500    fn name() -> String {
501        "float8".to_string()
502    }
503}
504
505impl ImpliedValue for f64 {
506    fn implied_value() -> Result<Self, PlanError> {
507        sql_bail!("must provide a float value")
508    }
509}
510
511impl TryFromValue<Value> for i32 {
512    fn try_from_value(v: Value) -> Result<Self, PlanError> {
513        match v {
514            Value::Number(v) => v
515                .parse::<i32>()
516                .map_err(|e| sql_err!("invalid numeric value: {e}")),
517            _ => sql_bail!("cannot use value as number"),
518        }
519    }
520
521    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
522        Some(Value::Number(self.to_string()))
523    }
524
525    fn name() -> String {
526        "int".to_string()
527    }
528}
529
530impl ImpliedValue for i32 {
531    fn implied_value() -> Result<Self, PlanError> {
532        sql_bail!("must provide an integer value")
533    }
534}
535
536impl TryFromValue<Value> for i64 {
537    fn try_from_value(v: Value) -> Result<Self, PlanError> {
538        match v {
539            Value::Number(v) => v
540                .parse::<i64>()
541                .map_err(|e| sql_err!("invalid numeric value: {e}")),
542            _ => sql_bail!("cannot use value as number"),
543        }
544    }
545    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
546        Some(Value::Number(self.to_string()))
547    }
548    fn name() -> String {
549        "int8".to_string()
550    }
551}
552
553impl ImpliedValue for i64 {
554    fn implied_value() -> Result<Self, PlanError> {
555        sql_bail!("must provide an integer value")
556    }
557}
558
559impl TryFromValue<Value> for u16 {
560    fn try_from_value(v: Value) -> Result<Self, PlanError> {
561        match v {
562            Value::Number(v) => v
563                .parse::<u16>()
564                .map_err(|e| sql_err!("invalid numeric value: {e}")),
565            _ => sql_bail!("cannot use value as number"),
566        }
567    }
568    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
569        Some(Value::Number(self.to_string()))
570    }
571    fn name() -> String {
572        "uint2".to_string()
573    }
574}
575
576impl ImpliedValue for u16 {
577    fn implied_value() -> Result<Self, PlanError> {
578        sql_bail!("must provide an integer value")
579    }
580}
581
582impl TryFromValue<Value> for u32 {
583    fn try_from_value(v: Value) -> Result<Self, PlanError> {
584        match v {
585            Value::Number(v) => v
586                .parse::<u32>()
587                .map_err(|e| sql_err!("invalid numeric value: {e}")),
588            _ => sql_bail!("cannot use value as number"),
589        }
590    }
591    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
592        Some(Value::Number(self.to_string()))
593    }
594    fn name() -> String {
595        "uint4".to_string()
596    }
597}
598
599impl ImpliedValue for u32 {
600    fn implied_value() -> Result<Self, PlanError> {
601        sql_bail!("must provide an integer value")
602    }
603}
604
605impl TryFromValue<Value> for u64 {
606    fn try_from_value(v: Value) -> Result<Self, PlanError> {
607        match v {
608            Value::Number(v) => v
609                .parse::<u64>()
610                .map_err(|e| sql_err!("invalid unsigned numeric value: {e}")),
611            _ => sql_bail!("cannot use value as number"),
612        }
613    }
614    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
615        Some(Value::Number(self.to_string()))
616    }
617    fn name() -> String {
618        "uint8".to_string()
619    }
620}
621
622impl ImpliedValue for u64 {
623    fn implied_value() -> Result<Self, PlanError> {
624        sql_bail!("must provide an unsigned integer value")
625    }
626}
627
628impl<V: TryFromValue<WithOptionValue<Aug>>> TryFromValue<WithOptionValue<Aug>> for Vec<V> {
629    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
630        match v {
631            WithOptionValue::Sequence(a) => {
632                let mut out = Vec::with_capacity(a.len());
633                for i in a {
634                    out.push(
635                        V::try_from_value(i)
636                            .map_err(|_| anyhow::anyhow!("cannot use value in array"))?,
637                    )
638                }
639                Ok(out)
640            }
641            _ => sql_bail!("cannot use value as array"),
642        }
643    }
644
645    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
646        Some(WithOptionValue::Sequence(
647            self.into_iter()
648                .map(|v| v.try_into_value(catalog))
649                .collect::<Option<_>>()?,
650        ))
651    }
652
653    fn name() -> String {
654        format!("array of {}", V::name())
655    }
656}
657
658impl<V: ImpliedValue> ImpliedValue for Vec<V> {
659    fn implied_value() -> Result<Self, PlanError> {
660        sql_bail!("must provide an array value")
661    }
662}
663
664impl<T: AstInfo, V: TryFromValue<WithOptionValue<T>>> TryFromValue<WithOptionValue<T>>
665    for Option<V>
666{
667    fn try_from_value(v: WithOptionValue<T>) -> Result<Self, PlanError> {
668        Ok(Some(V::try_from_value(v)?))
669    }
670
671    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<T>> {
672        match self {
673            Some(v) => v.try_into_value(catalog),
674            None => None,
675        }
676    }
677
678    fn name() -> String {
679        format!("optional {}", V::name())
680    }
681}
682
683impl<V: ImpliedValue> ImpliedValue for Option<V> {
684    fn implied_value() -> Result<Self, PlanError> {
685        Ok(Some(V::implied_value()?))
686    }
687}
688
689impl<V: TryFromValue<Value>, T: AstInfo + std::fmt::Debug> TryFromValue<WithOptionValue<T>> for V {
690    fn try_from_value(v: WithOptionValue<T>) -> Result<Self, PlanError> {
691        match v {
692            WithOptionValue::Value(v) => V::try_from_value(v),
693            WithOptionValue::UnresolvedItemName(UnresolvedItemName(mut inner))
694                if inner.len() == 1 =>
695            {
696                V::try_from_value(Value::String(inner.remove(0).into_string()))
697            }
698            WithOptionValue::Ident(v) => V::try_from_value(Value::String(v.into_string())),
699            WithOptionValue::RetainHistoryFor(v) => V::try_from_value(v),
700            WithOptionValue::Sequence(_)
701            | WithOptionValue::Map(_)
702            | WithOptionValue::Item(_)
703            | WithOptionValue::UnresolvedItemName(_)
704            | WithOptionValue::Secret(_)
705            | WithOptionValue::DataType(_)
706            | WithOptionValue::Expr(_)
707            | WithOptionValue::ClusterReplicas(_)
708            | WithOptionValue::ConnectionKafkaBroker(_)
709            | WithOptionValue::ConnectionAwsPrivatelink(_)
710            | WithOptionValue::KafkaMatchingBrokerRule(_)
711            | WithOptionValue::ClusterAlterStrategy(_)
712            | WithOptionValue::Refresh(_)
713            | WithOptionValue::ClusterScheduleOptionValue(_)
714            | WithOptionValue::NetworkPolicyRules(_) => sql_bail!(
715                "incompatible value types: cannot convert {} to {}",
716                match v {
717                    // The first few are unreachable because they are handled at the top of the outer match.
718                    WithOptionValue::Value(_) => unreachable!(),
719                    WithOptionValue::RetainHistoryFor(_) => unreachable!(),
720                    WithOptionValue::ClusterAlterStrategy(_) => "cluster alter strategy",
721                    WithOptionValue::Sequence(_) => "sequences",
722                    WithOptionValue::Map(_) => "maps",
723                    WithOptionValue::Item(_) => "object references",
724                    WithOptionValue::UnresolvedItemName(_) => "object names",
725                    WithOptionValue::Ident(_) => "identifiers",
726                    WithOptionValue::Secret(_) => "secrets",
727                    WithOptionValue::DataType(_) => "data types",
728                    WithOptionValue::Expr(_) => "exprs",
729                    WithOptionValue::ClusterReplicas(_) => "cluster replicas",
730                    WithOptionValue::ConnectionKafkaBroker(_) => "connection kafka brokers",
731                    WithOptionValue::ConnectionAwsPrivatelink(_) => "connection privatelink",
732                    WithOptionValue::KafkaMatchingBrokerRule(_) => "matching broker rule",
733                    WithOptionValue::Refresh(_) => "refresh option values",
734                    WithOptionValue::ClusterScheduleOptionValue(_) => "cluster schedule",
735                    WithOptionValue::NetworkPolicyRules(_) => "network policy rules",
736                },
737                V::name()
738            ),
739        }
740    }
741
742    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<T>> {
743        Some(WithOptionValue::Value(self.try_into_value(catalog)?))
744    }
745
746    fn name() -> String {
747        V::name()
748    }
749}
750
751impl<T, V: TryFromValue<T> + ImpliedValue> TryFromValue<Option<T>> for V {
752    fn try_from_value(v: Option<T>) -> Result<Self, PlanError> {
753        match v {
754            Some(v) => V::try_from_value(v),
755            None => V::implied_value(),
756        }
757    }
758
759    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Option<T>> {
760        Some(Some(self.try_into_value(catalog)?))
761    }
762
763    fn name() -> String {
764        V::name()
765    }
766}
767
768impl TryFromValue<WithOptionValue<Aug>> for Vec<ReplicaDefinition<Aug>> {
769    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
770        match v {
771            WithOptionValue::ClusterReplicas(replicas) => Ok(replicas),
772            _ => sql_bail!("cannot use value as cluster replicas"),
773        }
774    }
775
776    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
777        Some(WithOptionValue::ClusterReplicas(self))
778    }
779
780    fn name() -> String {
781        "cluster replicas".to_string()
782    }
783}
784
785impl ImpliedValue for Vec<ReplicaDefinition<Aug>> {
786    fn implied_value() -> Result<Self, PlanError> {
787        sql_bail!("must provide a set of cluster replicas")
788    }
789}
790
791impl TryFromValue<WithOptionValue<Aug>> for Vec<KafkaBroker<Aug>> {
792    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
793        let mut out = vec![];
794        match v {
795            WithOptionValue::ConnectionKafkaBroker(broker) => {
796                out.push(broker);
797            }
798            WithOptionValue::Sequence(values) => {
799                for value in values {
800                    out.extend(Self::try_from_value(value)?);
801                }
802            }
803            _ => sql_bail!("cannot use value as a kafka broker"),
804        }
805        Ok(out)
806    }
807
808    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
809        Some(WithOptionValue::Sequence(
810            self.into_iter()
811                .map(WithOptionValue::ConnectionKafkaBroker)
812                .collect(),
813        ))
814    }
815
816    fn name() -> String {
817        "kafka broker".to_string()
818    }
819}
820
821impl ImpliedValue for Vec<KafkaBroker<Aug>> {
822    fn implied_value() -> Result<Self, PlanError> {
823        sql_bail!("must provide a kafka broker")
824    }
825}
826
827impl TryFromValue<WithOptionValue<Aug>> for RefreshOptionValue<Aug> {
828    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
829        if let WithOptionValue::Refresh(r) = v {
830            Ok(r)
831        } else {
832            sql_bail!("cannot use value `{}` for a refresh option", v)
833        }
834    }
835
836    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
837        Some(WithOptionValue::Refresh(self))
838    }
839
840    fn name() -> String {
841        "refresh option value".to_string()
842    }
843}
844
845impl ImpliedValue for RefreshOptionValue<Aug> {
846    fn implied_value() -> Result<Self, PlanError> {
847        sql_bail!("must provide a refresh option value")
848    }
849}
850
851impl TryFromValue<WithOptionValue<Aug>> for ConnectionDefaultAwsPrivatelink<Aug> {
852    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
853        if let WithOptionValue::ConnectionAwsPrivatelink(r) = v {
854            Ok(r)
855        } else {
856            sql_bail!("cannot use value `{}` for a privatelink", v)
857        }
858    }
859
860    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
861        Some(WithOptionValue::ConnectionAwsPrivatelink(self))
862    }
863
864    fn name() -> String {
865        "privatelink option value".to_string()
866    }
867}
868
869impl TryFromValue<WithOptionValue<Aug>> for KafkaMatchingBrokerRule<Aug> {
870    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
871        if let WithOptionValue::KafkaMatchingBrokerRule(r) = v {
872            Ok(r)
873        } else {
874            sql_bail!("cannot use value `{}` for a matching broker rule", v)
875        }
876    }
877
878    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
879        Some(WithOptionValue::KafkaMatchingBrokerRule(self))
880    }
881
882    fn name() -> String {
883        "matching broker rule".to_string()
884    }
885}
886
887impl ImpliedValue for ConnectionDefaultAwsPrivatelink<Aug> {
888    fn implied_value() -> Result<Self, PlanError> {
889        sql_bail!("must provide a value")
890    }
891}
892
893impl ImpliedValue for KafkaMatchingBrokerRule<Aug> {
894    fn implied_value() -> Result<Self, PlanError> {
895        sql_bail!("must provide a value")
896    }
897}
898
899/// A list of broker entries that can contain both static `KafkaBroker` entries
900/// and `KafkaMatchingBrokerRule` entries (from `MATCHING` clauses in `BROKERS`).
901#[derive(Debug)]
902pub struct BrokersList {
903    pub static_entries: Vec<KafkaBroker<Aug>>,
904    pub matching_rules: Vec<KafkaMatchingBrokerRule<Aug>>,
905}
906
907impl TryFromValue<WithOptionValue<Aug>> for BrokersList {
908    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
909        match v {
910            WithOptionValue::Sequence(entries) => {
911                let mut static_entries = vec![];
912                let mut matching_rules = vec![];
913                for entry in entries {
914                    match entry {
915                        WithOptionValue::ConnectionKafkaBroker(b) => static_entries.push(b),
916                        WithOptionValue::KafkaMatchingBrokerRule(m) => matching_rules.push(m),
917                        other => sql_bail!("unexpected value in BROKERS: {}", other),
918                    }
919                }
920                Ok(BrokersList {
921                    static_entries,
922                    matching_rules,
923                })
924            }
925            WithOptionValue::ConnectionKafkaBroker(b) => Ok(BrokersList {
926                static_entries: vec![b],
927                matching_rules: vec![],
928            }),
929            WithOptionValue::KafkaMatchingBrokerRule(m) => Ok(BrokersList {
930                static_entries: vec![],
931                matching_rules: vec![m],
932            }),
933            other => sql_bail!("cannot use {} as brokers list", other),
934        }
935    }
936
937    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
938        let mut entries: Vec<WithOptionValue<Aug>> = vec![];
939        for b in self.static_entries {
940            entries.push(WithOptionValue::ConnectionKafkaBroker(b));
941        }
942        for m in self.matching_rules {
943            entries.push(WithOptionValue::KafkaMatchingBrokerRule(m));
944        }
945        Some(WithOptionValue::Sequence(entries))
946    }
947
948    fn name() -> String {
949        "brokers list".to_string()
950    }
951}
952
953impl ImpliedValue for BrokersList {
954    fn implied_value() -> Result<Self, PlanError> {
955        sql_bail!("must provide a value for BROKERS")
956    }
957}
958
959impl ImpliedValue for ClusterScheduleOptionValue {
960    fn implied_value() -> Result<Self, PlanError> {
961        sql_bail!("must provide a cluster schedule option value")
962    }
963}
964
965impl ImpliedValue for ClusterAlterOptionValue<Aug> {
966    fn implied_value() -> Result<Self, PlanError> {
967        sql_bail!("must provide a value")
968    }
969}
970
971impl TryFromValue<WithOptionValue<Aug>> for ClusterScheduleOptionValue {
972    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
973        if let WithOptionValue::ClusterScheduleOptionValue(r) = v {
974            Ok(r)
975        } else {
976            sql_bail!("cannot use value `{}` for a cluster schedule", v)
977        }
978    }
979
980    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
981        Some(WithOptionValue::ClusterScheduleOptionValue(self))
982    }
983
984    fn name() -> String {
985        "cluster schedule option value".to_string()
986    }
987}
988
989impl<V: ImpliedValue> ImpliedValue for BTreeMap<String, V> {
990    fn implied_value() -> Result<Self, PlanError> {
991        sql_bail!("must provide a map of key-value pairs")
992    }
993}
994
995impl<V: TryFromValue<WithOptionValue<Aug>>> TryFromValue<WithOptionValue<Aug>>
996    for BTreeMap<String, V>
997{
998    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
999        match v {
1000            WithOptionValue::Map(a) => a
1001                .into_iter()
1002                .map(|(k, v)| Ok((k, V::try_from_value(v)?)))
1003                .collect(),
1004            _ => sql_bail!("cannot use value as map"),
1005        }
1006    }
1007
1008    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
1009        Some(WithOptionValue::Map(
1010            self.into_iter()
1011                .map(|(k, v)| {
1012                    let v = v.try_into_value(catalog);
1013                    v.map(|v| (k, v))
1014                })
1015                .collect::<Option<_>>()?,
1016        ))
1017    }
1018
1019    fn name() -> String {
1020        format!("map of string to {}", V::name())
1021    }
1022}
1023
1024impl TryFromValue<WithOptionValue<Aug>> for ClusterAlterOptionValue<Aug> {
1025    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
1026        if let WithOptionValue::ClusterAlterStrategy(r) = v {
1027            Ok(r)
1028        } else {
1029            sql_bail!("cannot use value `{}` for a cluster alter strategy", v)
1030        }
1031    }
1032
1033    fn name() -> String {
1034        "cluster alter strategyoption value".to_string()
1035    }
1036
1037    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
1038        Some(WithOptionValue::ClusterAlterStrategy(self))
1039    }
1040}
1041
1042impl TryFromValue<WithOptionValue<Aug>> for Vec<NetworkPolicyRuleDefinition<Aug>> {
1043    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
1044        match v {
1045            WithOptionValue::NetworkPolicyRules(rules) => Ok(rules),
1046            _ => sql_bail!("cannot use value as cluster replicas"),
1047        }
1048    }
1049
1050    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
1051        Some(WithOptionValue::NetworkPolicyRules(self))
1052    }
1053
1054    fn name() -> String {
1055        "network policy rules".to_string()
1056    }
1057}
1058
1059impl ImpliedValue for Vec<NetworkPolicyRuleDefinition<Aug>> {
1060    fn implied_value() -> Result<Self, PlanError> {
1061        sql_bail!("must provide a set of network policy rules")
1062    }
1063}