1use std::collections::BTreeMap;
13use std::time::Duration;
14
15use mz_repr::adt::interval::Interval;
16use mz_repr::bytes::ByteSize;
17use mz_repr::{CatalogItemId, RelationVersionSelector, strconv};
18use mz_sql_parser::ast::{
19 ClusterAlterOptionValue, ClusterScheduleOptionValue, ConnectionDefaultAwsPrivatelink, Expr,
20 Ident, KafkaBroker, KafkaMatchingBrokerRule, NetworkPolicyRuleDefinition, RefreshOptionValue,
21 ReplicaDefinition,
22};
23use mz_storage_types::connections::IcebergCatalogType;
24use mz_storage_types::connections::string_or_secret::StringOrSecret;
25use serde::{Deserialize, Serialize};
26
27use crate::ast::{AstInfo, UnresolvedItemName, Value, WithOptionValue};
28use crate::catalog::SessionCatalog;
29use crate::names::{ResolvedDataType, ResolvedItemName};
30use crate::plan::{Aug, PlanError, literal};
31
32pub trait TryFromValue<T>: Sized {
33 fn try_from_value(v: T) -> Result<Self, PlanError>;
34
35 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<T>;
36
37 fn name() -> String;
38}
39
40pub trait ImpliedValue: Sized {
41 fn implied_value() -> Result<Self, PlanError>;
42}
43
44impl TryFromValue<WithOptionValue<Aug>> for IcebergCatalogType {
45 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
46 match String::try_from_value(v)? {
47 s if s.eq_ignore_ascii_case("rest") => Ok(IcebergCatalogType::Rest),
48 s if s.eq_ignore_ascii_case("s3tablesrest") => Ok(IcebergCatalogType::S3TablesRest),
49 _ => sql_bail!("invalid iceberg catalog type"),
50 }
51 }
52
53 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
54 Some(WithOptionValue::Value(Value::String(match self {
55 IcebergCatalogType::Rest => "rest".to_string(),
56 IcebergCatalogType::S3TablesRest => "s3tablesrest".to_string(),
57 })))
58 }
59
60 fn name() -> String {
61 "iceberg catalog type".to_string()
62 }
63}
64
65impl ImpliedValue for IcebergCatalogType {
66 fn implied_value() -> Result<Self, PlanError> {
67 sql_bail!("must provide an iceberg catalog type")
68 }
69}
70
71#[derive(Copy, Clone, Debug)]
72pub struct Secret(CatalogItemId);
73
74impl From<Secret> for CatalogItemId {
75 fn from(secret: Secret) -> Self {
76 secret.0
77 }
78}
79
80impl TryFromValue<WithOptionValue<Aug>> for Secret {
81 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
82 match StringOrSecret::try_from_value(v)? {
83 StringOrSecret::Secret(id) => Ok(Secret(id)),
84 StringOrSecret::String(_) => sql_bail!("must provide a secret value"),
85 }
86 }
87
88 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
89 let secret = catalog.get_item(&self.0);
90 let name = ResolvedItemName::Item {
91 id: self.0,
92 qualifiers: secret.name().qualifiers.clone(),
93 full_name: catalog.resolve_full_name(secret.name()),
94 print_id: false,
95 version: RelationVersionSelector::Latest,
96 };
97 Some(WithOptionValue::Secret(name))
98 }
99
100 fn name() -> String {
101 "secret".to_string()
102 }
103}
104
105impl ImpliedValue for Secret {
106 fn implied_value() -> Result<Self, PlanError> {
107 sql_bail!("must provide a secret value")
108 }
109}
110
111#[derive(Copy, Clone, Debug)]
112pub struct Object(CatalogItemId);
113
114impl From<Object> for CatalogItemId {
115 fn from(obj: Object) -> Self {
116 obj.0
117 }
118}
119
120impl From<&Object> for CatalogItemId {
121 fn from(obj: &Object) -> Self {
122 obj.0
123 }
124}
125
126impl TryFromValue<WithOptionValue<Aug>> for Object {
127 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
128 Ok(match v {
129 WithOptionValue::Item(ResolvedItemName::Item { id, .. }) => Object(id),
130 _ => sql_bail!("must provide an object"),
131 })
132 }
133
134 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
135 let item = catalog.get_item(&self.0);
136 let name = ResolvedItemName::Item {
137 id: self.0,
138 qualifiers: item.name().qualifiers.clone(),
139 full_name: catalog.resolve_full_name(item.name()),
140 print_id: false,
141 version: RelationVersionSelector::Latest,
143 };
144 Some(WithOptionValue::Item(name))
145 }
146
147 fn name() -> String {
148 "object reference".to_string()
149 }
150}
151
152impl ImpliedValue for Object {
153 fn implied_value() -> Result<Self, PlanError> {
154 sql_bail!("must provide an object")
155 }
156}
157
158impl TryFromValue<WithOptionValue<Aug>> for Ident {
159 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
160 Ok(match v {
161 WithOptionValue::UnresolvedItemName(UnresolvedItemName(mut inner))
162 if inner.len() == 1 =>
163 {
164 inner.remove(0)
165 }
166 WithOptionValue::Ident(inner) => inner,
167 _ => sql_bail!("must provide an unqualified identifier"),
168 })
169 }
170
171 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
172 Some(WithOptionValue::Ident(self))
173 }
174
175 fn name() -> String {
176 "identifier".to_string()
177 }
178}
179
180impl ImpliedValue for Ident {
181 fn implied_value() -> Result<Self, PlanError> {
182 sql_bail!("must provide an identifier")
183 }
184}
185
186impl TryFromValue<WithOptionValue<Aug>> for Expr<Aug> {
187 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
188 Ok(match v {
189 WithOptionValue::Expr(e) => e,
190 _ => sql_bail!("must provide an expr"),
191 })
192 }
193
194 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
195 Some(WithOptionValue::Expr(self))
196 }
197
198 fn name() -> String {
199 "expression".to_string()
200 }
201}
202
203impl ImpliedValue for Expr<Aug> {
204 fn implied_value() -> Result<Self, PlanError> {
205 sql_bail!("must provide an expression")
206 }
207}
208
209impl TryFromValue<WithOptionValue<Aug>> for UnresolvedItemName {
210 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
211 Ok(match v {
212 WithOptionValue::UnresolvedItemName(name) => name,
213 WithOptionValue::Ident(inner) => UnresolvedItemName(vec![inner]),
214 _ => sql_bail!("must provide an object name"),
215 })
216 }
217
218 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
219 Some(WithOptionValue::UnresolvedItemName(self))
220 }
221
222 fn name() -> String {
223 "object name".to_string()
224 }
225}
226
227impl ImpliedValue for UnresolvedItemName {
228 fn implied_value() -> Result<Self, PlanError> {
229 sql_bail!("must provide an object name")
230 }
231}
232
233impl TryFromValue<WithOptionValue<Aug>> for ResolvedDataType {
234 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
235 Ok(match v {
236 WithOptionValue::DataType(ty) => ty,
237 _ => sql_bail!("must provide a data type"),
238 })
239 }
240
241 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
242 Some(WithOptionValue::DataType(self))
243 }
244
245 fn name() -> String {
246 "data type".to_string()
247 }
248}
249
250impl ImpliedValue for ResolvedDataType {
251 fn implied_value() -> Result<Self, PlanError> {
252 sql_bail!("must provide a data type")
253 }
254}
255
256impl TryFromValue<WithOptionValue<Aug>> for StringOrSecret {
257 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
258 Ok(match v {
259 WithOptionValue::Secret(ResolvedItemName::Item { id, .. }) => {
260 StringOrSecret::Secret(id)
261 }
262 v => StringOrSecret::String(String::try_from_value(v)?),
263 })
264 }
265
266 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
267 Some(match self {
268 StringOrSecret::Secret(secret) => Secret(secret).try_into_value(catalog)?,
269 StringOrSecret::String(s) => s.try_into_value(catalog)?,
270 })
271 }
272
273 fn name() -> String {
274 "string or secret".to_string()
275 }
276}
277
278impl ImpliedValue for StringOrSecret {
279 fn implied_value() -> Result<Self, PlanError> {
280 sql_bail!("must provide a string or secret value")
281 }
282}
283
284impl TryFromValue<Value> for Duration {
285 fn try_from_value(v: Value) -> Result<Self, PlanError> {
286 let interval = Interval::try_from_value(v)?;
287 Ok(interval.duration()?)
288 }
289
290 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Value> {
291 let interval = Interval::from_duration(&self)
292 .expect("planning ensured that this is convertible back to Interval");
293 interval.try_into_value(catalog)
294 }
295
296 fn name() -> String {
297 "interval".to_string()
298 }
299}
300
301impl ImpliedValue for Duration {
302 fn implied_value() -> Result<Self, PlanError> {
303 sql_bail!("must provide an interval value")
304 }
305}
306
307impl TryFromValue<Value> for ByteSize {
308 fn try_from_value(v: Value) -> Result<Self, PlanError> {
309 match v {
310 Value::Number(value) | Value::String(value) => Ok(value
311 .parse::<ByteSize>()
312 .map_err(|e| sql_err!("invalid bytes value: {e}"))?),
313 _ => sql_bail!("cannot use value as bytes"),
314 }
315 }
316
317 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
318 Some(Value::String(self.to_string()))
319 }
320
321 fn name() -> String {
322 "bytes".to_string()
323 }
324}
325
326impl ImpliedValue for ByteSize {
327 fn implied_value() -> Result<Self, PlanError> {
328 sql_bail!("must provide a value for bytes")
329 }
330}
331
332impl TryFromValue<Value> for Interval {
333 fn try_from_value(v: Value) -> Result<Self, PlanError> {
334 match v {
335 Value::Interval(value) => literal::plan_interval(&value),
336 Value::Number(value) | Value::String(value) => Ok(strconv::parse_interval(&value)?),
337 _ => sql_bail!("cannot use value as interval"),
338 }
339 }
340
341 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
342 let interval_value = literal::unplan_interval(&self);
343 Some(Value::Interval(interval_value))
344 }
345
346 fn name() -> String {
347 "interval".to_string()
348 }
349}
350
351impl ImpliedValue for Interval {
352 fn implied_value() -> Result<Self, PlanError> {
353 sql_bail!("must provide an interval value")
354 }
355}
356
357#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
358pub struct OptionalString(pub Option<String>);
359
360impl TryFromValue<Value> for OptionalString {
361 fn try_from_value(v: Value) -> Result<Self, PlanError> {
362 Ok(match v {
363 Value::Null => Self(None),
364 v => Self(Some(String::try_from_value(v)?)),
365 })
366 }
367
368 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Value> {
369 Some(match self.0 {
370 None => Value::Null,
371 Some(s) => s.try_into_value(catalog)?,
372 })
373 }
374
375 fn name() -> String {
376 "optional string".to_string()
377 }
378}
379
380impl ImpliedValue for OptionalString {
381 fn implied_value() -> Result<Self, PlanError> {
382 sql_bail!("must provide a string value")
383 }
384}
385
386#[derive(
387 Debug,
388 Clone,
389 Copy,
390 PartialEq,
391 Eq,
392 PartialOrd,
393 Ord,
394 Serialize,
395 Hash,
396 Deserialize
397)]
398pub struct OptionalDuration(pub Option<Duration>);
399
400impl From<Duration> for OptionalDuration {
401 fn from(i: Duration) -> OptionalDuration {
402 let inner = if i == Duration::ZERO { None } else { Some(i) };
404 OptionalDuration(inner)
405 }
406}
407
408impl TryFromValue<Value> for OptionalDuration {
409 fn try_from_value(v: Value) -> Result<Self, PlanError> {
410 Ok(match v {
411 Value::Null => OptionalDuration(None),
412 v => Duration::try_from_value(v)?.into(),
413 })
414 }
415
416 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Value> {
417 Some(match self.0 {
418 None => Value::Null,
419 Some(duration) => duration.try_into_value(catalog)?,
420 })
421 }
422
423 fn name() -> String {
424 "optional interval".to_string()
425 }
426}
427
428impl ImpliedValue for OptionalDuration {
429 fn implied_value() -> Result<Self, PlanError> {
430 sql_bail!("must provide an interval value")
431 }
432}
433
434impl TryFromValue<Value> for String {
435 fn try_from_value(v: Value) -> Result<Self, PlanError> {
436 match v {
437 Value::String(v) => Ok(v),
438 _ => sql_bail!("cannot use value as string"),
439 }
440 }
441
442 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
443 Some(Value::String(self))
444 }
445
446 fn name() -> String {
447 "text".to_string()
448 }
449}
450
451impl ImpliedValue for String {
452 fn implied_value() -> Result<Self, PlanError> {
453 sql_bail!("must provide a string value")
454 }
455}
456
457impl TryFromValue<Value> for bool {
458 fn try_from_value(v: Value) -> Result<Self, PlanError> {
459 match v {
460 Value::Boolean(v) => Ok(v),
461 _ => sql_bail!("cannot use value as boolean"),
462 }
463 }
464
465 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
466 Some(Value::Boolean(self))
467 }
468
469 fn name() -> String {
470 "bool".to_string()
471 }
472}
473
474impl ImpliedValue for bool {
475 fn implied_value() -> Result<Self, PlanError> {
476 Ok(true)
477 }
478}
479
480impl TryFromValue<Value> for f64 {
481 fn try_from_value(v: Value) -> Result<Self, PlanError> {
482 match v {
483 Value::Number(v) => v
484 .parse::<f64>()
485 .map_err(|e| sql_err!("invalid numeric value: {e}")),
486 _ => sql_bail!("cannot use value as number"),
487 }
488 }
489
490 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
491 Some(Value::Number(self.to_string()))
492 }
493
494 fn name() -> String {
495 "float8".to_string()
496 }
497}
498
499impl ImpliedValue for f64 {
500 fn implied_value() -> Result<Self, PlanError> {
501 sql_bail!("must provide a float value")
502 }
503}
504
505impl TryFromValue<Value> for i32 {
506 fn try_from_value(v: Value) -> Result<Self, PlanError> {
507 match v {
508 Value::Number(v) => v
509 .parse::<i32>()
510 .map_err(|e| sql_err!("invalid numeric value: {e}")),
511 _ => sql_bail!("cannot use value as number"),
512 }
513 }
514
515 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
516 Some(Value::Number(self.to_string()))
517 }
518
519 fn name() -> String {
520 "int".to_string()
521 }
522}
523
524impl ImpliedValue for i32 {
525 fn implied_value() -> Result<Self, PlanError> {
526 sql_bail!("must provide an integer value")
527 }
528}
529
530impl TryFromValue<Value> for i64 {
531 fn try_from_value(v: Value) -> Result<Self, PlanError> {
532 match v {
533 Value::Number(v) => v
534 .parse::<i64>()
535 .map_err(|e| sql_err!("invalid numeric value: {e}")),
536 _ => sql_bail!("cannot use value as number"),
537 }
538 }
539 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
540 Some(Value::Number(self.to_string()))
541 }
542 fn name() -> String {
543 "int8".to_string()
544 }
545}
546
547impl ImpliedValue for i64 {
548 fn implied_value() -> Result<Self, PlanError> {
549 sql_bail!("must provide an integer value")
550 }
551}
552
553impl TryFromValue<Value> for u16 {
554 fn try_from_value(v: Value) -> Result<Self, PlanError> {
555 match v {
556 Value::Number(v) => v
557 .parse::<u16>()
558 .map_err(|e| sql_err!("invalid numeric value: {e}")),
559 _ => sql_bail!("cannot use value as number"),
560 }
561 }
562 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
563 Some(Value::Number(self.to_string()))
564 }
565 fn name() -> String {
566 "uint2".to_string()
567 }
568}
569
570impl ImpliedValue for u16 {
571 fn implied_value() -> Result<Self, PlanError> {
572 sql_bail!("must provide an integer value")
573 }
574}
575
576impl TryFromValue<Value> for u32 {
577 fn try_from_value(v: Value) -> Result<Self, PlanError> {
578 match v {
579 Value::Number(v) => v
580 .parse::<u32>()
581 .map_err(|e| sql_err!("invalid numeric value: {e}")),
582 _ => sql_bail!("cannot use value as number"),
583 }
584 }
585 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
586 Some(Value::Number(self.to_string()))
587 }
588 fn name() -> String {
589 "uint4".to_string()
590 }
591}
592
593impl ImpliedValue for u32 {
594 fn implied_value() -> Result<Self, PlanError> {
595 sql_bail!("must provide an integer value")
596 }
597}
598
599impl TryFromValue<Value> for u64 {
600 fn try_from_value(v: Value) -> Result<Self, PlanError> {
601 match v {
602 Value::Number(v) => v
603 .parse::<u64>()
604 .map_err(|e| sql_err!("invalid unsigned numeric value: {e}")),
605 _ => sql_bail!("cannot use value as number"),
606 }
607 }
608 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
609 Some(Value::Number(self.to_string()))
610 }
611 fn name() -> String {
612 "uint8".to_string()
613 }
614}
615
616impl ImpliedValue for u64 {
617 fn implied_value() -> Result<Self, PlanError> {
618 sql_bail!("must provide an unsigned integer value")
619 }
620}
621
622impl<V: TryFromValue<WithOptionValue<Aug>>> TryFromValue<WithOptionValue<Aug>> for Vec<V> {
623 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
624 match v {
625 WithOptionValue::Sequence(a) => {
626 let mut out = Vec::with_capacity(a.len());
627 for i in a {
628 out.push(
629 V::try_from_value(i)
630 .map_err(|_| anyhow::anyhow!("cannot use value in array"))?,
631 )
632 }
633 Ok(out)
634 }
635 _ => sql_bail!("cannot use value as array"),
636 }
637 }
638
639 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
640 Some(WithOptionValue::Sequence(
641 self.into_iter()
642 .map(|v| v.try_into_value(catalog))
643 .collect::<Option<_>>()?,
644 ))
645 }
646
647 fn name() -> String {
648 format!("array of {}", V::name())
649 }
650}
651
652impl<V: ImpliedValue> ImpliedValue for Vec<V> {
653 fn implied_value() -> Result<Self, PlanError> {
654 sql_bail!("must provide an array value")
655 }
656}
657
658impl<T: AstInfo, V: TryFromValue<WithOptionValue<T>>> TryFromValue<WithOptionValue<T>>
659 for Option<V>
660{
661 fn try_from_value(v: WithOptionValue<T>) -> Result<Self, PlanError> {
662 Ok(Some(V::try_from_value(v)?))
663 }
664
665 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<T>> {
666 match self {
667 Some(v) => v.try_into_value(catalog),
668 None => None,
669 }
670 }
671
672 fn name() -> String {
673 format!("optional {}", V::name())
674 }
675}
676
677impl<V: ImpliedValue> ImpliedValue for Option<V> {
678 fn implied_value() -> Result<Self, PlanError> {
679 Ok(Some(V::implied_value()?))
680 }
681}
682
683impl<V: TryFromValue<Value>, T: AstInfo + std::fmt::Debug> TryFromValue<WithOptionValue<T>> for V {
684 fn try_from_value(v: WithOptionValue<T>) -> Result<Self, PlanError> {
685 match v {
686 WithOptionValue::Value(v) => V::try_from_value(v),
687 WithOptionValue::UnresolvedItemName(UnresolvedItemName(mut inner))
688 if inner.len() == 1 =>
689 {
690 V::try_from_value(Value::String(inner.remove(0).into_string()))
691 }
692 WithOptionValue::Ident(v) => V::try_from_value(Value::String(v.into_string())),
693 WithOptionValue::RetainHistoryFor(v) => V::try_from_value(v),
694 WithOptionValue::Sequence(_)
695 | WithOptionValue::Map(_)
696 | WithOptionValue::Item(_)
697 | WithOptionValue::UnresolvedItemName(_)
698 | WithOptionValue::Secret(_)
699 | WithOptionValue::DataType(_)
700 | WithOptionValue::Expr(_)
701 | WithOptionValue::ClusterReplicas(_)
702 | WithOptionValue::ConnectionKafkaBroker(_)
703 | WithOptionValue::ConnectionAwsPrivatelink(_)
704 | WithOptionValue::KafkaMatchingBrokerRule(_)
705 | WithOptionValue::ClusterAlterStrategy(_)
706 | WithOptionValue::Refresh(_)
707 | WithOptionValue::ClusterScheduleOptionValue(_)
708 | WithOptionValue::NetworkPolicyRules(_) => sql_bail!(
709 "incompatible value types: cannot convert {} to {}",
710 match v {
711 WithOptionValue::Value(_) => unreachable!(),
713 WithOptionValue::RetainHistoryFor(_) => unreachable!(),
714 WithOptionValue::ClusterAlterStrategy(_) => "cluster alter strategy",
715 WithOptionValue::Sequence(_) => "sequences",
716 WithOptionValue::Map(_) => "maps",
717 WithOptionValue::Item(_) => "object references",
718 WithOptionValue::UnresolvedItemName(_) => "object names",
719 WithOptionValue::Ident(_) => "identifiers",
720 WithOptionValue::Secret(_) => "secrets",
721 WithOptionValue::DataType(_) => "data types",
722 WithOptionValue::Expr(_) => "exprs",
723 WithOptionValue::ClusterReplicas(_) => "cluster replicas",
724 WithOptionValue::ConnectionKafkaBroker(_) => "connection kafka brokers",
725 WithOptionValue::ConnectionAwsPrivatelink(_) => "connection privatelink",
726 WithOptionValue::KafkaMatchingBrokerRule(_) => "matching broker rule",
727 WithOptionValue::Refresh(_) => "refresh option values",
728 WithOptionValue::ClusterScheduleOptionValue(_) => "cluster schedule",
729 WithOptionValue::NetworkPolicyRules(_) => "network policy rules",
730 },
731 V::name()
732 ),
733 }
734 }
735
736 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<T>> {
737 Some(WithOptionValue::Value(self.try_into_value(catalog)?))
738 }
739
740 fn name() -> String {
741 V::name()
742 }
743}
744
745impl<T, V: TryFromValue<T> + ImpliedValue> TryFromValue<Option<T>> for V {
746 fn try_from_value(v: Option<T>) -> Result<Self, PlanError> {
747 match v {
748 Some(v) => V::try_from_value(v),
749 None => V::implied_value(),
750 }
751 }
752
753 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Option<T>> {
754 Some(Some(self.try_into_value(catalog)?))
755 }
756
757 fn name() -> String {
758 V::name()
759 }
760}
761
762impl TryFromValue<WithOptionValue<Aug>> for Vec<ReplicaDefinition<Aug>> {
763 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
764 match v {
765 WithOptionValue::ClusterReplicas(replicas) => Ok(replicas),
766 _ => sql_bail!("cannot use value as cluster replicas"),
767 }
768 }
769
770 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
771 Some(WithOptionValue::ClusterReplicas(self))
772 }
773
774 fn name() -> String {
775 "cluster replicas".to_string()
776 }
777}
778
779impl ImpliedValue for Vec<ReplicaDefinition<Aug>> {
780 fn implied_value() -> Result<Self, PlanError> {
781 sql_bail!("must provide a set of cluster replicas")
782 }
783}
784
785impl TryFromValue<WithOptionValue<Aug>> for Vec<KafkaBroker<Aug>> {
786 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
787 let mut out = vec![];
788 match v {
789 WithOptionValue::ConnectionKafkaBroker(broker) => {
790 out.push(broker);
791 }
792 WithOptionValue::Sequence(values) => {
793 for value in values {
794 out.extend(Self::try_from_value(value)?);
795 }
796 }
797 _ => sql_bail!("cannot use value as a kafka broker"),
798 }
799 Ok(out)
800 }
801
802 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
803 Some(WithOptionValue::Sequence(
804 self.into_iter()
805 .map(WithOptionValue::ConnectionKafkaBroker)
806 .collect(),
807 ))
808 }
809
810 fn name() -> String {
811 "kafka broker".to_string()
812 }
813}
814
815impl ImpliedValue for Vec<KafkaBroker<Aug>> {
816 fn implied_value() -> Result<Self, PlanError> {
817 sql_bail!("must provide a kafka broker")
818 }
819}
820
821impl TryFromValue<WithOptionValue<Aug>> for RefreshOptionValue<Aug> {
822 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
823 if let WithOptionValue::Refresh(r) = v {
824 Ok(r)
825 } else {
826 sql_bail!("cannot use value `{}` for a refresh option", v)
827 }
828 }
829
830 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
831 Some(WithOptionValue::Refresh(self))
832 }
833
834 fn name() -> String {
835 "refresh option value".to_string()
836 }
837}
838
839impl ImpliedValue for RefreshOptionValue<Aug> {
840 fn implied_value() -> Result<Self, PlanError> {
841 sql_bail!("must provide a refresh option value")
842 }
843}
844
845impl TryFromValue<WithOptionValue<Aug>> for ConnectionDefaultAwsPrivatelink<Aug> {
846 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
847 if let WithOptionValue::ConnectionAwsPrivatelink(r) = v {
848 Ok(r)
849 } else {
850 sql_bail!("cannot use value `{}` for a privatelink", v)
851 }
852 }
853
854 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
855 Some(WithOptionValue::ConnectionAwsPrivatelink(self))
856 }
857
858 fn name() -> String {
859 "privatelink option value".to_string()
860 }
861}
862
863impl TryFromValue<WithOptionValue<Aug>> for KafkaMatchingBrokerRule<Aug> {
864 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
865 if let WithOptionValue::KafkaMatchingBrokerRule(r) = v {
866 Ok(r)
867 } else {
868 sql_bail!("cannot use value `{}` for a matching broker rule", v)
869 }
870 }
871
872 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
873 Some(WithOptionValue::KafkaMatchingBrokerRule(self))
874 }
875
876 fn name() -> String {
877 "matching broker rule".to_string()
878 }
879}
880
881impl ImpliedValue for ConnectionDefaultAwsPrivatelink<Aug> {
882 fn implied_value() -> Result<Self, PlanError> {
883 sql_bail!("must provide a value")
884 }
885}
886
887impl ImpliedValue for KafkaMatchingBrokerRule<Aug> {
888 fn implied_value() -> Result<Self, PlanError> {
889 sql_bail!("must provide a value")
890 }
891}
892
893#[derive(Debug)]
896pub struct BrokersList {
897 pub static_entries: Vec<KafkaBroker<Aug>>,
898 pub matching_rules: Vec<KafkaMatchingBrokerRule<Aug>>,
899}
900
901impl TryFromValue<WithOptionValue<Aug>> for BrokersList {
902 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
903 match v {
904 WithOptionValue::Sequence(entries) => {
905 let mut static_entries = vec![];
906 let mut matching_rules = vec![];
907 for entry in entries {
908 match entry {
909 WithOptionValue::ConnectionKafkaBroker(b) => static_entries.push(b),
910 WithOptionValue::KafkaMatchingBrokerRule(m) => matching_rules.push(m),
911 other => sql_bail!("unexpected value in BROKERS: {}", other),
912 }
913 }
914 Ok(BrokersList {
915 static_entries,
916 matching_rules,
917 })
918 }
919 WithOptionValue::ConnectionKafkaBroker(b) => Ok(BrokersList {
920 static_entries: vec![b],
921 matching_rules: vec![],
922 }),
923 WithOptionValue::KafkaMatchingBrokerRule(m) => Ok(BrokersList {
924 static_entries: vec![],
925 matching_rules: vec![m],
926 }),
927 other => sql_bail!("cannot use {} as brokers list", other),
928 }
929 }
930
931 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
932 let mut entries: Vec<WithOptionValue<Aug>> = vec![];
933 for b in self.static_entries {
934 entries.push(WithOptionValue::ConnectionKafkaBroker(b));
935 }
936 for m in self.matching_rules {
937 entries.push(WithOptionValue::KafkaMatchingBrokerRule(m));
938 }
939 Some(WithOptionValue::Sequence(entries))
940 }
941
942 fn name() -> String {
943 "brokers list".to_string()
944 }
945}
946
947impl ImpliedValue for BrokersList {
948 fn implied_value() -> Result<Self, PlanError> {
949 sql_bail!("must provide a value for BROKERS")
950 }
951}
952
953impl ImpliedValue for ClusterScheduleOptionValue {
954 fn implied_value() -> Result<Self, PlanError> {
955 sql_bail!("must provide a cluster schedule option value")
956 }
957}
958
959impl ImpliedValue for ClusterAlterOptionValue<Aug> {
960 fn implied_value() -> Result<Self, PlanError> {
961 sql_bail!("must provide a value")
962 }
963}
964
965impl TryFromValue<WithOptionValue<Aug>> for ClusterScheduleOptionValue {
966 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
967 if let WithOptionValue::ClusterScheduleOptionValue(r) = v {
968 Ok(r)
969 } else {
970 sql_bail!("cannot use value `{}` for a cluster schedule", v)
971 }
972 }
973
974 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
975 Some(WithOptionValue::ClusterScheduleOptionValue(self))
976 }
977
978 fn name() -> String {
979 "cluster schedule option value".to_string()
980 }
981}
982
983impl<V: ImpliedValue> ImpliedValue for BTreeMap<String, V> {
984 fn implied_value() -> Result<Self, PlanError> {
985 sql_bail!("must provide a map of key-value pairs")
986 }
987}
988
989impl<V: TryFromValue<WithOptionValue<Aug>>> TryFromValue<WithOptionValue<Aug>>
990 for BTreeMap<String, V>
991{
992 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
993 match v {
994 WithOptionValue::Map(a) => a
995 .into_iter()
996 .map(|(k, v)| Ok((k, V::try_from_value(v)?)))
997 .collect(),
998 _ => sql_bail!("cannot use value as map"),
999 }
1000 }
1001
1002 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
1003 Some(WithOptionValue::Map(
1004 self.into_iter()
1005 .map(|(k, v)| {
1006 let v = v.try_into_value(catalog);
1007 v.map(|v| (k, v))
1008 })
1009 .collect::<Option<_>>()?,
1010 ))
1011 }
1012
1013 fn name() -> String {
1014 format!("map of string to {}", V::name())
1015 }
1016}
1017
1018impl TryFromValue<WithOptionValue<Aug>> for ClusterAlterOptionValue<Aug> {
1019 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
1020 if let WithOptionValue::ClusterAlterStrategy(r) = v {
1021 Ok(r)
1022 } else {
1023 sql_bail!("cannot use value `{}` for a cluster alter strategy", v)
1024 }
1025 }
1026
1027 fn name() -> String {
1028 "cluster alter strategyoption value".to_string()
1029 }
1030
1031 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
1032 Some(WithOptionValue::ClusterAlterStrategy(self))
1033 }
1034}
1035
1036impl TryFromValue<WithOptionValue<Aug>> for Vec<NetworkPolicyRuleDefinition<Aug>> {
1037 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
1038 match v {
1039 WithOptionValue::NetworkPolicyRules(rules) => Ok(rules),
1040 _ => sql_bail!("cannot use value as cluster replicas"),
1041 }
1042 }
1043
1044 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
1045 Some(WithOptionValue::NetworkPolicyRules(self))
1046 }
1047
1048 fn name() -> String {
1049 "network policy rules".to_string()
1050 }
1051}
1052
1053impl ImpliedValue for Vec<NetworkPolicyRuleDefinition<Aug>> {
1054 fn implied_value() -> Result<Self, PlanError> {
1055 sql_bail!("must provide a set of network policy rules")
1056 }
1057}