Skip to main content

mz_sql_parser/ast/defs/
ddl.rs

1// Copyright 2018 sqlparser-rs contributors. All rights reserved.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// This file is derived from the sqlparser-rs project, available at
5// https://github.com/andygrove/sqlparser-rs. It was incorporated
6// directly into Materialize on December 21, 2019.
7//
8// Licensed under the Apache License, Version 2.0 (the "License");
9// you may not use this file except in compliance with the License.
10// You may obtain a copy of the License in the LICENSE file at the
11// root of this repository, or online at
12//
13//     http://www.apache.org/licenses/LICENSE-2.0
14//
15// Unless required by applicable law or agreed to in writing, software
16// distributed under the License is distributed on an "AS IS" BASIS,
17// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18// See the License for the specific language governing permissions and
19// limitations under the License.
20
21//! AST types specific to CREATE/ALTER variants of [crate::ast::Statement]
22//! (commonly referred to as Data Definition Language, or DDL)
23
24use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28    AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33    /// The `ASSERT NOT NULL [=] <ident>` option.
34    AssertNotNull,
35    PartitionBy,
36    RetainHistory,
37    /// The `REFRESH [=] ...` option.
38    Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43        match self {
44            MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45            MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46            MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47            MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48        }
49    }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53    /// # WARNING
54    ///
55    /// Whenever implementing this trait consider very carefully whether or not
56    /// this value could contain sensitive user data. If you're uncertain, err
57    /// on the conservative side and return `true`.
58    fn redact_value(&self) -> bool {
59        match self {
60            MaterializedViewOptionName::AssertNotNull
61            | MaterializedViewOptionName::PartitionBy
62            | MaterializedViewOptionName::RetainHistory
63            | MaterializedViewOptionName::Refresh => false,
64        }
65    }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70    pub name: MaterializedViewOptionName,
71    pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, Hash)]
76pub struct Schema {
77    pub schema: String,
78}
79
80impl AstDisplay for Schema {
81    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
82        f.write_str("SCHEMA '");
83        f.write_node(&display::escape_single_quote_string(&self.schema));
84        f.write_str("'");
85    }
86}
87impl_display!(Schema);
88
89#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
90pub enum AvroSchemaOptionName {
91    /// The `CONFLUENT WIRE FORMAT [=] <bool>` option.
92    ConfluentWireFormat,
93}
94
95impl AstDisplay for AvroSchemaOptionName {
96    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
97        match self {
98            AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
99        }
100    }
101}
102
103impl WithOptionName for AvroSchemaOptionName {
104    /// # WARNING
105    ///
106    /// Whenever implementing this trait consider very carefully whether or not
107    /// this value could contain sensitive user data. If you're uncertain, err
108    /// on the conservative side and return `true`.
109    fn redact_value(&self) -> bool {
110        match self {
111            Self::ConfluentWireFormat => false,
112        }
113    }
114}
115
116/// Options accepted on the `USING AWS GLUE SCHEMA REGISTRY CONNECTION <name> (…)`
117/// form. Today there is only `SCHEMA NAME`, which is required (the Glue
118/// purification step needs it to fetch the writer schema's latest
119/// version at `CREATE SOURCE` time).
120#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
121pub enum GlueAvroOptionName {
122    /// The `SCHEMA NAME [=] '<name>'` option. Names the schema within a
123    /// Glue registry whose latest version is fetched during purification.
124    SchemaName,
125}
126
127impl AstDisplay for GlueAvroOptionName {
128    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
129        match self {
130            GlueAvroOptionName::SchemaName => f.write_str("SCHEMA NAME"),
131        }
132    }
133}
134
135impl WithOptionName for GlueAvroOptionName {
136    /// # WARNING
137    ///
138    /// Whenever implementing this trait consider very carefully whether or not
139    /// this value could contain sensitive user data. If you're uncertain, err
140    /// on the conservative side and return `true`.
141    fn redact_value(&self) -> bool {
142        match self {
143            // A schema *name* is a user-chosen identifier, no more
144            // sensitive than a table name.
145            Self::SchemaName => false,
146        }
147    }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
151pub struct GlueAvroOption<T: AstInfo> {
152    pub name: GlueAvroOptionName,
153    pub value: Option<WithOptionValue<T>>,
154}
155impl_display_for_with_option!(GlueAvroOption);
156
157#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
158pub struct AvroSchemaOption<T: AstInfo> {
159    pub name: AvroSchemaOptionName,
160    pub value: Option<WithOptionValue<T>>,
161}
162impl_display_for_with_option!(AvroSchemaOption);
163
164#[derive(Debug, Clone, PartialEq, Eq, Hash)]
165pub enum AvroSchema<T: AstInfo> {
166    Csr {
167        csr_connection: CsrConnectionAvro<T>,
168    },
169    InlineSchema {
170        schema: Schema,
171        with_options: Vec<AvroSchemaOption<T>>,
172    },
173    /// `USING AWS GLUE SCHEMA REGISTRY CONNECTION <name> (SCHEMA NAME = '<n>')`.
174    ///
175    /// Parallel to the `Csr` variant.
176    Glue {
177        connection: T::ItemName,
178        with_options: Vec<GlueAvroOption<T>>,
179        /// Normally populated during purification by fetching the named
180        /// schema's latest version from AWS Glue. The grammar also accepts a
181        /// user-written `SEED VALUE SCHEMA`, so this may be set on input; it is
182        /// not the intended authoring path, but it is not rejected.
183        seed: Option<GlueAvroSeed>,
184    },
185}
186
187impl<T: AstInfo> AstDisplay for AvroSchema<T> {
188    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
189        match self {
190            Self::Csr { csr_connection } => {
191                f.write_node(csr_connection);
192            }
193            Self::InlineSchema {
194                schema,
195                with_options,
196            } => {
197                f.write_str("USING ");
198                schema.fmt(f);
199                if !with_options.is_empty() {
200                    f.write_str(" (");
201                    f.write_node(&display::comma_separated(with_options));
202                    f.write_str(")");
203                }
204            }
205            Self::Glue {
206                connection,
207                with_options,
208                seed,
209            } => {
210                f.write_str("USING AWS GLUE SCHEMA REGISTRY CONNECTION ");
211                f.write_node(connection);
212                if !with_options.is_empty() {
213                    f.write_str(" (");
214                    f.write_node(&display::comma_separated(with_options));
215                    f.write_str(")");
216                }
217                if let Some(seed) = seed {
218                    f.write_str(" ");
219                    f.write_node(seed);
220                }
221            }
222        }
223    }
224}
225impl_display_t!(AvroSchema);
226
227#[derive(Debug, Clone, PartialEq, Eq, Hash)]
228pub enum ProtobufSchema<T: AstInfo> {
229    Csr {
230        csr_connection: CsrConnectionProtobuf<T>,
231    },
232    InlineSchema {
233        message_name: String,
234        schema: Schema,
235    },
236}
237
238impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
239    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
240        match self {
241            Self::Csr { csr_connection } => {
242                f.write_node(csr_connection);
243            }
244            Self::InlineSchema {
245                message_name,
246                schema,
247            } => {
248                f.write_str("MESSAGE '");
249                f.write_node(&display::escape_single_quote_string(message_name));
250                f.write_str("' USING ");
251                f.write_str(schema);
252            }
253        }
254    }
255}
256impl_display_t!(ProtobufSchema);
257
258#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
259pub enum CsrConfigOptionName<T: AstInfo> {
260    AvroKeyFullname,
261    AvroValueFullname,
262    NullDefaults,
263    AvroDocOn(AvroDocOn<T>),
264    KeyCompatibilityLevel,
265    ValueCompatibilityLevel,
266}
267
268impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
269    /// # WARNING
270    ///
271    /// Whenever implementing this trait consider very carefully whether or not
272    /// this value could contain sensitive user data. If you're uncertain, err
273    /// on the conservative side and return `true`.
274    fn redact_value(&self) -> bool {
275        match self {
276            Self::AvroKeyFullname
277            | Self::AvroValueFullname
278            | Self::NullDefaults
279            | Self::AvroDocOn(_)
280            | Self::KeyCompatibilityLevel
281            | Self::ValueCompatibilityLevel => false,
282        }
283    }
284}
285
286#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
287pub struct AvroDocOn<T: AstInfo> {
288    pub identifier: DocOnIdentifier<T>,
289    pub for_schema: DocOnSchema,
290}
291#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
292pub enum DocOnSchema {
293    KeyOnly,
294    ValueOnly,
295    All,
296}
297
298#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
299pub enum DocOnIdentifier<T: AstInfo> {
300    Column(ColumnName<T>),
301    Type(T::ItemName),
302}
303
304impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
305    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
306        match &self.for_schema {
307            DocOnSchema::KeyOnly => f.write_str("KEY "),
308            DocOnSchema::ValueOnly => f.write_str("VALUE "),
309            DocOnSchema::All => {}
310        }
311        match &self.identifier {
312            DocOnIdentifier::Column(name) => {
313                f.write_str("DOC ON COLUMN ");
314                f.write_node(name);
315            }
316            DocOnIdentifier::Type(name) => {
317                f.write_str("DOC ON TYPE ");
318                f.write_node(name);
319            }
320        }
321    }
322}
323impl_display_t!(AvroDocOn);
324
325impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
326    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
327        match self {
328            CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
329            CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
330            CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
331            CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
332            CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
333            CsrConfigOptionName::ValueCompatibilityLevel => {
334                f.write_str("VALUE COMPATIBILITY LEVEL")
335            }
336        }
337    }
338}
339impl_display_t!(CsrConfigOptionName);
340
341#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
342/// An option in a `{FROM|INTO} CONNECTION ...` statement.
343pub struct CsrConfigOption<T: AstInfo> {
344    pub name: CsrConfigOptionName<T>,
345    pub value: Option<WithOptionValue<T>>,
346}
347impl_display_for_with_option!(CsrConfigOption);
348impl_display_t!(CsrConfigOption);
349
350#[derive(Debug, Clone, PartialEq, Eq, Hash)]
351pub struct CsrConnection<T: AstInfo> {
352    pub connection: T::ItemName,
353    pub options: Vec<CsrConfigOption<T>>,
354}
355
356impl<T: AstInfo> AstDisplay for CsrConnection<T> {
357    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
358        f.write_str("CONNECTION ");
359        f.write_node(&self.connection);
360        if !self.options.is_empty() {
361            f.write_str(" (");
362            f.write_node(&display::comma_separated(&self.options));
363            f.write_str(")");
364        }
365    }
366}
367impl_display_t!(CsrConnection);
368
369#[derive(Debug, Clone, PartialEq, Eq, Hash)]
370pub enum ReaderSchemaSelectionStrategy {
371    Latest,
372    Inline(String),
373    ById(i32),
374}
375
376impl Default for ReaderSchemaSelectionStrategy {
377    fn default() -> Self {
378        Self::Latest
379    }
380}
381
382#[derive(Debug, Clone, PartialEq, Eq, Hash)]
383pub struct CsrConnectionAvro<T: AstInfo> {
384    pub connection: CsrConnection<T>,
385    pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
386    pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
387    pub seed: Option<CsrSeedAvro>,
388}
389
390impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
391    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
392        f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
393        f.write_node(&self.connection);
394        if let Some(seed) = &self.seed {
395            f.write_str(" ");
396            f.write_node(seed);
397        }
398    }
399}
400impl_display_t!(CsrConnectionAvro);
401
402#[derive(Debug, Clone, PartialEq, Eq, Hash)]
403pub struct CsrConnectionProtobuf<T: AstInfo> {
404    pub connection: CsrConnection<T>,
405    pub seed: Option<CsrSeedProtobuf>,
406}
407
408impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
409    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
410        f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
411        f.write_node(&self.connection);
412
413        if let Some(seed) = &self.seed {
414            f.write_str(" ");
415            f.write_node(seed);
416        }
417    }
418}
419impl_display_t!(CsrConnectionProtobuf);
420
421#[derive(Debug, Clone, PartialEq, Eq, Hash)]
422pub struct CsrSeedAvro {
423    pub key_schema: Option<String>,
424    pub value_schema: String,
425    /// Reference schemas for the key schema, in dependency order.
426    /// Populated during purification by fetching from the schema registry.
427    pub key_reference_schemas: Vec<String>,
428    /// Reference schemas for the value schema, in dependency order.
429    /// Populated during purification by fetching from the schema registry.
430    pub value_reference_schemas: Vec<String>,
431}
432
433impl AstDisplay for CsrSeedAvro {
434    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
435        f.write_str("SEED");
436        if let Some(key_schema) = &self.key_schema {
437            f.write_str(" KEY SCHEMA '");
438            f.write_node(&display::escape_single_quote_string(key_schema));
439            f.write_str("'");
440            if !self.key_reference_schemas.is_empty() {
441                f.write_str(" KEY REFERENCES (");
442                for (i, schema) in self.key_reference_schemas.iter().enumerate() {
443                    if i > 0 {
444                        f.write_str(", ");
445                    }
446                    f.write_str("'");
447                    f.write_node(&display::escape_single_quote_string(schema));
448                    f.write_str("'");
449                }
450                f.write_str(")");
451            }
452        }
453        f.write_str(" VALUE SCHEMA '");
454        f.write_node(&display::escape_single_quote_string(&self.value_schema));
455        f.write_str("'");
456        if !self.value_reference_schemas.is_empty() {
457            f.write_str(" VALUE REFERENCES (");
458            for (i, schema) in self.value_reference_schemas.iter().enumerate() {
459                if i > 0 {
460                    f.write_str(", ");
461                }
462                f.write_str("'");
463                f.write_node(&display::escape_single_quote_string(schema));
464                f.write_str("'");
465            }
466            f.write_str(")");
467        }
468    }
469}
470impl_display!(CsrSeedAvro);
471
472/// Resolved reader schema for a single `AvroSchema::Glue` FORMAT clause.
473///
474/// Glue resolves exactly one named schema per FORMAT clause, so this holds a
475/// single schema rather than a key/value pair. This differs from
476/// [`CsrSeedAvro`], where one `FORMAT AVRO USING CONFLUENT ...` clause can seed
477/// both key and value. Under Glue, the key and value are expressed as separate
478/// `KEY FORMAT ... VALUE FORMAT ...` clauses, each carrying its own
479/// `GlueAvroSeed`; the field is named `value_schema` because, for a single
480/// clause, it is decoded as that clause's value (a bare `FORMAT` clause, or the
481/// value side of a `KEY FORMAT/VALUE FORMAT` pair) — and a `KEY FORMAT` clause
482/// reuses the same schema as its key.
483///
484/// Glue schemas also have no references (each schema-version is a single
485/// self-contained definition), so unlike [`CsrSeedAvro`] there is no references
486/// vector.
487#[derive(Debug, Clone, PartialEq, Eq, Hash)]
488pub struct GlueAvroSeed {
489    pub value_schema: String,
490}
491
492impl AstDisplay for GlueAvroSeed {
493    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
494        f.write_str("SEED VALUE SCHEMA '");
495        f.write_node(&display::escape_single_quote_string(&self.value_schema));
496        f.write_str("'");
497    }
498}
499impl_display!(GlueAvroSeed);
500
501#[derive(Debug, Clone, PartialEq, Eq, Hash)]
502pub struct CsrSeedProtobuf {
503    pub key: Option<CsrSeedProtobufSchema>,
504    pub value: CsrSeedProtobufSchema,
505}
506
507impl AstDisplay for CsrSeedProtobuf {
508    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
509        f.write_str("SEED");
510        if let Some(key) = &self.key {
511            f.write_str(" KEY ");
512            f.write_node(key);
513        }
514        f.write_str(" VALUE ");
515        f.write_node(&self.value);
516    }
517}
518impl_display!(CsrSeedProtobuf);
519
520#[derive(Debug, Clone, PartialEq, Eq, Hash)]
521pub struct CsrSeedProtobufSchema {
522    // Hex encoded string.
523    pub schema: String,
524    pub message_name: String,
525}
526impl AstDisplay for CsrSeedProtobufSchema {
527    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
528        f.write_str("SCHEMA '");
529        f.write_str(&display::escape_single_quote_string(&self.schema));
530        f.write_str("' MESSAGE '");
531        f.write_str(&display::escape_single_quote_string(&self.message_name));
532        f.write_str("'");
533    }
534}
535impl_display!(CsrSeedProtobufSchema);
536
537#[derive(Debug, Clone, PartialEq, Eq, Hash)]
538pub enum FormatSpecifier<T: AstInfo> {
539    /// `CREATE SOURCE/SINK .. FORMAT`
540    Bare(Format<T>),
541    /// `CREATE SOURCE/SINK .. KEY FORMAT .. VALUE FORMAT`
542    KeyValue { key: Format<T>, value: Format<T> },
543}
544
545impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
546    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
547        match self {
548            FormatSpecifier::Bare(format) => {
549                f.write_str("FORMAT ");
550                f.write_node(format)
551            }
552            FormatSpecifier::KeyValue { key, value } => {
553                f.write_str("KEY FORMAT ");
554                f.write_node(key);
555                f.write_str(" VALUE FORMAT ");
556                f.write_node(value);
557            }
558        }
559    }
560}
561impl_display_t!(FormatSpecifier);
562
563#[derive(Debug, Clone, PartialEq, Eq, Hash)]
564pub enum Format<T: AstInfo> {
565    Bytes,
566    Avro(AvroSchema<T>),
567    Protobuf(ProtobufSchema<T>),
568    Regex(String),
569    Csv {
570        columns: CsvColumns,
571        delimiter: char,
572    },
573    Json {
574        array: bool,
575    },
576    Text,
577}
578
579#[derive(Debug, Clone, PartialEq, Eq, Hash)]
580pub enum CsvColumns {
581    /// `WITH count COLUMNS`
582    Count(u64),
583    /// `WITH HEADER (ident, ...)?`: `names` is empty if there are no names specified
584    Header { names: Vec<Ident> },
585}
586
587impl AstDisplay for CsvColumns {
588    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
589        match self {
590            CsvColumns::Count(n) => {
591                f.write_str(n);
592                f.write_str(" COLUMNS")
593            }
594            CsvColumns::Header { names } => {
595                f.write_str("HEADER");
596                if !names.is_empty() {
597                    f.write_str(" (");
598                    f.write_node(&display::comma_separated(names));
599                    f.write_str(")");
600                }
601            }
602        }
603    }
604}
605
606#[derive(Debug, Clone, PartialEq, Eq, Hash)]
607pub enum SourceIncludeMetadata {
608    Key {
609        alias: Option<Ident>,
610    },
611    Timestamp {
612        alias: Option<Ident>,
613    },
614    Partition {
615        alias: Option<Ident>,
616    },
617    Offset {
618        alias: Option<Ident>,
619    },
620    Headers {
621        alias: Option<Ident>,
622    },
623    Header {
624        key: String,
625        alias: Ident,
626        use_bytes: bool,
627    },
628}
629
630impl AstDisplay for SourceIncludeMetadata {
631    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
632        let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
633            if let Some(alias) = alias {
634                f.write_str(" AS ");
635                f.write_node(alias);
636            }
637        };
638
639        match self {
640            SourceIncludeMetadata::Key { alias } => {
641                f.write_str("KEY");
642                print_alias(f, alias);
643            }
644            SourceIncludeMetadata::Timestamp { alias } => {
645                f.write_str("TIMESTAMP");
646                print_alias(f, alias);
647            }
648            SourceIncludeMetadata::Partition { alias } => {
649                f.write_str("PARTITION");
650                print_alias(f, alias);
651            }
652            SourceIncludeMetadata::Offset { alias } => {
653                f.write_str("OFFSET");
654                print_alias(f, alias);
655            }
656            SourceIncludeMetadata::Headers { alias } => {
657                f.write_str("HEADERS");
658                print_alias(f, alias);
659            }
660            SourceIncludeMetadata::Header {
661                alias,
662                key,
663                use_bytes,
664            } => {
665                f.write_str("HEADER '");
666                f.write_str(&display::escape_single_quote_string(key));
667                f.write_str("'");
668                print_alias(f, &Some(alias.clone()));
669                if *use_bytes {
670                    f.write_str(" BYTES");
671                }
672            }
673        }
674    }
675}
676impl_display!(SourceIncludeMetadata);
677
678#[derive(Debug, Clone, PartialEq, Eq, Hash)]
679pub enum SourceErrorPolicy {
680    Inline {
681        /// The alias to use for the error column. If unspecified will be `error`.
682        alias: Option<Ident>,
683    },
684}
685
686impl AstDisplay for SourceErrorPolicy {
687    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
688        match self {
689            Self::Inline { alias } => {
690                f.write_str("INLINE");
691                if let Some(alias) = alias {
692                    f.write_str(" AS ");
693                    f.write_node(alias);
694                }
695            }
696        }
697    }
698}
699impl_display!(SourceErrorPolicy);
700
701#[derive(Debug, Clone, PartialEq, Eq, Hash)]
702pub enum SourceEnvelope {
703    None,
704    Debezium,
705    Upsert {
706        value_decode_err_policy: Vec<SourceErrorPolicy>,
707    },
708    CdcV2,
709}
710
711impl SourceEnvelope {
712    /// `true` iff Materialize is expected to crash or exhibit UB
713    /// when attempting to ingest data starting at an offset other than zero.
714    pub fn requires_all_input(&self) -> bool {
715        match self {
716            SourceEnvelope::None => false,
717            SourceEnvelope::Debezium => false,
718            SourceEnvelope::Upsert { .. } => false,
719            SourceEnvelope::CdcV2 => true,
720        }
721    }
722}
723
724impl AstDisplay for SourceEnvelope {
725    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
726        match self {
727            Self::None => {
728                // this is unreachable as long as the default is None, but include it in case we ever change that
729                f.write_str("NONE");
730            }
731            Self::Debezium => {
732                f.write_str("DEBEZIUM");
733            }
734            Self::Upsert {
735                value_decode_err_policy,
736            } => {
737                if value_decode_err_policy.is_empty() {
738                    f.write_str("UPSERT");
739                } else {
740                    f.write_str("UPSERT (VALUE DECODING ERRORS = (");
741                    f.write_node(&display::comma_separated(value_decode_err_policy));
742                    f.write_str("))")
743                }
744            }
745            Self::CdcV2 => {
746                f.write_str("MATERIALIZE");
747            }
748        }
749    }
750}
751impl_display!(SourceEnvelope);
752
753#[derive(Debug, Clone, PartialEq, Eq, Hash)]
754pub enum SinkEnvelope {
755    Debezium,
756    Upsert,
757}
758
759impl AstDisplay for SinkEnvelope {
760    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
761        match self {
762            Self::Upsert => {
763                f.write_str("UPSERT");
764            }
765            Self::Debezium => {
766                f.write_str("DEBEZIUM");
767            }
768        }
769    }
770}
771impl_display!(SinkEnvelope);
772
773#[derive(Debug, Clone, PartialEq, Eq, Hash)]
774pub enum IcebergSinkMode {
775    Upsert,
776    Append,
777}
778
779impl AstDisplay for IcebergSinkMode {
780    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
781        match self {
782            Self::Upsert => {
783                f.write_str("UPSERT");
784            }
785            Self::Append => {
786                f.write_str("APPEND");
787            }
788        }
789    }
790}
791impl_display!(IcebergSinkMode);
792
793#[derive(Debug, Clone, PartialEq, Eq, Hash)]
794pub enum SubscribeOutput<T: AstInfo> {
795    Diffs,
796    WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
797    EnvelopeUpsert { key_columns: Vec<Ident> },
798    EnvelopeDebezium { key_columns: Vec<Ident> },
799}
800
801impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
802    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
803        match self {
804            Self::Diffs => {}
805            Self::WithinTimestampOrderBy { order_by } => {
806                f.write_str(" WITHIN TIMESTAMP ORDER BY ");
807                f.write_node(&display::comma_separated(order_by));
808            }
809            Self::EnvelopeUpsert { key_columns } => {
810                f.write_str(" ENVELOPE UPSERT (KEY (");
811                f.write_node(&display::comma_separated(key_columns));
812                f.write_str("))");
813            }
814            Self::EnvelopeDebezium { key_columns } => {
815                f.write_str(" ENVELOPE DEBEZIUM (KEY (");
816                f.write_node(&display::comma_separated(key_columns));
817                f.write_str("))");
818            }
819        }
820    }
821}
822impl_display_t!(SubscribeOutput);
823
824impl<T: AstInfo> AstDisplay for Format<T> {
825    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
826        match self {
827            Self::Bytes => f.write_str("BYTES"),
828            Self::Avro(inner) => {
829                f.write_str("AVRO ");
830                f.write_node(inner);
831            }
832            Self::Protobuf(inner) => {
833                f.write_str("PROTOBUF ");
834                f.write_node(inner);
835            }
836            Self::Regex(regex) => {
837                f.write_str("REGEX '");
838                f.write_node(&display::escape_single_quote_string(regex));
839                f.write_str("'");
840            }
841            Self::Csv { columns, delimiter } => {
842                f.write_str("CSV WITH ");
843                f.write_node(columns);
844
845                if *delimiter != ',' {
846                    f.write_str(" DELIMITED BY '");
847                    f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
848                    f.write_str("'");
849                }
850            }
851            Self::Json { array } => {
852                f.write_str("JSON");
853                if *array {
854                    f.write_str(" ARRAY");
855                }
856            }
857            Self::Text => f.write_str("TEXT"),
858        }
859    }
860}
861impl_display_t!(Format);
862
863// All connection options are bundled together to allow us to parse `ALTER
864// CONNECTION` without specifying the type of connection we're altering. Il faut
865// souffrir pour être belle.
866#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
867pub enum ConnectionOptionName {
868    AccessKeyId,
869    AssumeRoleArn,
870    AssumeRoleSessionName,
871    AvailabilityZones,
872    AwsConnection,
873    AwsPrivatelink,
874    Broker,
875    Brokers,
876    Credential,
877    Database,
878    Endpoint,
879    GcpConnection,
880    Host,
881    Password,
882    Port,
883    ProgressTopic,
884    ProgressTopicReplicationFactor,
885    PublicKey1,
886    PublicKey2,
887    Region,
888    Registry,
889    SaslMechanisms,
890    SaslPassword,
891    SaslUsername,
892    Scope,
893    SecretAccessKey,
894    SecurityProtocol,
895    ServiceAccountKey,
896    ServiceName,
897    SshTunnel,
898    SslCertificate,
899    SslCertificateAuthority,
900    SslKey,
901    SslMode,
902    SessionToken,
903    CatalogType,
904    Url,
905    User,
906    Warehouse,
907}
908
909impl AstDisplay for ConnectionOptionName {
910    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
911        f.write_str(match self {
912            ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
913            ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
914            ConnectionOptionName::AwsConnection => "AWS CONNECTION",
915            ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
916            ConnectionOptionName::Broker => "BROKER",
917            ConnectionOptionName::Brokers => "BROKERS",
918            ConnectionOptionName::Credential => "CREDENTIAL",
919            ConnectionOptionName::Database => "DATABASE",
920            ConnectionOptionName::Endpoint => "ENDPOINT",
921            ConnectionOptionName::GcpConnection => "GCP CONNECTION",
922            ConnectionOptionName::Host => "HOST",
923            ConnectionOptionName::Password => "PASSWORD",
924            ConnectionOptionName::Port => "PORT",
925            ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
926            ConnectionOptionName::ProgressTopicReplicationFactor => {
927                "PROGRESS TOPIC REPLICATION FACTOR"
928            }
929            ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
930            ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
931            ConnectionOptionName::Region => "REGION",
932            ConnectionOptionName::Registry => "REGISTRY",
933            ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
934            ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
935            ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
936            ConnectionOptionName::SaslPassword => "SASL PASSWORD",
937            ConnectionOptionName::SaslUsername => "SASL USERNAME",
938            ConnectionOptionName::Scope => "SCOPE",
939            ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
940            ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
941            ConnectionOptionName::ServiceAccountKey => "SERVICE ACCOUNT KEY",
942            ConnectionOptionName::ServiceName => "SERVICE NAME",
943            ConnectionOptionName::SshTunnel => "SSH TUNNEL",
944            ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
945            ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
946            ConnectionOptionName::SslKey => "SSL KEY",
947            ConnectionOptionName::SslMode => "SSL MODE",
948            ConnectionOptionName::SessionToken => "SESSION TOKEN",
949            ConnectionOptionName::CatalogType => "CATALOG TYPE",
950            ConnectionOptionName::Url => "URL",
951            ConnectionOptionName::User => "USER",
952            ConnectionOptionName::Warehouse => "WAREHOUSE",
953        })
954    }
955}
956impl_display!(ConnectionOptionName);
957
958impl WithOptionName for ConnectionOptionName {
959    /// # WARNING
960    ///
961    /// Whenever implementing this trait consider very carefully whether or not
962    /// this value could contain sensitive user data. If you're uncertain, err
963    /// on the conservative side and return `true`.
964    fn redact_value(&self) -> bool {
965        match self {
966            ConnectionOptionName::AccessKeyId
967            | ConnectionOptionName::AvailabilityZones
968            | ConnectionOptionName::AwsConnection
969            | ConnectionOptionName::AwsPrivatelink
970            | ConnectionOptionName::Broker
971            | ConnectionOptionName::Brokers
972            | ConnectionOptionName::Credential
973            | ConnectionOptionName::Database
974            | ConnectionOptionName::Endpoint
975            | ConnectionOptionName::GcpConnection
976            | ConnectionOptionName::Host
977            | ConnectionOptionName::Password
978            | ConnectionOptionName::Port
979            | ConnectionOptionName::ProgressTopic
980            | ConnectionOptionName::ProgressTopicReplicationFactor
981            | ConnectionOptionName::PublicKey1
982            | ConnectionOptionName::PublicKey2
983            | ConnectionOptionName::Region
984            | ConnectionOptionName::Registry
985            | ConnectionOptionName::AssumeRoleArn
986            | ConnectionOptionName::AssumeRoleSessionName
987            | ConnectionOptionName::SaslMechanisms
988            | ConnectionOptionName::SaslPassword
989            | ConnectionOptionName::SaslUsername
990            | ConnectionOptionName::Scope
991            | ConnectionOptionName::SecurityProtocol
992            | ConnectionOptionName::SecretAccessKey
993            | ConnectionOptionName::ServiceAccountKey
994            | ConnectionOptionName::ServiceName
995            | ConnectionOptionName::SshTunnel
996            | ConnectionOptionName::SslCertificate
997            | ConnectionOptionName::SslCertificateAuthority
998            | ConnectionOptionName::SslKey
999            | ConnectionOptionName::SslMode
1000            | ConnectionOptionName::SessionToken
1001            | ConnectionOptionName::CatalogType
1002            | ConnectionOptionName::Url
1003            | ConnectionOptionName::User
1004            | ConnectionOptionName::Warehouse => false,
1005        }
1006    }
1007}
1008
1009#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1010/// An option in a `CREATE CONNECTION`.
1011pub struct ConnectionOption<T: AstInfo> {
1012    pub name: ConnectionOptionName,
1013    pub value: Option<WithOptionValue<T>>,
1014}
1015impl_display_for_with_option!(ConnectionOption);
1016impl_display_t!(ConnectionOption);
1017
1018#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
1019pub enum CreateConnectionType {
1020    Aws,
1021    AwsPrivatelink,
1022    GlueSchemaRegistry,
1023    Gcp,
1024    Kafka,
1025    Csr,
1026    Postgres,
1027    Ssh,
1028    SqlServer,
1029    MySql,
1030    IcebergCatalog,
1031}
1032
1033impl CreateConnectionType {
1034    pub fn as_str(&self) -> &'static str {
1035        match self {
1036            Self::Kafka => "kafka",
1037            Self::Csr => "confluent-schema-registry",
1038            Self::Postgres => "postgres",
1039            Self::Aws => "aws",
1040            Self::AwsPrivatelink => "aws-privatelink",
1041            Self::GlueSchemaRegistry => "glue-schema-registry",
1042            Self::Gcp => "gcp",
1043            Self::Ssh => "ssh-tunnel",
1044            Self::MySql => "mysql",
1045            Self::SqlServer => "sql-server",
1046            Self::IcebergCatalog => "iceberg-catalog",
1047        }
1048    }
1049}
1050
1051impl AstDisplay for CreateConnectionType {
1052    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1053        match self {
1054            Self::Kafka => {
1055                f.write_str("KAFKA");
1056            }
1057            Self::Csr => {
1058                f.write_str("CONFLUENT SCHEMA REGISTRY");
1059            }
1060            Self::Postgres => {
1061                f.write_str("POSTGRES");
1062            }
1063            Self::Aws => {
1064                f.write_str("AWS");
1065            }
1066            Self::AwsPrivatelink => {
1067                f.write_str("AWS PRIVATELINK");
1068            }
1069            Self::GlueSchemaRegistry => {
1070                f.write_str("AWS GLUE SCHEMA REGISTRY");
1071            }
1072            Self::Gcp => {
1073                f.write_str("GCP");
1074            }
1075            Self::Ssh => {
1076                f.write_str("SSH TUNNEL");
1077            }
1078            Self::SqlServer => {
1079                f.write_str("SQL SERVER");
1080            }
1081            Self::MySql => {
1082                f.write_str("MYSQL");
1083            }
1084            Self::IcebergCatalog => {
1085                f.write_str("ICEBERG CATALOG");
1086            }
1087        }
1088    }
1089}
1090impl_display!(CreateConnectionType);
1091
1092#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1093pub enum CreateConnectionOptionName {
1094    Validate,
1095}
1096
1097impl AstDisplay for CreateConnectionOptionName {
1098    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1099        f.write_str(match self {
1100            CreateConnectionOptionName::Validate => "VALIDATE",
1101        })
1102    }
1103}
1104impl_display!(CreateConnectionOptionName);
1105
1106impl WithOptionName for CreateConnectionOptionName {
1107    /// # WARNING
1108    ///
1109    /// Whenever implementing this trait consider very carefully whether or not
1110    /// this value could contain sensitive user data. If you're uncertain, err
1111    /// on the conservative side and return `true`.
1112    fn redact_value(&self) -> bool {
1113        match self {
1114            CreateConnectionOptionName::Validate => false,
1115        }
1116    }
1117}
1118
1119#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1120/// An option in a `CREATE CONNECTION...` statement.
1121pub struct CreateConnectionOption<T: AstInfo> {
1122    pub name: CreateConnectionOptionName,
1123    pub value: Option<WithOptionValue<T>>,
1124}
1125impl_display_for_with_option!(CreateConnectionOption);
1126impl_display_t!(CreateConnectionOption);
1127
1128#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1129pub enum KafkaSourceConfigOptionName {
1130    GroupIdPrefix,
1131    Topic,
1132    TopicMetadataRefreshInterval,
1133    StartTimestamp,
1134    StartOffset,
1135}
1136
1137impl AstDisplay for KafkaSourceConfigOptionName {
1138    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1139        f.write_str(match self {
1140            KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
1141            KafkaSourceConfigOptionName::Topic => "TOPIC",
1142            KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
1143                "TOPIC METADATA REFRESH INTERVAL"
1144            }
1145            KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
1146            KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
1147        })
1148    }
1149}
1150impl_display!(KafkaSourceConfigOptionName);
1151
1152impl WithOptionName for KafkaSourceConfigOptionName {
1153    /// # WARNING
1154    ///
1155    /// Whenever implementing this trait consider very carefully whether or not
1156    /// this value could contain sensitive user data. If you're uncertain, err
1157    /// on the conservative side and return `true`.
1158    fn redact_value(&self) -> bool {
1159        match self {
1160            KafkaSourceConfigOptionName::GroupIdPrefix
1161            | KafkaSourceConfigOptionName::Topic
1162            | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1163            | KafkaSourceConfigOptionName::StartOffset
1164            | KafkaSourceConfigOptionName::StartTimestamp => false,
1165        }
1166    }
1167}
1168
1169#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1170pub struct KafkaSourceConfigOption<T: AstInfo> {
1171    pub name: KafkaSourceConfigOptionName,
1172    pub value: Option<WithOptionValue<T>>,
1173}
1174impl_display_for_with_option!(KafkaSourceConfigOption);
1175impl_display_t!(KafkaSourceConfigOption);
1176
1177#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1178pub enum KafkaSinkConfigOptionName {
1179    CompressionType,
1180    PartitionBy,
1181    ProgressGroupIdPrefix,
1182    Topic,
1183    TransactionalIdPrefix,
1184    LegacyIds,
1185    TopicConfig,
1186    TopicMetadataRefreshInterval,
1187    TopicPartitionCount,
1188    TopicReplicationFactor,
1189}
1190
1191impl AstDisplay for KafkaSinkConfigOptionName {
1192    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1193        f.write_str(match self {
1194            KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1195            KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1196            KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1197            KafkaSinkConfigOptionName::Topic => "TOPIC",
1198            KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1199            KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1200            KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1201            KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1202                "TOPIC METADATA REFRESH INTERVAL"
1203            }
1204            KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1205            KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1206        })
1207    }
1208}
1209impl_display!(KafkaSinkConfigOptionName);
1210
1211impl WithOptionName for KafkaSinkConfigOptionName {
1212    /// # WARNING
1213    ///
1214    /// Whenever implementing this trait consider very carefully whether or not
1215    /// this value could contain sensitive user data. If you're uncertain, err
1216    /// on the conservative side and return `true`.
1217    fn redact_value(&self) -> bool {
1218        match self {
1219            KafkaSinkConfigOptionName::CompressionType
1220            | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1221            | KafkaSinkConfigOptionName::Topic
1222            | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1223            | KafkaSinkConfigOptionName::TransactionalIdPrefix
1224            | KafkaSinkConfigOptionName::LegacyIds
1225            | KafkaSinkConfigOptionName::TopicConfig
1226            | KafkaSinkConfigOptionName::TopicPartitionCount
1227            | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1228            KafkaSinkConfigOptionName::PartitionBy => true,
1229        }
1230    }
1231}
1232
1233#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1234pub struct KafkaSinkConfigOption<T: AstInfo> {
1235    pub name: KafkaSinkConfigOptionName,
1236    pub value: Option<WithOptionValue<T>>,
1237}
1238impl_display_for_with_option!(KafkaSinkConfigOption);
1239impl_display_t!(KafkaSinkConfigOption);
1240
1241#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1242pub enum IcebergSinkConfigOptionName {
1243    Namespace,
1244    Table,
1245}
1246
1247impl AstDisplay for IcebergSinkConfigOptionName {
1248    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1249        f.write_str(match self {
1250            IcebergSinkConfigOptionName::Namespace => "NAMESPACE",
1251            IcebergSinkConfigOptionName::Table => "TABLE",
1252        })
1253    }
1254}
1255impl_display!(IcebergSinkConfigOptionName);
1256
1257impl WithOptionName for IcebergSinkConfigOptionName {
1258    /// # WARNING
1259    ///
1260    /// Whenever implementing this trait consider very carefully whether or not
1261    /// this value could contain sensitive user data. If you're uncertain, err
1262    /// on the conservative side and return `true`.
1263    fn redact_value(&self) -> bool {
1264        match self {
1265            IcebergSinkConfigOptionName::Namespace | IcebergSinkConfigOptionName::Table => false,
1266        }
1267    }
1268}
1269
1270#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1271pub struct IcebergSinkConfigOption<T: AstInfo> {
1272    pub name: IcebergSinkConfigOptionName,
1273    pub value: Option<WithOptionValue<T>>,
1274}
1275impl_display_for_with_option!(IcebergSinkConfigOption);
1276impl_display_t!(IcebergSinkConfigOption);
1277
1278#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1279pub enum PgConfigOptionName {
1280    /// Hex encoded string of binary serialization of
1281    /// `mz_storage_types::sources::postgres::PostgresSourcePublicationDetails`
1282    Details,
1283    /// The name of the publication to sync
1284    Publication,
1285    /// Columns whose types you want to unconditionally format as text
1286    /// NOTE(roshan): This value is kept around to allow round-tripping a
1287    /// `CREATE SOURCE` statement while we still allow creating implicit
1288    /// subsources from `CREATE SOURCE`, but will be removed once
1289    /// fully deprecating that feature and forcing users to use explicit
1290    /// `CREATE TABLE .. FROM SOURCE` statements
1291    TextColumns,
1292    /// Columns you want to exclude
1293    /// NOTE: This value is kept around to allow round-tripping a
1294    /// `CREATE SOURCE` statement while we still allow creating implicit
1295    /// subsources from `CREATE SOURCE`, but will be removed once
1296    /// fully deprecating that feature and forcing users to use explicit
1297    /// `CREATE TABLE .. FROM SOURCE` statements
1298    ExcludeColumns,
1299}
1300
1301impl AstDisplay for PgConfigOptionName {
1302    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1303        f.write_str(match self {
1304            PgConfigOptionName::Details => "DETAILS",
1305            PgConfigOptionName::Publication => "PUBLICATION",
1306            PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1307            PgConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1308        })
1309    }
1310}
1311impl_display!(PgConfigOptionName);
1312
1313impl WithOptionName for PgConfigOptionName {
1314    /// # WARNING
1315    ///
1316    /// Whenever implementing this trait consider very carefully whether or not
1317    /// this value could contain sensitive user data. If you're uncertain, err
1318    /// on the conservative side and return `true`.
1319    fn redact_value(&self) -> bool {
1320        match self {
1321            PgConfigOptionName::Details
1322            | PgConfigOptionName::Publication
1323            | PgConfigOptionName::TextColumns
1324            | PgConfigOptionName::ExcludeColumns => false,
1325        }
1326    }
1327}
1328
1329#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1330/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1331pub struct PgConfigOption<T: AstInfo> {
1332    pub name: PgConfigOptionName,
1333    pub value: Option<WithOptionValue<T>>,
1334}
1335impl_display_for_with_option!(PgConfigOption);
1336impl_display_t!(PgConfigOption);
1337
1338#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1339pub enum MySqlConfigOptionName {
1340    /// Hex encoded string of binary serialization of
1341    /// `mz_storage_types::sources::mysql::MySqlSourceDetails`
1342    Details,
1343    /// Columns whose types you want to unconditionally format as text
1344    /// NOTE(roshan): This value is kept around to allow round-tripping a
1345    /// `CREATE SOURCE` statement while we still allow creating implicit
1346    /// subsources from `CREATE SOURCE`, but will be removed once
1347    /// fully deprecating that feature and forcing users to use explicit
1348    /// `CREATE TABLE .. FROM SOURCE` statements
1349    TextColumns,
1350    /// Columns you want to exclude
1351    /// NOTE(roshan): This value is kept around to allow round-tripping a
1352    /// `CREATE SOURCE` statement while we still allow creating implicit
1353    /// subsources from `CREATE SOURCE`, but will be removed once
1354    /// fully deprecating that feature and forcing users to use explicit
1355    /// `CREATE TABLE .. FROM SOURCE` statements
1356    ExcludeColumns,
1357}
1358
1359impl AstDisplay for MySqlConfigOptionName {
1360    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1361        f.write_str(match self {
1362            MySqlConfigOptionName::Details => "DETAILS",
1363            MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1364            MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1365        })
1366    }
1367}
1368impl_display!(MySqlConfigOptionName);
1369
1370impl WithOptionName for MySqlConfigOptionName {
1371    /// # WARNING
1372    ///
1373    /// Whenever implementing this trait consider very carefully whether or not
1374    /// this value could contain sensitive user data. If you're uncertain, err
1375    /// on the conservative side and return `true`.
1376    fn redact_value(&self) -> bool {
1377        match self {
1378            MySqlConfigOptionName::Details
1379            | MySqlConfigOptionName::TextColumns
1380            | MySqlConfigOptionName::ExcludeColumns => false,
1381        }
1382    }
1383}
1384
1385#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1386/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1387pub struct MySqlConfigOption<T: AstInfo> {
1388    pub name: MySqlConfigOptionName,
1389    pub value: Option<WithOptionValue<T>>,
1390}
1391impl_display_for_with_option!(MySqlConfigOption);
1392impl_display_t!(MySqlConfigOption);
1393
1394#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1395pub enum SqlServerConfigOptionName {
1396    /// Hex encoded string of binary serialization of
1397    /// `mz_storage_types::sources::sql_server::SqlServerSourceDetails`.
1398    Details,
1399    /// Columns whose types you want to unconditionally format as text.
1400    ///
1401    /// NOTE(roshan): This value is kept around to allow round-tripping a
1402    /// `CREATE SOURCE` statement while we still allow creating implicit
1403    /// subsources from `CREATE SOURCE`, but will be removed once
1404    /// fully deprecating that feature and forcing users to use explicit
1405    /// `CREATE TABLE .. FROM SOURCE` statements
1406    TextColumns,
1407    /// Columns you want to exclude.
1408    ///
1409    /// NOTE(roshan): This value is kept around to allow round-tripping a
1410    /// `CREATE SOURCE` statement while we still allow creating implicit
1411    /// subsources from `CREATE SOURCE`, but will be removed once
1412    /// fully deprecating that feature and forcing users to use explicit
1413    /// `CREATE TABLE .. FROM SOURCE` statements
1414    ExcludeColumns,
1415}
1416
1417impl AstDisplay for SqlServerConfigOptionName {
1418    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1419        f.write_str(match self {
1420            SqlServerConfigOptionName::Details => "DETAILS",
1421            SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1422            SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1423        })
1424    }
1425}
1426impl_display!(SqlServerConfigOptionName);
1427
1428impl WithOptionName for SqlServerConfigOptionName {
1429    /// # WARNING
1430    ///
1431    /// Whenever implementing this trait consider very carefully whether or not
1432    /// this value could contain sensitive user data. If you're uncertain, err
1433    /// on the conservative side and return `true`.
1434    fn redact_value(&self) -> bool {
1435        match self {
1436            SqlServerConfigOptionName::Details
1437            | SqlServerConfigOptionName::TextColumns
1438            | SqlServerConfigOptionName::ExcludeColumns => false,
1439        }
1440    }
1441}
1442
1443#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1444/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1445pub struct SqlServerConfigOption<T: AstInfo> {
1446    pub name: SqlServerConfigOptionName,
1447    pub value: Option<WithOptionValue<T>>,
1448}
1449impl_display_for_with_option!(SqlServerConfigOption);
1450impl_display_t!(SqlServerConfigOption);
1451
1452#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1453pub enum CreateSourceConnection<T: AstInfo> {
1454    Kafka {
1455        connection: T::ItemName,
1456        options: Vec<KafkaSourceConfigOption<T>>,
1457    },
1458    Postgres {
1459        connection: T::ItemName,
1460        options: Vec<PgConfigOption<T>>,
1461    },
1462    SqlServer {
1463        connection: T::ItemName,
1464        options: Vec<SqlServerConfigOption<T>>,
1465    },
1466    MySql {
1467        connection: T::ItemName,
1468        options: Vec<MySqlConfigOption<T>>,
1469    },
1470    LoadGenerator {
1471        generator: LoadGenerator,
1472        options: Vec<LoadGeneratorOption<T>>,
1473    },
1474}
1475
1476impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1477    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1478        match self {
1479            CreateSourceConnection::Kafka {
1480                connection,
1481                options,
1482            } => {
1483                f.write_str("KAFKA CONNECTION ");
1484                f.write_node(connection);
1485                if !options.is_empty() {
1486                    f.write_str(" (");
1487                    f.write_node(&display::comma_separated(options));
1488                    f.write_str(")");
1489                }
1490            }
1491            CreateSourceConnection::Postgres {
1492                connection,
1493                options,
1494            } => {
1495                f.write_str("POSTGRES CONNECTION ");
1496                f.write_node(connection);
1497                if !options.is_empty() {
1498                    f.write_str(" (");
1499                    f.write_node(&display::comma_separated(options));
1500                    f.write_str(")");
1501                }
1502            }
1503            CreateSourceConnection::SqlServer {
1504                connection,
1505                options,
1506            } => {
1507                f.write_str("SQL SERVER CONNECTION ");
1508                f.write_node(connection);
1509                if !options.is_empty() {
1510                    f.write_str(" (");
1511                    f.write_node(&display::comma_separated(options));
1512                    f.write_str(")");
1513                }
1514            }
1515            CreateSourceConnection::MySql {
1516                connection,
1517                options,
1518            } => {
1519                f.write_str("MYSQL CONNECTION ");
1520                f.write_node(connection);
1521                if !options.is_empty() {
1522                    f.write_str(" (");
1523                    f.write_node(&display::comma_separated(options));
1524                    f.write_str(")");
1525                }
1526            }
1527            CreateSourceConnection::LoadGenerator { generator, options } => {
1528                f.write_str("LOAD GENERATOR ");
1529                f.write_node(generator);
1530                if !options.is_empty() {
1531                    f.write_str(" (");
1532                    f.write_node(&display::comma_separated(options));
1533                    f.write_str(")");
1534                }
1535            }
1536        }
1537    }
1538}
1539impl_display_t!(CreateSourceConnection);
1540
1541#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1542pub enum LoadGenerator {
1543    Clock,
1544    Counter,
1545    Marketing,
1546    Auction,
1547    Datums,
1548    Tpch,
1549    KeyValue,
1550}
1551
1552impl AstDisplay for LoadGenerator {
1553    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1554        match self {
1555            Self::Counter => f.write_str("COUNTER"),
1556            Self::Clock => f.write_str("CLOCK"),
1557            Self::Marketing => f.write_str("MARKETING"),
1558            Self::Auction => f.write_str("AUCTION"),
1559            Self::Datums => f.write_str("DATUMS"),
1560            Self::Tpch => f.write_str("TPCH"),
1561            Self::KeyValue => f.write_str("KEY VALUE"),
1562        }
1563    }
1564}
1565impl_display!(LoadGenerator);
1566
1567impl LoadGenerator {
1568    /// Corresponds with the same mapping on the `LoadGenerator` enum defined in
1569    /// src/storage-types/src/sources/load_generator.rs, but re-defined here for
1570    /// cases where we only have the AST representation. This can be removed once
1571    /// the `ast_rewrite_sources_to_tables` migration is removed.
1572    pub fn schema_name(&self) -> &'static str {
1573        match self {
1574            LoadGenerator::Counter => "counter",
1575            LoadGenerator::Clock => "clock",
1576            LoadGenerator::Marketing => "marketing",
1577            LoadGenerator::Auction => "auction",
1578            LoadGenerator::Datums => "datums",
1579            LoadGenerator::Tpch => "tpch",
1580            LoadGenerator::KeyValue => "key_value",
1581        }
1582    }
1583}
1584
1585#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1586pub enum LoadGeneratorOptionName {
1587    ScaleFactor,
1588    TickInterval,
1589    AsOf,
1590    UpTo,
1591    MaxCardinality,
1592    Keys,
1593    SnapshotRounds,
1594    TransactionalSnapshot,
1595    ValueSize,
1596    Seed,
1597    Partitions,
1598    BatchSize,
1599}
1600
1601impl AstDisplay for LoadGeneratorOptionName {
1602    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1603        f.write_str(match self {
1604            LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1605            LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1606            LoadGeneratorOptionName::AsOf => "AS OF",
1607            LoadGeneratorOptionName::UpTo => "UP TO",
1608            LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1609            LoadGeneratorOptionName::Keys => "KEYS",
1610            LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1611            LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1612            LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1613            LoadGeneratorOptionName::Seed => "SEED",
1614            LoadGeneratorOptionName::Partitions => "PARTITIONS",
1615            LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1616        })
1617    }
1618}
1619impl_display!(LoadGeneratorOptionName);
1620
1621impl WithOptionName for LoadGeneratorOptionName {
1622    /// # WARNING
1623    ///
1624    /// Whenever implementing this trait consider very carefully whether or not
1625    /// this value could contain sensitive user data. If you're uncertain, err
1626    /// on the conservative side and return `true`.
1627    fn redact_value(&self) -> bool {
1628        match self {
1629            LoadGeneratorOptionName::ScaleFactor
1630            | LoadGeneratorOptionName::TickInterval
1631            | LoadGeneratorOptionName::AsOf
1632            | LoadGeneratorOptionName::UpTo
1633            | LoadGeneratorOptionName::MaxCardinality
1634            | LoadGeneratorOptionName::Keys
1635            | LoadGeneratorOptionName::SnapshotRounds
1636            | LoadGeneratorOptionName::TransactionalSnapshot
1637            | LoadGeneratorOptionName::ValueSize
1638            | LoadGeneratorOptionName::Partitions
1639            | LoadGeneratorOptionName::BatchSize
1640            | LoadGeneratorOptionName::Seed => false,
1641        }
1642    }
1643}
1644
1645#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1646/// An option in a `CREATE CONNECTION...SSH`.
1647pub struct LoadGeneratorOption<T: AstInfo> {
1648    pub name: LoadGeneratorOptionName,
1649    pub value: Option<WithOptionValue<T>>,
1650}
1651impl_display_for_with_option!(LoadGeneratorOption);
1652impl_display_t!(LoadGeneratorOption);
1653
1654#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1655pub enum CreateSinkConnection<T: AstInfo> {
1656    Kafka {
1657        connection: T::ItemName,
1658        options: Vec<KafkaSinkConfigOption<T>>,
1659        key: Option<SinkKey>,
1660        headers: Option<Ident>,
1661    },
1662    Iceberg {
1663        catalog_connection: T::ItemName,
1664
1665        /// AWS creds for the storage layer.
1666        aws_connection: Option<T::ItemName>,
1667
1668        key: Option<SinkKey>,
1669        options: Vec<IcebergSinkConfigOption<T>>,
1670    },
1671}
1672
1673impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1674    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1675        match self {
1676            CreateSinkConnection::Kafka {
1677                connection,
1678                options,
1679                key,
1680                headers,
1681            } => {
1682                f.write_str("KAFKA CONNECTION ");
1683                f.write_node(connection);
1684                if !options.is_empty() {
1685                    f.write_str(" (");
1686                    f.write_node(&display::comma_separated(options));
1687                    f.write_str(")");
1688                }
1689                if let Some(key) = key.as_ref() {
1690                    f.write_str(" ");
1691                    f.write_node(key);
1692                }
1693                if let Some(headers) = headers {
1694                    f.write_str(" HEADERS ");
1695                    f.write_node(headers);
1696                }
1697            }
1698            CreateSinkConnection::Iceberg {
1699                catalog_connection,
1700                aws_connection,
1701                key,
1702                options,
1703            } => {
1704                f.write_str("ICEBERG CATALOG CONNECTION ");
1705                f.write_node(catalog_connection);
1706                if !options.is_empty() {
1707                    f.write_str(" (");
1708                    f.write_node(&display::comma_separated(options));
1709                    f.write_str(")");
1710                }
1711                if let Some(aws_connection) = aws_connection {
1712                    f.write_str(" USING AWS CONNECTION ");
1713                    f.write_node(aws_connection);
1714                }
1715                if let Some(key) = key.as_ref() {
1716                    f.write_str(" ");
1717                    f.write_node(key);
1718                }
1719            }
1720        }
1721    }
1722}
1723impl_display_t!(CreateSinkConnection);
1724
1725#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1726pub struct SinkKey {
1727    pub key_columns: Vec<Ident>,
1728    pub not_enforced: bool,
1729}
1730
1731impl AstDisplay for SinkKey {
1732    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1733        f.write_str("KEY (");
1734        f.write_node(&display::comma_separated(&self.key_columns));
1735        f.write_str(")");
1736        if self.not_enforced {
1737            f.write_str(" NOT ENFORCED");
1738        }
1739    }
1740}
1741
1742/// A table-level constraint, specified in a `CREATE TABLE` or an
1743/// `ALTER TABLE ADD <constraint>` statement.
1744#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1745pub enum TableConstraint<T: AstInfo> {
1746    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE (NULLS NOT DISTINCT)? } (<columns>)`
1747    Unique {
1748        name: Option<Ident>,
1749        columns: Vec<Ident>,
1750        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
1751        is_primary: bool,
1752        // Where this constraint treats each NULL value as distinct; only available on `UNIQUE`
1753        // constraints.
1754        nulls_not_distinct: bool,
1755    },
1756    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
1757    /// REFERENCES <foreign_table> (<referred_columns>)`)
1758    ForeignKey {
1759        name: Option<Ident>,
1760        columns: Vec<Ident>,
1761        foreign_table: T::ItemName,
1762        referred_columns: Vec<Ident>,
1763    },
1764    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
1765    Check {
1766        name: Option<Ident>,
1767        expr: Box<Expr<T>>,
1768    },
1769}
1770
1771impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1772    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1773        match self {
1774            TableConstraint::Unique {
1775                name,
1776                columns,
1777                is_primary,
1778                nulls_not_distinct,
1779            } => {
1780                f.write_node(&display_constraint_name(name));
1781                if *is_primary {
1782                    f.write_str("PRIMARY KEY ");
1783                } else {
1784                    f.write_str("UNIQUE ");
1785                    if *nulls_not_distinct {
1786                        f.write_str("NULLS NOT DISTINCT ");
1787                    }
1788                }
1789                f.write_str("(");
1790                f.write_node(&display::comma_separated(columns));
1791                f.write_str(")");
1792            }
1793            TableConstraint::ForeignKey {
1794                name,
1795                columns,
1796                foreign_table,
1797                referred_columns,
1798            } => {
1799                f.write_node(&display_constraint_name(name));
1800                f.write_str("FOREIGN KEY (");
1801                f.write_node(&display::comma_separated(columns));
1802                f.write_str(") REFERENCES ");
1803                f.write_node(foreign_table);
1804                f.write_str("(");
1805                f.write_node(&display::comma_separated(referred_columns));
1806                f.write_str(")");
1807            }
1808            TableConstraint::Check { name, expr } => {
1809                f.write_node(&display_constraint_name(name));
1810                f.write_str("CHECK (");
1811                f.write_node(&expr);
1812                f.write_str(")");
1813            }
1814        }
1815    }
1816}
1817impl_display_t!(TableConstraint);
1818
1819/// A key constraint, specified in a `CREATE SOURCE`.
1820#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1821pub enum KeyConstraint {
1822    // PRIMARY KEY (<columns>) NOT ENFORCED
1823    PrimaryKeyNotEnforced { columns: Vec<Ident> },
1824}
1825
1826impl AstDisplay for KeyConstraint {
1827    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1828        match self {
1829            KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1830                f.write_str("PRIMARY KEY ");
1831                f.write_str("(");
1832                f.write_node(&display::comma_separated(columns));
1833                f.write_str(") ");
1834                f.write_str("NOT ENFORCED");
1835            }
1836        }
1837    }
1838}
1839impl_display!(KeyConstraint);
1840
1841#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1842pub enum CreateSourceOptionName {
1843    TimestampInterval,
1844    RetainHistory,
1845}
1846
1847impl AstDisplay for CreateSourceOptionName {
1848    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1849        f.write_str(match self {
1850            CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1851            CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1852        })
1853    }
1854}
1855impl_display!(CreateSourceOptionName);
1856
1857impl WithOptionName for CreateSourceOptionName {
1858    /// # WARNING
1859    ///
1860    /// Whenever implementing this trait consider very carefully whether or not
1861    /// this value could contain sensitive user data. If you're uncertain, err
1862    /// on the conservative side and return `true`.
1863    fn redact_value(&self) -> bool {
1864        match self {
1865            CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1866                false
1867            }
1868        }
1869    }
1870}
1871
1872#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1873/// An option in a `CREATE SOURCE...` statement.
1874pub struct CreateSourceOption<T: AstInfo> {
1875    pub name: CreateSourceOptionName,
1876    pub value: Option<WithOptionValue<T>>,
1877}
1878impl_display_for_with_option!(CreateSourceOption);
1879impl_display_t!(CreateSourceOption);
1880
1881/// SQL column definition
1882#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1883pub struct ColumnDef<T: AstInfo> {
1884    pub name: Ident,
1885    pub data_type: T::DataType,
1886    pub collation: Option<UnresolvedItemName>,
1887    pub options: Vec<ColumnOptionDef<T>>,
1888}
1889
1890impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1891    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1892        f.write_node(&self.name);
1893        f.write_str(" ");
1894        f.write_node(&self.data_type);
1895        if let Some(collation) = &self.collation {
1896            f.write_str(" COLLATE ");
1897            f.write_node(collation);
1898        }
1899        for option in &self.options {
1900            f.write_str(" ");
1901            f.write_node(option);
1902        }
1903    }
1904}
1905impl_display_t!(ColumnDef);
1906
1907/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
1908///
1909/// Note that implementations are substantially more permissive than the ANSI
1910/// specification on what order column options can be presented in, and whether
1911/// they are allowed to be named. The specification distinguishes between
1912/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
1913/// and can appear in any order, and other options (DEFAULT, GENERATED), which
1914/// cannot be named and must appear in a fixed order. PostgreSQL, however,
1915/// allows preceding any option with `CONSTRAINT <name>`, even those that are
1916/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
1917/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
1918/// NOT NULL constraints (the last of which is in violation of the spec).
1919///
1920/// For maximum flexibility, we don't distinguish between constraint and
1921/// non-constraint options, lumping them all together under the umbrella of
1922/// "column options," and we allow any column option to be named.
1923#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1924pub struct ColumnOptionDef<T: AstInfo> {
1925    pub name: Option<Ident>,
1926    pub option: ColumnOption<T>,
1927}
1928
1929impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1930    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1931        f.write_node(&display_constraint_name(&self.name));
1932        f.write_node(&self.option);
1933    }
1934}
1935impl_display_t!(ColumnOptionDef);
1936
1937/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
1938/// TABLE` statement.
1939#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1940pub enum ColumnOption<T: AstInfo> {
1941    /// `NULL`
1942    Null,
1943    /// `NOT NULL`
1944    NotNull,
1945    /// `DEFAULT <restricted-expr>`
1946    Default(Expr<T>),
1947    /// `{ PRIMARY KEY | UNIQUE }`
1948    Unique { is_primary: bool },
1949    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
1950    /// <foreign_table> (<referred_columns>)`).
1951    ForeignKey {
1952        foreign_table: UnresolvedItemName,
1953        referred_columns: Vec<Ident>,
1954    },
1955    /// `CHECK (<expr>)`
1956    Check(Expr<T>),
1957    /// `VERSION <action> <version>`
1958    Versioned {
1959        action: ColumnVersioned,
1960        version: Version,
1961    },
1962}
1963
1964impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1965    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1966        use ColumnOption::*;
1967        match self {
1968            Null => f.write_str("NULL"),
1969            NotNull => f.write_str("NOT NULL"),
1970            Default(expr) => {
1971                f.write_str("DEFAULT ");
1972                f.write_node(expr);
1973            }
1974            Unique { is_primary } => {
1975                if *is_primary {
1976                    f.write_str("PRIMARY KEY");
1977                } else {
1978                    f.write_str("UNIQUE");
1979                }
1980            }
1981            ForeignKey {
1982                foreign_table,
1983                referred_columns,
1984            } => {
1985                f.write_str("REFERENCES ");
1986                f.write_node(foreign_table);
1987                f.write_str(" (");
1988                f.write_node(&display::comma_separated(referred_columns));
1989                f.write_str(")");
1990            }
1991            Check(expr) => {
1992                f.write_str("CHECK (");
1993                f.write_node(expr);
1994                f.write_str(")");
1995            }
1996            Versioned { action, version } => {
1997                f.write_str("VERSION ");
1998                f.write_node(action);
1999                f.write_str(" ");
2000                f.write_node(version);
2001            }
2002        }
2003    }
2004}
2005impl_display_t!(ColumnOption);
2006
2007#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2008pub enum ColumnVersioned {
2009    Added,
2010}
2011
2012impl AstDisplay for ColumnVersioned {
2013    fn fmt<W>(&self, f: &mut AstFormatter<W>)
2014    where
2015        W: fmt::Write,
2016    {
2017        match self {
2018            // TODO(alter_table): Support dropped columns.
2019            ColumnVersioned::Added => f.write_str("ADDED"),
2020        }
2021    }
2022}
2023impl_display!(ColumnVersioned);
2024
2025fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
2026    struct ConstraintName<'a>(&'a Option<Ident>);
2027    impl<'a> AstDisplay for ConstraintName<'a> {
2028        fn fmt<W>(&self, f: &mut AstFormatter<W>)
2029        where
2030            W: fmt::Write,
2031        {
2032            if let Some(name) = self.0 {
2033                f.write_str("CONSTRAINT ");
2034                f.write_node(name);
2035                f.write_str(" ");
2036            }
2037        }
2038    }
2039    ConstraintName(name)
2040}