Skip to main content

mz_sql_parser/ast/defs/
ddl.rs

1// Copyright 2018 sqlparser-rs contributors. All rights reserved.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// This file is derived from the sqlparser-rs project, available at
5// https://github.com/andygrove/sqlparser-rs. It was incorporated
6// directly into Materialize on December 21, 2019.
7//
8// Licensed under the Apache License, Version 2.0 (the "License");
9// you may not use this file except in compliance with the License.
10// You may obtain a copy of the License in the LICENSE file at the
11// root of this repository, or online at
12//
13//     http://www.apache.org/licenses/LICENSE-2.0
14//
15// Unless required by applicable law or agreed to in writing, software
16// distributed under the License is distributed on an "AS IS" BASIS,
17// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18// See the License for the specific language governing permissions and
19// limitations under the License.
20
21//! AST types specific to CREATE/ALTER variants of [crate::ast::Statement]
22//! (commonly referred to as Data Definition Language, or DDL)
23
24use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28    AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33    /// The `ASSERT NOT NULL [=] <ident>` option.
34    AssertNotNull,
35    PartitionBy,
36    RetainHistory,
37    /// The `REFRESH [=] ...` option.
38    Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43        match self {
44            MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45            MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46            MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47            MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48        }
49    }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53    /// # WARNING
54    ///
55    /// Whenever implementing this trait consider very carefully whether or not
56    /// this value could contain sensitive user data. If you're uncertain, err
57    /// on the conservative side and return `true`.
58    fn redact_value(&self) -> bool {
59        match self {
60            MaterializedViewOptionName::AssertNotNull
61            | MaterializedViewOptionName::PartitionBy
62            | MaterializedViewOptionName::RetainHistory
63            | MaterializedViewOptionName::Refresh => false,
64        }
65    }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70    pub name: MaterializedViewOptionName,
71    pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, Hash)]
76pub struct Schema {
77    pub schema: String,
78}
79
80impl AstDisplay for Schema {
81    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
82        f.write_str("SCHEMA '");
83        f.write_node(&display::escape_single_quote_string(&self.schema));
84        f.write_str("'");
85    }
86}
87impl_display!(Schema);
88
89#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
90pub enum AvroSchemaOptionName {
91    /// The `CONFLUENT WIRE FORMAT [=] <bool>` option.
92    ConfluentWireFormat,
93}
94
95impl AstDisplay for AvroSchemaOptionName {
96    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
97        match self {
98            AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
99        }
100    }
101}
102
103impl WithOptionName for AvroSchemaOptionName {
104    /// # WARNING
105    ///
106    /// Whenever implementing this trait consider very carefully whether or not
107    /// this value could contain sensitive user data. If you're uncertain, err
108    /// on the conservative side and return `true`.
109    fn redact_value(&self) -> bool {
110        match self {
111            Self::ConfluentWireFormat => false,
112        }
113    }
114}
115
116#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
117pub struct AvroSchemaOption<T: AstInfo> {
118    pub name: AvroSchemaOptionName,
119    pub value: Option<WithOptionValue<T>>,
120}
121impl_display_for_with_option!(AvroSchemaOption);
122
123#[derive(Debug, Clone, PartialEq, Eq, Hash)]
124pub enum AvroSchema<T: AstInfo> {
125    Csr {
126        csr_connection: CsrConnectionAvro<T>,
127    },
128    InlineSchema {
129        schema: Schema,
130        with_options: Vec<AvroSchemaOption<T>>,
131    },
132}
133
134impl<T: AstInfo> AstDisplay for AvroSchema<T> {
135    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
136        match self {
137            Self::Csr { csr_connection } => {
138                f.write_node(csr_connection);
139            }
140            Self::InlineSchema {
141                schema,
142                with_options,
143            } => {
144                f.write_str("USING ");
145                schema.fmt(f);
146                if !with_options.is_empty() {
147                    f.write_str(" (");
148                    f.write_node(&display::comma_separated(with_options));
149                    f.write_str(")");
150                }
151            }
152        }
153    }
154}
155impl_display_t!(AvroSchema);
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum ProtobufSchema<T: AstInfo> {
159    Csr {
160        csr_connection: CsrConnectionProtobuf<T>,
161    },
162    InlineSchema {
163        message_name: String,
164        schema: Schema,
165    },
166}
167
168impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
169    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
170        match self {
171            Self::Csr { csr_connection } => {
172                f.write_node(csr_connection);
173            }
174            Self::InlineSchema {
175                message_name,
176                schema,
177            } => {
178                f.write_str("MESSAGE '");
179                f.write_node(&display::escape_single_quote_string(message_name));
180                f.write_str("' USING ");
181                f.write_str(schema);
182            }
183        }
184    }
185}
186impl_display_t!(ProtobufSchema);
187
188#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
189pub enum CsrConfigOptionName<T: AstInfo> {
190    AvroKeyFullname,
191    AvroValueFullname,
192    NullDefaults,
193    AvroDocOn(AvroDocOn<T>),
194    KeyCompatibilityLevel,
195    ValueCompatibilityLevel,
196}
197
198impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
199    /// # WARNING
200    ///
201    /// Whenever implementing this trait consider very carefully whether or not
202    /// this value could contain sensitive user data. If you're uncertain, err
203    /// on the conservative side and return `true`.
204    fn redact_value(&self) -> bool {
205        match self {
206            Self::AvroKeyFullname
207            | Self::AvroValueFullname
208            | Self::NullDefaults
209            | Self::AvroDocOn(_)
210            | Self::KeyCompatibilityLevel
211            | Self::ValueCompatibilityLevel => false,
212        }
213    }
214}
215
216#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
217pub struct AvroDocOn<T: AstInfo> {
218    pub identifier: DocOnIdentifier<T>,
219    pub for_schema: DocOnSchema,
220}
221#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
222pub enum DocOnSchema {
223    KeyOnly,
224    ValueOnly,
225    All,
226}
227
228#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
229pub enum DocOnIdentifier<T: AstInfo> {
230    Column(ColumnName<T>),
231    Type(T::ItemName),
232}
233
234impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
235    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
236        match &self.for_schema {
237            DocOnSchema::KeyOnly => f.write_str("KEY "),
238            DocOnSchema::ValueOnly => f.write_str("VALUE "),
239            DocOnSchema::All => {}
240        }
241        match &self.identifier {
242            DocOnIdentifier::Column(name) => {
243                f.write_str("DOC ON COLUMN ");
244                f.write_node(name);
245            }
246            DocOnIdentifier::Type(name) => {
247                f.write_str("DOC ON TYPE ");
248                f.write_node(name);
249            }
250        }
251    }
252}
253impl_display_t!(AvroDocOn);
254
255impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
256    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
257        match self {
258            CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
259            CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
260            CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
261            CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
262            CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
263            CsrConfigOptionName::ValueCompatibilityLevel => {
264                f.write_str("VALUE COMPATIBILITY LEVEL")
265            }
266        }
267    }
268}
269impl_display_t!(CsrConfigOptionName);
270
271#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
272/// An option in a `{FROM|INTO} CONNECTION ...` statement.
273pub struct CsrConfigOption<T: AstInfo> {
274    pub name: CsrConfigOptionName<T>,
275    pub value: Option<WithOptionValue<T>>,
276}
277impl_display_for_with_option!(CsrConfigOption);
278impl_display_t!(CsrConfigOption);
279
280#[derive(Debug, Clone, PartialEq, Eq, Hash)]
281pub struct CsrConnection<T: AstInfo> {
282    pub connection: T::ItemName,
283    pub options: Vec<CsrConfigOption<T>>,
284}
285
286impl<T: AstInfo> AstDisplay for CsrConnection<T> {
287    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
288        f.write_str("CONNECTION ");
289        f.write_node(&self.connection);
290        if !self.options.is_empty() {
291            f.write_str(" (");
292            f.write_node(&display::comma_separated(&self.options));
293            f.write_str(")");
294        }
295    }
296}
297impl_display_t!(CsrConnection);
298
299#[derive(Debug, Clone, PartialEq, Eq, Hash)]
300pub enum ReaderSchemaSelectionStrategy {
301    Latest,
302    Inline(String),
303    ById(i32),
304}
305
306impl Default for ReaderSchemaSelectionStrategy {
307    fn default() -> Self {
308        Self::Latest
309    }
310}
311
312#[derive(Debug, Clone, PartialEq, Eq, Hash)]
313pub struct CsrConnectionAvro<T: AstInfo> {
314    pub connection: CsrConnection<T>,
315    pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
316    pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
317    pub seed: Option<CsrSeedAvro>,
318}
319
320impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
321    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
322        f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
323        f.write_node(&self.connection);
324        if let Some(seed) = &self.seed {
325            f.write_str(" ");
326            f.write_node(seed);
327        }
328    }
329}
330impl_display_t!(CsrConnectionAvro);
331
332#[derive(Debug, Clone, PartialEq, Eq, Hash)]
333pub struct CsrConnectionProtobuf<T: AstInfo> {
334    pub connection: CsrConnection<T>,
335    pub seed: Option<CsrSeedProtobuf>,
336}
337
338impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
339    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
340        f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
341        f.write_node(&self.connection);
342
343        if let Some(seed) = &self.seed {
344            f.write_str(" ");
345            f.write_node(seed);
346        }
347    }
348}
349impl_display_t!(CsrConnectionProtobuf);
350
351#[derive(Debug, Clone, PartialEq, Eq, Hash)]
352pub struct CsrSeedAvro {
353    pub key_schema: Option<String>,
354    pub value_schema: String,
355    /// Reference schemas for the key schema, in dependency order.
356    /// Populated during purification by fetching from the schema registry.
357    pub key_reference_schemas: Vec<String>,
358    /// Reference schemas for the value schema, in dependency order.
359    /// Populated during purification by fetching from the schema registry.
360    pub value_reference_schemas: Vec<String>,
361}
362
363impl AstDisplay for CsrSeedAvro {
364    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
365        f.write_str("SEED");
366        if let Some(key_schema) = &self.key_schema {
367            f.write_str(" KEY SCHEMA '");
368            f.write_node(&display::escape_single_quote_string(key_schema));
369            f.write_str("'");
370            if !self.key_reference_schemas.is_empty() {
371                f.write_str(" KEY REFERENCES (");
372                for (i, schema) in self.key_reference_schemas.iter().enumerate() {
373                    if i > 0 {
374                        f.write_str(", ");
375                    }
376                    f.write_str("'");
377                    f.write_node(&display::escape_single_quote_string(schema));
378                    f.write_str("'");
379                }
380                f.write_str(")");
381            }
382        }
383        f.write_str(" VALUE SCHEMA '");
384        f.write_node(&display::escape_single_quote_string(&self.value_schema));
385        f.write_str("'");
386        if !self.value_reference_schemas.is_empty() {
387            f.write_str(" VALUE REFERENCES (");
388            for (i, schema) in self.value_reference_schemas.iter().enumerate() {
389                if i > 0 {
390                    f.write_str(", ");
391                }
392                f.write_str("'");
393                f.write_node(&display::escape_single_quote_string(schema));
394                f.write_str("'");
395            }
396            f.write_str(")");
397        }
398    }
399}
400impl_display!(CsrSeedAvro);
401
402#[derive(Debug, Clone, PartialEq, Eq, Hash)]
403pub struct CsrSeedProtobuf {
404    pub key: Option<CsrSeedProtobufSchema>,
405    pub value: CsrSeedProtobufSchema,
406}
407
408impl AstDisplay for CsrSeedProtobuf {
409    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
410        f.write_str("SEED");
411        if let Some(key) = &self.key {
412            f.write_str(" KEY ");
413            f.write_node(key);
414        }
415        f.write_str(" VALUE ");
416        f.write_node(&self.value);
417    }
418}
419impl_display!(CsrSeedProtobuf);
420
421#[derive(Debug, Clone, PartialEq, Eq, Hash)]
422pub struct CsrSeedProtobufSchema {
423    // Hex encoded string.
424    pub schema: String,
425    pub message_name: String,
426}
427impl AstDisplay for CsrSeedProtobufSchema {
428    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
429        f.write_str("SCHEMA '");
430        f.write_str(&display::escape_single_quote_string(&self.schema));
431        f.write_str("' MESSAGE '");
432        f.write_str(&self.message_name);
433        f.write_str("'");
434    }
435}
436impl_display!(CsrSeedProtobufSchema);
437
438#[derive(Debug, Clone, PartialEq, Eq, Hash)]
439pub enum FormatSpecifier<T: AstInfo> {
440    /// `CREATE SOURCE/SINK .. FORMAT`
441    Bare(Format<T>),
442    /// `CREATE SOURCE/SINK .. KEY FORMAT .. VALUE FORMAT`
443    KeyValue { key: Format<T>, value: Format<T> },
444}
445
446impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
447    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
448        match self {
449            FormatSpecifier::Bare(format) => {
450                f.write_str("FORMAT ");
451                f.write_node(format)
452            }
453            FormatSpecifier::KeyValue { key, value } => {
454                f.write_str("KEY FORMAT ");
455                f.write_node(key);
456                f.write_str(" VALUE FORMAT ");
457                f.write_node(value);
458            }
459        }
460    }
461}
462impl_display_t!(FormatSpecifier);
463
464#[derive(Debug, Clone, PartialEq, Eq, Hash)]
465pub enum Format<T: AstInfo> {
466    Bytes,
467    Avro(AvroSchema<T>),
468    Protobuf(ProtobufSchema<T>),
469    Regex(String),
470    Csv {
471        columns: CsvColumns,
472        delimiter: char,
473    },
474    Json {
475        array: bool,
476    },
477    Text,
478}
479
480#[derive(Debug, Clone, PartialEq, Eq, Hash)]
481pub enum CsvColumns {
482    /// `WITH count COLUMNS`
483    Count(u64),
484    /// `WITH HEADER (ident, ...)?`: `names` is empty if there are no names specified
485    Header { names: Vec<Ident> },
486}
487
488impl AstDisplay for CsvColumns {
489    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
490        match self {
491            CsvColumns::Count(n) => {
492                f.write_str(n);
493                f.write_str(" COLUMNS")
494            }
495            CsvColumns::Header { names } => {
496                f.write_str("HEADER");
497                if !names.is_empty() {
498                    f.write_str(" (");
499                    f.write_node(&display::comma_separated(names));
500                    f.write_str(")");
501                }
502            }
503        }
504    }
505}
506
507#[derive(Debug, Clone, PartialEq, Eq, Hash)]
508pub enum SourceIncludeMetadata {
509    Key {
510        alias: Option<Ident>,
511    },
512    Timestamp {
513        alias: Option<Ident>,
514    },
515    Partition {
516        alias: Option<Ident>,
517    },
518    Offset {
519        alias: Option<Ident>,
520    },
521    Headers {
522        alias: Option<Ident>,
523    },
524    Header {
525        key: String,
526        alias: Ident,
527        use_bytes: bool,
528    },
529}
530
531impl AstDisplay for SourceIncludeMetadata {
532    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
533        let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
534            if let Some(alias) = alias {
535                f.write_str(" AS ");
536                f.write_node(alias);
537            }
538        };
539
540        match self {
541            SourceIncludeMetadata::Key { alias } => {
542                f.write_str("KEY");
543                print_alias(f, alias);
544            }
545            SourceIncludeMetadata::Timestamp { alias } => {
546                f.write_str("TIMESTAMP");
547                print_alias(f, alias);
548            }
549            SourceIncludeMetadata::Partition { alias } => {
550                f.write_str("PARTITION");
551                print_alias(f, alias);
552            }
553            SourceIncludeMetadata::Offset { alias } => {
554                f.write_str("OFFSET");
555                print_alias(f, alias);
556            }
557            SourceIncludeMetadata::Headers { alias } => {
558                f.write_str("HEADERS");
559                print_alias(f, alias);
560            }
561            SourceIncludeMetadata::Header {
562                alias,
563                key,
564                use_bytes,
565            } => {
566                f.write_str("HEADER '");
567                f.write_str(&display::escape_single_quote_string(key));
568                f.write_str("'");
569                print_alias(f, &Some(alias.clone()));
570                if *use_bytes {
571                    f.write_str(" BYTES");
572                }
573            }
574        }
575    }
576}
577impl_display!(SourceIncludeMetadata);
578
579#[derive(Debug, Clone, PartialEq, Eq, Hash)]
580pub enum SourceErrorPolicy {
581    Inline {
582        /// The alias to use for the error column. If unspecified will be `error`.
583        alias: Option<Ident>,
584    },
585}
586
587impl AstDisplay for SourceErrorPolicy {
588    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
589        match self {
590            Self::Inline { alias } => {
591                f.write_str("INLINE");
592                if let Some(alias) = alias {
593                    f.write_str(" AS ");
594                    f.write_node(alias);
595                }
596            }
597        }
598    }
599}
600impl_display!(SourceErrorPolicy);
601
602#[derive(Debug, Clone, PartialEq, Eq, Hash)]
603pub enum SourceEnvelope {
604    None,
605    Debezium,
606    Upsert {
607        value_decode_err_policy: Vec<SourceErrorPolicy>,
608    },
609    CdcV2,
610}
611
612impl SourceEnvelope {
613    /// `true` iff Materialize is expected to crash or exhibit UB
614    /// when attempting to ingest data starting at an offset other than zero.
615    pub fn requires_all_input(&self) -> bool {
616        match self {
617            SourceEnvelope::None => false,
618            SourceEnvelope::Debezium => false,
619            SourceEnvelope::Upsert { .. } => false,
620            SourceEnvelope::CdcV2 => true,
621        }
622    }
623}
624
625impl AstDisplay for SourceEnvelope {
626    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
627        match self {
628            Self::None => {
629                // this is unreachable as long as the default is None, but include it in case we ever change that
630                f.write_str("NONE");
631            }
632            Self::Debezium => {
633                f.write_str("DEBEZIUM");
634            }
635            Self::Upsert {
636                value_decode_err_policy,
637            } => {
638                if value_decode_err_policy.is_empty() {
639                    f.write_str("UPSERT");
640                } else {
641                    f.write_str("UPSERT (VALUE DECODING ERRORS = (");
642                    f.write_node(&display::comma_separated(value_decode_err_policy));
643                    f.write_str("))")
644                }
645            }
646            Self::CdcV2 => {
647                f.write_str("MATERIALIZE");
648            }
649        }
650    }
651}
652impl_display!(SourceEnvelope);
653
654#[derive(Debug, Clone, PartialEq, Eq, Hash)]
655pub enum SinkEnvelope {
656    Debezium,
657    Upsert,
658}
659
660impl AstDisplay for SinkEnvelope {
661    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
662        match self {
663            Self::Upsert => {
664                f.write_str("UPSERT");
665            }
666            Self::Debezium => {
667                f.write_str("DEBEZIUM");
668            }
669        }
670    }
671}
672impl_display!(SinkEnvelope);
673
674#[derive(Debug, Clone, PartialEq, Eq, Hash)]
675pub enum IcebergSinkMode {
676    Upsert,
677    Append,
678}
679
680impl AstDisplay for IcebergSinkMode {
681    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
682        match self {
683            Self::Upsert => {
684                f.write_str("UPSERT");
685            }
686            Self::Append => {
687                f.write_str("APPEND");
688            }
689        }
690    }
691}
692impl_display!(IcebergSinkMode);
693
694#[derive(Debug, Clone, PartialEq, Eq, Hash)]
695pub enum SubscribeOutput<T: AstInfo> {
696    Diffs,
697    WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
698    EnvelopeUpsert { key_columns: Vec<Ident> },
699    EnvelopeDebezium { key_columns: Vec<Ident> },
700}
701
702impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
703    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
704        match self {
705            Self::Diffs => {}
706            Self::WithinTimestampOrderBy { order_by } => {
707                f.write_str(" WITHIN TIMESTAMP ORDER BY ");
708                f.write_node(&display::comma_separated(order_by));
709            }
710            Self::EnvelopeUpsert { key_columns } => {
711                f.write_str(" ENVELOPE UPSERT (KEY (");
712                f.write_node(&display::comma_separated(key_columns));
713                f.write_str("))");
714            }
715            Self::EnvelopeDebezium { key_columns } => {
716                f.write_str(" ENVELOPE DEBEZIUM (KEY (");
717                f.write_node(&display::comma_separated(key_columns));
718                f.write_str("))");
719            }
720        }
721    }
722}
723impl_display_t!(SubscribeOutput);
724
725impl<T: AstInfo> AstDisplay for Format<T> {
726    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
727        match self {
728            Self::Bytes => f.write_str("BYTES"),
729            Self::Avro(inner) => {
730                f.write_str("AVRO ");
731                f.write_node(inner);
732            }
733            Self::Protobuf(inner) => {
734                f.write_str("PROTOBUF ");
735                f.write_node(inner);
736            }
737            Self::Regex(regex) => {
738                f.write_str("REGEX '");
739                f.write_node(&display::escape_single_quote_string(regex));
740                f.write_str("'");
741            }
742            Self::Csv { columns, delimiter } => {
743                f.write_str("CSV WITH ");
744                f.write_node(columns);
745
746                if *delimiter != ',' {
747                    f.write_str(" DELIMITED BY '");
748                    f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
749                    f.write_str("'");
750                }
751            }
752            Self::Json { array } => {
753                f.write_str("JSON");
754                if *array {
755                    f.write_str(" ARRAY");
756                }
757            }
758            Self::Text => f.write_str("TEXT"),
759        }
760    }
761}
762impl_display_t!(Format);
763
764// All connection options are bundled together to allow us to parse `ALTER
765// CONNECTION` without specifying the type of connection we're altering. Il faut
766// souffrir pour être belle.
767#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
768pub enum ConnectionOptionName {
769    AccessKeyId,
770    AssumeRoleArn,
771    AssumeRoleSessionName,
772    AvailabilityZones,
773    AwsConnection,
774    AwsPrivatelink,
775    Broker,
776    Brokers,
777    Credential,
778    Database,
779    Endpoint,
780    Host,
781    Password,
782    Port,
783    ProgressTopic,
784    ProgressTopicReplicationFactor,
785    PublicKey1,
786    PublicKey2,
787    Region,
788    SaslMechanisms,
789    SaslPassword,
790    SaslUsername,
791    Scope,
792    SecretAccessKey,
793    SecurityProtocol,
794    ServiceName,
795    SshTunnel,
796    SslCertificate,
797    SslCertificateAuthority,
798    SslKey,
799    SslMode,
800    SessionToken,
801    CatalogType,
802    Url,
803    User,
804    Warehouse,
805}
806
807impl AstDisplay for ConnectionOptionName {
808    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
809        f.write_str(match self {
810            ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
811            ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
812            ConnectionOptionName::AwsConnection => "AWS CONNECTION",
813            ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
814            ConnectionOptionName::Broker => "BROKER",
815            ConnectionOptionName::Brokers => "BROKERS",
816            ConnectionOptionName::Credential => "CREDENTIAL",
817            ConnectionOptionName::Database => "DATABASE",
818            ConnectionOptionName::Endpoint => "ENDPOINT",
819            ConnectionOptionName::Host => "HOST",
820            ConnectionOptionName::Password => "PASSWORD",
821            ConnectionOptionName::Port => "PORT",
822            ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
823            ConnectionOptionName::ProgressTopicReplicationFactor => {
824                "PROGRESS TOPIC REPLICATION FACTOR"
825            }
826            ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
827            ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
828            ConnectionOptionName::Region => "REGION",
829            ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
830            ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
831            ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
832            ConnectionOptionName::SaslPassword => "SASL PASSWORD",
833            ConnectionOptionName::SaslUsername => "SASL USERNAME",
834            ConnectionOptionName::Scope => "SCOPE",
835            ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
836            ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
837            ConnectionOptionName::ServiceName => "SERVICE NAME",
838            ConnectionOptionName::SshTunnel => "SSH TUNNEL",
839            ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
840            ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
841            ConnectionOptionName::SslKey => "SSL KEY",
842            ConnectionOptionName::SslMode => "SSL MODE",
843            ConnectionOptionName::SessionToken => "SESSION TOKEN",
844            ConnectionOptionName::CatalogType => "CATALOG TYPE",
845            ConnectionOptionName::Url => "URL",
846            ConnectionOptionName::User => "USER",
847            ConnectionOptionName::Warehouse => "WAREHOUSE",
848        })
849    }
850}
851impl_display!(ConnectionOptionName);
852
853impl WithOptionName for ConnectionOptionName {
854    /// # WARNING
855    ///
856    /// Whenever implementing this trait consider very carefully whether or not
857    /// this value could contain sensitive user data. If you're uncertain, err
858    /// on the conservative side and return `true`.
859    fn redact_value(&self) -> bool {
860        match self {
861            ConnectionOptionName::AccessKeyId
862            | ConnectionOptionName::AvailabilityZones
863            | ConnectionOptionName::AwsConnection
864            | ConnectionOptionName::AwsPrivatelink
865            | ConnectionOptionName::Broker
866            | ConnectionOptionName::Brokers
867            | ConnectionOptionName::Credential
868            | ConnectionOptionName::Database
869            | ConnectionOptionName::Endpoint
870            | ConnectionOptionName::Host
871            | ConnectionOptionName::Password
872            | ConnectionOptionName::Port
873            | ConnectionOptionName::ProgressTopic
874            | ConnectionOptionName::ProgressTopicReplicationFactor
875            | ConnectionOptionName::PublicKey1
876            | ConnectionOptionName::PublicKey2
877            | ConnectionOptionName::Region
878            | ConnectionOptionName::AssumeRoleArn
879            | ConnectionOptionName::AssumeRoleSessionName
880            | ConnectionOptionName::SaslMechanisms
881            | ConnectionOptionName::SaslPassword
882            | ConnectionOptionName::SaslUsername
883            | ConnectionOptionName::Scope
884            | ConnectionOptionName::SecurityProtocol
885            | ConnectionOptionName::SecretAccessKey
886            | ConnectionOptionName::ServiceName
887            | ConnectionOptionName::SshTunnel
888            | ConnectionOptionName::SslCertificate
889            | ConnectionOptionName::SslCertificateAuthority
890            | ConnectionOptionName::SslKey
891            | ConnectionOptionName::SslMode
892            | ConnectionOptionName::SessionToken
893            | ConnectionOptionName::CatalogType
894            | ConnectionOptionName::Url
895            | ConnectionOptionName::User
896            | ConnectionOptionName::Warehouse => false,
897        }
898    }
899}
900
901#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
902/// An option in a `CREATE CONNECTION`.
903pub struct ConnectionOption<T: AstInfo> {
904    pub name: ConnectionOptionName,
905    pub value: Option<WithOptionValue<T>>,
906}
907impl_display_for_with_option!(ConnectionOption);
908impl_display_t!(ConnectionOption);
909
910#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
911pub enum CreateConnectionType {
912    Aws,
913    AwsPrivatelink,
914    Kafka,
915    Csr,
916    Postgres,
917    Ssh,
918    SqlServer,
919    MySql,
920    IcebergCatalog,
921}
922
923impl CreateConnectionType {
924    pub fn as_str(&self) -> &'static str {
925        match self {
926            Self::Kafka => "kafka",
927            Self::Csr => "confluent-schema-registry",
928            Self::Postgres => "postgres",
929            Self::Aws => "aws",
930            Self::AwsPrivatelink => "aws-privatelink",
931            Self::Ssh => "ssh-tunnel",
932            Self::MySql => "mysql",
933            Self::SqlServer => "sql-server",
934            Self::IcebergCatalog => "iceberg-catalog",
935        }
936    }
937}
938
939impl AstDisplay for CreateConnectionType {
940    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
941        match self {
942            Self::Kafka => {
943                f.write_str("KAFKA");
944            }
945            Self::Csr => {
946                f.write_str("CONFLUENT SCHEMA REGISTRY");
947            }
948            Self::Postgres => {
949                f.write_str("POSTGRES");
950            }
951            Self::Aws => {
952                f.write_str("AWS");
953            }
954            Self::AwsPrivatelink => {
955                f.write_str("AWS PRIVATELINK");
956            }
957            Self::Ssh => {
958                f.write_str("SSH TUNNEL");
959            }
960            Self::SqlServer => {
961                f.write_str("SQL SERVER");
962            }
963            Self::MySql => {
964                f.write_str("MYSQL");
965            }
966            Self::IcebergCatalog => {
967                f.write_str("ICEBERG CATALOG");
968            }
969        }
970    }
971}
972impl_display!(CreateConnectionType);
973
974#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
975pub enum CreateConnectionOptionName {
976    Validate,
977}
978
979impl AstDisplay for CreateConnectionOptionName {
980    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
981        f.write_str(match self {
982            CreateConnectionOptionName::Validate => "VALIDATE",
983        })
984    }
985}
986impl_display!(CreateConnectionOptionName);
987
988impl WithOptionName for CreateConnectionOptionName {
989    /// # WARNING
990    ///
991    /// Whenever implementing this trait consider very carefully whether or not
992    /// this value could contain sensitive user data. If you're uncertain, err
993    /// on the conservative side and return `true`.
994    fn redact_value(&self) -> bool {
995        match self {
996            CreateConnectionOptionName::Validate => false,
997        }
998    }
999}
1000
1001#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1002/// An option in a `CREATE CONNECTION...` statement.
1003pub struct CreateConnectionOption<T: AstInfo> {
1004    pub name: CreateConnectionOptionName,
1005    pub value: Option<WithOptionValue<T>>,
1006}
1007impl_display_for_with_option!(CreateConnectionOption);
1008impl_display_t!(CreateConnectionOption);
1009
1010#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1011pub enum KafkaSourceConfigOptionName {
1012    GroupIdPrefix,
1013    Topic,
1014    TopicMetadataRefreshInterval,
1015    StartTimestamp,
1016    StartOffset,
1017}
1018
1019impl AstDisplay for KafkaSourceConfigOptionName {
1020    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1021        f.write_str(match self {
1022            KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
1023            KafkaSourceConfigOptionName::Topic => "TOPIC",
1024            KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
1025                "TOPIC METADATA REFRESH INTERVAL"
1026            }
1027            KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
1028            KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
1029        })
1030    }
1031}
1032impl_display!(KafkaSourceConfigOptionName);
1033
1034impl WithOptionName for KafkaSourceConfigOptionName {
1035    /// # WARNING
1036    ///
1037    /// Whenever implementing this trait consider very carefully whether or not
1038    /// this value could contain sensitive user data. If you're uncertain, err
1039    /// on the conservative side and return `true`.
1040    fn redact_value(&self) -> bool {
1041        match self {
1042            KafkaSourceConfigOptionName::GroupIdPrefix
1043            | KafkaSourceConfigOptionName::Topic
1044            | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1045            | KafkaSourceConfigOptionName::StartOffset
1046            | KafkaSourceConfigOptionName::StartTimestamp => false,
1047        }
1048    }
1049}
1050
1051#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1052pub struct KafkaSourceConfigOption<T: AstInfo> {
1053    pub name: KafkaSourceConfigOptionName,
1054    pub value: Option<WithOptionValue<T>>,
1055}
1056impl_display_for_with_option!(KafkaSourceConfigOption);
1057impl_display_t!(KafkaSourceConfigOption);
1058
1059#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1060pub enum KafkaSinkConfigOptionName {
1061    CompressionType,
1062    PartitionBy,
1063    ProgressGroupIdPrefix,
1064    Topic,
1065    TransactionalIdPrefix,
1066    LegacyIds,
1067    TopicConfig,
1068    TopicMetadataRefreshInterval,
1069    TopicPartitionCount,
1070    TopicReplicationFactor,
1071}
1072
1073impl AstDisplay for KafkaSinkConfigOptionName {
1074    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1075        f.write_str(match self {
1076            KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1077            KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1078            KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1079            KafkaSinkConfigOptionName::Topic => "TOPIC",
1080            KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1081            KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1082            KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1083            KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1084                "TOPIC METADATA REFRESH INTERVAL"
1085            }
1086            KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1087            KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1088        })
1089    }
1090}
1091impl_display!(KafkaSinkConfigOptionName);
1092
1093impl WithOptionName for KafkaSinkConfigOptionName {
1094    /// # WARNING
1095    ///
1096    /// Whenever implementing this trait consider very carefully whether or not
1097    /// this value could contain sensitive user data. If you're uncertain, err
1098    /// on the conservative side and return `true`.
1099    fn redact_value(&self) -> bool {
1100        match self {
1101            KafkaSinkConfigOptionName::CompressionType
1102            | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1103            | KafkaSinkConfigOptionName::Topic
1104            | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1105            | KafkaSinkConfigOptionName::TransactionalIdPrefix
1106            | KafkaSinkConfigOptionName::LegacyIds
1107            | KafkaSinkConfigOptionName::TopicConfig
1108            | KafkaSinkConfigOptionName::TopicPartitionCount
1109            | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1110            KafkaSinkConfigOptionName::PartitionBy => true,
1111        }
1112    }
1113}
1114
1115#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1116pub struct KafkaSinkConfigOption<T: AstInfo> {
1117    pub name: KafkaSinkConfigOptionName,
1118    pub value: Option<WithOptionValue<T>>,
1119}
1120impl_display_for_with_option!(KafkaSinkConfigOption);
1121impl_display_t!(KafkaSinkConfigOption);
1122
1123#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1124pub enum IcebergSinkConfigOptionName {
1125    Namespace,
1126    Table,
1127}
1128
1129impl AstDisplay for IcebergSinkConfigOptionName {
1130    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1131        f.write_str(match self {
1132            IcebergSinkConfigOptionName::Namespace => "NAMESPACE",
1133            IcebergSinkConfigOptionName::Table => "TABLE",
1134        })
1135    }
1136}
1137impl_display!(IcebergSinkConfigOptionName);
1138
1139impl WithOptionName for IcebergSinkConfigOptionName {
1140    /// # WARNING
1141    ///
1142    /// Whenever implementing this trait consider very carefully whether or not
1143    /// this value could contain sensitive user data. If you're uncertain, err
1144    /// on the conservative side and return `true`.
1145    fn redact_value(&self) -> bool {
1146        match self {
1147            IcebergSinkConfigOptionName::Namespace | IcebergSinkConfigOptionName::Table => false,
1148        }
1149    }
1150}
1151
1152#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1153pub struct IcebergSinkConfigOption<T: AstInfo> {
1154    pub name: IcebergSinkConfigOptionName,
1155    pub value: Option<WithOptionValue<T>>,
1156}
1157impl_display_for_with_option!(IcebergSinkConfigOption);
1158impl_display_t!(IcebergSinkConfigOption);
1159
1160#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1161pub enum PgConfigOptionName {
1162    /// Hex encoded string of binary serialization of
1163    /// `mz_storage_types::sources::postgres::PostgresSourcePublicationDetails`
1164    Details,
1165    /// The name of the publication to sync
1166    Publication,
1167    /// Columns whose types you want to unconditionally format as text
1168    /// NOTE(roshan): This value is kept around to allow round-tripping a
1169    /// `CREATE SOURCE` statement while we still allow creating implicit
1170    /// subsources from `CREATE SOURCE`, but will be removed once
1171    /// fully deprecating that feature and forcing users to use explicit
1172    /// `CREATE TABLE .. FROM SOURCE` statements
1173    TextColumns,
1174    /// Columns you want to exclude
1175    /// NOTE: This value is kept around to allow round-tripping a
1176    /// `CREATE SOURCE` statement while we still allow creating implicit
1177    /// subsources from `CREATE SOURCE`, but will be removed once
1178    /// fully deprecating that feature and forcing users to use explicit
1179    /// `CREATE TABLE .. FROM SOURCE` statements
1180    ExcludeColumns,
1181}
1182
1183impl AstDisplay for PgConfigOptionName {
1184    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1185        f.write_str(match self {
1186            PgConfigOptionName::Details => "DETAILS",
1187            PgConfigOptionName::Publication => "PUBLICATION",
1188            PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1189            PgConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1190        })
1191    }
1192}
1193impl_display!(PgConfigOptionName);
1194
1195impl WithOptionName for PgConfigOptionName {
1196    /// # WARNING
1197    ///
1198    /// Whenever implementing this trait consider very carefully whether or not
1199    /// this value could contain sensitive user data. If you're uncertain, err
1200    /// on the conservative side and return `true`.
1201    fn redact_value(&self) -> bool {
1202        match self {
1203            PgConfigOptionName::Details
1204            | PgConfigOptionName::Publication
1205            | PgConfigOptionName::TextColumns
1206            | PgConfigOptionName::ExcludeColumns => false,
1207        }
1208    }
1209}
1210
1211#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1212/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1213pub struct PgConfigOption<T: AstInfo> {
1214    pub name: PgConfigOptionName,
1215    pub value: Option<WithOptionValue<T>>,
1216}
1217impl_display_for_with_option!(PgConfigOption);
1218impl_display_t!(PgConfigOption);
1219
1220#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1221pub enum MySqlConfigOptionName {
1222    /// Hex encoded string of binary serialization of
1223    /// `mz_storage_types::sources::mysql::MySqlSourceDetails`
1224    Details,
1225    /// Columns whose types you want to unconditionally format as text
1226    /// NOTE(roshan): This value is kept around to allow round-tripping a
1227    /// `CREATE SOURCE` statement while we still allow creating implicit
1228    /// subsources from `CREATE SOURCE`, but will be removed once
1229    /// fully deprecating that feature and forcing users to use explicit
1230    /// `CREATE TABLE .. FROM SOURCE` statements
1231    TextColumns,
1232    /// Columns you want to exclude
1233    /// NOTE(roshan): This value is kept around to allow round-tripping a
1234    /// `CREATE SOURCE` statement while we still allow creating implicit
1235    /// subsources from `CREATE SOURCE`, but will be removed once
1236    /// fully deprecating that feature and forcing users to use explicit
1237    /// `CREATE TABLE .. FROM SOURCE` statements
1238    ExcludeColumns,
1239}
1240
1241impl AstDisplay for MySqlConfigOptionName {
1242    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1243        f.write_str(match self {
1244            MySqlConfigOptionName::Details => "DETAILS",
1245            MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1246            MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1247        })
1248    }
1249}
1250impl_display!(MySqlConfigOptionName);
1251
1252impl WithOptionName for MySqlConfigOptionName {
1253    /// # WARNING
1254    ///
1255    /// Whenever implementing this trait consider very carefully whether or not
1256    /// this value could contain sensitive user data. If you're uncertain, err
1257    /// on the conservative side and return `true`.
1258    fn redact_value(&self) -> bool {
1259        match self {
1260            MySqlConfigOptionName::Details
1261            | MySqlConfigOptionName::TextColumns
1262            | MySqlConfigOptionName::ExcludeColumns => false,
1263        }
1264    }
1265}
1266
1267#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1268/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1269pub struct MySqlConfigOption<T: AstInfo> {
1270    pub name: MySqlConfigOptionName,
1271    pub value: Option<WithOptionValue<T>>,
1272}
1273impl_display_for_with_option!(MySqlConfigOption);
1274impl_display_t!(MySqlConfigOption);
1275
1276#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1277pub enum SqlServerConfigOptionName {
1278    /// Hex encoded string of binary serialization of
1279    /// `mz_storage_types::sources::sql_server::SqlServerSourceDetails`.
1280    Details,
1281    /// Columns whose types you want to unconditionally format as text.
1282    ///
1283    /// NOTE(roshan): This value is kept around to allow round-tripping a
1284    /// `CREATE SOURCE` statement while we still allow creating implicit
1285    /// subsources from `CREATE SOURCE`, but will be removed once
1286    /// fully deprecating that feature and forcing users to use explicit
1287    /// `CREATE TABLE .. FROM SOURCE` statements
1288    TextColumns,
1289    /// Columns you want to exclude.
1290    ///
1291    /// NOTE(roshan): This value is kept around to allow round-tripping a
1292    /// `CREATE SOURCE` statement while we still allow creating implicit
1293    /// subsources from `CREATE SOURCE`, but will be removed once
1294    /// fully deprecating that feature and forcing users to use explicit
1295    /// `CREATE TABLE .. FROM SOURCE` statements
1296    ExcludeColumns,
1297}
1298
1299impl AstDisplay for SqlServerConfigOptionName {
1300    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1301        f.write_str(match self {
1302            SqlServerConfigOptionName::Details => "DETAILS",
1303            SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1304            SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1305        })
1306    }
1307}
1308impl_display!(SqlServerConfigOptionName);
1309
1310impl WithOptionName for SqlServerConfigOptionName {
1311    /// # WARNING
1312    ///
1313    /// Whenever implementing this trait consider very carefully whether or not
1314    /// this value could contain sensitive user data. If you're uncertain, err
1315    /// on the conservative side and return `true`.
1316    fn redact_value(&self) -> bool {
1317        match self {
1318            SqlServerConfigOptionName::Details
1319            | SqlServerConfigOptionName::TextColumns
1320            | SqlServerConfigOptionName::ExcludeColumns => false,
1321        }
1322    }
1323}
1324
1325#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1326/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1327pub struct SqlServerConfigOption<T: AstInfo> {
1328    pub name: SqlServerConfigOptionName,
1329    pub value: Option<WithOptionValue<T>>,
1330}
1331impl_display_for_with_option!(SqlServerConfigOption);
1332impl_display_t!(SqlServerConfigOption);
1333
1334#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1335pub enum CreateSourceConnection<T: AstInfo> {
1336    Kafka {
1337        connection: T::ItemName,
1338        options: Vec<KafkaSourceConfigOption<T>>,
1339    },
1340    Postgres {
1341        connection: T::ItemName,
1342        options: Vec<PgConfigOption<T>>,
1343    },
1344    SqlServer {
1345        connection: T::ItemName,
1346        options: Vec<SqlServerConfigOption<T>>,
1347    },
1348    MySql {
1349        connection: T::ItemName,
1350        options: Vec<MySqlConfigOption<T>>,
1351    },
1352    LoadGenerator {
1353        generator: LoadGenerator,
1354        options: Vec<LoadGeneratorOption<T>>,
1355    },
1356}
1357
1358impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1359    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1360        match self {
1361            CreateSourceConnection::Kafka {
1362                connection,
1363                options,
1364            } => {
1365                f.write_str("KAFKA CONNECTION ");
1366                f.write_node(connection);
1367                if !options.is_empty() {
1368                    f.write_str(" (");
1369                    f.write_node(&display::comma_separated(options));
1370                    f.write_str(")");
1371                }
1372            }
1373            CreateSourceConnection::Postgres {
1374                connection,
1375                options,
1376            } => {
1377                f.write_str("POSTGRES CONNECTION ");
1378                f.write_node(connection);
1379                if !options.is_empty() {
1380                    f.write_str(" (");
1381                    f.write_node(&display::comma_separated(options));
1382                    f.write_str(")");
1383                }
1384            }
1385            CreateSourceConnection::SqlServer {
1386                connection,
1387                options,
1388            } => {
1389                f.write_str("SQL SERVER CONNECTION ");
1390                f.write_node(connection);
1391                if !options.is_empty() {
1392                    f.write_str(" (");
1393                    f.write_node(&display::comma_separated(options));
1394                    f.write_str(")");
1395                }
1396            }
1397            CreateSourceConnection::MySql {
1398                connection,
1399                options,
1400            } => {
1401                f.write_str("MYSQL CONNECTION ");
1402                f.write_node(connection);
1403                if !options.is_empty() {
1404                    f.write_str(" (");
1405                    f.write_node(&display::comma_separated(options));
1406                    f.write_str(")");
1407                }
1408            }
1409            CreateSourceConnection::LoadGenerator { generator, options } => {
1410                f.write_str("LOAD GENERATOR ");
1411                f.write_node(generator);
1412                if !options.is_empty() {
1413                    f.write_str(" (");
1414                    f.write_node(&display::comma_separated(options));
1415                    f.write_str(")");
1416                }
1417            }
1418        }
1419    }
1420}
1421impl_display_t!(CreateSourceConnection);
1422
1423#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1424pub enum LoadGenerator {
1425    Clock,
1426    Counter,
1427    Marketing,
1428    Auction,
1429    Datums,
1430    Tpch,
1431    KeyValue,
1432}
1433
1434impl AstDisplay for LoadGenerator {
1435    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1436        match self {
1437            Self::Counter => f.write_str("COUNTER"),
1438            Self::Clock => f.write_str("CLOCK"),
1439            Self::Marketing => f.write_str("MARKETING"),
1440            Self::Auction => f.write_str("AUCTION"),
1441            Self::Datums => f.write_str("DATUMS"),
1442            Self::Tpch => f.write_str("TPCH"),
1443            Self::KeyValue => f.write_str("KEY VALUE"),
1444        }
1445    }
1446}
1447impl_display!(LoadGenerator);
1448
1449impl LoadGenerator {
1450    /// Corresponds with the same mapping on the `LoadGenerator` enum defined in
1451    /// src/storage-types/src/sources/load_generator.rs, but re-defined here for
1452    /// cases where we only have the AST representation. This can be removed once
1453    /// the `ast_rewrite_sources_to_tables` migration is removed.
1454    pub fn schema_name(&self) -> &'static str {
1455        match self {
1456            LoadGenerator::Counter => "counter",
1457            LoadGenerator::Clock => "clock",
1458            LoadGenerator::Marketing => "marketing",
1459            LoadGenerator::Auction => "auction",
1460            LoadGenerator::Datums => "datums",
1461            LoadGenerator::Tpch => "tpch",
1462            LoadGenerator::KeyValue => "key_value",
1463        }
1464    }
1465}
1466
1467#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1468pub enum LoadGeneratorOptionName {
1469    ScaleFactor,
1470    TickInterval,
1471    AsOf,
1472    UpTo,
1473    MaxCardinality,
1474    Keys,
1475    SnapshotRounds,
1476    TransactionalSnapshot,
1477    ValueSize,
1478    Seed,
1479    Partitions,
1480    BatchSize,
1481}
1482
1483impl AstDisplay for LoadGeneratorOptionName {
1484    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1485        f.write_str(match self {
1486            LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1487            LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1488            LoadGeneratorOptionName::AsOf => "AS OF",
1489            LoadGeneratorOptionName::UpTo => "UP TO",
1490            LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1491            LoadGeneratorOptionName::Keys => "KEYS",
1492            LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1493            LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1494            LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1495            LoadGeneratorOptionName::Seed => "SEED",
1496            LoadGeneratorOptionName::Partitions => "PARTITIONS",
1497            LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1498        })
1499    }
1500}
1501impl_display!(LoadGeneratorOptionName);
1502
1503impl WithOptionName for LoadGeneratorOptionName {
1504    /// # WARNING
1505    ///
1506    /// Whenever implementing this trait consider very carefully whether or not
1507    /// this value could contain sensitive user data. If you're uncertain, err
1508    /// on the conservative side and return `true`.
1509    fn redact_value(&self) -> bool {
1510        match self {
1511            LoadGeneratorOptionName::ScaleFactor
1512            | LoadGeneratorOptionName::TickInterval
1513            | LoadGeneratorOptionName::AsOf
1514            | LoadGeneratorOptionName::UpTo
1515            | LoadGeneratorOptionName::MaxCardinality
1516            | LoadGeneratorOptionName::Keys
1517            | LoadGeneratorOptionName::SnapshotRounds
1518            | LoadGeneratorOptionName::TransactionalSnapshot
1519            | LoadGeneratorOptionName::ValueSize
1520            | LoadGeneratorOptionName::Partitions
1521            | LoadGeneratorOptionName::BatchSize
1522            | LoadGeneratorOptionName::Seed => false,
1523        }
1524    }
1525}
1526
1527#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1528/// An option in a `CREATE CONNECTION...SSH`.
1529pub struct LoadGeneratorOption<T: AstInfo> {
1530    pub name: LoadGeneratorOptionName,
1531    pub value: Option<WithOptionValue<T>>,
1532}
1533impl_display_for_with_option!(LoadGeneratorOption);
1534impl_display_t!(LoadGeneratorOption);
1535
1536#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1537pub enum CreateSinkConnection<T: AstInfo> {
1538    Kafka {
1539        connection: T::ItemName,
1540        options: Vec<KafkaSinkConfigOption<T>>,
1541        key: Option<SinkKey>,
1542        headers: Option<Ident>,
1543    },
1544    Iceberg {
1545        connection: T::ItemName,
1546        aws_connection: T::ItemName,
1547        key: Option<SinkKey>,
1548        options: Vec<IcebergSinkConfigOption<T>>,
1549    },
1550}
1551
1552impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1553    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1554        match self {
1555            CreateSinkConnection::Kafka {
1556                connection,
1557                options,
1558                key,
1559                headers,
1560            } => {
1561                f.write_str("KAFKA CONNECTION ");
1562                f.write_node(connection);
1563                if !options.is_empty() {
1564                    f.write_str(" (");
1565                    f.write_node(&display::comma_separated(options));
1566                    f.write_str(")");
1567                }
1568                if let Some(key) = key.as_ref() {
1569                    f.write_str(" ");
1570                    f.write_node(key);
1571                }
1572                if let Some(headers) = headers {
1573                    f.write_str(" HEADERS ");
1574                    f.write_node(headers);
1575                }
1576            }
1577            CreateSinkConnection::Iceberg {
1578                connection,
1579                aws_connection,
1580                key,
1581                options,
1582            } => {
1583                f.write_str("ICEBERG CATALOG CONNECTION ");
1584                f.write_node(connection);
1585                if !options.is_empty() {
1586                    f.write_str(" (");
1587                    f.write_node(&display::comma_separated(options));
1588                    f.write_str(")");
1589                }
1590                f.write_str(" USING AWS CONNECTION ");
1591                f.write_node(aws_connection);
1592                if let Some(key) = key.as_ref() {
1593                    f.write_str(" ");
1594                    f.write_node(key);
1595                }
1596            }
1597        }
1598    }
1599}
1600impl_display_t!(CreateSinkConnection);
1601
1602#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1603pub struct SinkKey {
1604    pub key_columns: Vec<Ident>,
1605    pub not_enforced: bool,
1606}
1607
1608impl AstDisplay for SinkKey {
1609    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1610        f.write_str("KEY (");
1611        f.write_node(&display::comma_separated(&self.key_columns));
1612        f.write_str(")");
1613        if self.not_enforced {
1614            f.write_str(" NOT ENFORCED");
1615        }
1616    }
1617}
1618
1619/// A table-level constraint, specified in a `CREATE TABLE` or an
1620/// `ALTER TABLE ADD <constraint>` statement.
1621#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1622pub enum TableConstraint<T: AstInfo> {
1623    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE (NULLS NOT DISTINCT)? } (<columns>)`
1624    Unique {
1625        name: Option<Ident>,
1626        columns: Vec<Ident>,
1627        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
1628        is_primary: bool,
1629        // Where this constraint treats each NULL value as distinct; only available on `UNIQUE`
1630        // constraints.
1631        nulls_not_distinct: bool,
1632    },
1633    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
1634    /// REFERENCES <foreign_table> (<referred_columns>)`)
1635    ForeignKey {
1636        name: Option<Ident>,
1637        columns: Vec<Ident>,
1638        foreign_table: T::ItemName,
1639        referred_columns: Vec<Ident>,
1640    },
1641    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
1642    Check {
1643        name: Option<Ident>,
1644        expr: Box<Expr<T>>,
1645    },
1646}
1647
1648impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1649    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1650        match self {
1651            TableConstraint::Unique {
1652                name,
1653                columns,
1654                is_primary,
1655                nulls_not_distinct,
1656            } => {
1657                f.write_node(&display_constraint_name(name));
1658                if *is_primary {
1659                    f.write_str("PRIMARY KEY ");
1660                } else {
1661                    f.write_str("UNIQUE ");
1662                    if *nulls_not_distinct {
1663                        f.write_str("NULLS NOT DISTINCT ");
1664                    }
1665                }
1666                f.write_str("(");
1667                f.write_node(&display::comma_separated(columns));
1668                f.write_str(")");
1669            }
1670            TableConstraint::ForeignKey {
1671                name,
1672                columns,
1673                foreign_table,
1674                referred_columns,
1675            } => {
1676                f.write_node(&display_constraint_name(name));
1677                f.write_str("FOREIGN KEY (");
1678                f.write_node(&display::comma_separated(columns));
1679                f.write_str(") REFERENCES ");
1680                f.write_node(foreign_table);
1681                f.write_str("(");
1682                f.write_node(&display::comma_separated(referred_columns));
1683                f.write_str(")");
1684            }
1685            TableConstraint::Check { name, expr } => {
1686                f.write_node(&display_constraint_name(name));
1687                f.write_str("CHECK (");
1688                f.write_node(&expr);
1689                f.write_str(")");
1690            }
1691        }
1692    }
1693}
1694impl_display_t!(TableConstraint);
1695
1696/// A key constraint, specified in a `CREATE SOURCE`.
1697#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1698pub enum KeyConstraint {
1699    // PRIMARY KEY (<columns>) NOT ENFORCED
1700    PrimaryKeyNotEnforced { columns: Vec<Ident> },
1701}
1702
1703impl AstDisplay for KeyConstraint {
1704    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1705        match self {
1706            KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1707                f.write_str("PRIMARY KEY ");
1708                f.write_str("(");
1709                f.write_node(&display::comma_separated(columns));
1710                f.write_str(") ");
1711                f.write_str("NOT ENFORCED");
1712            }
1713        }
1714    }
1715}
1716impl_display!(KeyConstraint);
1717
1718#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1719pub enum CreateSourceOptionName {
1720    TimestampInterval,
1721    RetainHistory,
1722}
1723
1724impl AstDisplay for CreateSourceOptionName {
1725    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1726        f.write_str(match self {
1727            CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1728            CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1729        })
1730    }
1731}
1732impl_display!(CreateSourceOptionName);
1733
1734impl WithOptionName for CreateSourceOptionName {
1735    /// # WARNING
1736    ///
1737    /// Whenever implementing this trait consider very carefully whether or not
1738    /// this value could contain sensitive user data. If you're uncertain, err
1739    /// on the conservative side and return `true`.
1740    fn redact_value(&self) -> bool {
1741        match self {
1742            CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1743                false
1744            }
1745        }
1746    }
1747}
1748
1749#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1750/// An option in a `CREATE SOURCE...` statement.
1751pub struct CreateSourceOption<T: AstInfo> {
1752    pub name: CreateSourceOptionName,
1753    pub value: Option<WithOptionValue<T>>,
1754}
1755impl_display_for_with_option!(CreateSourceOption);
1756impl_display_t!(CreateSourceOption);
1757
1758/// SQL column definition
1759#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1760pub struct ColumnDef<T: AstInfo> {
1761    pub name: Ident,
1762    pub data_type: T::DataType,
1763    pub collation: Option<UnresolvedItemName>,
1764    pub options: Vec<ColumnOptionDef<T>>,
1765}
1766
1767impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1768    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1769        f.write_node(&self.name);
1770        f.write_str(" ");
1771        f.write_node(&self.data_type);
1772        if let Some(collation) = &self.collation {
1773            f.write_str(" COLLATE ");
1774            f.write_node(collation);
1775        }
1776        for option in &self.options {
1777            f.write_str(" ");
1778            f.write_node(option);
1779        }
1780    }
1781}
1782impl_display_t!(ColumnDef);
1783
1784/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
1785///
1786/// Note that implementations are substantially more permissive than the ANSI
1787/// specification on what order column options can be presented in, and whether
1788/// they are allowed to be named. The specification distinguishes between
1789/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
1790/// and can appear in any order, and other options (DEFAULT, GENERATED), which
1791/// cannot be named and must appear in a fixed order. PostgreSQL, however,
1792/// allows preceding any option with `CONSTRAINT <name>`, even those that are
1793/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
1794/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
1795/// NOT NULL constraints (the last of which is in violation of the spec).
1796///
1797/// For maximum flexibility, we don't distinguish between constraint and
1798/// non-constraint options, lumping them all together under the umbrella of
1799/// "column options," and we allow any column option to be named.
1800#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1801pub struct ColumnOptionDef<T: AstInfo> {
1802    pub name: Option<Ident>,
1803    pub option: ColumnOption<T>,
1804}
1805
1806impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1807    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1808        f.write_node(&display_constraint_name(&self.name));
1809        f.write_node(&self.option);
1810    }
1811}
1812impl_display_t!(ColumnOptionDef);
1813
1814/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
1815/// TABLE` statement.
1816#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1817pub enum ColumnOption<T: AstInfo> {
1818    /// `NULL`
1819    Null,
1820    /// `NOT NULL`
1821    NotNull,
1822    /// `DEFAULT <restricted-expr>`
1823    Default(Expr<T>),
1824    /// `{ PRIMARY KEY | UNIQUE }`
1825    Unique { is_primary: bool },
1826    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
1827    /// <foreign_table> (<referred_columns>)`).
1828    ForeignKey {
1829        foreign_table: UnresolvedItemName,
1830        referred_columns: Vec<Ident>,
1831    },
1832    /// `CHECK (<expr>)`
1833    Check(Expr<T>),
1834    /// `VERSION <action> <version>`
1835    Versioned {
1836        action: ColumnVersioned,
1837        version: Version,
1838    },
1839}
1840
1841impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1842    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1843        use ColumnOption::*;
1844        match self {
1845            Null => f.write_str("NULL"),
1846            NotNull => f.write_str("NOT NULL"),
1847            Default(expr) => {
1848                f.write_str("DEFAULT ");
1849                f.write_node(expr);
1850            }
1851            Unique { is_primary } => {
1852                if *is_primary {
1853                    f.write_str("PRIMARY KEY");
1854                } else {
1855                    f.write_str("UNIQUE");
1856                }
1857            }
1858            ForeignKey {
1859                foreign_table,
1860                referred_columns,
1861            } => {
1862                f.write_str("REFERENCES ");
1863                f.write_node(foreign_table);
1864                f.write_str(" (");
1865                f.write_node(&display::comma_separated(referred_columns));
1866                f.write_str(")");
1867            }
1868            Check(expr) => {
1869                f.write_str("CHECK (");
1870                f.write_node(expr);
1871                f.write_str(")");
1872            }
1873            Versioned { action, version } => {
1874                f.write_str("VERSION ");
1875                f.write_node(action);
1876                f.write_str(" ");
1877                f.write_node(version);
1878            }
1879        }
1880    }
1881}
1882impl_display_t!(ColumnOption);
1883
1884#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1885pub enum ColumnVersioned {
1886    Added,
1887}
1888
1889impl AstDisplay for ColumnVersioned {
1890    fn fmt<W>(&self, f: &mut AstFormatter<W>)
1891    where
1892        W: fmt::Write,
1893    {
1894        match self {
1895            // TODO(alter_table): Support dropped columns.
1896            ColumnVersioned::Added => f.write_str("ADDED"),
1897        }
1898    }
1899}
1900impl_display!(ColumnVersioned);
1901
1902fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
1903    struct ConstraintName<'a>(&'a Option<Ident>);
1904    impl<'a> AstDisplay for ConstraintName<'a> {
1905        fn fmt<W>(&self, f: &mut AstFormatter<W>)
1906        where
1907            W: fmt::Write,
1908        {
1909            if let Some(name) = self.0 {
1910                f.write_str("CONSTRAINT ");
1911                f.write_node(name);
1912                f.write_str(" ");
1913            }
1914        }
1915    }
1916    ConstraintName(name)
1917}