Skip to main content

mz_sql_parser/ast/defs/
ddl.rs

1// Copyright 2018 sqlparser-rs contributors. All rights reserved.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// This file is derived from the sqlparser-rs project, available at
5// https://github.com/andygrove/sqlparser-rs. It was incorporated
6// directly into Materialize on December 21, 2019.
7//
8// Licensed under the Apache License, Version 2.0 (the "License");
9// you may not use this file except in compliance with the License.
10// You may obtain a copy of the License in the LICENSE file at the
11// root of this repository, or online at
12//
13//     http://www.apache.org/licenses/LICENSE-2.0
14//
15// Unless required by applicable law or agreed to in writing, software
16// distributed under the License is distributed on an "AS IS" BASIS,
17// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18// See the License for the specific language governing permissions and
19// limitations under the License.
20
21//! AST types specific to CREATE/ALTER variants of [crate::ast::Statement]
22//! (commonly referred to as Data Definition Language, or DDL)
23
24use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28    AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33    /// The `ASSERT NOT NULL [=] <ident>` option.
34    AssertNotNull,
35    PartitionBy,
36    RetainHistory,
37    /// The `REFRESH [=] ...` option.
38    Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43        match self {
44            MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45            MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46            MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47            MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48        }
49    }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53    /// # WARNING
54    ///
55    /// Whenever implementing this trait consider very carefully whether or not
56    /// this value could contain sensitive user data. If you're uncertain, err
57    /// on the conservative side and return `true`.
58    fn redact_value(&self) -> bool {
59        match self {
60            MaterializedViewOptionName::AssertNotNull
61            | MaterializedViewOptionName::PartitionBy
62            | MaterializedViewOptionName::RetainHistory
63            | MaterializedViewOptionName::Refresh => false,
64        }
65    }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70    pub name: MaterializedViewOptionName,
71    pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
76pub enum ContinualTaskOptionName {
77    /// The `SNAPSHOT [=] ...` option.
78    Snapshot,
79}
80
81impl AstDisplay for ContinualTaskOptionName {
82    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
83        match self {
84            ContinualTaskOptionName::Snapshot => f.write_str("SNAPSHOT"),
85        }
86    }
87}
88
89impl WithOptionName for ContinualTaskOptionName {
90    /// # WARNING
91    ///
92    /// Whenever implementing this trait consider very carefully whether or not
93    /// this value could contain sensitive user data. If you're uncertain, err
94    /// on the conservative side and return `true`.
95    fn redact_value(&self) -> bool {
96        match self {
97            ContinualTaskOptionName::Snapshot => false,
98        }
99    }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
103pub struct ContinualTaskOption<T: AstInfo> {
104    pub name: ContinualTaskOptionName,
105    pub value: Option<WithOptionValue<T>>,
106}
107impl_display_for_with_option!(ContinualTaskOption);
108
109#[derive(Debug, Clone, PartialEq, Eq, Hash)]
110pub struct Schema {
111    pub schema: String,
112}
113
114impl AstDisplay for Schema {
115    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
116        f.write_str("SCHEMA '");
117        f.write_node(&display::escape_single_quote_string(&self.schema));
118        f.write_str("'");
119    }
120}
121impl_display!(Schema);
122
123#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
124pub enum AvroSchemaOptionName {
125    /// The `CONFLUENT WIRE FORMAT [=] <bool>` option.
126    ConfluentWireFormat,
127}
128
129impl AstDisplay for AvroSchemaOptionName {
130    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
131        match self {
132            AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
133        }
134    }
135}
136
137impl WithOptionName for AvroSchemaOptionName {
138    /// # WARNING
139    ///
140    /// Whenever implementing this trait consider very carefully whether or not
141    /// this value could contain sensitive user data. If you're uncertain, err
142    /// on the conservative side and return `true`.
143    fn redact_value(&self) -> bool {
144        match self {
145            Self::ConfluentWireFormat => false,
146        }
147    }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
151pub struct AvroSchemaOption<T: AstInfo> {
152    pub name: AvroSchemaOptionName,
153    pub value: Option<WithOptionValue<T>>,
154}
155impl_display_for_with_option!(AvroSchemaOption);
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum AvroSchema<T: AstInfo> {
159    Csr {
160        csr_connection: CsrConnectionAvro<T>,
161    },
162    InlineSchema {
163        schema: Schema,
164        with_options: Vec<AvroSchemaOption<T>>,
165    },
166}
167
168impl<T: AstInfo> AstDisplay for AvroSchema<T> {
169    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
170        match self {
171            Self::Csr { csr_connection } => {
172                f.write_node(csr_connection);
173            }
174            Self::InlineSchema {
175                schema,
176                with_options,
177            } => {
178                f.write_str("USING ");
179                schema.fmt(f);
180                if !with_options.is_empty() {
181                    f.write_str(" (");
182                    f.write_node(&display::comma_separated(with_options));
183                    f.write_str(")");
184                }
185            }
186        }
187    }
188}
189impl_display_t!(AvroSchema);
190
191#[derive(Debug, Clone, PartialEq, Eq, Hash)]
192pub enum ProtobufSchema<T: AstInfo> {
193    Csr {
194        csr_connection: CsrConnectionProtobuf<T>,
195    },
196    InlineSchema {
197        message_name: String,
198        schema: Schema,
199    },
200}
201
202impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
203    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
204        match self {
205            Self::Csr { csr_connection } => {
206                f.write_node(csr_connection);
207            }
208            Self::InlineSchema {
209                message_name,
210                schema,
211            } => {
212                f.write_str("MESSAGE '");
213                f.write_node(&display::escape_single_quote_string(message_name));
214                f.write_str("' USING ");
215                f.write_str(schema);
216            }
217        }
218    }
219}
220impl_display_t!(ProtobufSchema);
221
222#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
223pub enum CsrConfigOptionName<T: AstInfo> {
224    AvroKeyFullname,
225    AvroValueFullname,
226    NullDefaults,
227    AvroDocOn(AvroDocOn<T>),
228    KeyCompatibilityLevel,
229    ValueCompatibilityLevel,
230}
231
232impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
233    /// # WARNING
234    ///
235    /// Whenever implementing this trait consider very carefully whether or not
236    /// this value could contain sensitive user data. If you're uncertain, err
237    /// on the conservative side and return `true`.
238    fn redact_value(&self) -> bool {
239        match self {
240            Self::AvroKeyFullname
241            | Self::AvroValueFullname
242            | Self::NullDefaults
243            | Self::AvroDocOn(_)
244            | Self::KeyCompatibilityLevel
245            | Self::ValueCompatibilityLevel => false,
246        }
247    }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
251pub struct AvroDocOn<T: AstInfo> {
252    pub identifier: DocOnIdentifier<T>,
253    pub for_schema: DocOnSchema,
254}
255#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
256pub enum DocOnSchema {
257    KeyOnly,
258    ValueOnly,
259    All,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
263pub enum DocOnIdentifier<T: AstInfo> {
264    Column(ColumnName<T>),
265    Type(T::ItemName),
266}
267
268impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
269    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
270        match &self.for_schema {
271            DocOnSchema::KeyOnly => f.write_str("KEY "),
272            DocOnSchema::ValueOnly => f.write_str("VALUE "),
273            DocOnSchema::All => {}
274        }
275        match &self.identifier {
276            DocOnIdentifier::Column(name) => {
277                f.write_str("DOC ON COLUMN ");
278                f.write_node(name);
279            }
280            DocOnIdentifier::Type(name) => {
281                f.write_str("DOC ON TYPE ");
282                f.write_node(name);
283            }
284        }
285    }
286}
287impl_display_t!(AvroDocOn);
288
289impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
290    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
291        match self {
292            CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
293            CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
294            CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
295            CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
296            CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
297            CsrConfigOptionName::ValueCompatibilityLevel => {
298                f.write_str("VALUE COMPATIBILITY LEVEL")
299            }
300        }
301    }
302}
303impl_display_t!(CsrConfigOptionName);
304
305#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
306/// An option in a `{FROM|INTO} CONNECTION ...` statement.
307pub struct CsrConfigOption<T: AstInfo> {
308    pub name: CsrConfigOptionName<T>,
309    pub value: Option<WithOptionValue<T>>,
310}
311impl_display_for_with_option!(CsrConfigOption);
312impl_display_t!(CsrConfigOption);
313
314#[derive(Debug, Clone, PartialEq, Eq, Hash)]
315pub struct CsrConnection<T: AstInfo> {
316    pub connection: T::ItemName,
317    pub options: Vec<CsrConfigOption<T>>,
318}
319
320impl<T: AstInfo> AstDisplay for CsrConnection<T> {
321    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
322        f.write_str("CONNECTION ");
323        f.write_node(&self.connection);
324        if !self.options.is_empty() {
325            f.write_str(" (");
326            f.write_node(&display::comma_separated(&self.options));
327            f.write_str(")");
328        }
329    }
330}
331impl_display_t!(CsrConnection);
332
333#[derive(Debug, Clone, PartialEq, Eq, Hash)]
334pub enum ReaderSchemaSelectionStrategy {
335    Latest,
336    Inline(String),
337    ById(i32),
338}
339
340impl Default for ReaderSchemaSelectionStrategy {
341    fn default() -> Self {
342        Self::Latest
343    }
344}
345
346#[derive(Debug, Clone, PartialEq, Eq, Hash)]
347pub struct CsrConnectionAvro<T: AstInfo> {
348    pub connection: CsrConnection<T>,
349    pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
350    pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
351    pub seed: Option<CsrSeedAvro>,
352}
353
354impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
355    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
356        f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
357        f.write_node(&self.connection);
358        if let Some(seed) = &self.seed {
359            f.write_str(" ");
360            f.write_node(seed);
361        }
362    }
363}
364impl_display_t!(CsrConnectionAvro);
365
366#[derive(Debug, Clone, PartialEq, Eq, Hash)]
367pub struct CsrConnectionProtobuf<T: AstInfo> {
368    pub connection: CsrConnection<T>,
369    pub seed: Option<CsrSeedProtobuf>,
370}
371
372impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
373    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
374        f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
375        f.write_node(&self.connection);
376
377        if let Some(seed) = &self.seed {
378            f.write_str(" ");
379            f.write_node(seed);
380        }
381    }
382}
383impl_display_t!(CsrConnectionProtobuf);
384
385#[derive(Debug, Clone, PartialEq, Eq, Hash)]
386pub struct CsrSeedAvro {
387    pub key_schema: Option<String>,
388    pub value_schema: String,
389    /// Reference schemas for the key schema, in dependency order.
390    /// Populated during purification by fetching from the schema registry.
391    pub key_reference_schemas: Vec<String>,
392    /// Reference schemas for the value schema, in dependency order.
393    /// Populated during purification by fetching from the schema registry.
394    pub value_reference_schemas: Vec<String>,
395}
396
397impl AstDisplay for CsrSeedAvro {
398    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
399        f.write_str("SEED");
400        if let Some(key_schema) = &self.key_schema {
401            f.write_str(" KEY SCHEMA '");
402            f.write_node(&display::escape_single_quote_string(key_schema));
403            f.write_str("'");
404            if !self.key_reference_schemas.is_empty() {
405                f.write_str(" KEY REFERENCES (");
406                for (i, schema) in self.key_reference_schemas.iter().enumerate() {
407                    if i > 0 {
408                        f.write_str(", ");
409                    }
410                    f.write_str("'");
411                    f.write_node(&display::escape_single_quote_string(schema));
412                    f.write_str("'");
413                }
414                f.write_str(")");
415            }
416        }
417        f.write_str(" VALUE SCHEMA '");
418        f.write_node(&display::escape_single_quote_string(&self.value_schema));
419        f.write_str("'");
420        if !self.value_reference_schemas.is_empty() {
421            f.write_str(" VALUE REFERENCES (");
422            for (i, schema) in self.value_reference_schemas.iter().enumerate() {
423                if i > 0 {
424                    f.write_str(", ");
425                }
426                f.write_str("'");
427                f.write_node(&display::escape_single_quote_string(schema));
428                f.write_str("'");
429            }
430            f.write_str(")");
431        }
432    }
433}
434impl_display!(CsrSeedAvro);
435
436#[derive(Debug, Clone, PartialEq, Eq, Hash)]
437pub struct CsrSeedProtobuf {
438    pub key: Option<CsrSeedProtobufSchema>,
439    pub value: CsrSeedProtobufSchema,
440}
441
442impl AstDisplay for CsrSeedProtobuf {
443    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
444        f.write_str("SEED");
445        if let Some(key) = &self.key {
446            f.write_str(" KEY ");
447            f.write_node(key);
448        }
449        f.write_str(" VALUE ");
450        f.write_node(&self.value);
451    }
452}
453impl_display!(CsrSeedProtobuf);
454
455#[derive(Debug, Clone, PartialEq, Eq, Hash)]
456pub struct CsrSeedProtobufSchema {
457    // Hex encoded string.
458    pub schema: String,
459    pub message_name: String,
460}
461impl AstDisplay for CsrSeedProtobufSchema {
462    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
463        f.write_str("SCHEMA '");
464        f.write_str(&display::escape_single_quote_string(&self.schema));
465        f.write_str("' MESSAGE '");
466        f.write_str(&self.message_name);
467        f.write_str("'");
468    }
469}
470impl_display!(CsrSeedProtobufSchema);
471
472#[derive(Debug, Clone, PartialEq, Eq, Hash)]
473pub enum FormatSpecifier<T: AstInfo> {
474    /// `CREATE SOURCE/SINK .. FORMAT`
475    Bare(Format<T>),
476    /// `CREATE SOURCE/SINK .. KEY FORMAT .. VALUE FORMAT`
477    KeyValue { key: Format<T>, value: Format<T> },
478}
479
480impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
481    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
482        match self {
483            FormatSpecifier::Bare(format) => {
484                f.write_str("FORMAT ");
485                f.write_node(format)
486            }
487            FormatSpecifier::KeyValue { key, value } => {
488                f.write_str("KEY FORMAT ");
489                f.write_node(key);
490                f.write_str(" VALUE FORMAT ");
491                f.write_node(value);
492            }
493        }
494    }
495}
496impl_display_t!(FormatSpecifier);
497
498#[derive(Debug, Clone, PartialEq, Eq, Hash)]
499pub enum Format<T: AstInfo> {
500    Bytes,
501    Avro(AvroSchema<T>),
502    Protobuf(ProtobufSchema<T>),
503    Regex(String),
504    Csv {
505        columns: CsvColumns,
506        delimiter: char,
507    },
508    Json {
509        array: bool,
510    },
511    Text,
512}
513
514#[derive(Debug, Clone, PartialEq, Eq, Hash)]
515pub enum CsvColumns {
516    /// `WITH count COLUMNS`
517    Count(u64),
518    /// `WITH HEADER (ident, ...)?`: `names` is empty if there are no names specified
519    Header { names: Vec<Ident> },
520}
521
522impl AstDisplay for CsvColumns {
523    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
524        match self {
525            CsvColumns::Count(n) => {
526                f.write_str(n);
527                f.write_str(" COLUMNS")
528            }
529            CsvColumns::Header { names } => {
530                f.write_str("HEADER");
531                if !names.is_empty() {
532                    f.write_str(" (");
533                    f.write_node(&display::comma_separated(names));
534                    f.write_str(")");
535                }
536            }
537        }
538    }
539}
540
541#[derive(Debug, Clone, PartialEq, Eq, Hash)]
542pub enum SourceIncludeMetadata {
543    Key {
544        alias: Option<Ident>,
545    },
546    Timestamp {
547        alias: Option<Ident>,
548    },
549    Partition {
550        alias: Option<Ident>,
551    },
552    Offset {
553        alias: Option<Ident>,
554    },
555    Headers {
556        alias: Option<Ident>,
557    },
558    Header {
559        key: String,
560        alias: Ident,
561        use_bytes: bool,
562    },
563}
564
565impl AstDisplay for SourceIncludeMetadata {
566    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
567        let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
568            if let Some(alias) = alias {
569                f.write_str(" AS ");
570                f.write_node(alias);
571            }
572        };
573
574        match self {
575            SourceIncludeMetadata::Key { alias } => {
576                f.write_str("KEY");
577                print_alias(f, alias);
578            }
579            SourceIncludeMetadata::Timestamp { alias } => {
580                f.write_str("TIMESTAMP");
581                print_alias(f, alias);
582            }
583            SourceIncludeMetadata::Partition { alias } => {
584                f.write_str("PARTITION");
585                print_alias(f, alias);
586            }
587            SourceIncludeMetadata::Offset { alias } => {
588                f.write_str("OFFSET");
589                print_alias(f, alias);
590            }
591            SourceIncludeMetadata::Headers { alias } => {
592                f.write_str("HEADERS");
593                print_alias(f, alias);
594            }
595            SourceIncludeMetadata::Header {
596                alias,
597                key,
598                use_bytes,
599            } => {
600                f.write_str("HEADER '");
601                f.write_str(&display::escape_single_quote_string(key));
602                f.write_str("'");
603                print_alias(f, &Some(alias.clone()));
604                if *use_bytes {
605                    f.write_str(" BYTES");
606                }
607            }
608        }
609    }
610}
611impl_display!(SourceIncludeMetadata);
612
613#[derive(Debug, Clone, PartialEq, Eq, Hash)]
614pub enum SourceErrorPolicy {
615    Inline {
616        /// The alias to use for the error column. If unspecified will be `error`.
617        alias: Option<Ident>,
618    },
619}
620
621impl AstDisplay for SourceErrorPolicy {
622    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
623        match self {
624            Self::Inline { alias } => {
625                f.write_str("INLINE");
626                if let Some(alias) = alias {
627                    f.write_str(" AS ");
628                    f.write_node(alias);
629                }
630            }
631        }
632    }
633}
634impl_display!(SourceErrorPolicy);
635
636#[derive(Debug, Clone, PartialEq, Eq, Hash)]
637pub enum SourceEnvelope {
638    None,
639    Debezium,
640    Upsert {
641        value_decode_err_policy: Vec<SourceErrorPolicy>,
642    },
643    CdcV2,
644}
645
646impl SourceEnvelope {
647    /// `true` iff Materialize is expected to crash or exhibit UB
648    /// when attempting to ingest data starting at an offset other than zero.
649    pub fn requires_all_input(&self) -> bool {
650        match self {
651            SourceEnvelope::None => false,
652            SourceEnvelope::Debezium => false,
653            SourceEnvelope::Upsert { .. } => false,
654            SourceEnvelope::CdcV2 => true,
655        }
656    }
657}
658
659impl AstDisplay for SourceEnvelope {
660    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
661        match self {
662            Self::None => {
663                // this is unreachable as long as the default is None, but include it in case we ever change that
664                f.write_str("NONE");
665            }
666            Self::Debezium => {
667                f.write_str("DEBEZIUM");
668            }
669            Self::Upsert {
670                value_decode_err_policy,
671            } => {
672                if value_decode_err_policy.is_empty() {
673                    f.write_str("UPSERT");
674                } else {
675                    f.write_str("UPSERT (VALUE DECODING ERRORS = (");
676                    f.write_node(&display::comma_separated(value_decode_err_policy));
677                    f.write_str("))")
678                }
679            }
680            Self::CdcV2 => {
681                f.write_str("MATERIALIZE");
682            }
683        }
684    }
685}
686impl_display!(SourceEnvelope);
687
688#[derive(Debug, Clone, PartialEq, Eq, Hash)]
689pub enum SinkEnvelope {
690    Debezium,
691    Upsert,
692}
693
694impl AstDisplay for SinkEnvelope {
695    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
696        match self {
697            Self::Upsert => {
698                f.write_str("UPSERT");
699            }
700            Self::Debezium => {
701                f.write_str("DEBEZIUM");
702            }
703        }
704    }
705}
706impl_display!(SinkEnvelope);
707
708#[derive(Debug, Clone, PartialEq, Eq, Hash)]
709pub enum IcebergSinkMode {
710    Upsert,
711    Append,
712}
713
714impl AstDisplay for IcebergSinkMode {
715    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
716        match self {
717            Self::Upsert => {
718                f.write_str("UPSERT");
719            }
720            Self::Append => {
721                f.write_str("APPEND");
722            }
723        }
724    }
725}
726impl_display!(IcebergSinkMode);
727
728#[derive(Debug, Clone, PartialEq, Eq, Hash)]
729pub enum SubscribeOutput<T: AstInfo> {
730    Diffs,
731    WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
732    EnvelopeUpsert { key_columns: Vec<Ident> },
733    EnvelopeDebezium { key_columns: Vec<Ident> },
734}
735
736impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
737    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
738        match self {
739            Self::Diffs => {}
740            Self::WithinTimestampOrderBy { order_by } => {
741                f.write_str(" WITHIN TIMESTAMP ORDER BY ");
742                f.write_node(&display::comma_separated(order_by));
743            }
744            Self::EnvelopeUpsert { key_columns } => {
745                f.write_str(" ENVELOPE UPSERT (KEY (");
746                f.write_node(&display::comma_separated(key_columns));
747                f.write_str("))");
748            }
749            Self::EnvelopeDebezium { key_columns } => {
750                f.write_str(" ENVELOPE DEBEZIUM (KEY (");
751                f.write_node(&display::comma_separated(key_columns));
752                f.write_str("))");
753            }
754        }
755    }
756}
757impl_display_t!(SubscribeOutput);
758
759impl<T: AstInfo> AstDisplay for Format<T> {
760    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
761        match self {
762            Self::Bytes => f.write_str("BYTES"),
763            Self::Avro(inner) => {
764                f.write_str("AVRO ");
765                f.write_node(inner);
766            }
767            Self::Protobuf(inner) => {
768                f.write_str("PROTOBUF ");
769                f.write_node(inner);
770            }
771            Self::Regex(regex) => {
772                f.write_str("REGEX '");
773                f.write_node(&display::escape_single_quote_string(regex));
774                f.write_str("'");
775            }
776            Self::Csv { columns, delimiter } => {
777                f.write_str("CSV WITH ");
778                f.write_node(columns);
779
780                if *delimiter != ',' {
781                    f.write_str(" DELIMITED BY '");
782                    f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
783                    f.write_str("'");
784                }
785            }
786            Self::Json { array } => {
787                f.write_str("JSON");
788                if *array {
789                    f.write_str(" ARRAY");
790                }
791            }
792            Self::Text => f.write_str("TEXT"),
793        }
794    }
795}
796impl_display_t!(Format);
797
798// All connection options are bundled together to allow us to parse `ALTER
799// CONNECTION` without specifying the type of connection we're altering. Il faut
800// souffrir pour être belle.
801#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
802pub enum ConnectionOptionName {
803    AccessKeyId,
804    AssumeRoleArn,
805    AssumeRoleSessionName,
806    AvailabilityZones,
807    AwsConnection,
808    AwsPrivatelink,
809    Broker,
810    Brokers,
811    Credential,
812    Database,
813    Endpoint,
814    Host,
815    Password,
816    Port,
817    ProgressTopic,
818    ProgressTopicReplicationFactor,
819    PublicKey1,
820    PublicKey2,
821    Region,
822    SaslMechanisms,
823    SaslPassword,
824    SaslUsername,
825    Scope,
826    SecretAccessKey,
827    SecurityProtocol,
828    ServiceName,
829    SshTunnel,
830    SslCertificate,
831    SslCertificateAuthority,
832    SslKey,
833    SslMode,
834    SessionToken,
835    CatalogType,
836    Url,
837    User,
838    Warehouse,
839}
840
841impl AstDisplay for ConnectionOptionName {
842    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
843        f.write_str(match self {
844            ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
845            ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
846            ConnectionOptionName::AwsConnection => "AWS CONNECTION",
847            ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
848            ConnectionOptionName::Broker => "BROKER",
849            ConnectionOptionName::Brokers => "BROKERS",
850            ConnectionOptionName::Credential => "CREDENTIAL",
851            ConnectionOptionName::Database => "DATABASE",
852            ConnectionOptionName::Endpoint => "ENDPOINT",
853            ConnectionOptionName::Host => "HOST",
854            ConnectionOptionName::Password => "PASSWORD",
855            ConnectionOptionName::Port => "PORT",
856            ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
857            ConnectionOptionName::ProgressTopicReplicationFactor => {
858                "PROGRESS TOPIC REPLICATION FACTOR"
859            }
860            ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
861            ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
862            ConnectionOptionName::Region => "REGION",
863            ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
864            ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
865            ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
866            ConnectionOptionName::SaslPassword => "SASL PASSWORD",
867            ConnectionOptionName::SaslUsername => "SASL USERNAME",
868            ConnectionOptionName::Scope => "SCOPE",
869            ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
870            ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
871            ConnectionOptionName::ServiceName => "SERVICE NAME",
872            ConnectionOptionName::SshTunnel => "SSH TUNNEL",
873            ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
874            ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
875            ConnectionOptionName::SslKey => "SSL KEY",
876            ConnectionOptionName::SslMode => "SSL MODE",
877            ConnectionOptionName::SessionToken => "SESSION TOKEN",
878            ConnectionOptionName::CatalogType => "CATALOG TYPE",
879            ConnectionOptionName::Url => "URL",
880            ConnectionOptionName::User => "USER",
881            ConnectionOptionName::Warehouse => "WAREHOUSE",
882        })
883    }
884}
885impl_display!(ConnectionOptionName);
886
887impl WithOptionName for ConnectionOptionName {
888    /// # WARNING
889    ///
890    /// Whenever implementing this trait consider very carefully whether or not
891    /// this value could contain sensitive user data. If you're uncertain, err
892    /// on the conservative side and return `true`.
893    fn redact_value(&self) -> bool {
894        match self {
895            ConnectionOptionName::AccessKeyId
896            | ConnectionOptionName::AvailabilityZones
897            | ConnectionOptionName::AwsConnection
898            | ConnectionOptionName::AwsPrivatelink
899            | ConnectionOptionName::Broker
900            | ConnectionOptionName::Brokers
901            | ConnectionOptionName::Credential
902            | ConnectionOptionName::Database
903            | ConnectionOptionName::Endpoint
904            | ConnectionOptionName::Host
905            | ConnectionOptionName::Password
906            | ConnectionOptionName::Port
907            | ConnectionOptionName::ProgressTopic
908            | ConnectionOptionName::ProgressTopicReplicationFactor
909            | ConnectionOptionName::PublicKey1
910            | ConnectionOptionName::PublicKey2
911            | ConnectionOptionName::Region
912            | ConnectionOptionName::AssumeRoleArn
913            | ConnectionOptionName::AssumeRoleSessionName
914            | ConnectionOptionName::SaslMechanisms
915            | ConnectionOptionName::SaslPassword
916            | ConnectionOptionName::SaslUsername
917            | ConnectionOptionName::Scope
918            | ConnectionOptionName::SecurityProtocol
919            | ConnectionOptionName::SecretAccessKey
920            | ConnectionOptionName::ServiceName
921            | ConnectionOptionName::SshTunnel
922            | ConnectionOptionName::SslCertificate
923            | ConnectionOptionName::SslCertificateAuthority
924            | ConnectionOptionName::SslKey
925            | ConnectionOptionName::SslMode
926            | ConnectionOptionName::SessionToken
927            | ConnectionOptionName::CatalogType
928            | ConnectionOptionName::Url
929            | ConnectionOptionName::User
930            | ConnectionOptionName::Warehouse => false,
931        }
932    }
933}
934
935#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
936/// An option in a `CREATE CONNECTION`.
937pub struct ConnectionOption<T: AstInfo> {
938    pub name: ConnectionOptionName,
939    pub value: Option<WithOptionValue<T>>,
940}
941impl_display_for_with_option!(ConnectionOption);
942impl_display_t!(ConnectionOption);
943
944#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
945pub enum CreateConnectionType {
946    Aws,
947    AwsPrivatelink,
948    Kafka,
949    Csr,
950    Postgres,
951    Ssh,
952    SqlServer,
953    MySql,
954    IcebergCatalog,
955}
956
957impl CreateConnectionType {
958    pub fn as_str(&self) -> &'static str {
959        match self {
960            Self::Kafka => "kafka",
961            Self::Csr => "confluent-schema-registry",
962            Self::Postgres => "postgres",
963            Self::Aws => "aws",
964            Self::AwsPrivatelink => "aws-privatelink",
965            Self::Ssh => "ssh-tunnel",
966            Self::MySql => "mysql",
967            Self::SqlServer => "sql-server",
968            Self::IcebergCatalog => "iceberg-catalog",
969        }
970    }
971}
972
973impl AstDisplay for CreateConnectionType {
974    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
975        match self {
976            Self::Kafka => {
977                f.write_str("KAFKA");
978            }
979            Self::Csr => {
980                f.write_str("CONFLUENT SCHEMA REGISTRY");
981            }
982            Self::Postgres => {
983                f.write_str("POSTGRES");
984            }
985            Self::Aws => {
986                f.write_str("AWS");
987            }
988            Self::AwsPrivatelink => {
989                f.write_str("AWS PRIVATELINK");
990            }
991            Self::Ssh => {
992                f.write_str("SSH TUNNEL");
993            }
994            Self::SqlServer => {
995                f.write_str("SQL SERVER");
996            }
997            Self::MySql => {
998                f.write_str("MYSQL");
999            }
1000            Self::IcebergCatalog => {
1001                f.write_str("ICEBERG CATALOG");
1002            }
1003        }
1004    }
1005}
1006impl_display!(CreateConnectionType);
1007
1008#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1009pub enum CreateConnectionOptionName {
1010    Validate,
1011}
1012
1013impl AstDisplay for CreateConnectionOptionName {
1014    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1015        f.write_str(match self {
1016            CreateConnectionOptionName::Validate => "VALIDATE",
1017        })
1018    }
1019}
1020impl_display!(CreateConnectionOptionName);
1021
1022impl WithOptionName for CreateConnectionOptionName {
1023    /// # WARNING
1024    ///
1025    /// Whenever implementing this trait consider very carefully whether or not
1026    /// this value could contain sensitive user data. If you're uncertain, err
1027    /// on the conservative side and return `true`.
1028    fn redact_value(&self) -> bool {
1029        match self {
1030            CreateConnectionOptionName::Validate => false,
1031        }
1032    }
1033}
1034
1035#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1036/// An option in a `CREATE CONNECTION...` statement.
1037pub struct CreateConnectionOption<T: AstInfo> {
1038    pub name: CreateConnectionOptionName,
1039    pub value: Option<WithOptionValue<T>>,
1040}
1041impl_display_for_with_option!(CreateConnectionOption);
1042impl_display_t!(CreateConnectionOption);
1043
1044#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1045pub enum KafkaSourceConfigOptionName {
1046    GroupIdPrefix,
1047    Topic,
1048    TopicMetadataRefreshInterval,
1049    StartTimestamp,
1050    StartOffset,
1051}
1052
1053impl AstDisplay for KafkaSourceConfigOptionName {
1054    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1055        f.write_str(match self {
1056            KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
1057            KafkaSourceConfigOptionName::Topic => "TOPIC",
1058            KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
1059                "TOPIC METADATA REFRESH INTERVAL"
1060            }
1061            KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
1062            KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
1063        })
1064    }
1065}
1066impl_display!(KafkaSourceConfigOptionName);
1067
1068impl WithOptionName for KafkaSourceConfigOptionName {
1069    /// # WARNING
1070    ///
1071    /// Whenever implementing this trait consider very carefully whether or not
1072    /// this value could contain sensitive user data. If you're uncertain, err
1073    /// on the conservative side and return `true`.
1074    fn redact_value(&self) -> bool {
1075        match self {
1076            KafkaSourceConfigOptionName::GroupIdPrefix
1077            | KafkaSourceConfigOptionName::Topic
1078            | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1079            | KafkaSourceConfigOptionName::StartOffset
1080            | KafkaSourceConfigOptionName::StartTimestamp => false,
1081        }
1082    }
1083}
1084
1085#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1086pub struct KafkaSourceConfigOption<T: AstInfo> {
1087    pub name: KafkaSourceConfigOptionName,
1088    pub value: Option<WithOptionValue<T>>,
1089}
1090impl_display_for_with_option!(KafkaSourceConfigOption);
1091impl_display_t!(KafkaSourceConfigOption);
1092
1093#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1094pub enum KafkaSinkConfigOptionName {
1095    CompressionType,
1096    PartitionBy,
1097    ProgressGroupIdPrefix,
1098    Topic,
1099    TransactionalIdPrefix,
1100    LegacyIds,
1101    TopicConfig,
1102    TopicMetadataRefreshInterval,
1103    TopicPartitionCount,
1104    TopicReplicationFactor,
1105}
1106
1107impl AstDisplay for KafkaSinkConfigOptionName {
1108    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1109        f.write_str(match self {
1110            KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1111            KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1112            KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1113            KafkaSinkConfigOptionName::Topic => "TOPIC",
1114            KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1115            KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1116            KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1117            KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1118                "TOPIC METADATA REFRESH INTERVAL"
1119            }
1120            KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1121            KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1122        })
1123    }
1124}
1125impl_display!(KafkaSinkConfigOptionName);
1126
1127impl WithOptionName for KafkaSinkConfigOptionName {
1128    /// # WARNING
1129    ///
1130    /// Whenever implementing this trait consider very carefully whether or not
1131    /// this value could contain sensitive user data. If you're uncertain, err
1132    /// on the conservative side and return `true`.
1133    fn redact_value(&self) -> bool {
1134        match self {
1135            KafkaSinkConfigOptionName::CompressionType
1136            | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1137            | KafkaSinkConfigOptionName::Topic
1138            | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1139            | KafkaSinkConfigOptionName::TransactionalIdPrefix
1140            | KafkaSinkConfigOptionName::LegacyIds
1141            | KafkaSinkConfigOptionName::TopicConfig
1142            | KafkaSinkConfigOptionName::TopicPartitionCount
1143            | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1144            KafkaSinkConfigOptionName::PartitionBy => true,
1145        }
1146    }
1147}
1148
1149#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1150pub struct KafkaSinkConfigOption<T: AstInfo> {
1151    pub name: KafkaSinkConfigOptionName,
1152    pub value: Option<WithOptionValue<T>>,
1153}
1154impl_display_for_with_option!(KafkaSinkConfigOption);
1155impl_display_t!(KafkaSinkConfigOption);
1156
1157#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1158pub enum IcebergSinkConfigOptionName {
1159    Namespace,
1160    Table,
1161}
1162
1163impl AstDisplay for IcebergSinkConfigOptionName {
1164    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1165        f.write_str(match self {
1166            IcebergSinkConfigOptionName::Namespace => "NAMESPACE",
1167            IcebergSinkConfigOptionName::Table => "TABLE",
1168        })
1169    }
1170}
1171impl_display!(IcebergSinkConfigOptionName);
1172
1173impl WithOptionName for IcebergSinkConfigOptionName {
1174    /// # WARNING
1175    ///
1176    /// Whenever implementing this trait consider very carefully whether or not
1177    /// this value could contain sensitive user data. If you're uncertain, err
1178    /// on the conservative side and return `true`.
1179    fn redact_value(&self) -> bool {
1180        match self {
1181            IcebergSinkConfigOptionName::Namespace | IcebergSinkConfigOptionName::Table => false,
1182        }
1183    }
1184}
1185
1186#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1187pub struct IcebergSinkConfigOption<T: AstInfo> {
1188    pub name: IcebergSinkConfigOptionName,
1189    pub value: Option<WithOptionValue<T>>,
1190}
1191impl_display_for_with_option!(IcebergSinkConfigOption);
1192impl_display_t!(IcebergSinkConfigOption);
1193
1194#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1195pub enum PgConfigOptionName {
1196    /// Hex encoded string of binary serialization of
1197    /// `mz_storage_types::sources::postgres::PostgresSourcePublicationDetails`
1198    Details,
1199    /// The name of the publication to sync
1200    Publication,
1201    /// Columns whose types you want to unconditionally format as text
1202    /// NOTE(roshan): This value is kept around to allow round-tripping a
1203    /// `CREATE SOURCE` statement while we still allow creating implicit
1204    /// subsources from `CREATE SOURCE`, but will be removed once
1205    /// fully deprecating that feature and forcing users to use explicit
1206    /// `CREATE TABLE .. FROM SOURCE` statements
1207    TextColumns,
1208    /// Columns you want to exclude
1209    /// NOTE: This value is kept around to allow round-tripping a
1210    /// `CREATE SOURCE` statement while we still allow creating implicit
1211    /// subsources from `CREATE SOURCE`, but will be removed once
1212    /// fully deprecating that feature and forcing users to use explicit
1213    /// `CREATE TABLE .. FROM SOURCE` statements
1214    ExcludeColumns,
1215}
1216
1217impl AstDisplay for PgConfigOptionName {
1218    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1219        f.write_str(match self {
1220            PgConfigOptionName::Details => "DETAILS",
1221            PgConfigOptionName::Publication => "PUBLICATION",
1222            PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1223            PgConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1224        })
1225    }
1226}
1227impl_display!(PgConfigOptionName);
1228
1229impl WithOptionName for PgConfigOptionName {
1230    /// # WARNING
1231    ///
1232    /// Whenever implementing this trait consider very carefully whether or not
1233    /// this value could contain sensitive user data. If you're uncertain, err
1234    /// on the conservative side and return `true`.
1235    fn redact_value(&self) -> bool {
1236        match self {
1237            PgConfigOptionName::Details
1238            | PgConfigOptionName::Publication
1239            | PgConfigOptionName::TextColumns
1240            | PgConfigOptionName::ExcludeColumns => false,
1241        }
1242    }
1243}
1244
1245#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1246/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1247pub struct PgConfigOption<T: AstInfo> {
1248    pub name: PgConfigOptionName,
1249    pub value: Option<WithOptionValue<T>>,
1250}
1251impl_display_for_with_option!(PgConfigOption);
1252impl_display_t!(PgConfigOption);
1253
1254#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1255pub enum MySqlConfigOptionName {
1256    /// Hex encoded string of binary serialization of
1257    /// `mz_storage_types::sources::mysql::MySqlSourceDetails`
1258    Details,
1259    /// Columns whose types you want to unconditionally format as text
1260    /// NOTE(roshan): This value is kept around to allow round-tripping a
1261    /// `CREATE SOURCE` statement while we still allow creating implicit
1262    /// subsources from `CREATE SOURCE`, but will be removed once
1263    /// fully deprecating that feature and forcing users to use explicit
1264    /// `CREATE TABLE .. FROM SOURCE` statements
1265    TextColumns,
1266    /// Columns you want to exclude
1267    /// NOTE(roshan): This value is kept around to allow round-tripping a
1268    /// `CREATE SOURCE` statement while we still allow creating implicit
1269    /// subsources from `CREATE SOURCE`, but will be removed once
1270    /// fully deprecating that feature and forcing users to use explicit
1271    /// `CREATE TABLE .. FROM SOURCE` statements
1272    ExcludeColumns,
1273}
1274
1275impl AstDisplay for MySqlConfigOptionName {
1276    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1277        f.write_str(match self {
1278            MySqlConfigOptionName::Details => "DETAILS",
1279            MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1280            MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1281        })
1282    }
1283}
1284impl_display!(MySqlConfigOptionName);
1285
1286impl WithOptionName for MySqlConfigOptionName {
1287    /// # WARNING
1288    ///
1289    /// Whenever implementing this trait consider very carefully whether or not
1290    /// this value could contain sensitive user data. If you're uncertain, err
1291    /// on the conservative side and return `true`.
1292    fn redact_value(&self) -> bool {
1293        match self {
1294            MySqlConfigOptionName::Details
1295            | MySqlConfigOptionName::TextColumns
1296            | MySqlConfigOptionName::ExcludeColumns => false,
1297        }
1298    }
1299}
1300
1301#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1302/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1303pub struct MySqlConfigOption<T: AstInfo> {
1304    pub name: MySqlConfigOptionName,
1305    pub value: Option<WithOptionValue<T>>,
1306}
1307impl_display_for_with_option!(MySqlConfigOption);
1308impl_display_t!(MySqlConfigOption);
1309
1310#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1311pub enum SqlServerConfigOptionName {
1312    /// Hex encoded string of binary serialization of
1313    /// `mz_storage_types::sources::sql_server::SqlServerSourceDetails`.
1314    Details,
1315    /// Columns whose types you want to unconditionally format as text.
1316    ///
1317    /// NOTE(roshan): This value is kept around to allow round-tripping a
1318    /// `CREATE SOURCE` statement while we still allow creating implicit
1319    /// subsources from `CREATE SOURCE`, but will be removed once
1320    /// fully deprecating that feature and forcing users to use explicit
1321    /// `CREATE TABLE .. FROM SOURCE` statements
1322    TextColumns,
1323    /// Columns you want to exclude.
1324    ///
1325    /// NOTE(roshan): This value is kept around to allow round-tripping a
1326    /// `CREATE SOURCE` statement while we still allow creating implicit
1327    /// subsources from `CREATE SOURCE`, but will be removed once
1328    /// fully deprecating that feature and forcing users to use explicit
1329    /// `CREATE TABLE .. FROM SOURCE` statements
1330    ExcludeColumns,
1331}
1332
1333impl AstDisplay for SqlServerConfigOptionName {
1334    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1335        f.write_str(match self {
1336            SqlServerConfigOptionName::Details => "DETAILS",
1337            SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1338            SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1339        })
1340    }
1341}
1342impl_display!(SqlServerConfigOptionName);
1343
1344impl WithOptionName for SqlServerConfigOptionName {
1345    /// # WARNING
1346    ///
1347    /// Whenever implementing this trait consider very carefully whether or not
1348    /// this value could contain sensitive user data. If you're uncertain, err
1349    /// on the conservative side and return `true`.
1350    fn redact_value(&self) -> bool {
1351        match self {
1352            SqlServerConfigOptionName::Details
1353            | SqlServerConfigOptionName::TextColumns
1354            | SqlServerConfigOptionName::ExcludeColumns => false,
1355        }
1356    }
1357}
1358
1359#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1360/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1361pub struct SqlServerConfigOption<T: AstInfo> {
1362    pub name: SqlServerConfigOptionName,
1363    pub value: Option<WithOptionValue<T>>,
1364}
1365impl_display_for_with_option!(SqlServerConfigOption);
1366impl_display_t!(SqlServerConfigOption);
1367
1368#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1369pub enum CreateSourceConnection<T: AstInfo> {
1370    Kafka {
1371        connection: T::ItemName,
1372        options: Vec<KafkaSourceConfigOption<T>>,
1373    },
1374    Postgres {
1375        connection: T::ItemName,
1376        options: Vec<PgConfigOption<T>>,
1377    },
1378    SqlServer {
1379        connection: T::ItemName,
1380        options: Vec<SqlServerConfigOption<T>>,
1381    },
1382    MySql {
1383        connection: T::ItemName,
1384        options: Vec<MySqlConfigOption<T>>,
1385    },
1386    LoadGenerator {
1387        generator: LoadGenerator,
1388        options: Vec<LoadGeneratorOption<T>>,
1389    },
1390}
1391
1392impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1393    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1394        match self {
1395            CreateSourceConnection::Kafka {
1396                connection,
1397                options,
1398            } => {
1399                f.write_str("KAFKA CONNECTION ");
1400                f.write_node(connection);
1401                if !options.is_empty() {
1402                    f.write_str(" (");
1403                    f.write_node(&display::comma_separated(options));
1404                    f.write_str(")");
1405                }
1406            }
1407            CreateSourceConnection::Postgres {
1408                connection,
1409                options,
1410            } => {
1411                f.write_str("POSTGRES CONNECTION ");
1412                f.write_node(connection);
1413                if !options.is_empty() {
1414                    f.write_str(" (");
1415                    f.write_node(&display::comma_separated(options));
1416                    f.write_str(")");
1417                }
1418            }
1419            CreateSourceConnection::SqlServer {
1420                connection,
1421                options,
1422            } => {
1423                f.write_str("SQL SERVER CONNECTION ");
1424                f.write_node(connection);
1425                if !options.is_empty() {
1426                    f.write_str(" (");
1427                    f.write_node(&display::comma_separated(options));
1428                    f.write_str(")");
1429                }
1430            }
1431            CreateSourceConnection::MySql {
1432                connection,
1433                options,
1434            } => {
1435                f.write_str("MYSQL CONNECTION ");
1436                f.write_node(connection);
1437                if !options.is_empty() {
1438                    f.write_str(" (");
1439                    f.write_node(&display::comma_separated(options));
1440                    f.write_str(")");
1441                }
1442            }
1443            CreateSourceConnection::LoadGenerator { generator, options } => {
1444                f.write_str("LOAD GENERATOR ");
1445                f.write_node(generator);
1446                if !options.is_empty() {
1447                    f.write_str(" (");
1448                    f.write_node(&display::comma_separated(options));
1449                    f.write_str(")");
1450                }
1451            }
1452        }
1453    }
1454}
1455impl_display_t!(CreateSourceConnection);
1456
1457#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1458pub enum LoadGenerator {
1459    Clock,
1460    Counter,
1461    Marketing,
1462    Auction,
1463    Datums,
1464    Tpch,
1465    KeyValue,
1466}
1467
1468impl AstDisplay for LoadGenerator {
1469    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1470        match self {
1471            Self::Counter => f.write_str("COUNTER"),
1472            Self::Clock => f.write_str("CLOCK"),
1473            Self::Marketing => f.write_str("MARKETING"),
1474            Self::Auction => f.write_str("AUCTION"),
1475            Self::Datums => f.write_str("DATUMS"),
1476            Self::Tpch => f.write_str("TPCH"),
1477            Self::KeyValue => f.write_str("KEY VALUE"),
1478        }
1479    }
1480}
1481impl_display!(LoadGenerator);
1482
1483impl LoadGenerator {
1484    /// Corresponds with the same mapping on the `LoadGenerator` enum defined in
1485    /// src/storage-types/src/sources/load_generator.rs, but re-defined here for
1486    /// cases where we only have the AST representation. This can be removed once
1487    /// the `ast_rewrite_sources_to_tables` migration is removed.
1488    pub fn schema_name(&self) -> &'static str {
1489        match self {
1490            LoadGenerator::Counter => "counter",
1491            LoadGenerator::Clock => "clock",
1492            LoadGenerator::Marketing => "marketing",
1493            LoadGenerator::Auction => "auction",
1494            LoadGenerator::Datums => "datums",
1495            LoadGenerator::Tpch => "tpch",
1496            LoadGenerator::KeyValue => "key_value",
1497        }
1498    }
1499}
1500
1501#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1502pub enum LoadGeneratorOptionName {
1503    ScaleFactor,
1504    TickInterval,
1505    AsOf,
1506    UpTo,
1507    MaxCardinality,
1508    Keys,
1509    SnapshotRounds,
1510    TransactionalSnapshot,
1511    ValueSize,
1512    Seed,
1513    Partitions,
1514    BatchSize,
1515}
1516
1517impl AstDisplay for LoadGeneratorOptionName {
1518    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1519        f.write_str(match self {
1520            LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1521            LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1522            LoadGeneratorOptionName::AsOf => "AS OF",
1523            LoadGeneratorOptionName::UpTo => "UP TO",
1524            LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1525            LoadGeneratorOptionName::Keys => "KEYS",
1526            LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1527            LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1528            LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1529            LoadGeneratorOptionName::Seed => "SEED",
1530            LoadGeneratorOptionName::Partitions => "PARTITIONS",
1531            LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1532        })
1533    }
1534}
1535impl_display!(LoadGeneratorOptionName);
1536
1537impl WithOptionName for LoadGeneratorOptionName {
1538    /// # WARNING
1539    ///
1540    /// Whenever implementing this trait consider very carefully whether or not
1541    /// this value could contain sensitive user data. If you're uncertain, err
1542    /// on the conservative side and return `true`.
1543    fn redact_value(&self) -> bool {
1544        match self {
1545            LoadGeneratorOptionName::ScaleFactor
1546            | LoadGeneratorOptionName::TickInterval
1547            | LoadGeneratorOptionName::AsOf
1548            | LoadGeneratorOptionName::UpTo
1549            | LoadGeneratorOptionName::MaxCardinality
1550            | LoadGeneratorOptionName::Keys
1551            | LoadGeneratorOptionName::SnapshotRounds
1552            | LoadGeneratorOptionName::TransactionalSnapshot
1553            | LoadGeneratorOptionName::ValueSize
1554            | LoadGeneratorOptionName::Partitions
1555            | LoadGeneratorOptionName::BatchSize
1556            | LoadGeneratorOptionName::Seed => false,
1557        }
1558    }
1559}
1560
1561#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1562/// An option in a `CREATE CONNECTION...SSH`.
1563pub struct LoadGeneratorOption<T: AstInfo> {
1564    pub name: LoadGeneratorOptionName,
1565    pub value: Option<WithOptionValue<T>>,
1566}
1567impl_display_for_with_option!(LoadGeneratorOption);
1568impl_display_t!(LoadGeneratorOption);
1569
1570#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1571pub enum CreateSinkConnection<T: AstInfo> {
1572    Kafka {
1573        connection: T::ItemName,
1574        options: Vec<KafkaSinkConfigOption<T>>,
1575        key: Option<SinkKey>,
1576        headers: Option<Ident>,
1577    },
1578    Iceberg {
1579        connection: T::ItemName,
1580        aws_connection: T::ItemName,
1581        key: Option<SinkKey>,
1582        options: Vec<IcebergSinkConfigOption<T>>,
1583    },
1584}
1585
1586impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1587    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1588        match self {
1589            CreateSinkConnection::Kafka {
1590                connection,
1591                options,
1592                key,
1593                headers,
1594            } => {
1595                f.write_str("KAFKA CONNECTION ");
1596                f.write_node(connection);
1597                if !options.is_empty() {
1598                    f.write_str(" (");
1599                    f.write_node(&display::comma_separated(options));
1600                    f.write_str(")");
1601                }
1602                if let Some(key) = key.as_ref() {
1603                    f.write_str(" ");
1604                    f.write_node(key);
1605                }
1606                if let Some(headers) = headers {
1607                    f.write_str(" HEADERS ");
1608                    f.write_node(headers);
1609                }
1610            }
1611            CreateSinkConnection::Iceberg {
1612                connection,
1613                aws_connection,
1614                key,
1615                options,
1616            } => {
1617                f.write_str("ICEBERG CATALOG CONNECTION ");
1618                f.write_node(connection);
1619                if !options.is_empty() {
1620                    f.write_str(" (");
1621                    f.write_node(&display::comma_separated(options));
1622                    f.write_str(")");
1623                }
1624                f.write_str(" USING AWS CONNECTION ");
1625                f.write_node(aws_connection);
1626                if let Some(key) = key.as_ref() {
1627                    f.write_str(" ");
1628                    f.write_node(key);
1629                }
1630            }
1631        }
1632    }
1633}
1634impl_display_t!(CreateSinkConnection);
1635
1636#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1637pub struct SinkKey {
1638    pub key_columns: Vec<Ident>,
1639    pub not_enforced: bool,
1640}
1641
1642impl AstDisplay for SinkKey {
1643    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1644        f.write_str("KEY (");
1645        f.write_node(&display::comma_separated(&self.key_columns));
1646        f.write_str(")");
1647        if self.not_enforced {
1648            f.write_str(" NOT ENFORCED");
1649        }
1650    }
1651}
1652
1653/// A table-level constraint, specified in a `CREATE TABLE` or an
1654/// `ALTER TABLE ADD <constraint>` statement.
1655#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1656pub enum TableConstraint<T: AstInfo> {
1657    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE (NULLS NOT DISTINCT)? } (<columns>)`
1658    Unique {
1659        name: Option<Ident>,
1660        columns: Vec<Ident>,
1661        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
1662        is_primary: bool,
1663        // Where this constraint treats each NULL value as distinct; only available on `UNIQUE`
1664        // constraints.
1665        nulls_not_distinct: bool,
1666    },
1667    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
1668    /// REFERENCES <foreign_table> (<referred_columns>)`)
1669    ForeignKey {
1670        name: Option<Ident>,
1671        columns: Vec<Ident>,
1672        foreign_table: T::ItemName,
1673        referred_columns: Vec<Ident>,
1674    },
1675    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
1676    Check {
1677        name: Option<Ident>,
1678        expr: Box<Expr<T>>,
1679    },
1680}
1681
1682impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1683    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1684        match self {
1685            TableConstraint::Unique {
1686                name,
1687                columns,
1688                is_primary,
1689                nulls_not_distinct,
1690            } => {
1691                f.write_node(&display_constraint_name(name));
1692                if *is_primary {
1693                    f.write_str("PRIMARY KEY ");
1694                } else {
1695                    f.write_str("UNIQUE ");
1696                    if *nulls_not_distinct {
1697                        f.write_str("NULLS NOT DISTINCT ");
1698                    }
1699                }
1700                f.write_str("(");
1701                f.write_node(&display::comma_separated(columns));
1702                f.write_str(")");
1703            }
1704            TableConstraint::ForeignKey {
1705                name,
1706                columns,
1707                foreign_table,
1708                referred_columns,
1709            } => {
1710                f.write_node(&display_constraint_name(name));
1711                f.write_str("FOREIGN KEY (");
1712                f.write_node(&display::comma_separated(columns));
1713                f.write_str(") REFERENCES ");
1714                f.write_node(foreign_table);
1715                f.write_str("(");
1716                f.write_node(&display::comma_separated(referred_columns));
1717                f.write_str(")");
1718            }
1719            TableConstraint::Check { name, expr } => {
1720                f.write_node(&display_constraint_name(name));
1721                f.write_str("CHECK (");
1722                f.write_node(&expr);
1723                f.write_str(")");
1724            }
1725        }
1726    }
1727}
1728impl_display_t!(TableConstraint);
1729
1730/// A key constraint, specified in a `CREATE SOURCE`.
1731#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1732pub enum KeyConstraint {
1733    // PRIMARY KEY (<columns>) NOT ENFORCED
1734    PrimaryKeyNotEnforced { columns: Vec<Ident> },
1735}
1736
1737impl AstDisplay for KeyConstraint {
1738    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1739        match self {
1740            KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1741                f.write_str("PRIMARY KEY ");
1742                f.write_str("(");
1743                f.write_node(&display::comma_separated(columns));
1744                f.write_str(") ");
1745                f.write_str("NOT ENFORCED");
1746            }
1747        }
1748    }
1749}
1750impl_display!(KeyConstraint);
1751
1752#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1753pub enum CreateSourceOptionName {
1754    TimestampInterval,
1755    RetainHistory,
1756}
1757
1758impl AstDisplay for CreateSourceOptionName {
1759    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1760        f.write_str(match self {
1761            CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1762            CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1763        })
1764    }
1765}
1766impl_display!(CreateSourceOptionName);
1767
1768impl WithOptionName for CreateSourceOptionName {
1769    /// # WARNING
1770    ///
1771    /// Whenever implementing this trait consider very carefully whether or not
1772    /// this value could contain sensitive user data. If you're uncertain, err
1773    /// on the conservative side and return `true`.
1774    fn redact_value(&self) -> bool {
1775        match self {
1776            CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1777                false
1778            }
1779        }
1780    }
1781}
1782
1783#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1784/// An option in a `CREATE SOURCE...` statement.
1785pub struct CreateSourceOption<T: AstInfo> {
1786    pub name: CreateSourceOptionName,
1787    pub value: Option<WithOptionValue<T>>,
1788}
1789impl_display_for_with_option!(CreateSourceOption);
1790impl_display_t!(CreateSourceOption);
1791
1792/// SQL column definition
1793#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1794pub struct ColumnDef<T: AstInfo> {
1795    pub name: Ident,
1796    pub data_type: T::DataType,
1797    pub collation: Option<UnresolvedItemName>,
1798    pub options: Vec<ColumnOptionDef<T>>,
1799}
1800
1801impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1802    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1803        f.write_node(&self.name);
1804        f.write_str(" ");
1805        f.write_node(&self.data_type);
1806        if let Some(collation) = &self.collation {
1807            f.write_str(" COLLATE ");
1808            f.write_node(collation);
1809        }
1810        for option in &self.options {
1811            f.write_str(" ");
1812            f.write_node(option);
1813        }
1814    }
1815}
1816impl_display_t!(ColumnDef);
1817
1818/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
1819///
1820/// Note that implementations are substantially more permissive than the ANSI
1821/// specification on what order column options can be presented in, and whether
1822/// they are allowed to be named. The specification distinguishes between
1823/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
1824/// and can appear in any order, and other options (DEFAULT, GENERATED), which
1825/// cannot be named and must appear in a fixed order. PostgreSQL, however,
1826/// allows preceding any option with `CONSTRAINT <name>`, even those that are
1827/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
1828/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
1829/// NOT NULL constraints (the last of which is in violation of the spec).
1830///
1831/// For maximum flexibility, we don't distinguish between constraint and
1832/// non-constraint options, lumping them all together under the umbrella of
1833/// "column options," and we allow any column option to be named.
1834#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1835pub struct ColumnOptionDef<T: AstInfo> {
1836    pub name: Option<Ident>,
1837    pub option: ColumnOption<T>,
1838}
1839
1840impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1841    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1842        f.write_node(&display_constraint_name(&self.name));
1843        f.write_node(&self.option);
1844    }
1845}
1846impl_display_t!(ColumnOptionDef);
1847
1848/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
1849/// TABLE` statement.
1850#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1851pub enum ColumnOption<T: AstInfo> {
1852    /// `NULL`
1853    Null,
1854    /// `NOT NULL`
1855    NotNull,
1856    /// `DEFAULT <restricted-expr>`
1857    Default(Expr<T>),
1858    /// `{ PRIMARY KEY | UNIQUE }`
1859    Unique { is_primary: bool },
1860    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
1861    /// <foreign_table> (<referred_columns>)`).
1862    ForeignKey {
1863        foreign_table: UnresolvedItemName,
1864        referred_columns: Vec<Ident>,
1865    },
1866    /// `CHECK (<expr>)`
1867    Check(Expr<T>),
1868    /// `VERSION <action> <version>`
1869    Versioned {
1870        action: ColumnVersioned,
1871        version: Version,
1872    },
1873}
1874
1875impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1876    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1877        use ColumnOption::*;
1878        match self {
1879            Null => f.write_str("NULL"),
1880            NotNull => f.write_str("NOT NULL"),
1881            Default(expr) => {
1882                f.write_str("DEFAULT ");
1883                f.write_node(expr);
1884            }
1885            Unique { is_primary } => {
1886                if *is_primary {
1887                    f.write_str("PRIMARY KEY");
1888                } else {
1889                    f.write_str("UNIQUE");
1890                }
1891            }
1892            ForeignKey {
1893                foreign_table,
1894                referred_columns,
1895            } => {
1896                f.write_str("REFERENCES ");
1897                f.write_node(foreign_table);
1898                f.write_str(" (");
1899                f.write_node(&display::comma_separated(referred_columns));
1900                f.write_str(")");
1901            }
1902            Check(expr) => {
1903                f.write_str("CHECK (");
1904                f.write_node(expr);
1905                f.write_str(")");
1906            }
1907            Versioned { action, version } => {
1908                f.write_str("VERSION ");
1909                f.write_node(action);
1910                f.write_str(" ");
1911                f.write_node(version);
1912            }
1913        }
1914    }
1915}
1916impl_display_t!(ColumnOption);
1917
1918#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1919pub enum ColumnVersioned {
1920    Added,
1921}
1922
1923impl AstDisplay for ColumnVersioned {
1924    fn fmt<W>(&self, f: &mut AstFormatter<W>)
1925    where
1926        W: fmt::Write,
1927    {
1928        match self {
1929            // TODO(alter_table): Support dropped columns.
1930            ColumnVersioned::Added => f.write_str("ADDED"),
1931        }
1932    }
1933}
1934impl_display!(ColumnVersioned);
1935
1936fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
1937    struct ConstraintName<'a>(&'a Option<Ident>);
1938    impl<'a> AstDisplay for ConstraintName<'a> {
1939        fn fmt<W>(&self, f: &mut AstFormatter<W>)
1940        where
1941            W: fmt::Write,
1942        {
1943            if let Some(name) = self.0 {
1944                f.write_str("CONSTRAINT ");
1945                f.write_node(name);
1946                f.write_str(" ");
1947            }
1948        }
1949    }
1950    ConstraintName(name)
1951}