Skip to main content

mz_sql_parser/ast/defs/
ddl.rs

1// Copyright 2018 sqlparser-rs contributors. All rights reserved.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// This file is derived from the sqlparser-rs project, available at
5// https://github.com/andygrove/sqlparser-rs. It was incorporated
6// directly into Materialize on December 21, 2019.
7//
8// Licensed under the Apache License, Version 2.0 (the "License");
9// you may not use this file except in compliance with the License.
10// You may obtain a copy of the License in the LICENSE file at the
11// root of this repository, or online at
12//
13//     http://www.apache.org/licenses/LICENSE-2.0
14//
15// Unless required by applicable law or agreed to in writing, software
16// distributed under the License is distributed on an "AS IS" BASIS,
17// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18// See the License for the specific language governing permissions and
19// limitations under the License.
20
21//! AST types specific to CREATE/ALTER variants of [crate::ast::Statement]
22//! (commonly referred to as Data Definition Language, or DDL)
23
24use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28    AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33    /// The `ASSERT NOT NULL [=] <ident>` option.
34    AssertNotNull,
35    PartitionBy,
36    RetainHistory,
37    /// The `REFRESH [=] ...` option.
38    Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43        match self {
44            MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45            MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46            MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47            MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48        }
49    }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53    /// # WARNING
54    ///
55    /// Whenever implementing this trait consider very carefully whether or not
56    /// this value could contain sensitive user data. If you're uncertain, err
57    /// on the conservative side and return `true`.
58    fn redact_value(&self) -> bool {
59        match self {
60            MaterializedViewOptionName::AssertNotNull
61            | MaterializedViewOptionName::PartitionBy
62            | MaterializedViewOptionName::RetainHistory
63            | MaterializedViewOptionName::Refresh => false,
64        }
65    }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70    pub name: MaterializedViewOptionName,
71    pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, Hash)]
76pub struct Schema {
77    pub schema: String,
78}
79
80impl AstDisplay for Schema {
81    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
82        f.write_str("SCHEMA '");
83        f.write_node(&display::escape_single_quote_string(&self.schema));
84        f.write_str("'");
85    }
86}
87impl_display!(Schema);
88
89#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
90pub enum AvroSchemaOptionName {
91    /// The `CONFLUENT WIRE FORMAT [=] <bool>` option.
92    ConfluentWireFormat,
93}
94
95impl AstDisplay for AvroSchemaOptionName {
96    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
97        match self {
98            AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
99        }
100    }
101}
102
103impl WithOptionName for AvroSchemaOptionName {
104    /// # WARNING
105    ///
106    /// Whenever implementing this trait consider very carefully whether or not
107    /// this value could contain sensitive user data. If you're uncertain, err
108    /// on the conservative side and return `true`.
109    fn redact_value(&self) -> bool {
110        match self {
111            Self::ConfluentWireFormat => false,
112        }
113    }
114}
115
116#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
117pub struct AvroSchemaOption<T: AstInfo> {
118    pub name: AvroSchemaOptionName,
119    pub value: Option<WithOptionValue<T>>,
120}
121impl_display_for_with_option!(AvroSchemaOption);
122
123#[derive(Debug, Clone, PartialEq, Eq, Hash)]
124pub enum AvroSchema<T: AstInfo> {
125    Csr {
126        csr_connection: CsrConnectionAvro<T>,
127    },
128    InlineSchema {
129        schema: Schema,
130        with_options: Vec<AvroSchemaOption<T>>,
131    },
132}
133
134impl<T: AstInfo> AstDisplay for AvroSchema<T> {
135    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
136        match self {
137            Self::Csr { csr_connection } => {
138                f.write_node(csr_connection);
139            }
140            Self::InlineSchema {
141                schema,
142                with_options,
143            } => {
144                f.write_str("USING ");
145                schema.fmt(f);
146                if !with_options.is_empty() {
147                    f.write_str(" (");
148                    f.write_node(&display::comma_separated(with_options));
149                    f.write_str(")");
150                }
151            }
152        }
153    }
154}
155impl_display_t!(AvroSchema);
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum ProtobufSchema<T: AstInfo> {
159    Csr {
160        csr_connection: CsrConnectionProtobuf<T>,
161    },
162    InlineSchema {
163        message_name: String,
164        schema: Schema,
165    },
166}
167
168impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
169    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
170        match self {
171            Self::Csr { csr_connection } => {
172                f.write_node(csr_connection);
173            }
174            Self::InlineSchema {
175                message_name,
176                schema,
177            } => {
178                f.write_str("MESSAGE '");
179                f.write_node(&display::escape_single_quote_string(message_name));
180                f.write_str("' USING ");
181                f.write_str(schema);
182            }
183        }
184    }
185}
186impl_display_t!(ProtobufSchema);
187
188#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
189pub enum CsrConfigOptionName<T: AstInfo> {
190    AvroKeyFullname,
191    AvroValueFullname,
192    NullDefaults,
193    AvroDocOn(AvroDocOn<T>),
194    KeyCompatibilityLevel,
195    ValueCompatibilityLevel,
196}
197
198impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
199    /// # WARNING
200    ///
201    /// Whenever implementing this trait consider very carefully whether or not
202    /// this value could contain sensitive user data. If you're uncertain, err
203    /// on the conservative side and return `true`.
204    fn redact_value(&self) -> bool {
205        match self {
206            Self::AvroKeyFullname
207            | Self::AvroValueFullname
208            | Self::NullDefaults
209            | Self::AvroDocOn(_)
210            | Self::KeyCompatibilityLevel
211            | Self::ValueCompatibilityLevel => false,
212        }
213    }
214}
215
216#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
217pub struct AvroDocOn<T: AstInfo> {
218    pub identifier: DocOnIdentifier<T>,
219    pub for_schema: DocOnSchema,
220}
221#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
222pub enum DocOnSchema {
223    KeyOnly,
224    ValueOnly,
225    All,
226}
227
228#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
229pub enum DocOnIdentifier<T: AstInfo> {
230    Column(ColumnName<T>),
231    Type(T::ItemName),
232}
233
234impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
235    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
236        match &self.for_schema {
237            DocOnSchema::KeyOnly => f.write_str("KEY "),
238            DocOnSchema::ValueOnly => f.write_str("VALUE "),
239            DocOnSchema::All => {}
240        }
241        match &self.identifier {
242            DocOnIdentifier::Column(name) => {
243                f.write_str("DOC ON COLUMN ");
244                f.write_node(name);
245            }
246            DocOnIdentifier::Type(name) => {
247                f.write_str("DOC ON TYPE ");
248                f.write_node(name);
249            }
250        }
251    }
252}
253impl_display_t!(AvroDocOn);
254
255impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
256    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
257        match self {
258            CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
259            CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
260            CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
261            CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
262            CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
263            CsrConfigOptionName::ValueCompatibilityLevel => {
264                f.write_str("VALUE COMPATIBILITY LEVEL")
265            }
266        }
267    }
268}
269impl_display_t!(CsrConfigOptionName);
270
271#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
272/// An option in a `{FROM|INTO} CONNECTION ...` statement.
273pub struct CsrConfigOption<T: AstInfo> {
274    pub name: CsrConfigOptionName<T>,
275    pub value: Option<WithOptionValue<T>>,
276}
277impl_display_for_with_option!(CsrConfigOption);
278impl_display_t!(CsrConfigOption);
279
280#[derive(Debug, Clone, PartialEq, Eq, Hash)]
281pub struct CsrConnection<T: AstInfo> {
282    pub connection: T::ItemName,
283    pub options: Vec<CsrConfigOption<T>>,
284}
285
286impl<T: AstInfo> AstDisplay for CsrConnection<T> {
287    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
288        f.write_str("CONNECTION ");
289        f.write_node(&self.connection);
290        if !self.options.is_empty() {
291            f.write_str(" (");
292            f.write_node(&display::comma_separated(&self.options));
293            f.write_str(")");
294        }
295    }
296}
297impl_display_t!(CsrConnection);
298
299#[derive(Debug, Clone, PartialEq, Eq, Hash)]
300pub enum ReaderSchemaSelectionStrategy {
301    Latest,
302    Inline(String),
303    ById(i32),
304}
305
306impl Default for ReaderSchemaSelectionStrategy {
307    fn default() -> Self {
308        Self::Latest
309    }
310}
311
312#[derive(Debug, Clone, PartialEq, Eq, Hash)]
313pub struct CsrConnectionAvro<T: AstInfo> {
314    pub connection: CsrConnection<T>,
315    pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
316    pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
317    pub seed: Option<CsrSeedAvro>,
318}
319
320impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
321    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
322        f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
323        f.write_node(&self.connection);
324        if let Some(seed) = &self.seed {
325            f.write_str(" ");
326            f.write_node(seed);
327        }
328    }
329}
330impl_display_t!(CsrConnectionAvro);
331
332#[derive(Debug, Clone, PartialEq, Eq, Hash)]
333pub struct CsrConnectionProtobuf<T: AstInfo> {
334    pub connection: CsrConnection<T>,
335    pub seed: Option<CsrSeedProtobuf>,
336}
337
338impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
339    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
340        f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
341        f.write_node(&self.connection);
342
343        if let Some(seed) = &self.seed {
344            f.write_str(" ");
345            f.write_node(seed);
346        }
347    }
348}
349impl_display_t!(CsrConnectionProtobuf);
350
351#[derive(Debug, Clone, PartialEq, Eq, Hash)]
352pub struct CsrSeedAvro {
353    pub key_schema: Option<String>,
354    pub value_schema: String,
355    /// Reference schemas for the key schema, in dependency order.
356    /// Populated during purification by fetching from the schema registry.
357    pub key_reference_schemas: Vec<String>,
358    /// Reference schemas for the value schema, in dependency order.
359    /// Populated during purification by fetching from the schema registry.
360    pub value_reference_schemas: Vec<String>,
361}
362
363impl AstDisplay for CsrSeedAvro {
364    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
365        f.write_str("SEED");
366        if let Some(key_schema) = &self.key_schema {
367            f.write_str(" KEY SCHEMA '");
368            f.write_node(&display::escape_single_quote_string(key_schema));
369            f.write_str("'");
370            if !self.key_reference_schemas.is_empty() {
371                f.write_str(" KEY REFERENCES (");
372                for (i, schema) in self.key_reference_schemas.iter().enumerate() {
373                    if i > 0 {
374                        f.write_str(", ");
375                    }
376                    f.write_str("'");
377                    f.write_node(&display::escape_single_quote_string(schema));
378                    f.write_str("'");
379                }
380                f.write_str(")");
381            }
382        }
383        f.write_str(" VALUE SCHEMA '");
384        f.write_node(&display::escape_single_quote_string(&self.value_schema));
385        f.write_str("'");
386        if !self.value_reference_schemas.is_empty() {
387            f.write_str(" VALUE REFERENCES (");
388            for (i, schema) in self.value_reference_schemas.iter().enumerate() {
389                if i > 0 {
390                    f.write_str(", ");
391                }
392                f.write_str("'");
393                f.write_node(&display::escape_single_quote_string(schema));
394                f.write_str("'");
395            }
396            f.write_str(")");
397        }
398    }
399}
400impl_display!(CsrSeedAvro);
401
402#[derive(Debug, Clone, PartialEq, Eq, Hash)]
403pub struct CsrSeedProtobuf {
404    pub key: Option<CsrSeedProtobufSchema>,
405    pub value: CsrSeedProtobufSchema,
406}
407
408impl AstDisplay for CsrSeedProtobuf {
409    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
410        f.write_str("SEED");
411        if let Some(key) = &self.key {
412            f.write_str(" KEY ");
413            f.write_node(key);
414        }
415        f.write_str(" VALUE ");
416        f.write_node(&self.value);
417    }
418}
419impl_display!(CsrSeedProtobuf);
420
421#[derive(Debug, Clone, PartialEq, Eq, Hash)]
422pub struct CsrSeedProtobufSchema {
423    // Hex encoded string.
424    pub schema: String,
425    pub message_name: String,
426}
427impl AstDisplay for CsrSeedProtobufSchema {
428    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
429        f.write_str("SCHEMA '");
430        f.write_str(&display::escape_single_quote_string(&self.schema));
431        f.write_str("' MESSAGE '");
432        f.write_str(&display::escape_single_quote_string(&self.message_name));
433        f.write_str("'");
434    }
435}
436impl_display!(CsrSeedProtobufSchema);
437
438#[derive(Debug, Clone, PartialEq, Eq, Hash)]
439pub enum FormatSpecifier<T: AstInfo> {
440    /// `CREATE SOURCE/SINK .. FORMAT`
441    Bare(Format<T>),
442    /// `CREATE SOURCE/SINK .. KEY FORMAT .. VALUE FORMAT`
443    KeyValue { key: Format<T>, value: Format<T> },
444}
445
446impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
447    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
448        match self {
449            FormatSpecifier::Bare(format) => {
450                f.write_str("FORMAT ");
451                f.write_node(format)
452            }
453            FormatSpecifier::KeyValue { key, value } => {
454                f.write_str("KEY FORMAT ");
455                f.write_node(key);
456                f.write_str(" VALUE FORMAT ");
457                f.write_node(value);
458            }
459        }
460    }
461}
462impl_display_t!(FormatSpecifier);
463
464#[derive(Debug, Clone, PartialEq, Eq, Hash)]
465pub enum Format<T: AstInfo> {
466    Bytes,
467    Avro(AvroSchema<T>),
468    Protobuf(ProtobufSchema<T>),
469    Regex(String),
470    Csv {
471        columns: CsvColumns,
472        delimiter: char,
473    },
474    Json {
475        array: bool,
476    },
477    Text,
478}
479
480#[derive(Debug, Clone, PartialEq, Eq, Hash)]
481pub enum CsvColumns {
482    /// `WITH count COLUMNS`
483    Count(u64),
484    /// `WITH HEADER (ident, ...)?`: `names` is empty if there are no names specified
485    Header { names: Vec<Ident> },
486}
487
488impl AstDisplay for CsvColumns {
489    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
490        match self {
491            CsvColumns::Count(n) => {
492                f.write_str(n);
493                f.write_str(" COLUMNS")
494            }
495            CsvColumns::Header { names } => {
496                f.write_str("HEADER");
497                if !names.is_empty() {
498                    f.write_str(" (");
499                    f.write_node(&display::comma_separated(names));
500                    f.write_str(")");
501                }
502            }
503        }
504    }
505}
506
507#[derive(Debug, Clone, PartialEq, Eq, Hash)]
508pub enum SourceIncludeMetadata {
509    Key {
510        alias: Option<Ident>,
511    },
512    Timestamp {
513        alias: Option<Ident>,
514    },
515    Partition {
516        alias: Option<Ident>,
517    },
518    Offset {
519        alias: Option<Ident>,
520    },
521    Headers {
522        alias: Option<Ident>,
523    },
524    Header {
525        key: String,
526        alias: Ident,
527        use_bytes: bool,
528    },
529}
530
531impl AstDisplay for SourceIncludeMetadata {
532    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
533        let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
534            if let Some(alias) = alias {
535                f.write_str(" AS ");
536                f.write_node(alias);
537            }
538        };
539
540        match self {
541            SourceIncludeMetadata::Key { alias } => {
542                f.write_str("KEY");
543                print_alias(f, alias);
544            }
545            SourceIncludeMetadata::Timestamp { alias } => {
546                f.write_str("TIMESTAMP");
547                print_alias(f, alias);
548            }
549            SourceIncludeMetadata::Partition { alias } => {
550                f.write_str("PARTITION");
551                print_alias(f, alias);
552            }
553            SourceIncludeMetadata::Offset { alias } => {
554                f.write_str("OFFSET");
555                print_alias(f, alias);
556            }
557            SourceIncludeMetadata::Headers { alias } => {
558                f.write_str("HEADERS");
559                print_alias(f, alias);
560            }
561            SourceIncludeMetadata::Header {
562                alias,
563                key,
564                use_bytes,
565            } => {
566                f.write_str("HEADER '");
567                f.write_str(&display::escape_single_quote_string(key));
568                f.write_str("'");
569                print_alias(f, &Some(alias.clone()));
570                if *use_bytes {
571                    f.write_str(" BYTES");
572                }
573            }
574        }
575    }
576}
577impl_display!(SourceIncludeMetadata);
578
579#[derive(Debug, Clone, PartialEq, Eq, Hash)]
580pub enum SourceErrorPolicy {
581    Inline {
582        /// The alias to use for the error column. If unspecified will be `error`.
583        alias: Option<Ident>,
584    },
585}
586
587impl AstDisplay for SourceErrorPolicy {
588    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
589        match self {
590            Self::Inline { alias } => {
591                f.write_str("INLINE");
592                if let Some(alias) = alias {
593                    f.write_str(" AS ");
594                    f.write_node(alias);
595                }
596            }
597        }
598    }
599}
600impl_display!(SourceErrorPolicy);
601
602#[derive(Debug, Clone, PartialEq, Eq, Hash)]
603pub enum SourceEnvelope {
604    None,
605    Debezium,
606    Upsert {
607        value_decode_err_policy: Vec<SourceErrorPolicy>,
608    },
609    CdcV2,
610}
611
612impl SourceEnvelope {
613    /// `true` iff Materialize is expected to crash or exhibit UB
614    /// when attempting to ingest data starting at an offset other than zero.
615    pub fn requires_all_input(&self) -> bool {
616        match self {
617            SourceEnvelope::None => false,
618            SourceEnvelope::Debezium => false,
619            SourceEnvelope::Upsert { .. } => false,
620            SourceEnvelope::CdcV2 => true,
621        }
622    }
623}
624
625impl AstDisplay for SourceEnvelope {
626    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
627        match self {
628            Self::None => {
629                // this is unreachable as long as the default is None, but include it in case we ever change that
630                f.write_str("NONE");
631            }
632            Self::Debezium => {
633                f.write_str("DEBEZIUM");
634            }
635            Self::Upsert {
636                value_decode_err_policy,
637            } => {
638                if value_decode_err_policy.is_empty() {
639                    f.write_str("UPSERT");
640                } else {
641                    f.write_str("UPSERT (VALUE DECODING ERRORS = (");
642                    f.write_node(&display::comma_separated(value_decode_err_policy));
643                    f.write_str("))")
644                }
645            }
646            Self::CdcV2 => {
647                f.write_str("MATERIALIZE");
648            }
649        }
650    }
651}
652impl_display!(SourceEnvelope);
653
654#[derive(Debug, Clone, PartialEq, Eq, Hash)]
655pub enum SinkEnvelope {
656    Debezium,
657    Upsert,
658}
659
660impl AstDisplay for SinkEnvelope {
661    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
662        match self {
663            Self::Upsert => {
664                f.write_str("UPSERT");
665            }
666            Self::Debezium => {
667                f.write_str("DEBEZIUM");
668            }
669        }
670    }
671}
672impl_display!(SinkEnvelope);
673
674#[derive(Debug, Clone, PartialEq, Eq, Hash)]
675pub enum IcebergSinkMode {
676    Upsert,
677    Append,
678}
679
680impl AstDisplay for IcebergSinkMode {
681    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
682        match self {
683            Self::Upsert => {
684                f.write_str("UPSERT");
685            }
686            Self::Append => {
687                f.write_str("APPEND");
688            }
689        }
690    }
691}
692impl_display!(IcebergSinkMode);
693
694#[derive(Debug, Clone, PartialEq, Eq, Hash)]
695pub enum SubscribeOutput<T: AstInfo> {
696    Diffs,
697    WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
698    EnvelopeUpsert { key_columns: Vec<Ident> },
699    EnvelopeDebezium { key_columns: Vec<Ident> },
700}
701
702impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
703    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
704        match self {
705            Self::Diffs => {}
706            Self::WithinTimestampOrderBy { order_by } => {
707                f.write_str(" WITHIN TIMESTAMP ORDER BY ");
708                f.write_node(&display::comma_separated(order_by));
709            }
710            Self::EnvelopeUpsert { key_columns } => {
711                f.write_str(" ENVELOPE UPSERT (KEY (");
712                f.write_node(&display::comma_separated(key_columns));
713                f.write_str("))");
714            }
715            Self::EnvelopeDebezium { key_columns } => {
716                f.write_str(" ENVELOPE DEBEZIUM (KEY (");
717                f.write_node(&display::comma_separated(key_columns));
718                f.write_str("))");
719            }
720        }
721    }
722}
723impl_display_t!(SubscribeOutput);
724
725impl<T: AstInfo> AstDisplay for Format<T> {
726    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
727        match self {
728            Self::Bytes => f.write_str("BYTES"),
729            Self::Avro(inner) => {
730                f.write_str("AVRO ");
731                f.write_node(inner);
732            }
733            Self::Protobuf(inner) => {
734                f.write_str("PROTOBUF ");
735                f.write_node(inner);
736            }
737            Self::Regex(regex) => {
738                f.write_str("REGEX '");
739                f.write_node(&display::escape_single_quote_string(regex));
740                f.write_str("'");
741            }
742            Self::Csv { columns, delimiter } => {
743                f.write_str("CSV WITH ");
744                f.write_node(columns);
745
746                if *delimiter != ',' {
747                    f.write_str(" DELIMITED BY '");
748                    f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
749                    f.write_str("'");
750                }
751            }
752            Self::Json { array } => {
753                f.write_str("JSON");
754                if *array {
755                    f.write_str(" ARRAY");
756                }
757            }
758            Self::Text => f.write_str("TEXT"),
759        }
760    }
761}
762impl_display_t!(Format);
763
764// All connection options are bundled together to allow us to parse `ALTER
765// CONNECTION` without specifying the type of connection we're altering. Il faut
766// souffrir pour être belle.
767#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
768pub enum ConnectionOptionName {
769    AccessKeyId,
770    AssumeRoleArn,
771    AssumeRoleSessionName,
772    AvailabilityZones,
773    AwsConnection,
774    AwsPrivatelink,
775    Broker,
776    Brokers,
777    Credential,
778    Database,
779    Endpoint,
780    GcpConnection,
781    Host,
782    Password,
783    Port,
784    ProgressTopic,
785    ProgressTopicReplicationFactor,
786    PublicKey1,
787    PublicKey2,
788    Region,
789    Registry,
790    SaslMechanisms,
791    SaslPassword,
792    SaslUsername,
793    Scope,
794    SecretAccessKey,
795    SecurityProtocol,
796    ServiceAccountKey,
797    ServiceName,
798    SshTunnel,
799    SslCertificate,
800    SslCertificateAuthority,
801    SslKey,
802    SslMode,
803    SessionToken,
804    CatalogType,
805    Url,
806    User,
807    Warehouse,
808}
809
810impl AstDisplay for ConnectionOptionName {
811    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
812        f.write_str(match self {
813            ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
814            ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
815            ConnectionOptionName::AwsConnection => "AWS CONNECTION",
816            ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
817            ConnectionOptionName::Broker => "BROKER",
818            ConnectionOptionName::Brokers => "BROKERS",
819            ConnectionOptionName::Credential => "CREDENTIAL",
820            ConnectionOptionName::Database => "DATABASE",
821            ConnectionOptionName::Endpoint => "ENDPOINT",
822            ConnectionOptionName::GcpConnection => "GCP CONNECTION",
823            ConnectionOptionName::Host => "HOST",
824            ConnectionOptionName::Password => "PASSWORD",
825            ConnectionOptionName::Port => "PORT",
826            ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
827            ConnectionOptionName::ProgressTopicReplicationFactor => {
828                "PROGRESS TOPIC REPLICATION FACTOR"
829            }
830            ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
831            ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
832            ConnectionOptionName::Region => "REGION",
833            ConnectionOptionName::Registry => "REGISTRY",
834            ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
835            ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
836            ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
837            ConnectionOptionName::SaslPassword => "SASL PASSWORD",
838            ConnectionOptionName::SaslUsername => "SASL USERNAME",
839            ConnectionOptionName::Scope => "SCOPE",
840            ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
841            ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
842            ConnectionOptionName::ServiceAccountKey => "SERVICE ACCOUNT KEY",
843            ConnectionOptionName::ServiceName => "SERVICE NAME",
844            ConnectionOptionName::SshTunnel => "SSH TUNNEL",
845            ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
846            ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
847            ConnectionOptionName::SslKey => "SSL KEY",
848            ConnectionOptionName::SslMode => "SSL MODE",
849            ConnectionOptionName::SessionToken => "SESSION TOKEN",
850            ConnectionOptionName::CatalogType => "CATALOG TYPE",
851            ConnectionOptionName::Url => "URL",
852            ConnectionOptionName::User => "USER",
853            ConnectionOptionName::Warehouse => "WAREHOUSE",
854        })
855    }
856}
857impl_display!(ConnectionOptionName);
858
859impl WithOptionName for ConnectionOptionName {
860    /// # WARNING
861    ///
862    /// Whenever implementing this trait consider very carefully whether or not
863    /// this value could contain sensitive user data. If you're uncertain, err
864    /// on the conservative side and return `true`.
865    fn redact_value(&self) -> bool {
866        match self {
867            ConnectionOptionName::AccessKeyId
868            | ConnectionOptionName::AvailabilityZones
869            | ConnectionOptionName::AwsConnection
870            | ConnectionOptionName::AwsPrivatelink
871            | ConnectionOptionName::Broker
872            | ConnectionOptionName::Brokers
873            | ConnectionOptionName::Credential
874            | ConnectionOptionName::Database
875            | ConnectionOptionName::Endpoint
876            | ConnectionOptionName::GcpConnection
877            | ConnectionOptionName::Host
878            | ConnectionOptionName::Password
879            | ConnectionOptionName::Port
880            | ConnectionOptionName::ProgressTopic
881            | ConnectionOptionName::ProgressTopicReplicationFactor
882            | ConnectionOptionName::PublicKey1
883            | ConnectionOptionName::PublicKey2
884            | ConnectionOptionName::Region
885            | ConnectionOptionName::Registry
886            | ConnectionOptionName::AssumeRoleArn
887            | ConnectionOptionName::AssumeRoleSessionName
888            | ConnectionOptionName::SaslMechanisms
889            | ConnectionOptionName::SaslPassword
890            | ConnectionOptionName::SaslUsername
891            | ConnectionOptionName::Scope
892            | ConnectionOptionName::SecurityProtocol
893            | ConnectionOptionName::SecretAccessKey
894            | ConnectionOptionName::ServiceAccountKey
895            | ConnectionOptionName::ServiceName
896            | ConnectionOptionName::SshTunnel
897            | ConnectionOptionName::SslCertificate
898            | ConnectionOptionName::SslCertificateAuthority
899            | ConnectionOptionName::SslKey
900            | ConnectionOptionName::SslMode
901            | ConnectionOptionName::SessionToken
902            | ConnectionOptionName::CatalogType
903            | ConnectionOptionName::Url
904            | ConnectionOptionName::User
905            | ConnectionOptionName::Warehouse => false,
906        }
907    }
908}
909
910#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
911/// An option in a `CREATE CONNECTION`.
912pub struct ConnectionOption<T: AstInfo> {
913    pub name: ConnectionOptionName,
914    pub value: Option<WithOptionValue<T>>,
915}
916impl_display_for_with_option!(ConnectionOption);
917impl_display_t!(ConnectionOption);
918
919#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
920pub enum CreateConnectionType {
921    Aws,
922    AwsPrivatelink,
923    GlueSchemaRegistry,
924    Gcp,
925    Kafka,
926    Csr,
927    Postgres,
928    Ssh,
929    SqlServer,
930    MySql,
931    IcebergCatalog,
932}
933
934impl CreateConnectionType {
935    pub fn as_str(&self) -> &'static str {
936        match self {
937            Self::Kafka => "kafka",
938            Self::Csr => "confluent-schema-registry",
939            Self::Postgres => "postgres",
940            Self::Aws => "aws",
941            Self::AwsPrivatelink => "aws-privatelink",
942            Self::GlueSchemaRegistry => "glue-schema-registry",
943            Self::Gcp => "gcp",
944            Self::Ssh => "ssh-tunnel",
945            Self::MySql => "mysql",
946            Self::SqlServer => "sql-server",
947            Self::IcebergCatalog => "iceberg-catalog",
948        }
949    }
950}
951
952impl AstDisplay for CreateConnectionType {
953    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
954        match self {
955            Self::Kafka => {
956                f.write_str("KAFKA");
957            }
958            Self::Csr => {
959                f.write_str("CONFLUENT SCHEMA REGISTRY");
960            }
961            Self::Postgres => {
962                f.write_str("POSTGRES");
963            }
964            Self::Aws => {
965                f.write_str("AWS");
966            }
967            Self::AwsPrivatelink => {
968                f.write_str("AWS PRIVATELINK");
969            }
970            Self::GlueSchemaRegistry => {
971                f.write_str("AWS GLUE SCHEMA REGISTRY");
972            }
973            Self::Gcp => {
974                f.write_str("GCP");
975            }
976            Self::Ssh => {
977                f.write_str("SSH TUNNEL");
978            }
979            Self::SqlServer => {
980                f.write_str("SQL SERVER");
981            }
982            Self::MySql => {
983                f.write_str("MYSQL");
984            }
985            Self::IcebergCatalog => {
986                f.write_str("ICEBERG CATALOG");
987            }
988        }
989    }
990}
991impl_display!(CreateConnectionType);
992
993#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
994pub enum CreateConnectionOptionName {
995    Validate,
996}
997
998impl AstDisplay for CreateConnectionOptionName {
999    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1000        f.write_str(match self {
1001            CreateConnectionOptionName::Validate => "VALIDATE",
1002        })
1003    }
1004}
1005impl_display!(CreateConnectionOptionName);
1006
1007impl WithOptionName for CreateConnectionOptionName {
1008    /// # WARNING
1009    ///
1010    /// Whenever implementing this trait consider very carefully whether or not
1011    /// this value could contain sensitive user data. If you're uncertain, err
1012    /// on the conservative side and return `true`.
1013    fn redact_value(&self) -> bool {
1014        match self {
1015            CreateConnectionOptionName::Validate => false,
1016        }
1017    }
1018}
1019
1020#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1021/// An option in a `CREATE CONNECTION...` statement.
1022pub struct CreateConnectionOption<T: AstInfo> {
1023    pub name: CreateConnectionOptionName,
1024    pub value: Option<WithOptionValue<T>>,
1025}
1026impl_display_for_with_option!(CreateConnectionOption);
1027impl_display_t!(CreateConnectionOption);
1028
1029#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1030pub enum KafkaSourceConfigOptionName {
1031    GroupIdPrefix,
1032    Topic,
1033    TopicMetadataRefreshInterval,
1034    StartTimestamp,
1035    StartOffset,
1036}
1037
1038impl AstDisplay for KafkaSourceConfigOptionName {
1039    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1040        f.write_str(match self {
1041            KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
1042            KafkaSourceConfigOptionName::Topic => "TOPIC",
1043            KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
1044                "TOPIC METADATA REFRESH INTERVAL"
1045            }
1046            KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
1047            KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
1048        })
1049    }
1050}
1051impl_display!(KafkaSourceConfigOptionName);
1052
1053impl WithOptionName for KafkaSourceConfigOptionName {
1054    /// # WARNING
1055    ///
1056    /// Whenever implementing this trait consider very carefully whether or not
1057    /// this value could contain sensitive user data. If you're uncertain, err
1058    /// on the conservative side and return `true`.
1059    fn redact_value(&self) -> bool {
1060        match self {
1061            KafkaSourceConfigOptionName::GroupIdPrefix
1062            | KafkaSourceConfigOptionName::Topic
1063            | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1064            | KafkaSourceConfigOptionName::StartOffset
1065            | KafkaSourceConfigOptionName::StartTimestamp => false,
1066        }
1067    }
1068}
1069
1070#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1071pub struct KafkaSourceConfigOption<T: AstInfo> {
1072    pub name: KafkaSourceConfigOptionName,
1073    pub value: Option<WithOptionValue<T>>,
1074}
1075impl_display_for_with_option!(KafkaSourceConfigOption);
1076impl_display_t!(KafkaSourceConfigOption);
1077
1078#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1079pub enum KafkaSinkConfigOptionName {
1080    CompressionType,
1081    PartitionBy,
1082    ProgressGroupIdPrefix,
1083    Topic,
1084    TransactionalIdPrefix,
1085    LegacyIds,
1086    TopicConfig,
1087    TopicMetadataRefreshInterval,
1088    TopicPartitionCount,
1089    TopicReplicationFactor,
1090}
1091
1092impl AstDisplay for KafkaSinkConfigOptionName {
1093    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1094        f.write_str(match self {
1095            KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1096            KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1097            KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1098            KafkaSinkConfigOptionName::Topic => "TOPIC",
1099            KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1100            KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1101            KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1102            KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1103                "TOPIC METADATA REFRESH INTERVAL"
1104            }
1105            KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1106            KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1107        })
1108    }
1109}
1110impl_display!(KafkaSinkConfigOptionName);
1111
1112impl WithOptionName for KafkaSinkConfigOptionName {
1113    /// # WARNING
1114    ///
1115    /// Whenever implementing this trait consider very carefully whether or not
1116    /// this value could contain sensitive user data. If you're uncertain, err
1117    /// on the conservative side and return `true`.
1118    fn redact_value(&self) -> bool {
1119        match self {
1120            KafkaSinkConfigOptionName::CompressionType
1121            | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1122            | KafkaSinkConfigOptionName::Topic
1123            | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1124            | KafkaSinkConfigOptionName::TransactionalIdPrefix
1125            | KafkaSinkConfigOptionName::LegacyIds
1126            | KafkaSinkConfigOptionName::TopicConfig
1127            | KafkaSinkConfigOptionName::TopicPartitionCount
1128            | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1129            KafkaSinkConfigOptionName::PartitionBy => true,
1130        }
1131    }
1132}
1133
1134#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1135pub struct KafkaSinkConfigOption<T: AstInfo> {
1136    pub name: KafkaSinkConfigOptionName,
1137    pub value: Option<WithOptionValue<T>>,
1138}
1139impl_display_for_with_option!(KafkaSinkConfigOption);
1140impl_display_t!(KafkaSinkConfigOption);
1141
1142#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1143pub enum IcebergSinkConfigOptionName {
1144    Namespace,
1145    Table,
1146}
1147
1148impl AstDisplay for IcebergSinkConfigOptionName {
1149    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1150        f.write_str(match self {
1151            IcebergSinkConfigOptionName::Namespace => "NAMESPACE",
1152            IcebergSinkConfigOptionName::Table => "TABLE",
1153        })
1154    }
1155}
1156impl_display!(IcebergSinkConfigOptionName);
1157
1158impl WithOptionName for IcebergSinkConfigOptionName {
1159    /// # WARNING
1160    ///
1161    /// Whenever implementing this trait consider very carefully whether or not
1162    /// this value could contain sensitive user data. If you're uncertain, err
1163    /// on the conservative side and return `true`.
1164    fn redact_value(&self) -> bool {
1165        match self {
1166            IcebergSinkConfigOptionName::Namespace | IcebergSinkConfigOptionName::Table => false,
1167        }
1168    }
1169}
1170
1171#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1172pub struct IcebergSinkConfigOption<T: AstInfo> {
1173    pub name: IcebergSinkConfigOptionName,
1174    pub value: Option<WithOptionValue<T>>,
1175}
1176impl_display_for_with_option!(IcebergSinkConfigOption);
1177impl_display_t!(IcebergSinkConfigOption);
1178
1179#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1180pub enum PgConfigOptionName {
1181    /// Hex encoded string of binary serialization of
1182    /// `mz_storage_types::sources::postgres::PostgresSourcePublicationDetails`
1183    Details,
1184    /// The name of the publication to sync
1185    Publication,
1186    /// Columns whose types you want to unconditionally format as text
1187    /// NOTE(roshan): This value is kept around to allow round-tripping a
1188    /// `CREATE SOURCE` statement while we still allow creating implicit
1189    /// subsources from `CREATE SOURCE`, but will be removed once
1190    /// fully deprecating that feature and forcing users to use explicit
1191    /// `CREATE TABLE .. FROM SOURCE` statements
1192    TextColumns,
1193    /// Columns you want to exclude
1194    /// NOTE: This value is kept around to allow round-tripping a
1195    /// `CREATE SOURCE` statement while we still allow creating implicit
1196    /// subsources from `CREATE SOURCE`, but will be removed once
1197    /// fully deprecating that feature and forcing users to use explicit
1198    /// `CREATE TABLE .. FROM SOURCE` statements
1199    ExcludeColumns,
1200}
1201
1202impl AstDisplay for PgConfigOptionName {
1203    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1204        f.write_str(match self {
1205            PgConfigOptionName::Details => "DETAILS",
1206            PgConfigOptionName::Publication => "PUBLICATION",
1207            PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1208            PgConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1209        })
1210    }
1211}
1212impl_display!(PgConfigOptionName);
1213
1214impl WithOptionName for PgConfigOptionName {
1215    /// # WARNING
1216    ///
1217    /// Whenever implementing this trait consider very carefully whether or not
1218    /// this value could contain sensitive user data. If you're uncertain, err
1219    /// on the conservative side and return `true`.
1220    fn redact_value(&self) -> bool {
1221        match self {
1222            PgConfigOptionName::Details
1223            | PgConfigOptionName::Publication
1224            | PgConfigOptionName::TextColumns
1225            | PgConfigOptionName::ExcludeColumns => false,
1226        }
1227    }
1228}
1229
1230#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1231/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1232pub struct PgConfigOption<T: AstInfo> {
1233    pub name: PgConfigOptionName,
1234    pub value: Option<WithOptionValue<T>>,
1235}
1236impl_display_for_with_option!(PgConfigOption);
1237impl_display_t!(PgConfigOption);
1238
1239#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1240pub enum MySqlConfigOptionName {
1241    /// Hex encoded string of binary serialization of
1242    /// `mz_storage_types::sources::mysql::MySqlSourceDetails`
1243    Details,
1244    /// Columns whose types you want to unconditionally format as text
1245    /// NOTE(roshan): This value is kept around to allow round-tripping a
1246    /// `CREATE SOURCE` statement while we still allow creating implicit
1247    /// subsources from `CREATE SOURCE`, but will be removed once
1248    /// fully deprecating that feature and forcing users to use explicit
1249    /// `CREATE TABLE .. FROM SOURCE` statements
1250    TextColumns,
1251    /// Columns you want to exclude
1252    /// NOTE(roshan): This value is kept around to allow round-tripping a
1253    /// `CREATE SOURCE` statement while we still allow creating implicit
1254    /// subsources from `CREATE SOURCE`, but will be removed once
1255    /// fully deprecating that feature and forcing users to use explicit
1256    /// `CREATE TABLE .. FROM SOURCE` statements
1257    ExcludeColumns,
1258}
1259
1260impl AstDisplay for MySqlConfigOptionName {
1261    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1262        f.write_str(match self {
1263            MySqlConfigOptionName::Details => "DETAILS",
1264            MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1265            MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1266        })
1267    }
1268}
1269impl_display!(MySqlConfigOptionName);
1270
1271impl WithOptionName for MySqlConfigOptionName {
1272    /// # WARNING
1273    ///
1274    /// Whenever implementing this trait consider very carefully whether or not
1275    /// this value could contain sensitive user data. If you're uncertain, err
1276    /// on the conservative side and return `true`.
1277    fn redact_value(&self) -> bool {
1278        match self {
1279            MySqlConfigOptionName::Details
1280            | MySqlConfigOptionName::TextColumns
1281            | MySqlConfigOptionName::ExcludeColumns => false,
1282        }
1283    }
1284}
1285
1286#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1287/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1288pub struct MySqlConfigOption<T: AstInfo> {
1289    pub name: MySqlConfigOptionName,
1290    pub value: Option<WithOptionValue<T>>,
1291}
1292impl_display_for_with_option!(MySqlConfigOption);
1293impl_display_t!(MySqlConfigOption);
1294
1295#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1296pub enum SqlServerConfigOptionName {
1297    /// Hex encoded string of binary serialization of
1298    /// `mz_storage_types::sources::sql_server::SqlServerSourceDetails`.
1299    Details,
1300    /// Columns whose types you want to unconditionally format as text.
1301    ///
1302    /// NOTE(roshan): This value is kept around to allow round-tripping a
1303    /// `CREATE SOURCE` statement while we still allow creating implicit
1304    /// subsources from `CREATE SOURCE`, but will be removed once
1305    /// fully deprecating that feature and forcing users to use explicit
1306    /// `CREATE TABLE .. FROM SOURCE` statements
1307    TextColumns,
1308    /// Columns you want to exclude.
1309    ///
1310    /// NOTE(roshan): This value is kept around to allow round-tripping a
1311    /// `CREATE SOURCE` statement while we still allow creating implicit
1312    /// subsources from `CREATE SOURCE`, but will be removed once
1313    /// fully deprecating that feature and forcing users to use explicit
1314    /// `CREATE TABLE .. FROM SOURCE` statements
1315    ExcludeColumns,
1316}
1317
1318impl AstDisplay for SqlServerConfigOptionName {
1319    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1320        f.write_str(match self {
1321            SqlServerConfigOptionName::Details => "DETAILS",
1322            SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1323            SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1324        })
1325    }
1326}
1327impl_display!(SqlServerConfigOptionName);
1328
1329impl WithOptionName for SqlServerConfigOptionName {
1330    /// # WARNING
1331    ///
1332    /// Whenever implementing this trait consider very carefully whether or not
1333    /// this value could contain sensitive user data. If you're uncertain, err
1334    /// on the conservative side and return `true`.
1335    fn redact_value(&self) -> bool {
1336        match self {
1337            SqlServerConfigOptionName::Details
1338            | SqlServerConfigOptionName::TextColumns
1339            | SqlServerConfigOptionName::ExcludeColumns => false,
1340        }
1341    }
1342}
1343
1344#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1345/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1346pub struct SqlServerConfigOption<T: AstInfo> {
1347    pub name: SqlServerConfigOptionName,
1348    pub value: Option<WithOptionValue<T>>,
1349}
1350impl_display_for_with_option!(SqlServerConfigOption);
1351impl_display_t!(SqlServerConfigOption);
1352
1353#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1354pub enum CreateSourceConnection<T: AstInfo> {
1355    Kafka {
1356        connection: T::ItemName,
1357        options: Vec<KafkaSourceConfigOption<T>>,
1358    },
1359    Postgres {
1360        connection: T::ItemName,
1361        options: Vec<PgConfigOption<T>>,
1362    },
1363    SqlServer {
1364        connection: T::ItemName,
1365        options: Vec<SqlServerConfigOption<T>>,
1366    },
1367    MySql {
1368        connection: T::ItemName,
1369        options: Vec<MySqlConfigOption<T>>,
1370    },
1371    LoadGenerator {
1372        generator: LoadGenerator,
1373        options: Vec<LoadGeneratorOption<T>>,
1374    },
1375}
1376
1377impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1378    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1379        match self {
1380            CreateSourceConnection::Kafka {
1381                connection,
1382                options,
1383            } => {
1384                f.write_str("KAFKA CONNECTION ");
1385                f.write_node(connection);
1386                if !options.is_empty() {
1387                    f.write_str(" (");
1388                    f.write_node(&display::comma_separated(options));
1389                    f.write_str(")");
1390                }
1391            }
1392            CreateSourceConnection::Postgres {
1393                connection,
1394                options,
1395            } => {
1396                f.write_str("POSTGRES CONNECTION ");
1397                f.write_node(connection);
1398                if !options.is_empty() {
1399                    f.write_str(" (");
1400                    f.write_node(&display::comma_separated(options));
1401                    f.write_str(")");
1402                }
1403            }
1404            CreateSourceConnection::SqlServer {
1405                connection,
1406                options,
1407            } => {
1408                f.write_str("SQL SERVER CONNECTION ");
1409                f.write_node(connection);
1410                if !options.is_empty() {
1411                    f.write_str(" (");
1412                    f.write_node(&display::comma_separated(options));
1413                    f.write_str(")");
1414                }
1415            }
1416            CreateSourceConnection::MySql {
1417                connection,
1418                options,
1419            } => {
1420                f.write_str("MYSQL CONNECTION ");
1421                f.write_node(connection);
1422                if !options.is_empty() {
1423                    f.write_str(" (");
1424                    f.write_node(&display::comma_separated(options));
1425                    f.write_str(")");
1426                }
1427            }
1428            CreateSourceConnection::LoadGenerator { generator, options } => {
1429                f.write_str("LOAD GENERATOR ");
1430                f.write_node(generator);
1431                if !options.is_empty() {
1432                    f.write_str(" (");
1433                    f.write_node(&display::comma_separated(options));
1434                    f.write_str(")");
1435                }
1436            }
1437        }
1438    }
1439}
1440impl_display_t!(CreateSourceConnection);
1441
1442#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1443pub enum LoadGenerator {
1444    Clock,
1445    Counter,
1446    Marketing,
1447    Auction,
1448    Datums,
1449    Tpch,
1450    KeyValue,
1451}
1452
1453impl AstDisplay for LoadGenerator {
1454    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1455        match self {
1456            Self::Counter => f.write_str("COUNTER"),
1457            Self::Clock => f.write_str("CLOCK"),
1458            Self::Marketing => f.write_str("MARKETING"),
1459            Self::Auction => f.write_str("AUCTION"),
1460            Self::Datums => f.write_str("DATUMS"),
1461            Self::Tpch => f.write_str("TPCH"),
1462            Self::KeyValue => f.write_str("KEY VALUE"),
1463        }
1464    }
1465}
1466impl_display!(LoadGenerator);
1467
1468impl LoadGenerator {
1469    /// Corresponds with the same mapping on the `LoadGenerator` enum defined in
1470    /// src/storage-types/src/sources/load_generator.rs, but re-defined here for
1471    /// cases where we only have the AST representation. This can be removed once
1472    /// the `ast_rewrite_sources_to_tables` migration is removed.
1473    pub fn schema_name(&self) -> &'static str {
1474        match self {
1475            LoadGenerator::Counter => "counter",
1476            LoadGenerator::Clock => "clock",
1477            LoadGenerator::Marketing => "marketing",
1478            LoadGenerator::Auction => "auction",
1479            LoadGenerator::Datums => "datums",
1480            LoadGenerator::Tpch => "tpch",
1481            LoadGenerator::KeyValue => "key_value",
1482        }
1483    }
1484}
1485
1486#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1487pub enum LoadGeneratorOptionName {
1488    ScaleFactor,
1489    TickInterval,
1490    AsOf,
1491    UpTo,
1492    MaxCardinality,
1493    Keys,
1494    SnapshotRounds,
1495    TransactionalSnapshot,
1496    ValueSize,
1497    Seed,
1498    Partitions,
1499    BatchSize,
1500}
1501
1502impl AstDisplay for LoadGeneratorOptionName {
1503    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1504        f.write_str(match self {
1505            LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1506            LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1507            LoadGeneratorOptionName::AsOf => "AS OF",
1508            LoadGeneratorOptionName::UpTo => "UP TO",
1509            LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1510            LoadGeneratorOptionName::Keys => "KEYS",
1511            LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1512            LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1513            LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1514            LoadGeneratorOptionName::Seed => "SEED",
1515            LoadGeneratorOptionName::Partitions => "PARTITIONS",
1516            LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1517        })
1518    }
1519}
1520impl_display!(LoadGeneratorOptionName);
1521
1522impl WithOptionName for LoadGeneratorOptionName {
1523    /// # WARNING
1524    ///
1525    /// Whenever implementing this trait consider very carefully whether or not
1526    /// this value could contain sensitive user data. If you're uncertain, err
1527    /// on the conservative side and return `true`.
1528    fn redact_value(&self) -> bool {
1529        match self {
1530            LoadGeneratorOptionName::ScaleFactor
1531            | LoadGeneratorOptionName::TickInterval
1532            | LoadGeneratorOptionName::AsOf
1533            | LoadGeneratorOptionName::UpTo
1534            | LoadGeneratorOptionName::MaxCardinality
1535            | LoadGeneratorOptionName::Keys
1536            | LoadGeneratorOptionName::SnapshotRounds
1537            | LoadGeneratorOptionName::TransactionalSnapshot
1538            | LoadGeneratorOptionName::ValueSize
1539            | LoadGeneratorOptionName::Partitions
1540            | LoadGeneratorOptionName::BatchSize
1541            | LoadGeneratorOptionName::Seed => false,
1542        }
1543    }
1544}
1545
1546#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1547/// An option in a `CREATE CONNECTION...SSH`.
1548pub struct LoadGeneratorOption<T: AstInfo> {
1549    pub name: LoadGeneratorOptionName,
1550    pub value: Option<WithOptionValue<T>>,
1551}
1552impl_display_for_with_option!(LoadGeneratorOption);
1553impl_display_t!(LoadGeneratorOption);
1554
1555#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1556pub enum CreateSinkConnection<T: AstInfo> {
1557    Kafka {
1558        connection: T::ItemName,
1559        options: Vec<KafkaSinkConfigOption<T>>,
1560        key: Option<SinkKey>,
1561        headers: Option<Ident>,
1562    },
1563    Iceberg {
1564        catalog_connection: T::ItemName,
1565
1566        /// AWS creds for the storage layer.
1567        aws_connection: Option<T::ItemName>,
1568
1569        key: Option<SinkKey>,
1570        options: Vec<IcebergSinkConfigOption<T>>,
1571    },
1572}
1573
1574impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1575    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1576        match self {
1577            CreateSinkConnection::Kafka {
1578                connection,
1579                options,
1580                key,
1581                headers,
1582            } => {
1583                f.write_str("KAFKA CONNECTION ");
1584                f.write_node(connection);
1585                if !options.is_empty() {
1586                    f.write_str(" (");
1587                    f.write_node(&display::comma_separated(options));
1588                    f.write_str(")");
1589                }
1590                if let Some(key) = key.as_ref() {
1591                    f.write_str(" ");
1592                    f.write_node(key);
1593                }
1594                if let Some(headers) = headers {
1595                    f.write_str(" HEADERS ");
1596                    f.write_node(headers);
1597                }
1598            }
1599            CreateSinkConnection::Iceberg {
1600                catalog_connection,
1601                aws_connection,
1602                key,
1603                options,
1604            } => {
1605                f.write_str("ICEBERG CATALOG CONNECTION ");
1606                f.write_node(catalog_connection);
1607                if !options.is_empty() {
1608                    f.write_str(" (");
1609                    f.write_node(&display::comma_separated(options));
1610                    f.write_str(")");
1611                }
1612                if let Some(aws_connection) = aws_connection {
1613                    f.write_str(" USING AWS CONNECTION ");
1614                    f.write_node(aws_connection);
1615                }
1616                if let Some(key) = key.as_ref() {
1617                    f.write_str(" ");
1618                    f.write_node(key);
1619                }
1620            }
1621        }
1622    }
1623}
1624impl_display_t!(CreateSinkConnection);
1625
1626#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1627pub struct SinkKey {
1628    pub key_columns: Vec<Ident>,
1629    pub not_enforced: bool,
1630}
1631
1632impl AstDisplay for SinkKey {
1633    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1634        f.write_str("KEY (");
1635        f.write_node(&display::comma_separated(&self.key_columns));
1636        f.write_str(")");
1637        if self.not_enforced {
1638            f.write_str(" NOT ENFORCED");
1639        }
1640    }
1641}
1642
1643/// A table-level constraint, specified in a `CREATE TABLE` or an
1644/// `ALTER TABLE ADD <constraint>` statement.
1645#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1646pub enum TableConstraint<T: AstInfo> {
1647    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE (NULLS NOT DISTINCT)? } (<columns>)`
1648    Unique {
1649        name: Option<Ident>,
1650        columns: Vec<Ident>,
1651        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
1652        is_primary: bool,
1653        // Where this constraint treats each NULL value as distinct; only available on `UNIQUE`
1654        // constraints.
1655        nulls_not_distinct: bool,
1656    },
1657    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
1658    /// REFERENCES <foreign_table> (<referred_columns>)`)
1659    ForeignKey {
1660        name: Option<Ident>,
1661        columns: Vec<Ident>,
1662        foreign_table: T::ItemName,
1663        referred_columns: Vec<Ident>,
1664    },
1665    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
1666    Check {
1667        name: Option<Ident>,
1668        expr: Box<Expr<T>>,
1669    },
1670}
1671
1672impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1673    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1674        match self {
1675            TableConstraint::Unique {
1676                name,
1677                columns,
1678                is_primary,
1679                nulls_not_distinct,
1680            } => {
1681                f.write_node(&display_constraint_name(name));
1682                if *is_primary {
1683                    f.write_str("PRIMARY KEY ");
1684                } else {
1685                    f.write_str("UNIQUE ");
1686                    if *nulls_not_distinct {
1687                        f.write_str("NULLS NOT DISTINCT ");
1688                    }
1689                }
1690                f.write_str("(");
1691                f.write_node(&display::comma_separated(columns));
1692                f.write_str(")");
1693            }
1694            TableConstraint::ForeignKey {
1695                name,
1696                columns,
1697                foreign_table,
1698                referred_columns,
1699            } => {
1700                f.write_node(&display_constraint_name(name));
1701                f.write_str("FOREIGN KEY (");
1702                f.write_node(&display::comma_separated(columns));
1703                f.write_str(") REFERENCES ");
1704                f.write_node(foreign_table);
1705                f.write_str("(");
1706                f.write_node(&display::comma_separated(referred_columns));
1707                f.write_str(")");
1708            }
1709            TableConstraint::Check { name, expr } => {
1710                f.write_node(&display_constraint_name(name));
1711                f.write_str("CHECK (");
1712                f.write_node(&expr);
1713                f.write_str(")");
1714            }
1715        }
1716    }
1717}
1718impl_display_t!(TableConstraint);
1719
1720/// A key constraint, specified in a `CREATE SOURCE`.
1721#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1722pub enum KeyConstraint {
1723    // PRIMARY KEY (<columns>) NOT ENFORCED
1724    PrimaryKeyNotEnforced { columns: Vec<Ident> },
1725}
1726
1727impl AstDisplay for KeyConstraint {
1728    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1729        match self {
1730            KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1731                f.write_str("PRIMARY KEY ");
1732                f.write_str("(");
1733                f.write_node(&display::comma_separated(columns));
1734                f.write_str(") ");
1735                f.write_str("NOT ENFORCED");
1736            }
1737        }
1738    }
1739}
1740impl_display!(KeyConstraint);
1741
1742#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1743pub enum CreateSourceOptionName {
1744    TimestampInterval,
1745    RetainHistory,
1746}
1747
1748impl AstDisplay for CreateSourceOptionName {
1749    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1750        f.write_str(match self {
1751            CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1752            CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1753        })
1754    }
1755}
1756impl_display!(CreateSourceOptionName);
1757
1758impl WithOptionName for CreateSourceOptionName {
1759    /// # WARNING
1760    ///
1761    /// Whenever implementing this trait consider very carefully whether or not
1762    /// this value could contain sensitive user data. If you're uncertain, err
1763    /// on the conservative side and return `true`.
1764    fn redact_value(&self) -> bool {
1765        match self {
1766            CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1767                false
1768            }
1769        }
1770    }
1771}
1772
1773#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1774/// An option in a `CREATE SOURCE...` statement.
1775pub struct CreateSourceOption<T: AstInfo> {
1776    pub name: CreateSourceOptionName,
1777    pub value: Option<WithOptionValue<T>>,
1778}
1779impl_display_for_with_option!(CreateSourceOption);
1780impl_display_t!(CreateSourceOption);
1781
1782/// SQL column definition
1783#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1784pub struct ColumnDef<T: AstInfo> {
1785    pub name: Ident,
1786    pub data_type: T::DataType,
1787    pub collation: Option<UnresolvedItemName>,
1788    pub options: Vec<ColumnOptionDef<T>>,
1789}
1790
1791impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1792    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1793        f.write_node(&self.name);
1794        f.write_str(" ");
1795        f.write_node(&self.data_type);
1796        if let Some(collation) = &self.collation {
1797            f.write_str(" COLLATE ");
1798            f.write_node(collation);
1799        }
1800        for option in &self.options {
1801            f.write_str(" ");
1802            f.write_node(option);
1803        }
1804    }
1805}
1806impl_display_t!(ColumnDef);
1807
1808/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
1809///
1810/// Note that implementations are substantially more permissive than the ANSI
1811/// specification on what order column options can be presented in, and whether
1812/// they are allowed to be named. The specification distinguishes between
1813/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
1814/// and can appear in any order, and other options (DEFAULT, GENERATED), which
1815/// cannot be named and must appear in a fixed order. PostgreSQL, however,
1816/// allows preceding any option with `CONSTRAINT <name>`, even those that are
1817/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
1818/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
1819/// NOT NULL constraints (the last of which is in violation of the spec).
1820///
1821/// For maximum flexibility, we don't distinguish between constraint and
1822/// non-constraint options, lumping them all together under the umbrella of
1823/// "column options," and we allow any column option to be named.
1824#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1825pub struct ColumnOptionDef<T: AstInfo> {
1826    pub name: Option<Ident>,
1827    pub option: ColumnOption<T>,
1828}
1829
1830impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1831    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1832        f.write_node(&display_constraint_name(&self.name));
1833        f.write_node(&self.option);
1834    }
1835}
1836impl_display_t!(ColumnOptionDef);
1837
1838/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
1839/// TABLE` statement.
1840#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1841pub enum ColumnOption<T: AstInfo> {
1842    /// `NULL`
1843    Null,
1844    /// `NOT NULL`
1845    NotNull,
1846    /// `DEFAULT <restricted-expr>`
1847    Default(Expr<T>),
1848    /// `{ PRIMARY KEY | UNIQUE }`
1849    Unique { is_primary: bool },
1850    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
1851    /// <foreign_table> (<referred_columns>)`).
1852    ForeignKey {
1853        foreign_table: UnresolvedItemName,
1854        referred_columns: Vec<Ident>,
1855    },
1856    /// `CHECK (<expr>)`
1857    Check(Expr<T>),
1858    /// `VERSION <action> <version>`
1859    Versioned {
1860        action: ColumnVersioned,
1861        version: Version,
1862    },
1863}
1864
1865impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1866    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1867        use ColumnOption::*;
1868        match self {
1869            Null => f.write_str("NULL"),
1870            NotNull => f.write_str("NOT NULL"),
1871            Default(expr) => {
1872                f.write_str("DEFAULT ");
1873                f.write_node(expr);
1874            }
1875            Unique { is_primary } => {
1876                if *is_primary {
1877                    f.write_str("PRIMARY KEY");
1878                } else {
1879                    f.write_str("UNIQUE");
1880                }
1881            }
1882            ForeignKey {
1883                foreign_table,
1884                referred_columns,
1885            } => {
1886                f.write_str("REFERENCES ");
1887                f.write_node(foreign_table);
1888                f.write_str(" (");
1889                f.write_node(&display::comma_separated(referred_columns));
1890                f.write_str(")");
1891            }
1892            Check(expr) => {
1893                f.write_str("CHECK (");
1894                f.write_node(expr);
1895                f.write_str(")");
1896            }
1897            Versioned { action, version } => {
1898                f.write_str("VERSION ");
1899                f.write_node(action);
1900                f.write_str(" ");
1901                f.write_node(version);
1902            }
1903        }
1904    }
1905}
1906impl_display_t!(ColumnOption);
1907
1908#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1909pub enum ColumnVersioned {
1910    Added,
1911}
1912
1913impl AstDisplay for ColumnVersioned {
1914    fn fmt<W>(&self, f: &mut AstFormatter<W>)
1915    where
1916        W: fmt::Write,
1917    {
1918        match self {
1919            // TODO(alter_table): Support dropped columns.
1920            ColumnVersioned::Added => f.write_str("ADDED"),
1921        }
1922    }
1923}
1924impl_display!(ColumnVersioned);
1925
1926fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
1927    struct ConstraintName<'a>(&'a Option<Ident>);
1928    impl<'a> AstDisplay for ConstraintName<'a> {
1929        fn fmt<W>(&self, f: &mut AstFormatter<W>)
1930        where
1931            W: fmt::Write,
1932        {
1933            if let Some(name) = self.0 {
1934                f.write_str("CONSTRAINT ");
1935                f.write_node(name);
1936                f.write_str(" ");
1937            }
1938        }
1939    }
1940    ConstraintName(name)
1941}