1use std::collections::BTreeMap;
13use std::time::Duration;
14
15use mz_repr::adt::interval::Interval;
16use mz_repr::bytes::ByteSize;
17use mz_repr::{CatalogItemId, RelationVersionSelector, strconv};
18use mz_sql_parser::ast::{
19 ClusterAlterOptionValue, ClusterScheduleOptionValue, ConnectionDefaultAwsPrivatelink, Expr,
20 Ident, KafkaBroker, KafkaMatchingBrokerRule, NetworkPolicyRuleDefinition, RefreshOptionValue,
21 ReplicaDefinition,
22};
23use mz_storage_types::connections::IcebergCatalogType;
24use mz_storage_types::connections::string_or_secret::StringOrSecret;
25use serde::{Deserialize, Serialize};
26
27use crate::ast::{AstInfo, UnresolvedItemName, Value, WithOptionValue};
28use crate::catalog::SessionCatalog;
29use crate::names::{ResolvedDataType, ResolvedItemName};
30use crate::plan::{Aug, PlanError, literal};
31
32pub trait TryFromValue<T>: Sized {
33 fn try_from_value(v: T) -> Result<Self, PlanError>;
34
35 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<T>;
36
37 fn name() -> String;
38}
39
40pub trait ImpliedValue: Sized {
41 fn implied_value() -> Result<Self, PlanError>;
42}
43
44impl TryFromValue<WithOptionValue<Aug>> for IcebergCatalogType {
45 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
46 match String::try_from_value(v)? {
47 s if s.eq_ignore_ascii_case("rest") => Ok(IcebergCatalogType::Rest),
48 s if s.eq_ignore_ascii_case("s3tablesrest") => Ok(IcebergCatalogType::S3TablesRest),
49 _ => sql_bail!("invalid iceberg catalog type"),
50 }
51 }
52
53 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
54 Some(WithOptionValue::Value(Value::String(match self {
55 IcebergCatalogType::Rest => "rest".to_string(),
56 IcebergCatalogType::S3TablesRest => "s3tablesrest".to_string(),
57 })))
58 }
59
60 fn name() -> String {
61 "iceberg catalog type".to_string()
62 }
63}
64
65impl ImpliedValue for IcebergCatalogType {
66 fn implied_value() -> Result<Self, PlanError> {
67 sql_bail!("must provide an iceberg catalog type")
68 }
69}
70
71#[derive(Copy, Clone, Debug)]
72pub struct Secret(CatalogItemId);
73
74impl From<Secret> for CatalogItemId {
75 fn from(secret: Secret) -> Self {
76 secret.0
77 }
78}
79
80impl TryFromValue<WithOptionValue<Aug>> for Secret {
81 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
82 match StringOrSecret::try_from_value(v)? {
83 StringOrSecret::Secret(id) => Ok(Secret(id)),
84 StringOrSecret::String(_) => sql_bail!("must provide a secret value"),
85 }
86 }
87
88 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
89 let secret = catalog.get_item(&self.0);
90 let name = ResolvedItemName::Item {
91 id: self.0,
92 qualifiers: secret.name().qualifiers.clone(),
93 full_name: catalog.resolve_full_name(secret.name()),
94 print_id: false,
95 version: RelationVersionSelector::Latest,
96 };
97 Some(WithOptionValue::Secret(name))
98 }
99
100 fn name() -> String {
101 "secret".to_string()
102 }
103}
104
105impl ImpliedValue for Secret {
106 fn implied_value() -> Result<Self, PlanError> {
107 sql_bail!("must provide a secret value")
108 }
109}
110
111#[derive(Copy, Clone, Debug)]
112pub struct Object(CatalogItemId);
113
114impl From<Object> for CatalogItemId {
115 fn from(obj: Object) -> Self {
116 obj.0
117 }
118}
119
120impl From<&Object> for CatalogItemId {
121 fn from(obj: &Object) -> Self {
122 obj.0
123 }
124}
125
126impl TryFromValue<WithOptionValue<Aug>> for Object {
127 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
128 Ok(match v {
129 WithOptionValue::Item(ResolvedItemName::Item { id, .. }) => Object(id),
130 _ => sql_bail!("must provide an object"),
131 })
132 }
133
134 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
135 let item = catalog.get_item(&self.0);
136 let name = ResolvedItemName::Item {
137 id: self.0,
138 qualifiers: item.name().qualifiers.clone(),
139 full_name: catalog.resolve_full_name(item.name()),
140 print_id: false,
141 version: RelationVersionSelector::Latest,
143 };
144 Some(WithOptionValue::Item(name))
145 }
146
147 fn name() -> String {
148 "object reference".to_string()
149 }
150}
151
152impl ImpliedValue for Object {
153 fn implied_value() -> Result<Self, PlanError> {
154 sql_bail!("must provide an object")
155 }
156}
157
158impl TryFromValue<WithOptionValue<Aug>> for Ident {
159 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
160 Ok(match v {
161 WithOptionValue::UnresolvedItemName(UnresolvedItemName(mut inner))
162 if inner.len() == 1 =>
163 {
164 inner.remove(0)
165 }
166 WithOptionValue::Ident(inner) => inner,
167 _ => sql_bail!("must provide an unqualified identifier"),
168 })
169 }
170
171 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
172 Some(WithOptionValue::Ident(self))
173 }
174
175 fn name() -> String {
176 "identifier".to_string()
177 }
178}
179
180impl ImpliedValue for Ident {
181 fn implied_value() -> Result<Self, PlanError> {
182 sql_bail!("must provide an identifier")
183 }
184}
185
186impl TryFromValue<WithOptionValue<Aug>> for Expr<Aug> {
187 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
188 Ok(match v {
189 WithOptionValue::Expr(e) => e,
190 _ => sql_bail!("must provide an expr"),
191 })
192 }
193
194 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
195 Some(WithOptionValue::Expr(self))
196 }
197
198 fn name() -> String {
199 "expression".to_string()
200 }
201}
202
203impl ImpliedValue for Expr<Aug> {
204 fn implied_value() -> Result<Self, PlanError> {
205 sql_bail!("must provide an expression")
206 }
207}
208
209impl TryFromValue<WithOptionValue<Aug>> for UnresolvedItemName {
210 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
211 Ok(match v {
212 WithOptionValue::UnresolvedItemName(name) => name,
213 WithOptionValue::Ident(inner) => UnresolvedItemName(vec![inner]),
214 _ => sql_bail!("must provide an object name"),
215 })
216 }
217
218 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
219 Some(WithOptionValue::UnresolvedItemName(self))
220 }
221
222 fn name() -> String {
223 "object name".to_string()
224 }
225}
226
227impl ImpliedValue for UnresolvedItemName {
228 fn implied_value() -> Result<Self, PlanError> {
229 sql_bail!("must provide an object name")
230 }
231}
232
233impl TryFromValue<WithOptionValue<Aug>> for ResolvedDataType {
234 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
235 Ok(match v {
236 WithOptionValue::DataType(ty) => ty,
237 _ => sql_bail!("must provide a data type"),
238 })
239 }
240
241 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
242 Some(WithOptionValue::DataType(self))
243 }
244
245 fn name() -> String {
246 "data type".to_string()
247 }
248}
249
250impl ImpliedValue for ResolvedDataType {
251 fn implied_value() -> Result<Self, PlanError> {
252 sql_bail!("must provide a data type")
253 }
254}
255
256impl TryFromValue<WithOptionValue<Aug>> for StringOrSecret {
257 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
258 Ok(match v {
259 WithOptionValue::Secret(ResolvedItemName::Item { id, .. }) => {
260 StringOrSecret::Secret(id)
261 }
262 v => StringOrSecret::String(String::try_from_value(v)?),
263 })
264 }
265
266 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
267 Some(match self {
268 StringOrSecret::Secret(secret) => Secret(secret).try_into_value(catalog)?,
269 StringOrSecret::String(s) => s.try_into_value(catalog)?,
270 })
271 }
272
273 fn name() -> String {
274 "string or secret".to_string()
275 }
276}
277
278impl ImpliedValue for StringOrSecret {
279 fn implied_value() -> Result<Self, PlanError> {
280 sql_bail!("must provide a string or secret value")
281 }
282}
283
284impl TryFromValue<Value> for Duration {
285 fn try_from_value(v: Value) -> Result<Self, PlanError> {
286 let interval = Interval::try_from_value(v)?;
287 let duration = interval.duration()?;
288 Interval::from_duration(&duration).map_err(|_| sql_err!("interval out of range"))?;
293 Ok(duration)
294 }
295
296 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Value> {
297 let interval = Interval::from_duration(&self)
298 .expect("planning ensured that this is convertible back to Interval");
299 interval.try_into_value(catalog)
300 }
301
302 fn name() -> String {
303 "interval".to_string()
304 }
305}
306
307impl ImpliedValue for Duration {
308 fn implied_value() -> Result<Self, PlanError> {
309 sql_bail!("must provide an interval value")
310 }
311}
312
313impl TryFromValue<Value> for ByteSize {
314 fn try_from_value(v: Value) -> Result<Self, PlanError> {
315 match v {
316 Value::Number(value) | Value::String(value) => Ok(value
317 .parse::<ByteSize>()
318 .map_err(|e| sql_err!("invalid bytes value: {e}"))?),
319 _ => sql_bail!("cannot use value as bytes"),
320 }
321 }
322
323 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
324 Some(Value::String(self.to_string()))
325 }
326
327 fn name() -> String {
328 "bytes".to_string()
329 }
330}
331
332impl ImpliedValue for ByteSize {
333 fn implied_value() -> Result<Self, PlanError> {
334 sql_bail!("must provide a value for bytes")
335 }
336}
337
338impl TryFromValue<Value> for Interval {
339 fn try_from_value(v: Value) -> Result<Self, PlanError> {
340 match v {
341 Value::Interval(value) => literal::plan_interval(&value),
342 Value::Number(value) | Value::String(value) => Ok(strconv::parse_interval(&value)?),
343 _ => sql_bail!("cannot use value as interval"),
344 }
345 }
346
347 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
348 let interval_value = literal::unplan_interval(&self);
349 Some(Value::Interval(interval_value))
350 }
351
352 fn name() -> String {
353 "interval".to_string()
354 }
355}
356
357impl ImpliedValue for Interval {
358 fn implied_value() -> Result<Self, PlanError> {
359 sql_bail!("must provide an interval value")
360 }
361}
362
363#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
364pub struct OptionalString(pub Option<String>);
365
366impl TryFromValue<Value> for OptionalString {
367 fn try_from_value(v: Value) -> Result<Self, PlanError> {
368 Ok(match v {
369 Value::Null => Self(None),
370 v => Self(Some(String::try_from_value(v)?)),
371 })
372 }
373
374 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Value> {
375 Some(match self.0 {
376 None => Value::Null,
377 Some(s) => s.try_into_value(catalog)?,
378 })
379 }
380
381 fn name() -> String {
382 "optional string".to_string()
383 }
384}
385
386impl ImpliedValue for OptionalString {
387 fn implied_value() -> Result<Self, PlanError> {
388 sql_bail!("must provide a string value")
389 }
390}
391
392#[derive(
393 Debug,
394 Clone,
395 Copy,
396 PartialEq,
397 Eq,
398 PartialOrd,
399 Ord,
400 Serialize,
401 Hash,
402 Deserialize
403)]
404pub struct OptionalDuration(pub Option<Duration>);
405
406impl From<Duration> for OptionalDuration {
407 fn from(i: Duration) -> OptionalDuration {
408 let inner = if i == Duration::ZERO { None } else { Some(i) };
410 OptionalDuration(inner)
411 }
412}
413
414impl TryFromValue<Value> for OptionalDuration {
415 fn try_from_value(v: Value) -> Result<Self, PlanError> {
416 Ok(match v {
417 Value::Null => OptionalDuration(None),
418 v => Duration::try_from_value(v)?.into(),
419 })
420 }
421
422 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Value> {
423 Some(match self.0 {
424 None => Value::Null,
425 Some(duration) => duration.try_into_value(catalog)?,
426 })
427 }
428
429 fn name() -> String {
430 "optional interval".to_string()
431 }
432}
433
434impl ImpliedValue for OptionalDuration {
435 fn implied_value() -> Result<Self, PlanError> {
436 sql_bail!("must provide an interval value")
437 }
438}
439
440impl TryFromValue<Value> for String {
441 fn try_from_value(v: Value) -> Result<Self, PlanError> {
442 match v {
443 Value::String(v) => Ok(v),
444 _ => sql_bail!("cannot use value as string"),
445 }
446 }
447
448 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
449 Some(Value::String(self))
450 }
451
452 fn name() -> String {
453 "text".to_string()
454 }
455}
456
457impl ImpliedValue for String {
458 fn implied_value() -> Result<Self, PlanError> {
459 sql_bail!("must provide a string value")
460 }
461}
462
463impl TryFromValue<Value> for bool {
464 fn try_from_value(v: Value) -> Result<Self, PlanError> {
465 match v {
466 Value::Boolean(v) => Ok(v),
467 _ => sql_bail!("cannot use value as boolean"),
468 }
469 }
470
471 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
472 Some(Value::Boolean(self))
473 }
474
475 fn name() -> String {
476 "bool".to_string()
477 }
478}
479
480impl ImpliedValue for bool {
481 fn implied_value() -> Result<Self, PlanError> {
482 Ok(true)
483 }
484}
485
486impl TryFromValue<Value> for f64 {
487 fn try_from_value(v: Value) -> Result<Self, PlanError> {
488 match v {
489 Value::Number(v) => v
490 .parse::<f64>()
491 .map_err(|e| sql_err!("invalid numeric value: {e}")),
492 _ => sql_bail!("cannot use value as number"),
493 }
494 }
495
496 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
497 Some(Value::Number(self.to_string()))
498 }
499
500 fn name() -> String {
501 "float8".to_string()
502 }
503}
504
505impl ImpliedValue for f64 {
506 fn implied_value() -> Result<Self, PlanError> {
507 sql_bail!("must provide a float value")
508 }
509}
510
511impl TryFromValue<Value> for i32 {
512 fn try_from_value(v: Value) -> Result<Self, PlanError> {
513 match v {
514 Value::Number(v) => v
515 .parse::<i32>()
516 .map_err(|e| sql_err!("invalid numeric value: {e}")),
517 _ => sql_bail!("cannot use value as number"),
518 }
519 }
520
521 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
522 Some(Value::Number(self.to_string()))
523 }
524
525 fn name() -> String {
526 "int".to_string()
527 }
528}
529
530impl ImpliedValue for i32 {
531 fn implied_value() -> Result<Self, PlanError> {
532 sql_bail!("must provide an integer value")
533 }
534}
535
536impl TryFromValue<Value> for i64 {
537 fn try_from_value(v: Value) -> Result<Self, PlanError> {
538 match v {
539 Value::Number(v) => v
540 .parse::<i64>()
541 .map_err(|e| sql_err!("invalid numeric value: {e}")),
542 _ => sql_bail!("cannot use value as number"),
543 }
544 }
545 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
546 Some(Value::Number(self.to_string()))
547 }
548 fn name() -> String {
549 "int8".to_string()
550 }
551}
552
553impl ImpliedValue for i64 {
554 fn implied_value() -> Result<Self, PlanError> {
555 sql_bail!("must provide an integer value")
556 }
557}
558
559impl TryFromValue<Value> for u16 {
560 fn try_from_value(v: Value) -> Result<Self, PlanError> {
561 match v {
562 Value::Number(v) => v
563 .parse::<u16>()
564 .map_err(|e| sql_err!("invalid numeric value: {e}")),
565 _ => sql_bail!("cannot use value as number"),
566 }
567 }
568 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
569 Some(Value::Number(self.to_string()))
570 }
571 fn name() -> String {
572 "uint2".to_string()
573 }
574}
575
576impl ImpliedValue for u16 {
577 fn implied_value() -> Result<Self, PlanError> {
578 sql_bail!("must provide an integer value")
579 }
580}
581
582impl TryFromValue<Value> for u32 {
583 fn try_from_value(v: Value) -> Result<Self, PlanError> {
584 match v {
585 Value::Number(v) => v
586 .parse::<u32>()
587 .map_err(|e| sql_err!("invalid numeric value: {e}")),
588 _ => sql_bail!("cannot use value as number"),
589 }
590 }
591 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
592 Some(Value::Number(self.to_string()))
593 }
594 fn name() -> String {
595 "uint4".to_string()
596 }
597}
598
599impl ImpliedValue for u32 {
600 fn implied_value() -> Result<Self, PlanError> {
601 sql_bail!("must provide an integer value")
602 }
603}
604
605impl TryFromValue<Value> for u64 {
606 fn try_from_value(v: Value) -> Result<Self, PlanError> {
607 match v {
608 Value::Number(v) => v
609 .parse::<u64>()
610 .map_err(|e| sql_err!("invalid unsigned numeric value: {e}")),
611 _ => sql_bail!("cannot use value as number"),
612 }
613 }
614 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
615 Some(Value::Number(self.to_string()))
616 }
617 fn name() -> String {
618 "uint8".to_string()
619 }
620}
621
622impl ImpliedValue for u64 {
623 fn implied_value() -> Result<Self, PlanError> {
624 sql_bail!("must provide an unsigned integer value")
625 }
626}
627
628impl<V: TryFromValue<WithOptionValue<Aug>>> TryFromValue<WithOptionValue<Aug>> for Vec<V> {
629 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
630 match v {
631 WithOptionValue::Sequence(a) => {
632 let mut out = Vec::with_capacity(a.len());
633 for i in a {
634 out.push(
635 V::try_from_value(i)
636 .map_err(|_| anyhow::anyhow!("cannot use value in array"))?,
637 )
638 }
639 Ok(out)
640 }
641 _ => sql_bail!("cannot use value as array"),
642 }
643 }
644
645 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
646 Some(WithOptionValue::Sequence(
647 self.into_iter()
648 .map(|v| v.try_into_value(catalog))
649 .collect::<Option<_>>()?,
650 ))
651 }
652
653 fn name() -> String {
654 format!("array of {}", V::name())
655 }
656}
657
658impl<V: ImpliedValue> ImpliedValue for Vec<V> {
659 fn implied_value() -> Result<Self, PlanError> {
660 sql_bail!("must provide an array value")
661 }
662}
663
664impl<T: AstInfo, V: TryFromValue<WithOptionValue<T>>> TryFromValue<WithOptionValue<T>>
665 for Option<V>
666{
667 fn try_from_value(v: WithOptionValue<T>) -> Result<Self, PlanError> {
668 Ok(Some(V::try_from_value(v)?))
669 }
670
671 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<T>> {
672 match self {
673 Some(v) => v.try_into_value(catalog),
674 None => None,
675 }
676 }
677
678 fn name() -> String {
679 format!("optional {}", V::name())
680 }
681}
682
683impl<V: ImpliedValue> ImpliedValue for Option<V> {
684 fn implied_value() -> Result<Self, PlanError> {
685 Ok(Some(V::implied_value()?))
686 }
687}
688
689impl<V: TryFromValue<Value>, T: AstInfo + std::fmt::Debug> TryFromValue<WithOptionValue<T>> for V {
690 fn try_from_value(v: WithOptionValue<T>) -> Result<Self, PlanError> {
691 match v {
692 WithOptionValue::Value(v) => V::try_from_value(v),
693 WithOptionValue::UnresolvedItemName(UnresolvedItemName(mut inner))
694 if inner.len() == 1 =>
695 {
696 V::try_from_value(Value::String(inner.remove(0).into_string()))
697 }
698 WithOptionValue::Ident(v) => V::try_from_value(Value::String(v.into_string())),
699 WithOptionValue::RetainHistoryFor(v) => V::try_from_value(v),
700 WithOptionValue::Sequence(_)
701 | WithOptionValue::Map(_)
702 | WithOptionValue::Item(_)
703 | WithOptionValue::UnresolvedItemName(_)
704 | WithOptionValue::Secret(_)
705 | WithOptionValue::DataType(_)
706 | WithOptionValue::Expr(_)
707 | WithOptionValue::ClusterReplicas(_)
708 | WithOptionValue::ConnectionKafkaBroker(_)
709 | WithOptionValue::ConnectionAwsPrivatelink(_)
710 | WithOptionValue::KafkaMatchingBrokerRule(_)
711 | WithOptionValue::ClusterAlterStrategy(_)
712 | WithOptionValue::Refresh(_)
713 | WithOptionValue::ClusterScheduleOptionValue(_)
714 | WithOptionValue::NetworkPolicyRules(_) => sql_bail!(
715 "incompatible value types: cannot convert {} to {}",
716 match v {
717 WithOptionValue::Value(_) => unreachable!(),
719 WithOptionValue::RetainHistoryFor(_) => unreachable!(),
720 WithOptionValue::ClusterAlterStrategy(_) => "cluster alter strategy",
721 WithOptionValue::Sequence(_) => "sequences",
722 WithOptionValue::Map(_) => "maps",
723 WithOptionValue::Item(_) => "object references",
724 WithOptionValue::UnresolvedItemName(_) => "object names",
725 WithOptionValue::Ident(_) => "identifiers",
726 WithOptionValue::Secret(_) => "secrets",
727 WithOptionValue::DataType(_) => "data types",
728 WithOptionValue::Expr(_) => "exprs",
729 WithOptionValue::ClusterReplicas(_) => "cluster replicas",
730 WithOptionValue::ConnectionKafkaBroker(_) => "connection kafka brokers",
731 WithOptionValue::ConnectionAwsPrivatelink(_) => "connection privatelink",
732 WithOptionValue::KafkaMatchingBrokerRule(_) => "matching broker rule",
733 WithOptionValue::Refresh(_) => "refresh option values",
734 WithOptionValue::ClusterScheduleOptionValue(_) => "cluster schedule",
735 WithOptionValue::NetworkPolicyRules(_) => "network policy rules",
736 },
737 V::name()
738 ),
739 }
740 }
741
742 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<T>> {
743 Some(WithOptionValue::Value(self.try_into_value(catalog)?))
744 }
745
746 fn name() -> String {
747 V::name()
748 }
749}
750
751impl<T, V: TryFromValue<T> + ImpliedValue> TryFromValue<Option<T>> for V {
752 fn try_from_value(v: Option<T>) -> Result<Self, PlanError> {
753 match v {
754 Some(v) => V::try_from_value(v),
755 None => V::implied_value(),
756 }
757 }
758
759 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<Option<T>> {
760 Some(Some(self.try_into_value(catalog)?))
761 }
762
763 fn name() -> String {
764 V::name()
765 }
766}
767
768impl TryFromValue<WithOptionValue<Aug>> for Vec<ReplicaDefinition<Aug>> {
769 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
770 match v {
771 WithOptionValue::ClusterReplicas(replicas) => Ok(replicas),
772 _ => sql_bail!("cannot use value as cluster replicas"),
773 }
774 }
775
776 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
777 Some(WithOptionValue::ClusterReplicas(self))
778 }
779
780 fn name() -> String {
781 "cluster replicas".to_string()
782 }
783}
784
785impl ImpliedValue for Vec<ReplicaDefinition<Aug>> {
786 fn implied_value() -> Result<Self, PlanError> {
787 sql_bail!("must provide a set of cluster replicas")
788 }
789}
790
791impl TryFromValue<WithOptionValue<Aug>> for Vec<KafkaBroker<Aug>> {
792 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
793 let mut out = vec![];
794 match v {
795 WithOptionValue::ConnectionKafkaBroker(broker) => {
796 out.push(broker);
797 }
798 WithOptionValue::Sequence(values) => {
799 for value in values {
800 out.extend(Self::try_from_value(value)?);
801 }
802 }
803 _ => sql_bail!("cannot use value as a kafka broker"),
804 }
805 Ok(out)
806 }
807
808 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
809 Some(WithOptionValue::Sequence(
810 self.into_iter()
811 .map(WithOptionValue::ConnectionKafkaBroker)
812 .collect(),
813 ))
814 }
815
816 fn name() -> String {
817 "kafka broker".to_string()
818 }
819}
820
821impl ImpliedValue for Vec<KafkaBroker<Aug>> {
822 fn implied_value() -> Result<Self, PlanError> {
823 sql_bail!("must provide a kafka broker")
824 }
825}
826
827impl TryFromValue<WithOptionValue<Aug>> for RefreshOptionValue<Aug> {
828 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
829 if let WithOptionValue::Refresh(r) = v {
830 Ok(r)
831 } else {
832 sql_bail!("cannot use value `{}` for a refresh option", v)
833 }
834 }
835
836 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
837 Some(WithOptionValue::Refresh(self))
838 }
839
840 fn name() -> String {
841 "refresh option value".to_string()
842 }
843}
844
845impl ImpliedValue for RefreshOptionValue<Aug> {
846 fn implied_value() -> Result<Self, PlanError> {
847 sql_bail!("must provide a refresh option value")
848 }
849}
850
851impl TryFromValue<WithOptionValue<Aug>> for ConnectionDefaultAwsPrivatelink<Aug> {
852 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
853 if let WithOptionValue::ConnectionAwsPrivatelink(r) = v {
854 Ok(r)
855 } else {
856 sql_bail!("cannot use value `{}` for a privatelink", v)
857 }
858 }
859
860 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
861 Some(WithOptionValue::ConnectionAwsPrivatelink(self))
862 }
863
864 fn name() -> String {
865 "privatelink option value".to_string()
866 }
867}
868
869impl TryFromValue<WithOptionValue<Aug>> for KafkaMatchingBrokerRule<Aug> {
870 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
871 if let WithOptionValue::KafkaMatchingBrokerRule(r) = v {
872 Ok(r)
873 } else {
874 sql_bail!("cannot use value `{}` for a matching broker rule", v)
875 }
876 }
877
878 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
879 Some(WithOptionValue::KafkaMatchingBrokerRule(self))
880 }
881
882 fn name() -> String {
883 "matching broker rule".to_string()
884 }
885}
886
887impl ImpliedValue for ConnectionDefaultAwsPrivatelink<Aug> {
888 fn implied_value() -> Result<Self, PlanError> {
889 sql_bail!("must provide a value")
890 }
891}
892
893impl ImpliedValue for KafkaMatchingBrokerRule<Aug> {
894 fn implied_value() -> Result<Self, PlanError> {
895 sql_bail!("must provide a value")
896 }
897}
898
899#[derive(Debug)]
902pub struct BrokersList {
903 pub static_entries: Vec<KafkaBroker<Aug>>,
904 pub matching_rules: Vec<KafkaMatchingBrokerRule<Aug>>,
905}
906
907impl TryFromValue<WithOptionValue<Aug>> for BrokersList {
908 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
909 match v {
910 WithOptionValue::Sequence(entries) => {
911 let mut static_entries = vec![];
912 let mut matching_rules = vec![];
913 for entry in entries {
914 match entry {
915 WithOptionValue::ConnectionKafkaBroker(b) => static_entries.push(b),
916 WithOptionValue::KafkaMatchingBrokerRule(m) => matching_rules.push(m),
917 other => sql_bail!("unexpected value in BROKERS: {}", other),
918 }
919 }
920 Ok(BrokersList {
921 static_entries,
922 matching_rules,
923 })
924 }
925 WithOptionValue::ConnectionKafkaBroker(b) => Ok(BrokersList {
926 static_entries: vec![b],
927 matching_rules: vec![],
928 }),
929 WithOptionValue::KafkaMatchingBrokerRule(m) => Ok(BrokersList {
930 static_entries: vec![],
931 matching_rules: vec![m],
932 }),
933 other => sql_bail!("cannot use {} as brokers list", other),
934 }
935 }
936
937 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
938 let mut entries: Vec<WithOptionValue<Aug>> = vec![];
939 for b in self.static_entries {
940 entries.push(WithOptionValue::ConnectionKafkaBroker(b));
941 }
942 for m in self.matching_rules {
943 entries.push(WithOptionValue::KafkaMatchingBrokerRule(m));
944 }
945 Some(WithOptionValue::Sequence(entries))
946 }
947
948 fn name() -> String {
949 "brokers list".to_string()
950 }
951}
952
953impl ImpliedValue for BrokersList {
954 fn implied_value() -> Result<Self, PlanError> {
955 sql_bail!("must provide a value for BROKERS")
956 }
957}
958
959impl ImpliedValue for ClusterScheduleOptionValue {
960 fn implied_value() -> Result<Self, PlanError> {
961 sql_bail!("must provide a cluster schedule option value")
962 }
963}
964
965impl ImpliedValue for ClusterAlterOptionValue<Aug> {
966 fn implied_value() -> Result<Self, PlanError> {
967 sql_bail!("must provide a value")
968 }
969}
970
971impl TryFromValue<WithOptionValue<Aug>> for ClusterScheduleOptionValue {
972 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
973 if let WithOptionValue::ClusterScheduleOptionValue(r) = v {
974 Ok(r)
975 } else {
976 sql_bail!("cannot use value `{}` for a cluster schedule", v)
977 }
978 }
979
980 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
981 Some(WithOptionValue::ClusterScheduleOptionValue(self))
982 }
983
984 fn name() -> String {
985 "cluster schedule option value".to_string()
986 }
987}
988
989impl<V: ImpliedValue> ImpliedValue for BTreeMap<String, V> {
990 fn implied_value() -> Result<Self, PlanError> {
991 sql_bail!("must provide a map of key-value pairs")
992 }
993}
994
995impl<V: TryFromValue<WithOptionValue<Aug>>> TryFromValue<WithOptionValue<Aug>>
996 for BTreeMap<String, V>
997{
998 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
999 match v {
1000 WithOptionValue::Map(a) => a
1001 .into_iter()
1002 .map(|(k, v)| Ok((k, V::try_from_value(v)?)))
1003 .collect(),
1004 _ => sql_bail!("cannot use value as map"),
1005 }
1006 }
1007
1008 fn try_into_value(self, catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
1009 Some(WithOptionValue::Map(
1010 self.into_iter()
1011 .map(|(k, v)| {
1012 let v = v.try_into_value(catalog);
1013 v.map(|v| (k, v))
1014 })
1015 .collect::<Option<_>>()?,
1016 ))
1017 }
1018
1019 fn name() -> String {
1020 format!("map of string to {}", V::name())
1021 }
1022}
1023
1024impl TryFromValue<WithOptionValue<Aug>> for ClusterAlterOptionValue<Aug> {
1025 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
1026 if let WithOptionValue::ClusterAlterStrategy(r) = v {
1027 Ok(r)
1028 } else {
1029 sql_bail!("cannot use value `{}` for a cluster alter strategy", v)
1030 }
1031 }
1032
1033 fn name() -> String {
1034 "cluster alter strategyoption value".to_string()
1035 }
1036
1037 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
1038 Some(WithOptionValue::ClusterAlterStrategy(self))
1039 }
1040}
1041
1042impl TryFromValue<WithOptionValue<Aug>> for Vec<NetworkPolicyRuleDefinition<Aug>> {
1043 fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
1044 match v {
1045 WithOptionValue::NetworkPolicyRules(rules) => Ok(rules),
1046 _ => sql_bail!("cannot use value as cluster replicas"),
1047 }
1048 }
1049
1050 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<WithOptionValue<Aug>> {
1051 Some(WithOptionValue::NetworkPolicyRules(self))
1052 }
1053
1054 fn name() -> String {
1055 "network policy rules".to_string()
1056 }
1057}
1058
1059impl ImpliedValue for Vec<NetworkPolicyRuleDefinition<Aug>> {
1060 fn implied_value() -> Result<Self, PlanError> {
1061 sql_bail!("must provide a set of network policy rules")
1062 }
1063}