Skip to main content

mz_sql/plan/
with_options.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Provides tooling to handle `WITH` options.
11
12use std::collections::BTreeMap;
13use std::time::Duration;
14
15use mz_repr::adt::interval::Interval;
16use mz_repr::bytes::ByteSize;
17use mz_repr::{CatalogItemId, RelationVersionSelector, strconv};
18use mz_sql_parser::ast::{
19    ClusterAlterOptionValue, ClusterScheduleOptionValue, ConnectionDefaultAwsPrivatelink, Expr,
20    Ident, KafkaBroker, KafkaMatchingBrokerRule, NetworkPolicyRuleDefinition, RefreshOptionValue,
21    ReplicaDefinition,
22};
23use mz_storage_types::connections::IcebergCatalogType;
24use mz_storage_types::connections::string_or_secret::StringOrSecret;
25use serde::{Deserialize, Serialize};
26
27use crate::ast::{AstInfo, UnresolvedItemName, Value, WithOptionValue};
28use crate::catalog::SessionCatalog;
29use crate::names::{ResolvedDataType, ResolvedItemName};
30use crate::plan::{Aug, PlanError, literal};
31
32pub trait TryFromValue<T>: Sized {
33    fn try_from_value(v: T) -> Result<Self, PlanError>;
34
35    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<T>;
36
37    fn name() -> String;
38}
39
40pub trait ImpliedValue: Sized {
41    fn implied_value() -> Result<Self, PlanError>;
42}
43
44impl TryFromValue<WithOptionValue<Aug>> for IcebergCatalogType {
45    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
46        match String::try_from_value(v)? {
47            s if s.eq_ignore_ascii_case("rest") => Ok(IcebergCatalogType::Rest),
48            s if s.eq_ignore_ascii_case("s3tablesrest") => Ok(IcebergCatalogType::S3TablesRest),
49            _ => sql_bail!("invalid iceberg catalog type"),
50        }
51    }
52
53    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
54        Some(WithOptionValue::Value(Value::String(match self {
55            IcebergCatalogType::Rest => "rest".to_string(),
56            IcebergCatalogType::S3TablesRest => "s3tablesrest".to_string(),
57        })))
58    }
59
60    fn name() -> String {
61        "iceberg catalog type".to_string()
62    }
63}
64
65impl ImpliedValue for IcebergCatalogType {
66    fn implied_value() -> Result<Self, PlanError> {
67        sql_bail!("must provide an iceberg catalog type")
68    }
69}
70
71#[derive(Copy, Clone, Debug)]
72pub struct Secret(CatalogItemId);
73
74impl From<Secret> for CatalogItemId {
75    fn from(secret: Secret) -> Self {
76        secret.0
77    }
78}
79
80impl TryFromValue<WithOptionValue<Aug>> for Secret {
81    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
82        match StringOrSecret::try_from_value(v)? {
83            StringOrSecret::Secret(id) => Ok(Secret(id)),
84            StringOrSecret::String(_) => sql_bail!("must provide a secret value"),
85        }
86    }
87
88    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
89        let secret = catalog.get_item(&self.0);
90        let name = ResolvedItemName::Item {
91            id: self.0,
92            qualifiers: secret.name().qualifiers.clone(),
93            full_name: catalog.resolve_full_name(secret.name()),
94            print_id: false,
95            version: RelationVersionSelector::Latest,
96        };
97        Some(WithOptionValue::Secret(name))
98    }
99
100    fn name() -> String {
101        "secret".to_string()
102    }
103}
104
105impl ImpliedValue for Secret {
106    fn implied_value() -> Result<Self, PlanError> {
107        sql_bail!("must provide a secret value")
108    }
109}
110
111#[derive(Copy, Clone, Debug)]
112pub struct Object(CatalogItemId);
113
114impl From<Object> for CatalogItemId {
115    fn from(obj: Object) -> Self {
116        obj.0
117    }
118}
119
120impl From<&Object> for CatalogItemId {
121    fn from(obj: &Object) -> Self {
122        obj.0
123    }
124}
125
126impl TryFromValue<WithOptionValue<Aug>> for Object {
127    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
128        Ok(match v {
129            WithOptionValue::Item(ResolvedItemName::Item { id, .. }) => Object(id),
130            _ => sql_bail!("must provide an object"),
131        })
132    }
133
134    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
135        let item = catalog.get_item(&self.0);
136        let name = ResolvedItemName::Item {
137            id: self.0,
138            qualifiers: item.name().qualifiers.clone(),
139            full_name: catalog.resolve_full_name(item.name()),
140            print_id: false,
141            // TODO(alter_table): Evaluate if this is correct.
142            version: RelationVersionSelector::Latest,
143        };
144        Some(WithOptionValue::Item(name))
145    }
146
147    fn name() -> String {
148        "object reference".to_string()
149    }
150}
151
152impl ImpliedValue for Object {
153    fn implied_value() -> Result<Self, PlanError> {
154        sql_bail!("must provide an object")
155    }
156}
157
158impl TryFromValue<WithOptionValue<Aug>> for Ident {
159    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
160        Ok(match v {
161            WithOptionValue::UnresolvedItemName(UnresolvedItemName(mut inner))
162                if inner.len() == 1 =>
163            {
164                inner.remove(0)
165            }
166            WithOptionValue::Ident(inner) => inner,
167            _ => sql_bail!("must provide an unqualified identifier"),
168        })
169    }
170
171    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
172        Some(WithOptionValue::Ident(self))
173    }
174
175    fn name() -> String {
176        "identifier".to_string()
177    }
178}
179
180impl ImpliedValue for Ident {
181    fn implied_value() -> Result<Self, PlanError> {
182        sql_bail!("must provide an identifier")
183    }
184}
185
186impl TryFromValue<WithOptionValue<Aug>> for Expr<Aug> {
187    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
188        Ok(match v {
189            WithOptionValue::Expr(e) => e,
190            _ => sql_bail!("must provide an expr"),
191        })
192    }
193
194    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
195        Some(WithOptionValue::Expr(self))
196    }
197
198    fn name() -> String {
199        "expression".to_string()
200    }
201}
202
203impl ImpliedValue for Expr<Aug> {
204    fn implied_value() -> Result<Self, PlanError> {
205        sql_bail!("must provide an expression")
206    }
207}
208
209impl TryFromValue<WithOptionValue<Aug>> for UnresolvedItemName {
210    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
211        Ok(match v {
212            WithOptionValue::UnresolvedItemName(name) => name,
213            WithOptionValue::Ident(inner) => UnresolvedItemName(vec![inner]),
214            _ => sql_bail!("must provide an object name"),
215        })
216    }
217
218    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
219        Some(WithOptionValue::UnresolvedItemName(self))
220    }
221
222    fn name() -> String {
223        "object name".to_string()
224    }
225}
226
227impl ImpliedValue for UnresolvedItemName {
228    fn implied_value() -> Result<Self, PlanError> {
229        sql_bail!("must provide an object name")
230    }
231}
232
233impl TryFromValue<WithOptionValue<Aug>> for ResolvedDataType {
234    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
235        Ok(match v {
236            WithOptionValue::DataType(ty) => ty,
237            _ => sql_bail!("must provide a data type"),
238        })
239    }
240
241    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
242        Some(WithOptionValue::DataType(self))
243    }
244
245    fn name() -> String {
246        "data type".to_string()
247    }
248}
249
250impl ImpliedValue for ResolvedDataType {
251    fn implied_value() -> Result<Self, PlanError> {
252        sql_bail!("must provide a data type")
253    }
254}
255
256impl TryFromValue<WithOptionValue<Aug>> for StringOrSecret {
257    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
258        Ok(match v {
259            WithOptionValue::Secret(ResolvedItemName::Item { id, .. }) => {
260                StringOrSecret::Secret(id)
261            }
262            v => StringOrSecret::String(String::try_from_value(v)?),
263        })
264    }
265
266    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
267        Some(match self {
268            StringOrSecret::Secret(secret) => Secret(secret).try_into_value(catalog)?,
269            StringOrSecret::String(s) => s.try_into_value(catalog)?,
270        })
271    }
272
273    fn name() -> String {
274        "string or secret".to_string()
275    }
276}
277
278impl ImpliedValue for StringOrSecret {
279    fn implied_value() -> Result<Self, PlanError> {
280        sql_bail!("must provide a string or secret value")
281    }
282}
283
284impl TryFromValue<Value> for Duration {
285    fn try_from_value(v: Value) -> Result<Self, PlanError> {
286        let interval = Interval::try_from_value(v)?;
287        Ok(interval.duration()?)
288    }
289
290    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Value> {
291        let interval = Interval::from_duration(&self)
292            .expect("planning ensured that this is convertible back to Interval");
293        interval.try_into_value(catalog)
294    }
295
296    fn name() -> String {
297        "interval".to_string()
298    }
299}
300
301impl ImpliedValue for Duration {
302    fn implied_value() -> Result<Self, PlanError> {
303        sql_bail!("must provide an interval value")
304    }
305}
306
307impl TryFromValue<Value> for ByteSize {
308    fn try_from_value(v: Value) -> Result<Self, PlanError> {
309        match v {
310            Value::Number(value) | Value::String(value) => Ok(value
311                .parse::<ByteSize>()
312                .map_err(|e| sql_err!("invalid bytes value: {e}"))?),
313            _ => sql_bail!("cannot use value as bytes"),
314        }
315    }
316
317    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
318        Some(Value::String(self.to_string()))
319    }
320
321    fn name() -> String {
322        "bytes".to_string()
323    }
324}
325
326impl ImpliedValue for ByteSize {
327    fn implied_value() -> Result<Self, PlanError> {
328        sql_bail!("must provide a value for bytes")
329    }
330}
331
332impl TryFromValue<Value> for Interval {
333    fn try_from_value(v: Value) -> Result<Self, PlanError> {
334        match v {
335            Value::Interval(value) => literal::plan_interval(&value),
336            Value::Number(value) | Value::String(value) => Ok(strconv::parse_interval(&value)?),
337            _ => sql_bail!("cannot use value as interval"),
338        }
339    }
340
341    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
342        let interval_value = literal::unplan_interval(&self);
343        Some(Value::Interval(interval_value))
344    }
345
346    fn name() -> String {
347        "interval".to_string()
348    }
349}
350
351impl ImpliedValue for Interval {
352    fn implied_value() -> Result<Self, PlanError> {
353        sql_bail!("must provide an interval value")
354    }
355}
356
357#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
358pub struct OptionalString(pub Option<String>);
359
360impl TryFromValue<Value> for OptionalString {
361    fn try_from_value(v: Value) -> Result<Self, PlanError> {
362        Ok(match v {
363            Value::Null => Self(None),
364            v => Self(Some(String::try_from_value(v)?)),
365        })
366    }
367
368    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Value> {
369        Some(match self.0 {
370            None => Value::Null,
371            Some(s) => s.try_into_value(catalog)?,
372        })
373    }
374
375    fn name() -> String {
376        "optional string".to_string()
377    }
378}
379
380impl ImpliedValue for OptionalString {
381    fn implied_value() -> Result<Self, PlanError> {
382        sql_bail!("must provide a string value")
383    }
384}
385
386#[derive(
387    Debug,
388    Clone,
389    Copy,
390    PartialEq,
391    Eq,
392    PartialOrd,
393    Ord,
394    Serialize,
395    Hash,
396    Deserialize
397)]
398pub struct OptionalDuration(pub Option<Duration>);
399
400impl From<Duration> for OptionalDuration {
401    fn from(i: Duration) -> OptionalDuration {
402        // An interval of 0 disables the setting.
403        let inner = if i == Duration::ZERO { None } else { Some(i) };
404        OptionalDuration(inner)
405    }
406}
407
408impl TryFromValue<Value> for OptionalDuration {
409    fn try_from_value(v: Value) -> Result<Self, PlanError> {
410        Ok(match v {
411            Value::Null => OptionalDuration(None),
412            v => Duration::try_from_value(v)?.into(),
413        })
414    }
415
416    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Value> {
417        Some(match self.0 {
418            None => Value::Null,
419            Some(duration) => duration.try_into_value(catalog)?,
420        })
421    }
422
423    fn name() -> String {
424        "optional interval".to_string()
425    }
426}
427
428impl ImpliedValue for OptionalDuration {
429    fn implied_value() -> Result<Self, PlanError> {
430        sql_bail!("must provide an interval value")
431    }
432}
433
434impl TryFromValue<Value> for String {
435    fn try_from_value(v: Value) -> Result<Self, PlanError> {
436        match v {
437            Value::String(v) => Ok(v),
438            _ => sql_bail!("cannot use value as string"),
439        }
440    }
441
442    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
443        Some(Value::String(self))
444    }
445
446    fn name() -> String {
447        "text".to_string()
448    }
449}
450
451impl ImpliedValue for String {
452    fn implied_value() -> Result<Self, PlanError> {
453        sql_bail!("must provide a string value")
454    }
455}
456
457impl TryFromValue<Value> for bool {
458    fn try_from_value(v: Value) -> Result<Self, PlanError> {
459        match v {
460            Value::Boolean(v) => Ok(v),
461            _ => sql_bail!("cannot use value as boolean"),
462        }
463    }
464
465    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
466        Some(Value::Boolean(self))
467    }
468
469    fn name() -> String {
470        "bool".to_string()
471    }
472}
473
474impl ImpliedValue for bool {
475    fn implied_value() -> Result<Self, PlanError> {
476        Ok(true)
477    }
478}
479
480impl TryFromValue<Value> for f64 {
481    fn try_from_value(v: Value) -> Result<Self, PlanError> {
482        match v {
483            Value::Number(v) => v
484                .parse::<f64>()
485                .map_err(|e| sql_err!("invalid numeric value: {e}")),
486            _ => sql_bail!("cannot use value as number"),
487        }
488    }
489
490    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
491        Some(Value::Number(self.to_string()))
492    }
493
494    fn name() -> String {
495        "float8".to_string()
496    }
497}
498
499impl ImpliedValue for f64 {
500    fn implied_value() -> Result<Self, PlanError> {
501        sql_bail!("must provide a float value")
502    }
503}
504
505impl TryFromValue<Value> for i32 {
506    fn try_from_value(v: Value) -> Result<Self, PlanError> {
507        match v {
508            Value::Number(v) => v
509                .parse::<i32>()
510                .map_err(|e| sql_err!("invalid numeric value: {e}")),
511            _ => sql_bail!("cannot use value as number"),
512        }
513    }
514
515    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
516        Some(Value::Number(self.to_string()))
517    }
518
519    fn name() -> String {
520        "int".to_string()
521    }
522}
523
524impl ImpliedValue for i32 {
525    fn implied_value() -> Result<Self, PlanError> {
526        sql_bail!("must provide an integer value")
527    }
528}
529
530impl TryFromValue<Value> for i64 {
531    fn try_from_value(v: Value) -> Result<Self, PlanError> {
532        match v {
533            Value::Number(v) => v
534                .parse::<i64>()
535                .map_err(|e| sql_err!("invalid numeric value: {e}")),
536            _ => sql_bail!("cannot use value as number"),
537        }
538    }
539    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
540        Some(Value::Number(self.to_string()))
541    }
542    fn name() -> String {
543        "int8".to_string()
544    }
545}
546
547impl ImpliedValue for i64 {
548    fn implied_value() -> Result<Self, PlanError> {
549        sql_bail!("must provide an integer value")
550    }
551}
552
553impl TryFromValue<Value> for u16 {
554    fn try_from_value(v: Value) -> Result<Self, PlanError> {
555        match v {
556            Value::Number(v) => v
557                .parse::<u16>()
558                .map_err(|e| sql_err!("invalid numeric value: {e}")),
559            _ => sql_bail!("cannot use value as number"),
560        }
561    }
562    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
563        Some(Value::Number(self.to_string()))
564    }
565    fn name() -> String {
566        "uint2".to_string()
567    }
568}
569
570impl ImpliedValue for u16 {
571    fn implied_value() -> Result<Self, PlanError> {
572        sql_bail!("must provide an integer value")
573    }
574}
575
576impl TryFromValue<Value> for u32 {
577    fn try_from_value(v: Value) -> Result<Self, PlanError> {
578        match v {
579            Value::Number(v) => v
580                .parse::<u32>()
581                .map_err(|e| sql_err!("invalid numeric value: {e}")),
582            _ => sql_bail!("cannot use value as number"),
583        }
584    }
585    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
586        Some(Value::Number(self.to_string()))
587    }
588    fn name() -> String {
589        "uint4".to_string()
590    }
591}
592
593impl ImpliedValue for u32 {
594    fn implied_value() -> Result<Self, PlanError> {
595        sql_bail!("must provide an integer value")
596    }
597}
598
599impl TryFromValue<Value> for u64 {
600    fn try_from_value(v: Value) -> Result<Self, PlanError> {
601        match v {
602            Value::Number(v) => v
603                .parse::<u64>()
604                .map_err(|e| sql_err!("invalid unsigned numeric value: {e}")),
605            _ => sql_bail!("cannot use value as number"),
606        }
607    }
608    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
609        Some(Value::Number(self.to_string()))
610    }
611    fn name() -> String {
612        "uint8".to_string()
613    }
614}
615
616impl ImpliedValue for u64 {
617    fn implied_value() -> Result<Self, PlanError> {
618        sql_bail!("must provide an unsigned integer value")
619    }
620}
621
622impl<V: TryFromValue<WithOptionValue<Aug>>> TryFromValue<WithOptionValue<Aug>> for Vec<V> {
623    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
624        match v {
625            WithOptionValue::Sequence(a) => {
626                let mut out = Vec::with_capacity(a.len());
627                for i in a {
628                    out.push(
629                        V::try_from_value(i)
630                            .map_err(|_| anyhow::anyhow!("cannot use value in array"))?,
631                    )
632                }
633                Ok(out)
634            }
635            _ => sql_bail!("cannot use value as array"),
636        }
637    }
638
639    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
640        Some(WithOptionValue::Sequence(
641            self.into_iter()
642                .map(|v| v.try_into_value(catalog))
643                .collect::<Option<_>>()?,
644        ))
645    }
646
647    fn name() -> String {
648        format!("array of {}", V::name())
649    }
650}
651
652impl<V: ImpliedValue> ImpliedValue for Vec<V> {
653    fn implied_value() -> Result<Self, PlanError> {
654        sql_bail!("must provide an array value")
655    }
656}
657
658impl<T: AstInfo, V: TryFromValue<WithOptionValue<T>>> TryFromValue<WithOptionValue<T>>
659    for Option<V>
660{
661    fn try_from_value(v: WithOptionValue<T>) -> Result<Self, PlanError> {
662        Ok(Some(V::try_from_value(v)?))
663    }
664
665    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<T>> {
666        match self {
667            Some(v) => v.try_into_value(catalog),
668            None => None,
669        }
670    }
671
672    fn name() -> String {
673        format!("optional {}", V::name())
674    }
675}
676
677impl<V: ImpliedValue> ImpliedValue for Option<V> {
678    fn implied_value() -> Result<Self, PlanError> {
679        Ok(Some(V::implied_value()?))
680    }
681}
682
683impl<V: TryFromValue<Value>, T: AstInfo + std::fmt::Debug> TryFromValue<WithOptionValue<T>> for V {
684    fn try_from_value(v: WithOptionValue<T>) -> Result<Self, PlanError> {
685        match v {
686            WithOptionValue::Value(v) => V::try_from_value(v),
687            WithOptionValue::UnresolvedItemName(UnresolvedItemName(mut inner))
688                if inner.len() == 1 =>
689            {
690                V::try_from_value(Value::String(inner.remove(0).into_string()))
691            }
692            WithOptionValue::Ident(v) => V::try_from_value(Value::String(v.into_string())),
693            WithOptionValue::RetainHistoryFor(v) => V::try_from_value(v),
694            WithOptionValue::Sequence(_)
695            | WithOptionValue::Map(_)
696            | WithOptionValue::Item(_)
697            | WithOptionValue::UnresolvedItemName(_)
698            | WithOptionValue::Secret(_)
699            | WithOptionValue::DataType(_)
700            | WithOptionValue::Expr(_)
701            | WithOptionValue::ClusterReplicas(_)
702            | WithOptionValue::ConnectionKafkaBroker(_)
703            | WithOptionValue::ConnectionAwsPrivatelink(_)
704            | WithOptionValue::KafkaMatchingBrokerRule(_)
705            | WithOptionValue::ClusterAlterStrategy(_)
706            | WithOptionValue::Refresh(_)
707            | WithOptionValue::ClusterScheduleOptionValue(_)
708            | WithOptionValue::NetworkPolicyRules(_) => sql_bail!(
709                "incompatible value types: cannot convert {} to {}",
710                match v {
711                    // The first few are unreachable because they are handled at the top of the outer match.
712                    WithOptionValue::Value(_) => unreachable!(),
713                    WithOptionValue::RetainHistoryFor(_) => unreachable!(),
714                    WithOptionValue::ClusterAlterStrategy(_) => "cluster alter strategy",
715                    WithOptionValue::Sequence(_) => "sequences",
716                    WithOptionValue::Map(_) => "maps",
717                    WithOptionValue::Item(_) => "object references",
718                    WithOptionValue::UnresolvedItemName(_) => "object names",
719                    WithOptionValue::Ident(_) => "identifiers",
720                    WithOptionValue::Secret(_) => "secrets",
721                    WithOptionValue::DataType(_) => "data types",
722                    WithOptionValue::Expr(_) => "exprs",
723                    WithOptionValue::ClusterReplicas(_) => "cluster replicas",
724                    WithOptionValue::ConnectionKafkaBroker(_) => "connection kafka brokers",
725                    WithOptionValue::ConnectionAwsPrivatelink(_) => "connection privatelink",
726                    WithOptionValue::KafkaMatchingBrokerRule(_) => "matching broker rule",
727                    WithOptionValue::Refresh(_) => "refresh option values",
728                    WithOptionValue::ClusterScheduleOptionValue(_) => "cluster schedule",
729                    WithOptionValue::NetworkPolicyRules(_) => "network policy rules",
730                },
731                V::name()
732            ),
733        }
734    }
735
736    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<T>> {
737        Some(WithOptionValue::Value(self.try_into_value(catalog)?))
738    }
739
740    fn name() -> String {
741        V::name()
742    }
743}
744
745impl<T, V: TryFromValue<T> + ImpliedValue> TryFromValue<Option<T>> for V {
746    fn try_from_value(v: Option<T>) -> Result<Self, PlanError> {
747        match v {
748            Some(v) => V::try_from_value(v),
749            None => V::implied_value(),
750        }
751    }
752
753    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Option<T>> {
754        Some(Some(self.try_into_value(catalog)?))
755    }
756
757    fn name() -> String {
758        V::name()
759    }
760}
761
762impl TryFromValue<WithOptionValue<Aug>> for Vec<ReplicaDefinition<Aug>> {
763    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
764        match v {
765            WithOptionValue::ClusterReplicas(replicas) => Ok(replicas),
766            _ => sql_bail!("cannot use value as cluster replicas"),
767        }
768    }
769
770    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
771        Some(WithOptionValue::ClusterReplicas(self))
772    }
773
774    fn name() -> String {
775        "cluster replicas".to_string()
776    }
777}
778
779impl ImpliedValue for Vec<ReplicaDefinition<Aug>> {
780    fn implied_value() -> Result<Self, PlanError> {
781        sql_bail!("must provide a set of cluster replicas")
782    }
783}
784
785impl TryFromValue<WithOptionValue<Aug>> for Vec<KafkaBroker<Aug>> {
786    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
787        let mut out = vec![];
788        match v {
789            WithOptionValue::ConnectionKafkaBroker(broker) => {
790                out.push(broker);
791            }
792            WithOptionValue::Sequence(values) => {
793                for value in values {
794                    out.extend(Self::try_from_value(value)?);
795                }
796            }
797            _ => sql_bail!("cannot use value as a kafka broker"),
798        }
799        Ok(out)
800    }
801
802    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
803        Some(WithOptionValue::Sequence(
804            self.into_iter()
805                .map(WithOptionValue::ConnectionKafkaBroker)
806                .collect(),
807        ))
808    }
809
810    fn name() -> String {
811        "kafka broker".to_string()
812    }
813}
814
815impl ImpliedValue for Vec<KafkaBroker<Aug>> {
816    fn implied_value() -> Result<Self, PlanError> {
817        sql_bail!("must provide a kafka broker")
818    }
819}
820
821impl TryFromValue<WithOptionValue<Aug>> for RefreshOptionValue<Aug> {
822    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
823        if let WithOptionValue::Refresh(r) = v {
824            Ok(r)
825        } else {
826            sql_bail!("cannot use value `{}` for a refresh option", v)
827        }
828    }
829
830    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
831        Some(WithOptionValue::Refresh(self))
832    }
833
834    fn name() -> String {
835        "refresh option value".to_string()
836    }
837}
838
839impl ImpliedValue for RefreshOptionValue<Aug> {
840    fn implied_value() -> Result<Self, PlanError> {
841        sql_bail!("must provide a refresh option value")
842    }
843}
844
845impl TryFromValue<WithOptionValue<Aug>> for ConnectionDefaultAwsPrivatelink<Aug> {
846    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
847        if let WithOptionValue::ConnectionAwsPrivatelink(r) = v {
848            Ok(r)
849        } else {
850            sql_bail!("cannot use value `{}` for a privatelink", v)
851        }
852    }
853
854    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
855        Some(WithOptionValue::ConnectionAwsPrivatelink(self))
856    }
857
858    fn name() -> String {
859        "privatelink option value".to_string()
860    }
861}
862
863impl TryFromValue<WithOptionValue<Aug>> for KafkaMatchingBrokerRule<Aug> {
864    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
865        if let WithOptionValue::KafkaMatchingBrokerRule(r) = v {
866            Ok(r)
867        } else {
868            sql_bail!("cannot use value `{}` for a matching broker rule", v)
869        }
870    }
871
872    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
873        Some(WithOptionValue::KafkaMatchingBrokerRule(self))
874    }
875
876    fn name() -> String {
877        "matching broker rule".to_string()
878    }
879}
880
881impl ImpliedValue for ConnectionDefaultAwsPrivatelink<Aug> {
882    fn implied_value() -> Result<Self, PlanError> {
883        sql_bail!("must provide a value")
884    }
885}
886
887impl ImpliedValue for KafkaMatchingBrokerRule<Aug> {
888    fn implied_value() -> Result<Self, PlanError> {
889        sql_bail!("must provide a value")
890    }
891}
892
893/// A list of broker entries that can contain both static `KafkaBroker` entries
894/// and `KafkaMatchingBrokerRule` entries (from `MATCHING` clauses in `BROKERS`).
895#[derive(Debug)]
896pub struct BrokersList {
897    pub static_entries: Vec<KafkaBroker<Aug>>,
898    pub matching_rules: Vec<KafkaMatchingBrokerRule<Aug>>,
899}
900
901impl TryFromValue<WithOptionValue<Aug>> for BrokersList {
902    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
903        match v {
904            WithOptionValue::Sequence(entries) => {
905                let mut static_entries = vec![];
906                let mut matching_rules = vec![];
907                for entry in entries {
908                    match entry {
909                        WithOptionValue::ConnectionKafkaBroker(b) => static_entries.push(b),
910                        WithOptionValue::KafkaMatchingBrokerRule(m) => matching_rules.push(m),
911                        other => sql_bail!("unexpected value in BROKERS: {}", other),
912                    }
913                }
914                Ok(BrokersList {
915                    static_entries,
916                    matching_rules,
917                })
918            }
919            WithOptionValue::ConnectionKafkaBroker(b) => Ok(BrokersList {
920                static_entries: vec![b],
921                matching_rules: vec![],
922            }),
923            WithOptionValue::KafkaMatchingBrokerRule(m) => Ok(BrokersList {
924                static_entries: vec![],
925                matching_rules: vec![m],
926            }),
927            other => sql_bail!("cannot use {} as brokers list", other),
928        }
929    }
930
931    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
932        let mut entries: Vec<WithOptionValue<Aug>> = vec![];
933        for b in self.static_entries {
934            entries.push(WithOptionValue::ConnectionKafkaBroker(b));
935        }
936        for m in self.matching_rules {
937            entries.push(WithOptionValue::KafkaMatchingBrokerRule(m));
938        }
939        Some(WithOptionValue::Sequence(entries))
940    }
941
942    fn name() -> String {
943        "brokers list".to_string()
944    }
945}
946
947impl ImpliedValue for BrokersList {
948    fn implied_value() -> Result<Self, PlanError> {
949        sql_bail!("must provide a value for BROKERS")
950    }
951}
952
953impl ImpliedValue for ClusterScheduleOptionValue {
954    fn implied_value() -> Result<Self, PlanError> {
955        sql_bail!("must provide a cluster schedule option value")
956    }
957}
958
959impl ImpliedValue for ClusterAlterOptionValue<Aug> {
960    fn implied_value() -> Result<Self, PlanError> {
961        sql_bail!("must provide a value")
962    }
963}
964
965impl TryFromValue<WithOptionValue<Aug>> for ClusterScheduleOptionValue {
966    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
967        if let WithOptionValue::ClusterScheduleOptionValue(r) = v {
968            Ok(r)
969        } else {
970            sql_bail!("cannot use value `{}` for a cluster schedule", v)
971        }
972    }
973
974    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
975        Some(WithOptionValue::ClusterScheduleOptionValue(self))
976    }
977
978    fn name() -> String {
979        "cluster schedule option value".to_string()
980    }
981}
982
983impl<V: ImpliedValue> ImpliedValue for BTreeMap<String, V> {
984    fn implied_value() -> Result<Self, PlanError> {
985        sql_bail!("must provide a map of key-value pairs")
986    }
987}
988
989impl<V: TryFromValue<WithOptionValue<Aug>>> TryFromValue<WithOptionValue<Aug>>
990    for BTreeMap<String, V>
991{
992    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
993        match v {
994            WithOptionValue::Map(a) => a
995                .into_iter()
996                .map(|(k, v)| Ok((k, V::try_from_value(v)?)))
997                .collect(),
998            _ => sql_bail!("cannot use value as map"),
999        }
1000    }
1001
1002    fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
1003        Some(WithOptionValue::Map(
1004            self.into_iter()
1005                .map(|(k, v)| {
1006                    let v = v.try_into_value(catalog);
1007                    v.map(|v| (k, v))
1008                })
1009                .collect::<Option<_>>()?,
1010        ))
1011    }
1012
1013    fn name() -> String {
1014        format!("map of string to {}", V::name())
1015    }
1016}
1017
1018impl TryFromValue<WithOptionValue<Aug>> for ClusterAlterOptionValue<Aug> {
1019    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
1020        if let WithOptionValue::ClusterAlterStrategy(r) = v {
1021            Ok(r)
1022        } else {
1023            sql_bail!("cannot use value `{}` for a cluster alter strategy", v)
1024        }
1025    }
1026
1027    fn name() -> String {
1028        "cluster alter strategyoption value".to_string()
1029    }
1030
1031    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
1032        Some(WithOptionValue::ClusterAlterStrategy(self))
1033    }
1034}
1035
1036impl TryFromValue<WithOptionValue<Aug>> for Vec<NetworkPolicyRuleDefinition<Aug>> {
1037    fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
1038        match v {
1039            WithOptionValue::NetworkPolicyRules(rules) => Ok(rules),
1040            _ => sql_bail!("cannot use value as cluster replicas"),
1041        }
1042    }
1043
1044    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
1045        Some(WithOptionValue::NetworkPolicyRules(self))
1046    }
1047
1048    fn name() -> String {
1049        "network policy rules".to_string()
1050    }
1051}
1052
1053impl ImpliedValue for Vec<NetworkPolicyRuleDefinition<Aug>> {
1054    fn implied_value() -> Result<Self, PlanError> {
1055        sql_bail!("must provide a set of network policy rules")
1056    }
1057}