Skip to main content

mz_sql_parser/ast/defs/
ddl.rs

1// Copyright 2018 sqlparser-rs contributors. All rights reserved.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// This file is derived from the sqlparser-rs project, available at
5// https://github.com/andygrove/sqlparser-rs. It was incorporated
6// directly into Materialize on December 21, 2019.
7//
8// Licensed under the Apache License, Version 2.0 (the "License");
9// you may not use this file except in compliance with the License.
10// You may obtain a copy of the License in the LICENSE file at the
11// root of this repository, or online at
12//
13//     http://www.apache.org/licenses/LICENSE-2.0
14//
15// Unless required by applicable law or agreed to in writing, software
16// distributed under the License is distributed on an "AS IS" BASIS,
17// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18// See the License for the specific language governing permissions and
19// limitations under the License.
20
21//! AST types specific to CREATE/ALTER variants of [crate::ast::Statement]
22//! (commonly referred to as Data Definition Language, or DDL)
23
24use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28    AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33    /// The `ASSERT NOT NULL [=] <ident>` option.
34    AssertNotNull,
35    PartitionBy,
36    RetainHistory,
37    /// The `REFRESH [=] ...` option.
38    Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43        match self {
44            MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45            MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46            MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47            MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48        }
49    }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53    /// # WARNING
54    ///
55    /// Whenever implementing this trait consider very carefully whether or not
56    /// this value could contain sensitive user data. If you're uncertain, err
57    /// on the conservative side and return `true`.
58    fn redact_value(&self) -> bool {
59        match self {
60            MaterializedViewOptionName::AssertNotNull
61            | MaterializedViewOptionName::PartitionBy
62            | MaterializedViewOptionName::RetainHistory
63            | MaterializedViewOptionName::Refresh => false,
64        }
65    }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70    pub name: MaterializedViewOptionName,
71    pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
76pub enum ContinualTaskOptionName {
77    /// The `SNAPSHOT [=] ...` option.
78    Snapshot,
79}
80
81impl AstDisplay for ContinualTaskOptionName {
82    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
83        match self {
84            ContinualTaskOptionName::Snapshot => f.write_str("SNAPSHOT"),
85        }
86    }
87}
88
89impl WithOptionName for ContinualTaskOptionName {
90    /// # WARNING
91    ///
92    /// Whenever implementing this trait consider very carefully whether or not
93    /// this value could contain sensitive user data. If you're uncertain, err
94    /// on the conservative side and return `true`.
95    fn redact_value(&self) -> bool {
96        match self {
97            ContinualTaskOptionName::Snapshot => false,
98        }
99    }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
103pub struct ContinualTaskOption<T: AstInfo> {
104    pub name: ContinualTaskOptionName,
105    pub value: Option<WithOptionValue<T>>,
106}
107impl_display_for_with_option!(ContinualTaskOption);
108
109#[derive(Debug, Clone, PartialEq, Eq, Hash)]
110pub struct Schema {
111    pub schema: String,
112}
113
114impl AstDisplay for Schema {
115    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
116        f.write_str("SCHEMA '");
117        f.write_node(&display::escape_single_quote_string(&self.schema));
118        f.write_str("'");
119    }
120}
121impl_display!(Schema);
122
123#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
124pub enum AvroSchemaOptionName {
125    /// The `CONFLUENT WIRE FORMAT [=] <bool>` option.
126    ConfluentWireFormat,
127}
128
129impl AstDisplay for AvroSchemaOptionName {
130    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
131        match self {
132            AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
133        }
134    }
135}
136
137impl WithOptionName for AvroSchemaOptionName {
138    /// # WARNING
139    ///
140    /// Whenever implementing this trait consider very carefully whether or not
141    /// this value could contain sensitive user data. If you're uncertain, err
142    /// on the conservative side and return `true`.
143    fn redact_value(&self) -> bool {
144        match self {
145            Self::ConfluentWireFormat => false,
146        }
147    }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
151pub struct AvroSchemaOption<T: AstInfo> {
152    pub name: AvroSchemaOptionName,
153    pub value: Option<WithOptionValue<T>>,
154}
155impl_display_for_with_option!(AvroSchemaOption);
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum AvroSchema<T: AstInfo> {
159    Csr {
160        csr_connection: CsrConnectionAvro<T>,
161    },
162    InlineSchema {
163        schema: Schema,
164        with_options: Vec<AvroSchemaOption<T>>,
165    },
166}
167
168impl<T: AstInfo> AstDisplay for AvroSchema<T> {
169    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
170        match self {
171            Self::Csr { csr_connection } => {
172                f.write_node(csr_connection);
173            }
174            Self::InlineSchema {
175                schema,
176                with_options,
177            } => {
178                f.write_str("USING ");
179                schema.fmt(f);
180                if !with_options.is_empty() {
181                    f.write_str(" (");
182                    f.write_node(&display::comma_separated(with_options));
183                    f.write_str(")");
184                }
185            }
186        }
187    }
188}
189impl_display_t!(AvroSchema);
190
191#[derive(Debug, Clone, PartialEq, Eq, Hash)]
192pub enum ProtobufSchema<T: AstInfo> {
193    Csr {
194        csr_connection: CsrConnectionProtobuf<T>,
195    },
196    InlineSchema {
197        message_name: String,
198        schema: Schema,
199    },
200}
201
202impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
203    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
204        match self {
205            Self::Csr { csr_connection } => {
206                f.write_node(csr_connection);
207            }
208            Self::InlineSchema {
209                message_name,
210                schema,
211            } => {
212                f.write_str("MESSAGE '");
213                f.write_node(&display::escape_single_quote_string(message_name));
214                f.write_str("' USING ");
215                f.write_str(schema);
216            }
217        }
218    }
219}
220impl_display_t!(ProtobufSchema);
221
222#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
223pub enum CsrConfigOptionName<T: AstInfo> {
224    AvroKeyFullname,
225    AvroValueFullname,
226    NullDefaults,
227    AvroDocOn(AvroDocOn<T>),
228    KeyCompatibilityLevel,
229    ValueCompatibilityLevel,
230}
231
232impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
233    /// # WARNING
234    ///
235    /// Whenever implementing this trait consider very carefully whether or not
236    /// this value could contain sensitive user data. If you're uncertain, err
237    /// on the conservative side and return `true`.
238    fn redact_value(&self) -> bool {
239        match self {
240            Self::AvroKeyFullname
241            | Self::AvroValueFullname
242            | Self::NullDefaults
243            | Self::AvroDocOn(_)
244            | Self::KeyCompatibilityLevel
245            | Self::ValueCompatibilityLevel => false,
246        }
247    }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
251pub struct AvroDocOn<T: AstInfo> {
252    pub identifier: DocOnIdentifier<T>,
253    pub for_schema: DocOnSchema,
254}
255#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
256pub enum DocOnSchema {
257    KeyOnly,
258    ValueOnly,
259    All,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
263pub enum DocOnIdentifier<T: AstInfo> {
264    Column(ColumnName<T>),
265    Type(T::ItemName),
266}
267
268impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
269    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
270        match &self.for_schema {
271            DocOnSchema::KeyOnly => f.write_str("KEY "),
272            DocOnSchema::ValueOnly => f.write_str("VALUE "),
273            DocOnSchema::All => {}
274        }
275        match &self.identifier {
276            DocOnIdentifier::Column(name) => {
277                f.write_str("DOC ON COLUMN ");
278                f.write_node(name);
279            }
280            DocOnIdentifier::Type(name) => {
281                f.write_str("DOC ON TYPE ");
282                f.write_node(name);
283            }
284        }
285    }
286}
287impl_display_t!(AvroDocOn);
288
289impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
290    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
291        match self {
292            CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
293            CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
294            CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
295            CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
296            CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
297            CsrConfigOptionName::ValueCompatibilityLevel => {
298                f.write_str("VALUE COMPATIBILITY LEVEL")
299            }
300        }
301    }
302}
303impl_display_t!(CsrConfigOptionName);
304
305#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
306/// An option in a `{FROM|INTO} CONNECTION ...` statement.
307pub struct CsrConfigOption<T: AstInfo> {
308    pub name: CsrConfigOptionName<T>,
309    pub value: Option<WithOptionValue<T>>,
310}
311impl_display_for_with_option!(CsrConfigOption);
312impl_display_t!(CsrConfigOption);
313
314#[derive(Debug, Clone, PartialEq, Eq, Hash)]
315pub struct CsrConnection<T: AstInfo> {
316    pub connection: T::ItemName,
317    pub options: Vec<CsrConfigOption<T>>,
318}
319
320impl<T: AstInfo> AstDisplay for CsrConnection<T> {
321    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
322        f.write_str("CONNECTION ");
323        f.write_node(&self.connection);
324        if !self.options.is_empty() {
325            f.write_str(" (");
326            f.write_node(&display::comma_separated(&self.options));
327            f.write_str(")");
328        }
329    }
330}
331impl_display_t!(CsrConnection);
332
333#[derive(Debug, Clone, PartialEq, Eq, Hash)]
334pub enum ReaderSchemaSelectionStrategy {
335    Latest,
336    Inline(String),
337    ById(i32),
338}
339
340impl Default for ReaderSchemaSelectionStrategy {
341    fn default() -> Self {
342        Self::Latest
343    }
344}
345
346#[derive(Debug, Clone, PartialEq, Eq, Hash)]
347pub struct CsrConnectionAvro<T: AstInfo> {
348    pub connection: CsrConnection<T>,
349    pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
350    pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
351    pub seed: Option<CsrSeedAvro>,
352}
353
354impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
355    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
356        f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
357        f.write_node(&self.connection);
358        if let Some(seed) = &self.seed {
359            f.write_str(" ");
360            f.write_node(seed);
361        }
362    }
363}
364impl_display_t!(CsrConnectionAvro);
365
366#[derive(Debug, Clone, PartialEq, Eq, Hash)]
367pub struct CsrConnectionProtobuf<T: AstInfo> {
368    pub connection: CsrConnection<T>,
369    pub seed: Option<CsrSeedProtobuf>,
370}
371
372impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
373    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
374        f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
375        f.write_node(&self.connection);
376
377        if let Some(seed) = &self.seed {
378            f.write_str(" ");
379            f.write_node(seed);
380        }
381    }
382}
383impl_display_t!(CsrConnectionProtobuf);
384
385#[derive(Debug, Clone, PartialEq, Eq, Hash)]
386pub struct CsrSeedAvro {
387    pub key_schema: Option<String>,
388    pub value_schema: String,
389    /// Reference schemas for the key schema, in dependency order.
390    /// Populated during purification by fetching from the schema registry.
391    pub key_reference_schemas: Vec<String>,
392    /// Reference schemas for the value schema, in dependency order.
393    /// Populated during purification by fetching from the schema registry.
394    pub value_reference_schemas: Vec<String>,
395}
396
397impl AstDisplay for CsrSeedAvro {
398    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
399        f.write_str("SEED");
400        if let Some(key_schema) = &self.key_schema {
401            f.write_str(" KEY SCHEMA '");
402            f.write_node(&display::escape_single_quote_string(key_schema));
403            f.write_str("'");
404            if !self.key_reference_schemas.is_empty() {
405                f.write_str(" KEY REFERENCES (");
406                for (i, schema) in self.key_reference_schemas.iter().enumerate() {
407                    if i > 0 {
408                        f.write_str(", ");
409                    }
410                    f.write_str("'");
411                    f.write_node(&display::escape_single_quote_string(schema));
412                    f.write_str("'");
413                }
414                f.write_str(")");
415            }
416        }
417        f.write_str(" VALUE SCHEMA '");
418        f.write_node(&display::escape_single_quote_string(&self.value_schema));
419        f.write_str("'");
420        if !self.value_reference_schemas.is_empty() {
421            f.write_str(" VALUE REFERENCES (");
422            for (i, schema) in self.value_reference_schemas.iter().enumerate() {
423                if i > 0 {
424                    f.write_str(", ");
425                }
426                f.write_str("'");
427                f.write_node(&display::escape_single_quote_string(schema));
428                f.write_str("'");
429            }
430            f.write_str(")");
431        }
432    }
433}
434impl_display!(CsrSeedAvro);
435
436#[derive(Debug, Clone, PartialEq, Eq, Hash)]
437pub struct CsrSeedProtobuf {
438    pub key: Option<CsrSeedProtobufSchema>,
439    pub value: CsrSeedProtobufSchema,
440}
441
442impl AstDisplay for CsrSeedProtobuf {
443    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
444        f.write_str("SEED");
445        if let Some(key) = &self.key {
446            f.write_str(" KEY ");
447            f.write_node(key);
448        }
449        f.write_str(" VALUE ");
450        f.write_node(&self.value);
451    }
452}
453impl_display!(CsrSeedProtobuf);
454
455#[derive(Debug, Clone, PartialEq, Eq, Hash)]
456pub struct CsrSeedProtobufSchema {
457    // Hex encoded string.
458    pub schema: String,
459    pub message_name: String,
460}
461impl AstDisplay for CsrSeedProtobufSchema {
462    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
463        f.write_str("SCHEMA '");
464        f.write_str(&display::escape_single_quote_string(&self.schema));
465        f.write_str("' MESSAGE '");
466        f.write_str(&self.message_name);
467        f.write_str("'");
468    }
469}
470impl_display!(CsrSeedProtobufSchema);
471
472#[derive(Debug, Clone, PartialEq, Eq, Hash)]
473pub enum FormatSpecifier<T: AstInfo> {
474    /// `CREATE SOURCE/SINK .. FORMAT`
475    Bare(Format<T>),
476    /// `CREATE SOURCE/SINK .. KEY FORMAT .. VALUE FORMAT`
477    KeyValue { key: Format<T>, value: Format<T> },
478}
479
480impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
481    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
482        match self {
483            FormatSpecifier::Bare(format) => {
484                f.write_str("FORMAT ");
485                f.write_node(format)
486            }
487            FormatSpecifier::KeyValue { key, value } => {
488                f.write_str("KEY FORMAT ");
489                f.write_node(key);
490                f.write_str(" VALUE FORMAT ");
491                f.write_node(value);
492            }
493        }
494    }
495}
496impl_display_t!(FormatSpecifier);
497
498#[derive(Debug, Clone, PartialEq, Eq, Hash)]
499pub enum Format<T: AstInfo> {
500    Bytes,
501    Avro(AvroSchema<T>),
502    Protobuf(ProtobufSchema<T>),
503    Regex(String),
504    Csv {
505        columns: CsvColumns,
506        delimiter: char,
507    },
508    Json {
509        array: bool,
510    },
511    Text,
512}
513
514#[derive(Debug, Clone, PartialEq, Eq, Hash)]
515pub enum CsvColumns {
516    /// `WITH count COLUMNS`
517    Count(u64),
518    /// `WITH HEADER (ident, ...)?`: `names` is empty if there are no names specified
519    Header { names: Vec<Ident> },
520}
521
522impl AstDisplay for CsvColumns {
523    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
524        match self {
525            CsvColumns::Count(n) => {
526                f.write_str(n);
527                f.write_str(" COLUMNS")
528            }
529            CsvColumns::Header { names } => {
530                f.write_str("HEADER");
531                if !names.is_empty() {
532                    f.write_str(" (");
533                    f.write_node(&display::comma_separated(names));
534                    f.write_str(")");
535                }
536            }
537        }
538    }
539}
540
541#[derive(Debug, Clone, PartialEq, Eq, Hash)]
542pub enum SourceIncludeMetadata {
543    Key {
544        alias: Option<Ident>,
545    },
546    Timestamp {
547        alias: Option<Ident>,
548    },
549    Partition {
550        alias: Option<Ident>,
551    },
552    Offset {
553        alias: Option<Ident>,
554    },
555    Headers {
556        alias: Option<Ident>,
557    },
558    Header {
559        key: String,
560        alias: Ident,
561        use_bytes: bool,
562    },
563}
564
565impl AstDisplay for SourceIncludeMetadata {
566    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
567        let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
568            if let Some(alias) = alias {
569                f.write_str(" AS ");
570                f.write_node(alias);
571            }
572        };
573
574        match self {
575            SourceIncludeMetadata::Key { alias } => {
576                f.write_str("KEY");
577                print_alias(f, alias);
578            }
579            SourceIncludeMetadata::Timestamp { alias } => {
580                f.write_str("TIMESTAMP");
581                print_alias(f, alias);
582            }
583            SourceIncludeMetadata::Partition { alias } => {
584                f.write_str("PARTITION");
585                print_alias(f, alias);
586            }
587            SourceIncludeMetadata::Offset { alias } => {
588                f.write_str("OFFSET");
589                print_alias(f, alias);
590            }
591            SourceIncludeMetadata::Headers { alias } => {
592                f.write_str("HEADERS");
593                print_alias(f, alias);
594            }
595            SourceIncludeMetadata::Header {
596                alias,
597                key,
598                use_bytes,
599            } => {
600                f.write_str("HEADER '");
601                f.write_str(&display::escape_single_quote_string(key));
602                f.write_str("'");
603                print_alias(f, &Some(alias.clone()));
604                if *use_bytes {
605                    f.write_str(" BYTES");
606                }
607            }
608        }
609    }
610}
611impl_display!(SourceIncludeMetadata);
612
613#[derive(Debug, Clone, PartialEq, Eq, Hash)]
614pub enum SourceErrorPolicy {
615    Inline {
616        /// The alias to use for the error column. If unspecified will be `error`.
617        alias: Option<Ident>,
618    },
619}
620
621impl AstDisplay for SourceErrorPolicy {
622    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
623        match self {
624            Self::Inline { alias } => {
625                f.write_str("INLINE");
626                if let Some(alias) = alias {
627                    f.write_str(" AS ");
628                    f.write_node(alias);
629                }
630            }
631        }
632    }
633}
634impl_display!(SourceErrorPolicy);
635
636#[derive(Debug, Clone, PartialEq, Eq, Hash)]
637pub enum SourceEnvelope {
638    None,
639    Debezium,
640    Upsert {
641        value_decode_err_policy: Vec<SourceErrorPolicy>,
642    },
643    CdcV2,
644}
645
646impl SourceEnvelope {
647    /// `true` iff Materialize is expected to crash or exhibit UB
648    /// when attempting to ingest data starting at an offset other than zero.
649    pub fn requires_all_input(&self) -> bool {
650        match self {
651            SourceEnvelope::None => false,
652            SourceEnvelope::Debezium => false,
653            SourceEnvelope::Upsert { .. } => false,
654            SourceEnvelope::CdcV2 => true,
655        }
656    }
657}
658
659impl AstDisplay for SourceEnvelope {
660    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
661        match self {
662            Self::None => {
663                // this is unreachable as long as the default is None, but include it in case we ever change that
664                f.write_str("NONE");
665            }
666            Self::Debezium => {
667                f.write_str("DEBEZIUM");
668            }
669            Self::Upsert {
670                value_decode_err_policy,
671            } => {
672                if value_decode_err_policy.is_empty() {
673                    f.write_str("UPSERT");
674                } else {
675                    f.write_str("UPSERT (VALUE DECODING ERRORS = (");
676                    f.write_node(&display::comma_separated(value_decode_err_policy));
677                    f.write_str("))")
678                }
679            }
680            Self::CdcV2 => {
681                f.write_str("MATERIALIZE");
682            }
683        }
684    }
685}
686impl_display!(SourceEnvelope);
687
688#[derive(Debug, Clone, PartialEq, Eq, Hash)]
689pub enum SinkEnvelope {
690    Debezium,
691    Upsert,
692}
693
694impl AstDisplay for SinkEnvelope {
695    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
696        match self {
697            Self::Upsert => {
698                f.write_str("UPSERT");
699            }
700            Self::Debezium => {
701                f.write_str("DEBEZIUM");
702            }
703        }
704    }
705}
706impl_display!(SinkEnvelope);
707
708#[derive(Debug, Clone, PartialEq, Eq, Hash)]
709pub enum IcebergSinkMode {
710    Upsert,
711}
712
713impl AstDisplay for IcebergSinkMode {
714    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
715        match self {
716            Self::Upsert => {
717                f.write_str("UPSERT");
718            }
719        }
720    }
721}
722impl_display!(IcebergSinkMode);
723
724#[derive(Debug, Clone, PartialEq, Eq, Hash)]
725pub enum SubscribeOutput<T: AstInfo> {
726    Diffs,
727    WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
728    EnvelopeUpsert { key_columns: Vec<Ident> },
729    EnvelopeDebezium { key_columns: Vec<Ident> },
730}
731
732impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
733    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
734        match self {
735            Self::Diffs => {}
736            Self::WithinTimestampOrderBy { order_by } => {
737                f.write_str(" WITHIN TIMESTAMP ORDER BY ");
738                f.write_node(&display::comma_separated(order_by));
739            }
740            Self::EnvelopeUpsert { key_columns } => {
741                f.write_str(" ENVELOPE UPSERT (KEY (");
742                f.write_node(&display::comma_separated(key_columns));
743                f.write_str("))");
744            }
745            Self::EnvelopeDebezium { key_columns } => {
746                f.write_str(" ENVELOPE DEBEZIUM (KEY (");
747                f.write_node(&display::comma_separated(key_columns));
748                f.write_str("))");
749            }
750        }
751    }
752}
753impl_display_t!(SubscribeOutput);
754
755impl<T: AstInfo> AstDisplay for Format<T> {
756    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
757        match self {
758            Self::Bytes => f.write_str("BYTES"),
759            Self::Avro(inner) => {
760                f.write_str("AVRO ");
761                f.write_node(inner);
762            }
763            Self::Protobuf(inner) => {
764                f.write_str("PROTOBUF ");
765                f.write_node(inner);
766            }
767            Self::Regex(regex) => {
768                f.write_str("REGEX '");
769                f.write_node(&display::escape_single_quote_string(regex));
770                f.write_str("'");
771            }
772            Self::Csv { columns, delimiter } => {
773                f.write_str("CSV WITH ");
774                f.write_node(columns);
775
776                if *delimiter != ',' {
777                    f.write_str(" DELIMITED BY '");
778                    f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
779                    f.write_str("'");
780                }
781            }
782            Self::Json { array } => {
783                f.write_str("JSON");
784                if *array {
785                    f.write_str(" ARRAY");
786                }
787            }
788            Self::Text => f.write_str("TEXT"),
789        }
790    }
791}
792impl_display_t!(Format);
793
794// All connection options are bundled together to allow us to parse `ALTER
795// CONNECTION` without specifying the type of connection we're altering. Il faut
796// souffrir pour être belle.
797#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
798pub enum ConnectionOptionName {
799    AccessKeyId,
800    AssumeRoleArn,
801    AssumeRoleSessionName,
802    AvailabilityZones,
803    AwsConnection,
804    AwsPrivatelink,
805    Broker,
806    Brokers,
807    Credential,
808    Database,
809    Endpoint,
810    Host,
811    Password,
812    Port,
813    ProgressTopic,
814    ProgressTopicReplicationFactor,
815    PublicKey1,
816    PublicKey2,
817    Region,
818    SaslMechanisms,
819    SaslPassword,
820    SaslUsername,
821    Scope,
822    SecretAccessKey,
823    SecurityProtocol,
824    ServiceName,
825    SshTunnel,
826    SslCertificate,
827    SslCertificateAuthority,
828    SslKey,
829    SslMode,
830    SessionToken,
831    CatalogType,
832    Url,
833    User,
834    Warehouse,
835}
836
837impl AstDisplay for ConnectionOptionName {
838    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
839        f.write_str(match self {
840            ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
841            ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
842            ConnectionOptionName::AwsConnection => "AWS CONNECTION",
843            ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
844            ConnectionOptionName::Broker => "BROKER",
845            ConnectionOptionName::Brokers => "BROKERS",
846            ConnectionOptionName::Credential => "CREDENTIAL",
847            ConnectionOptionName::Database => "DATABASE",
848            ConnectionOptionName::Endpoint => "ENDPOINT",
849            ConnectionOptionName::Host => "HOST",
850            ConnectionOptionName::Password => "PASSWORD",
851            ConnectionOptionName::Port => "PORT",
852            ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
853            ConnectionOptionName::ProgressTopicReplicationFactor => {
854                "PROGRESS TOPIC REPLICATION FACTOR"
855            }
856            ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
857            ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
858            ConnectionOptionName::Region => "REGION",
859            ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
860            ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
861            ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
862            ConnectionOptionName::SaslPassword => "SASL PASSWORD",
863            ConnectionOptionName::SaslUsername => "SASL USERNAME",
864            ConnectionOptionName::Scope => "SCOPE",
865            ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
866            ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
867            ConnectionOptionName::ServiceName => "SERVICE NAME",
868            ConnectionOptionName::SshTunnel => "SSH TUNNEL",
869            ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
870            ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
871            ConnectionOptionName::SslKey => "SSL KEY",
872            ConnectionOptionName::SslMode => "SSL MODE",
873            ConnectionOptionName::SessionToken => "SESSION TOKEN",
874            ConnectionOptionName::CatalogType => "CATALOG TYPE",
875            ConnectionOptionName::Url => "URL",
876            ConnectionOptionName::User => "USER",
877            ConnectionOptionName::Warehouse => "WAREHOUSE",
878        })
879    }
880}
881impl_display!(ConnectionOptionName);
882
883impl WithOptionName for ConnectionOptionName {
884    /// # WARNING
885    ///
886    /// Whenever implementing this trait consider very carefully whether or not
887    /// this value could contain sensitive user data. If you're uncertain, err
888    /// on the conservative side and return `true`.
889    fn redact_value(&self) -> bool {
890        match self {
891            ConnectionOptionName::AccessKeyId
892            | ConnectionOptionName::AvailabilityZones
893            | ConnectionOptionName::AwsConnection
894            | ConnectionOptionName::AwsPrivatelink
895            | ConnectionOptionName::Broker
896            | ConnectionOptionName::Brokers
897            | ConnectionOptionName::Credential
898            | ConnectionOptionName::Database
899            | ConnectionOptionName::Endpoint
900            | ConnectionOptionName::Host
901            | ConnectionOptionName::Password
902            | ConnectionOptionName::Port
903            | ConnectionOptionName::ProgressTopic
904            | ConnectionOptionName::ProgressTopicReplicationFactor
905            | ConnectionOptionName::PublicKey1
906            | ConnectionOptionName::PublicKey2
907            | ConnectionOptionName::Region
908            | ConnectionOptionName::AssumeRoleArn
909            | ConnectionOptionName::AssumeRoleSessionName
910            | ConnectionOptionName::SaslMechanisms
911            | ConnectionOptionName::SaslPassword
912            | ConnectionOptionName::SaslUsername
913            | ConnectionOptionName::Scope
914            | ConnectionOptionName::SecurityProtocol
915            | ConnectionOptionName::SecretAccessKey
916            | ConnectionOptionName::ServiceName
917            | ConnectionOptionName::SshTunnel
918            | ConnectionOptionName::SslCertificate
919            | ConnectionOptionName::SslCertificateAuthority
920            | ConnectionOptionName::SslKey
921            | ConnectionOptionName::SslMode
922            | ConnectionOptionName::SessionToken
923            | ConnectionOptionName::CatalogType
924            | ConnectionOptionName::Url
925            | ConnectionOptionName::User
926            | ConnectionOptionName::Warehouse => false,
927        }
928    }
929}
930
931#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
932/// An option in a `CREATE CONNECTION`.
933pub struct ConnectionOption<T: AstInfo> {
934    pub name: ConnectionOptionName,
935    pub value: Option<WithOptionValue<T>>,
936}
937impl_display_for_with_option!(ConnectionOption);
938impl_display_t!(ConnectionOption);
939
940#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
941pub enum CreateConnectionType {
942    Aws,
943    AwsPrivatelink,
944    Kafka,
945    Csr,
946    Postgres,
947    Ssh,
948    SqlServer,
949    MySql,
950    IcebergCatalog,
951}
952
953impl AstDisplay for CreateConnectionType {
954    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
955        match self {
956            Self::Kafka => {
957                f.write_str("KAFKA");
958            }
959            Self::Csr => {
960                f.write_str("CONFLUENT SCHEMA REGISTRY");
961            }
962            Self::Postgres => {
963                f.write_str("POSTGRES");
964            }
965            Self::Aws => {
966                f.write_str("AWS");
967            }
968            Self::AwsPrivatelink => {
969                f.write_str("AWS PRIVATELINK");
970            }
971            Self::Ssh => {
972                f.write_str("SSH TUNNEL");
973            }
974            Self::SqlServer => {
975                f.write_str("SQL SERVER");
976            }
977            Self::MySql => {
978                f.write_str("MYSQL");
979            }
980            Self::IcebergCatalog => {
981                f.write_str("ICEBERG CATALOG");
982            }
983        }
984    }
985}
986impl_display!(CreateConnectionType);
987
988#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
989pub enum CreateConnectionOptionName {
990    Validate,
991}
992
993impl AstDisplay for CreateConnectionOptionName {
994    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
995        f.write_str(match self {
996            CreateConnectionOptionName::Validate => "VALIDATE",
997        })
998    }
999}
1000impl_display!(CreateConnectionOptionName);
1001
1002impl WithOptionName for CreateConnectionOptionName {
1003    /// # WARNING
1004    ///
1005    /// Whenever implementing this trait consider very carefully whether or not
1006    /// this value could contain sensitive user data. If you're uncertain, err
1007    /// on the conservative side and return `true`.
1008    fn redact_value(&self) -> bool {
1009        match self {
1010            CreateConnectionOptionName::Validate => false,
1011        }
1012    }
1013}
1014
1015#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1016/// An option in a `CREATE CONNECTION...` statement.
1017pub struct CreateConnectionOption<T: AstInfo> {
1018    pub name: CreateConnectionOptionName,
1019    pub value: Option<WithOptionValue<T>>,
1020}
1021impl_display_for_with_option!(CreateConnectionOption);
1022impl_display_t!(CreateConnectionOption);
1023
1024#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1025pub enum KafkaSourceConfigOptionName {
1026    GroupIdPrefix,
1027    Topic,
1028    TopicMetadataRefreshInterval,
1029    StartTimestamp,
1030    StartOffset,
1031}
1032
1033impl AstDisplay for KafkaSourceConfigOptionName {
1034    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1035        f.write_str(match self {
1036            KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
1037            KafkaSourceConfigOptionName::Topic => "TOPIC",
1038            KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
1039                "TOPIC METADATA REFRESH INTERVAL"
1040            }
1041            KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
1042            KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
1043        })
1044    }
1045}
1046impl_display!(KafkaSourceConfigOptionName);
1047
1048impl WithOptionName for KafkaSourceConfigOptionName {
1049    /// # WARNING
1050    ///
1051    /// Whenever implementing this trait consider very carefully whether or not
1052    /// this value could contain sensitive user data. If you're uncertain, err
1053    /// on the conservative side and return `true`.
1054    fn redact_value(&self) -> bool {
1055        match self {
1056            KafkaSourceConfigOptionName::GroupIdPrefix
1057            | KafkaSourceConfigOptionName::Topic
1058            | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1059            | KafkaSourceConfigOptionName::StartOffset
1060            | KafkaSourceConfigOptionName::StartTimestamp => false,
1061        }
1062    }
1063}
1064
1065#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1066pub struct KafkaSourceConfigOption<T: AstInfo> {
1067    pub name: KafkaSourceConfigOptionName,
1068    pub value: Option<WithOptionValue<T>>,
1069}
1070impl_display_for_with_option!(KafkaSourceConfigOption);
1071impl_display_t!(KafkaSourceConfigOption);
1072
1073#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1074pub enum KafkaSinkConfigOptionName {
1075    CompressionType,
1076    PartitionBy,
1077    ProgressGroupIdPrefix,
1078    Topic,
1079    TransactionalIdPrefix,
1080    LegacyIds,
1081    TopicConfig,
1082    TopicMetadataRefreshInterval,
1083    TopicPartitionCount,
1084    TopicReplicationFactor,
1085}
1086
1087impl AstDisplay for KafkaSinkConfigOptionName {
1088    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1089        f.write_str(match self {
1090            KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1091            KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1092            KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1093            KafkaSinkConfigOptionName::Topic => "TOPIC",
1094            KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1095            KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1096            KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1097            KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1098                "TOPIC METADATA REFRESH INTERVAL"
1099            }
1100            KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1101            KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1102        })
1103    }
1104}
1105impl_display!(KafkaSinkConfigOptionName);
1106
1107impl WithOptionName for KafkaSinkConfigOptionName {
1108    /// # WARNING
1109    ///
1110    /// Whenever implementing this trait consider very carefully whether or not
1111    /// this value could contain sensitive user data. If you're uncertain, err
1112    /// on the conservative side and return `true`.
1113    fn redact_value(&self) -> bool {
1114        match self {
1115            KafkaSinkConfigOptionName::CompressionType
1116            | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1117            | KafkaSinkConfigOptionName::Topic
1118            | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1119            | KafkaSinkConfigOptionName::TransactionalIdPrefix
1120            | KafkaSinkConfigOptionName::LegacyIds
1121            | KafkaSinkConfigOptionName::TopicConfig
1122            | KafkaSinkConfigOptionName::TopicPartitionCount
1123            | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1124            KafkaSinkConfigOptionName::PartitionBy => true,
1125        }
1126    }
1127}
1128
1129#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1130pub struct KafkaSinkConfigOption<T: AstInfo> {
1131    pub name: KafkaSinkConfigOptionName,
1132    pub value: Option<WithOptionValue<T>>,
1133}
1134impl_display_for_with_option!(KafkaSinkConfigOption);
1135impl_display_t!(KafkaSinkConfigOption);
1136
1137#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1138pub enum IcebergSinkConfigOptionName {
1139    Namespace,
1140    Table,
1141}
1142
1143impl AstDisplay for IcebergSinkConfigOptionName {
1144    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1145        f.write_str(match self {
1146            IcebergSinkConfigOptionName::Namespace => "NAMESPACE",
1147            IcebergSinkConfigOptionName::Table => "TABLE",
1148        })
1149    }
1150}
1151impl_display!(IcebergSinkConfigOptionName);
1152
1153impl WithOptionName for IcebergSinkConfigOptionName {
1154    /// # WARNING
1155    ///
1156    /// Whenever implementing this trait consider very carefully whether or not
1157    /// this value could contain sensitive user data. If you're uncertain, err
1158    /// on the conservative side and return `true`.
1159    fn redact_value(&self) -> bool {
1160        match self {
1161            IcebergSinkConfigOptionName::Namespace | IcebergSinkConfigOptionName::Table => false,
1162        }
1163    }
1164}
1165
1166#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1167pub struct IcebergSinkConfigOption<T: AstInfo> {
1168    pub name: IcebergSinkConfigOptionName,
1169    pub value: Option<WithOptionValue<T>>,
1170}
1171impl_display_for_with_option!(IcebergSinkConfigOption);
1172impl_display_t!(IcebergSinkConfigOption);
1173
1174#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1175pub enum PgConfigOptionName {
1176    /// Hex encoded string of binary serialization of
1177    /// `mz_storage_types::sources::postgres::PostgresSourcePublicationDetails`
1178    Details,
1179    /// The name of the publication to sync
1180    Publication,
1181    /// Columns whose types you want to unconditionally format as text
1182    /// NOTE(roshan): This value is kept around to allow round-tripping a
1183    /// `CREATE SOURCE` statement while we still allow creating implicit
1184    /// subsources from `CREATE SOURCE`, but will be removed once
1185    /// fully deprecating that feature and forcing users to use explicit
1186    /// `CREATE TABLE .. FROM SOURCE` statements
1187    TextColumns,
1188    /// Columns you want to exclude
1189    /// NOTE: This value is kept around to allow round-tripping a
1190    /// `CREATE SOURCE` statement while we still allow creating implicit
1191    /// subsources from `CREATE SOURCE`, but will be removed once
1192    /// fully deprecating that feature and forcing users to use explicit
1193    /// `CREATE TABLE .. FROM SOURCE` statements
1194    ExcludeColumns,
1195}
1196
1197impl AstDisplay for PgConfigOptionName {
1198    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1199        f.write_str(match self {
1200            PgConfigOptionName::Details => "DETAILS",
1201            PgConfigOptionName::Publication => "PUBLICATION",
1202            PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1203            PgConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1204        })
1205    }
1206}
1207impl_display!(PgConfigOptionName);
1208
1209impl WithOptionName for PgConfigOptionName {
1210    /// # WARNING
1211    ///
1212    /// Whenever implementing this trait consider very carefully whether or not
1213    /// this value could contain sensitive user data. If you're uncertain, err
1214    /// on the conservative side and return `true`.
1215    fn redact_value(&self) -> bool {
1216        match self {
1217            PgConfigOptionName::Details
1218            | PgConfigOptionName::Publication
1219            | PgConfigOptionName::TextColumns
1220            | PgConfigOptionName::ExcludeColumns => false,
1221        }
1222    }
1223}
1224
1225#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1226/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1227pub struct PgConfigOption<T: AstInfo> {
1228    pub name: PgConfigOptionName,
1229    pub value: Option<WithOptionValue<T>>,
1230}
1231impl_display_for_with_option!(PgConfigOption);
1232impl_display_t!(PgConfigOption);
1233
1234#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1235pub enum MySqlConfigOptionName {
1236    /// Hex encoded string of binary serialization of
1237    /// `mz_storage_types::sources::mysql::MySqlSourceDetails`
1238    Details,
1239    /// Columns whose types you want to unconditionally format as text
1240    /// NOTE(roshan): This value is kept around to allow round-tripping a
1241    /// `CREATE SOURCE` statement while we still allow creating implicit
1242    /// subsources from `CREATE SOURCE`, but will be removed once
1243    /// fully deprecating that feature and forcing users to use explicit
1244    /// `CREATE TABLE .. FROM SOURCE` statements
1245    TextColumns,
1246    /// Columns you want to exclude
1247    /// NOTE(roshan): This value is kept around to allow round-tripping a
1248    /// `CREATE SOURCE` statement while we still allow creating implicit
1249    /// subsources from `CREATE SOURCE`, but will be removed once
1250    /// fully deprecating that feature and forcing users to use explicit
1251    /// `CREATE TABLE .. FROM SOURCE` statements
1252    ExcludeColumns,
1253}
1254
1255impl AstDisplay for MySqlConfigOptionName {
1256    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1257        f.write_str(match self {
1258            MySqlConfigOptionName::Details => "DETAILS",
1259            MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1260            MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1261        })
1262    }
1263}
1264impl_display!(MySqlConfigOptionName);
1265
1266impl WithOptionName for MySqlConfigOptionName {
1267    /// # WARNING
1268    ///
1269    /// Whenever implementing this trait consider very carefully whether or not
1270    /// this value could contain sensitive user data. If you're uncertain, err
1271    /// on the conservative side and return `true`.
1272    fn redact_value(&self) -> bool {
1273        match self {
1274            MySqlConfigOptionName::Details
1275            | MySqlConfigOptionName::TextColumns
1276            | MySqlConfigOptionName::ExcludeColumns => false,
1277        }
1278    }
1279}
1280
1281#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1282/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1283pub struct MySqlConfigOption<T: AstInfo> {
1284    pub name: MySqlConfigOptionName,
1285    pub value: Option<WithOptionValue<T>>,
1286}
1287impl_display_for_with_option!(MySqlConfigOption);
1288impl_display_t!(MySqlConfigOption);
1289
1290#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1291pub enum SqlServerConfigOptionName {
1292    /// Hex encoded string of binary serialization of
1293    /// `mz_storage_types::sources::sql_server::SqlServerSourceDetails`.
1294    Details,
1295    /// Columns whose types you want to unconditionally format as text.
1296    ///
1297    /// NOTE(roshan): This value is kept around to allow round-tripping a
1298    /// `CREATE SOURCE` statement while we still allow creating implicit
1299    /// subsources from `CREATE SOURCE`, but will be removed once
1300    /// fully deprecating that feature and forcing users to use explicit
1301    /// `CREATE TABLE .. FROM SOURCE` statements
1302    TextColumns,
1303    /// Columns you want to exclude.
1304    ///
1305    /// NOTE(roshan): This value is kept around to allow round-tripping a
1306    /// `CREATE SOURCE` statement while we still allow creating implicit
1307    /// subsources from `CREATE SOURCE`, but will be removed once
1308    /// fully deprecating that feature and forcing users to use explicit
1309    /// `CREATE TABLE .. FROM SOURCE` statements
1310    ExcludeColumns,
1311}
1312
1313impl AstDisplay for SqlServerConfigOptionName {
1314    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1315        f.write_str(match self {
1316            SqlServerConfigOptionName::Details => "DETAILS",
1317            SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1318            SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1319        })
1320    }
1321}
1322impl_display!(SqlServerConfigOptionName);
1323
1324impl WithOptionName for SqlServerConfigOptionName {
1325    /// # WARNING
1326    ///
1327    /// Whenever implementing this trait consider very carefully whether or not
1328    /// this value could contain sensitive user data. If you're uncertain, err
1329    /// on the conservative side and return `true`.
1330    fn redact_value(&self) -> bool {
1331        match self {
1332            SqlServerConfigOptionName::Details
1333            | SqlServerConfigOptionName::TextColumns
1334            | SqlServerConfigOptionName::ExcludeColumns => false,
1335        }
1336    }
1337}
1338
1339#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1340/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1341pub struct SqlServerConfigOption<T: AstInfo> {
1342    pub name: SqlServerConfigOptionName,
1343    pub value: Option<WithOptionValue<T>>,
1344}
1345impl_display_for_with_option!(SqlServerConfigOption);
1346impl_display_t!(SqlServerConfigOption);
1347
1348#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1349pub enum CreateSourceConnection<T: AstInfo> {
1350    Kafka {
1351        connection: T::ItemName,
1352        options: Vec<KafkaSourceConfigOption<T>>,
1353    },
1354    Postgres {
1355        connection: T::ItemName,
1356        options: Vec<PgConfigOption<T>>,
1357    },
1358    SqlServer {
1359        connection: T::ItemName,
1360        options: Vec<SqlServerConfigOption<T>>,
1361    },
1362    MySql {
1363        connection: T::ItemName,
1364        options: Vec<MySqlConfigOption<T>>,
1365    },
1366    LoadGenerator {
1367        generator: LoadGenerator,
1368        options: Vec<LoadGeneratorOption<T>>,
1369    },
1370}
1371
1372impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1373    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1374        match self {
1375            CreateSourceConnection::Kafka {
1376                connection,
1377                options,
1378            } => {
1379                f.write_str("KAFKA CONNECTION ");
1380                f.write_node(connection);
1381                if !options.is_empty() {
1382                    f.write_str(" (");
1383                    f.write_node(&display::comma_separated(options));
1384                    f.write_str(")");
1385                }
1386            }
1387            CreateSourceConnection::Postgres {
1388                connection,
1389                options,
1390            } => {
1391                f.write_str("POSTGRES CONNECTION ");
1392                f.write_node(connection);
1393                if !options.is_empty() {
1394                    f.write_str(" (");
1395                    f.write_node(&display::comma_separated(options));
1396                    f.write_str(")");
1397                }
1398            }
1399            CreateSourceConnection::SqlServer {
1400                connection,
1401                options,
1402            } => {
1403                f.write_str("SQL SERVER CONNECTION ");
1404                f.write_node(connection);
1405                if !options.is_empty() {
1406                    f.write_str(" (");
1407                    f.write_node(&display::comma_separated(options));
1408                    f.write_str(")");
1409                }
1410            }
1411            CreateSourceConnection::MySql {
1412                connection,
1413                options,
1414            } => {
1415                f.write_str("MYSQL CONNECTION ");
1416                f.write_node(connection);
1417                if !options.is_empty() {
1418                    f.write_str(" (");
1419                    f.write_node(&display::comma_separated(options));
1420                    f.write_str(")");
1421                }
1422            }
1423            CreateSourceConnection::LoadGenerator { generator, options } => {
1424                f.write_str("LOAD GENERATOR ");
1425                f.write_node(generator);
1426                if !options.is_empty() {
1427                    f.write_str(" (");
1428                    f.write_node(&display::comma_separated(options));
1429                    f.write_str(")");
1430                }
1431            }
1432        }
1433    }
1434}
1435impl_display_t!(CreateSourceConnection);
1436
1437#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1438pub enum LoadGenerator {
1439    Clock,
1440    Counter,
1441    Marketing,
1442    Auction,
1443    Datums,
1444    Tpch,
1445    KeyValue,
1446}
1447
1448impl AstDisplay for LoadGenerator {
1449    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1450        match self {
1451            Self::Counter => f.write_str("COUNTER"),
1452            Self::Clock => f.write_str("CLOCK"),
1453            Self::Marketing => f.write_str("MARKETING"),
1454            Self::Auction => f.write_str("AUCTION"),
1455            Self::Datums => f.write_str("DATUMS"),
1456            Self::Tpch => f.write_str("TPCH"),
1457            Self::KeyValue => f.write_str("KEY VALUE"),
1458        }
1459    }
1460}
1461impl_display!(LoadGenerator);
1462
1463impl LoadGenerator {
1464    /// Corresponds with the same mapping on the `LoadGenerator` enum defined in
1465    /// src/storage-types/src/sources/load_generator.rs, but re-defined here for
1466    /// cases where we only have the AST representation. This can be removed once
1467    /// the `ast_rewrite_sources_to_tables` migration is removed.
1468    pub fn schema_name(&self) -> &'static str {
1469        match self {
1470            LoadGenerator::Counter => "counter",
1471            LoadGenerator::Clock => "clock",
1472            LoadGenerator::Marketing => "marketing",
1473            LoadGenerator::Auction => "auction",
1474            LoadGenerator::Datums => "datums",
1475            LoadGenerator::Tpch => "tpch",
1476            LoadGenerator::KeyValue => "key_value",
1477        }
1478    }
1479}
1480
1481#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1482pub enum LoadGeneratorOptionName {
1483    ScaleFactor,
1484    TickInterval,
1485    AsOf,
1486    UpTo,
1487    MaxCardinality,
1488    Keys,
1489    SnapshotRounds,
1490    TransactionalSnapshot,
1491    ValueSize,
1492    Seed,
1493    Partitions,
1494    BatchSize,
1495}
1496
1497impl AstDisplay for LoadGeneratorOptionName {
1498    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1499        f.write_str(match self {
1500            LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1501            LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1502            LoadGeneratorOptionName::AsOf => "AS OF",
1503            LoadGeneratorOptionName::UpTo => "UP TO",
1504            LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1505            LoadGeneratorOptionName::Keys => "KEYS",
1506            LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1507            LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1508            LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1509            LoadGeneratorOptionName::Seed => "SEED",
1510            LoadGeneratorOptionName::Partitions => "PARTITIONS",
1511            LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1512        })
1513    }
1514}
1515impl_display!(LoadGeneratorOptionName);
1516
1517impl WithOptionName for LoadGeneratorOptionName {
1518    /// # WARNING
1519    ///
1520    /// Whenever implementing this trait consider very carefully whether or not
1521    /// this value could contain sensitive user data. If you're uncertain, err
1522    /// on the conservative side and return `true`.
1523    fn redact_value(&self) -> bool {
1524        match self {
1525            LoadGeneratorOptionName::ScaleFactor
1526            | LoadGeneratorOptionName::TickInterval
1527            | LoadGeneratorOptionName::AsOf
1528            | LoadGeneratorOptionName::UpTo
1529            | LoadGeneratorOptionName::MaxCardinality
1530            | LoadGeneratorOptionName::Keys
1531            | LoadGeneratorOptionName::SnapshotRounds
1532            | LoadGeneratorOptionName::TransactionalSnapshot
1533            | LoadGeneratorOptionName::ValueSize
1534            | LoadGeneratorOptionName::Partitions
1535            | LoadGeneratorOptionName::BatchSize
1536            | LoadGeneratorOptionName::Seed => false,
1537        }
1538    }
1539}
1540
1541#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1542/// An option in a `CREATE CONNECTION...SSH`.
1543pub struct LoadGeneratorOption<T: AstInfo> {
1544    pub name: LoadGeneratorOptionName,
1545    pub value: Option<WithOptionValue<T>>,
1546}
1547impl_display_for_with_option!(LoadGeneratorOption);
1548impl_display_t!(LoadGeneratorOption);
1549
1550#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1551pub enum CreateSinkConnection<T: AstInfo> {
1552    Kafka {
1553        connection: T::ItemName,
1554        options: Vec<KafkaSinkConfigOption<T>>,
1555        key: Option<SinkKey>,
1556        headers: Option<Ident>,
1557    },
1558    Iceberg {
1559        connection: T::ItemName,
1560        aws_connection: T::ItemName,
1561        key: Option<SinkKey>,
1562        options: Vec<IcebergSinkConfigOption<T>>,
1563    },
1564}
1565
1566impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1567    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1568        match self {
1569            CreateSinkConnection::Kafka {
1570                connection,
1571                options,
1572                key,
1573                headers,
1574            } => {
1575                f.write_str("KAFKA CONNECTION ");
1576                f.write_node(connection);
1577                if !options.is_empty() {
1578                    f.write_str(" (");
1579                    f.write_node(&display::comma_separated(options));
1580                    f.write_str(")");
1581                }
1582                if let Some(key) = key.as_ref() {
1583                    f.write_str(" ");
1584                    f.write_node(key);
1585                }
1586                if let Some(headers) = headers {
1587                    f.write_str(" HEADERS ");
1588                    f.write_node(headers);
1589                }
1590            }
1591            CreateSinkConnection::Iceberg {
1592                connection,
1593                aws_connection,
1594                key,
1595                options,
1596            } => {
1597                f.write_str("ICEBERG CATALOG CONNECTION ");
1598                f.write_node(connection);
1599                if !options.is_empty() {
1600                    f.write_str(" (");
1601                    f.write_node(&display::comma_separated(options));
1602                    f.write_str(")");
1603                }
1604                f.write_str(" USING AWS CONNECTION ");
1605                f.write_node(aws_connection);
1606                if let Some(key) = key.as_ref() {
1607                    f.write_str(" ");
1608                    f.write_node(key);
1609                }
1610            }
1611        }
1612    }
1613}
1614impl_display_t!(CreateSinkConnection);
1615
1616#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1617pub struct SinkKey {
1618    pub key_columns: Vec<Ident>,
1619    pub not_enforced: bool,
1620}
1621
1622impl AstDisplay for SinkKey {
1623    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1624        f.write_str("KEY (");
1625        f.write_node(&display::comma_separated(&self.key_columns));
1626        f.write_str(")");
1627        if self.not_enforced {
1628            f.write_str(" NOT ENFORCED");
1629        }
1630    }
1631}
1632
1633/// A table-level constraint, specified in a `CREATE TABLE` or an
1634/// `ALTER TABLE ADD <constraint>` statement.
1635#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1636pub enum TableConstraint<T: AstInfo> {
1637    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE (NULLS NOT DISTINCT)? } (<columns>)`
1638    Unique {
1639        name: Option<Ident>,
1640        columns: Vec<Ident>,
1641        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
1642        is_primary: bool,
1643        // Where this constraint treats each NULL value as distinct; only available on `UNIQUE`
1644        // constraints.
1645        nulls_not_distinct: bool,
1646    },
1647    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
1648    /// REFERENCES <foreign_table> (<referred_columns>)`)
1649    ForeignKey {
1650        name: Option<Ident>,
1651        columns: Vec<Ident>,
1652        foreign_table: T::ItemName,
1653        referred_columns: Vec<Ident>,
1654    },
1655    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
1656    Check {
1657        name: Option<Ident>,
1658        expr: Box<Expr<T>>,
1659    },
1660}
1661
1662impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1663    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1664        match self {
1665            TableConstraint::Unique {
1666                name,
1667                columns,
1668                is_primary,
1669                nulls_not_distinct,
1670            } => {
1671                f.write_node(&display_constraint_name(name));
1672                if *is_primary {
1673                    f.write_str("PRIMARY KEY ");
1674                } else {
1675                    f.write_str("UNIQUE ");
1676                    if *nulls_not_distinct {
1677                        f.write_str("NULLS NOT DISTINCT ");
1678                    }
1679                }
1680                f.write_str("(");
1681                f.write_node(&display::comma_separated(columns));
1682                f.write_str(")");
1683            }
1684            TableConstraint::ForeignKey {
1685                name,
1686                columns,
1687                foreign_table,
1688                referred_columns,
1689            } => {
1690                f.write_node(&display_constraint_name(name));
1691                f.write_str("FOREIGN KEY (");
1692                f.write_node(&display::comma_separated(columns));
1693                f.write_str(") REFERENCES ");
1694                f.write_node(foreign_table);
1695                f.write_str("(");
1696                f.write_node(&display::comma_separated(referred_columns));
1697                f.write_str(")");
1698            }
1699            TableConstraint::Check { name, expr } => {
1700                f.write_node(&display_constraint_name(name));
1701                f.write_str("CHECK (");
1702                f.write_node(&expr);
1703                f.write_str(")");
1704            }
1705        }
1706    }
1707}
1708impl_display_t!(TableConstraint);
1709
1710/// A key constraint, specified in a `CREATE SOURCE`.
1711#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1712pub enum KeyConstraint {
1713    // PRIMARY KEY (<columns>) NOT ENFORCED
1714    PrimaryKeyNotEnforced { columns: Vec<Ident> },
1715}
1716
1717impl AstDisplay for KeyConstraint {
1718    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1719        match self {
1720            KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1721                f.write_str("PRIMARY KEY ");
1722                f.write_str("(");
1723                f.write_node(&display::comma_separated(columns));
1724                f.write_str(") ");
1725                f.write_str("NOT ENFORCED");
1726            }
1727        }
1728    }
1729}
1730impl_display!(KeyConstraint);
1731
1732#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1733pub enum CreateSourceOptionName {
1734    TimestampInterval,
1735    RetainHistory,
1736}
1737
1738impl AstDisplay for CreateSourceOptionName {
1739    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1740        f.write_str(match self {
1741            CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1742            CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1743        })
1744    }
1745}
1746impl_display!(CreateSourceOptionName);
1747
1748impl WithOptionName for CreateSourceOptionName {
1749    /// # WARNING
1750    ///
1751    /// Whenever implementing this trait consider very carefully whether or not
1752    /// this value could contain sensitive user data. If you're uncertain, err
1753    /// on the conservative side and return `true`.
1754    fn redact_value(&self) -> bool {
1755        match self {
1756            CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1757                false
1758            }
1759        }
1760    }
1761}
1762
1763#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1764/// An option in a `CREATE SOURCE...` statement.
1765pub struct CreateSourceOption<T: AstInfo> {
1766    pub name: CreateSourceOptionName,
1767    pub value: Option<WithOptionValue<T>>,
1768}
1769impl_display_for_with_option!(CreateSourceOption);
1770impl_display_t!(CreateSourceOption);
1771
1772/// SQL column definition
1773#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1774pub struct ColumnDef<T: AstInfo> {
1775    pub name: Ident,
1776    pub data_type: T::DataType,
1777    pub collation: Option<UnresolvedItemName>,
1778    pub options: Vec<ColumnOptionDef<T>>,
1779}
1780
1781impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1782    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1783        f.write_node(&self.name);
1784        f.write_str(" ");
1785        f.write_node(&self.data_type);
1786        if let Some(collation) = &self.collation {
1787            f.write_str(" COLLATE ");
1788            f.write_node(collation);
1789        }
1790        for option in &self.options {
1791            f.write_str(" ");
1792            f.write_node(option);
1793        }
1794    }
1795}
1796impl_display_t!(ColumnDef);
1797
1798/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
1799///
1800/// Note that implementations are substantially more permissive than the ANSI
1801/// specification on what order column options can be presented in, and whether
1802/// they are allowed to be named. The specification distinguishes between
1803/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
1804/// and can appear in any order, and other options (DEFAULT, GENERATED), which
1805/// cannot be named and must appear in a fixed order. PostgreSQL, however,
1806/// allows preceding any option with `CONSTRAINT <name>`, even those that are
1807/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
1808/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
1809/// NOT NULL constraints (the last of which is in violation of the spec).
1810///
1811/// For maximum flexibility, we don't distinguish between constraint and
1812/// non-constraint options, lumping them all together under the umbrella of
1813/// "column options," and we allow any column option to be named.
1814#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1815pub struct ColumnOptionDef<T: AstInfo> {
1816    pub name: Option<Ident>,
1817    pub option: ColumnOption<T>,
1818}
1819
1820impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1821    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1822        f.write_node(&display_constraint_name(&self.name));
1823        f.write_node(&self.option);
1824    }
1825}
1826impl_display_t!(ColumnOptionDef);
1827
1828/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
1829/// TABLE` statement.
1830#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1831pub enum ColumnOption<T: AstInfo> {
1832    /// `NULL`
1833    Null,
1834    /// `NOT NULL`
1835    NotNull,
1836    /// `DEFAULT <restricted-expr>`
1837    Default(Expr<T>),
1838    /// `{ PRIMARY KEY | UNIQUE }`
1839    Unique { is_primary: bool },
1840    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
1841    /// <foreign_table> (<referred_columns>)`).
1842    ForeignKey {
1843        foreign_table: UnresolvedItemName,
1844        referred_columns: Vec<Ident>,
1845    },
1846    /// `CHECK (<expr>)`
1847    Check(Expr<T>),
1848    /// `VERSION <action> <version>`
1849    Versioned {
1850        action: ColumnVersioned,
1851        version: Version,
1852    },
1853}
1854
1855impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1856    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1857        use ColumnOption::*;
1858        match self {
1859            Null => f.write_str("NULL"),
1860            NotNull => f.write_str("NOT NULL"),
1861            Default(expr) => {
1862                f.write_str("DEFAULT ");
1863                f.write_node(expr);
1864            }
1865            Unique { is_primary } => {
1866                if *is_primary {
1867                    f.write_str("PRIMARY KEY");
1868                } else {
1869                    f.write_str("UNIQUE");
1870                }
1871            }
1872            ForeignKey {
1873                foreign_table,
1874                referred_columns,
1875            } => {
1876                f.write_str("REFERENCES ");
1877                f.write_node(foreign_table);
1878                f.write_str(" (");
1879                f.write_node(&display::comma_separated(referred_columns));
1880                f.write_str(")");
1881            }
1882            Check(expr) => {
1883                f.write_str("CHECK (");
1884                f.write_node(expr);
1885                f.write_str(")");
1886            }
1887            Versioned { action, version } => {
1888                f.write_str("VERSION ");
1889                f.write_node(action);
1890                f.write_str(" ");
1891                f.write_node(version);
1892            }
1893        }
1894    }
1895}
1896impl_display_t!(ColumnOption);
1897
1898#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1899pub enum ColumnVersioned {
1900    Added,
1901}
1902
1903impl AstDisplay for ColumnVersioned {
1904    fn fmt<W>(&self, f: &mut AstFormatter<W>)
1905    where
1906        W: fmt::Write,
1907    {
1908        match self {
1909            // TODO(alter_table): Support dropped columns.
1910            ColumnVersioned::Added => f.write_str("ADDED"),
1911        }
1912    }
1913}
1914impl_display!(ColumnVersioned);
1915
1916fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
1917    struct ConstraintName<'a>(&'a Option<Ident>);
1918    impl<'a> AstDisplay for ConstraintName<'a> {
1919        fn fmt<W>(&self, f: &mut AstFormatter<W>)
1920        where
1921            W: fmt::Write,
1922        {
1923            if let Some(name) = self.0 {
1924                f.write_str("CONSTRAINT ");
1925                f.write_node(name);
1926                f.write_str(" ");
1927            }
1928        }
1929    }
1930    ConstraintName(name)
1931}