mz_sql_parser/ast/defs/
ddl.rs

1// Copyright 2018 sqlparser-rs contributors. All rights reserved.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// This file is derived from the sqlparser-rs project, available at
5// https://github.com/andygrove/sqlparser-rs. It was incorporated
6// directly into Materialize on December 21, 2019.
7//
8// Licensed under the Apache License, Version 2.0 (the "License");
9// you may not use this file except in compliance with the License.
10// You may obtain a copy of the License in the LICENSE file at the
11// root of this repository, or online at
12//
13//     http://www.apache.org/licenses/LICENSE-2.0
14//
15// Unless required by applicable law or agreed to in writing, software
16// distributed under the License is distributed on an "AS IS" BASIS,
17// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18// See the License for the specific language governing permissions and
19// limitations under the License.
20
21//! AST types specific to CREATE/ALTER variants of [crate::ast::Statement]
22//! (commonly referred to as Data Definition Language, or DDL)
23
24use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28    AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33    /// The `ASSERT NOT NULL [=] <ident>` option.
34    AssertNotNull,
35    PartitionBy,
36    RetainHistory,
37    /// The `REFRESH [=] ...` option.
38    Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43        match self {
44            MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45            MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46            MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47            MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48        }
49    }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53    /// # WARNING
54    ///
55    /// Whenever implementing this trait consider very carefully whether or not
56    /// this value could contain sensitive user data. If you're uncertain, err
57    /// on the conservative side and return `true`.
58    fn redact_value(&self) -> bool {
59        match self {
60            MaterializedViewOptionName::AssertNotNull
61            | MaterializedViewOptionName::PartitionBy
62            | MaterializedViewOptionName::RetainHistory
63            | MaterializedViewOptionName::Refresh => false,
64        }
65    }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70    pub name: MaterializedViewOptionName,
71    pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
76pub enum ContinualTaskOptionName {
77    /// The `SNAPSHOT [=] ...` option.
78    Snapshot,
79}
80
81impl AstDisplay for ContinualTaskOptionName {
82    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
83        match self {
84            ContinualTaskOptionName::Snapshot => f.write_str("SNAPSHOT"),
85        }
86    }
87}
88
89impl WithOptionName for ContinualTaskOptionName {
90    /// # WARNING
91    ///
92    /// Whenever implementing this trait consider very carefully whether or not
93    /// this value could contain sensitive user data. If you're uncertain, err
94    /// on the conservative side and return `true`.
95    fn redact_value(&self) -> bool {
96        match self {
97            ContinualTaskOptionName::Snapshot => false,
98        }
99    }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
103pub struct ContinualTaskOption<T: AstInfo> {
104    pub name: ContinualTaskOptionName,
105    pub value: Option<WithOptionValue<T>>,
106}
107impl_display_for_with_option!(ContinualTaskOption);
108
109#[derive(Debug, Clone, PartialEq, Eq, Hash)]
110pub struct Schema {
111    pub schema: String,
112}
113
114impl AstDisplay for Schema {
115    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
116        f.write_str("SCHEMA '");
117        f.write_node(&display::escape_single_quote_string(&self.schema));
118        f.write_str("'");
119    }
120}
121impl_display!(Schema);
122
123#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
124pub enum AvroSchemaOptionName {
125    /// The `CONFLUENT WIRE FORMAT [=] <bool>` option.
126    ConfluentWireFormat,
127}
128
129impl AstDisplay for AvroSchemaOptionName {
130    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
131        match self {
132            AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
133        }
134    }
135}
136
137impl WithOptionName for AvroSchemaOptionName {
138    /// # WARNING
139    ///
140    /// Whenever implementing this trait consider very carefully whether or not
141    /// this value could contain sensitive user data. If you're uncertain, err
142    /// on the conservative side and return `true`.
143    fn redact_value(&self) -> bool {
144        match self {
145            Self::ConfluentWireFormat => false,
146        }
147    }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
151pub struct AvroSchemaOption<T: AstInfo> {
152    pub name: AvroSchemaOptionName,
153    pub value: Option<WithOptionValue<T>>,
154}
155impl_display_for_with_option!(AvroSchemaOption);
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum AvroSchema<T: AstInfo> {
159    Csr {
160        csr_connection: CsrConnectionAvro<T>,
161    },
162    InlineSchema {
163        schema: Schema,
164        with_options: Vec<AvroSchemaOption<T>>,
165    },
166}
167
168impl<T: AstInfo> AstDisplay for AvroSchema<T> {
169    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
170        match self {
171            Self::Csr { csr_connection } => {
172                f.write_node(csr_connection);
173            }
174            Self::InlineSchema {
175                schema,
176                with_options,
177            } => {
178                f.write_str("USING ");
179                schema.fmt(f);
180                if !with_options.is_empty() {
181                    f.write_str(" (");
182                    f.write_node(&display::comma_separated(with_options));
183                    f.write_str(")");
184                }
185            }
186        }
187    }
188}
189impl_display_t!(AvroSchema);
190
191#[derive(Debug, Clone, PartialEq, Eq, Hash)]
192pub enum ProtobufSchema<T: AstInfo> {
193    Csr {
194        csr_connection: CsrConnectionProtobuf<T>,
195    },
196    InlineSchema {
197        message_name: String,
198        schema: Schema,
199    },
200}
201
202impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
203    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
204        match self {
205            Self::Csr { csr_connection } => {
206                f.write_node(csr_connection);
207            }
208            Self::InlineSchema {
209                message_name,
210                schema,
211            } => {
212                f.write_str("MESSAGE '");
213                f.write_node(&display::escape_single_quote_string(message_name));
214                f.write_str("' USING ");
215                f.write_str(schema);
216            }
217        }
218    }
219}
220impl_display_t!(ProtobufSchema);
221
222#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
223pub enum CsrConfigOptionName<T: AstInfo> {
224    AvroKeyFullname,
225    AvroValueFullname,
226    NullDefaults,
227    AvroDocOn(AvroDocOn<T>),
228    KeyCompatibilityLevel,
229    ValueCompatibilityLevel,
230}
231
232impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
233    /// # WARNING
234    ///
235    /// Whenever implementing this trait consider very carefully whether or not
236    /// this value could contain sensitive user data. If you're uncertain, err
237    /// on the conservative side and return `true`.
238    fn redact_value(&self) -> bool {
239        match self {
240            Self::AvroKeyFullname
241            | Self::AvroValueFullname
242            | Self::NullDefaults
243            | Self::AvroDocOn(_)
244            | Self::KeyCompatibilityLevel
245            | Self::ValueCompatibilityLevel => false,
246        }
247    }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
251pub struct AvroDocOn<T: AstInfo> {
252    pub identifier: DocOnIdentifier<T>,
253    pub for_schema: DocOnSchema,
254}
255#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
256pub enum DocOnSchema {
257    KeyOnly,
258    ValueOnly,
259    All,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
263pub enum DocOnIdentifier<T: AstInfo> {
264    Column(ColumnName<T>),
265    Type(T::ItemName),
266}
267
268impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
269    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
270        match &self.for_schema {
271            DocOnSchema::KeyOnly => f.write_str("KEY "),
272            DocOnSchema::ValueOnly => f.write_str("VALUE "),
273            DocOnSchema::All => {}
274        }
275        match &self.identifier {
276            DocOnIdentifier::Column(name) => {
277                f.write_str("DOC ON COLUMN ");
278                f.write_node(name);
279            }
280            DocOnIdentifier::Type(name) => {
281                f.write_str("DOC ON TYPE ");
282                f.write_node(name);
283            }
284        }
285    }
286}
287impl_display_t!(AvroDocOn);
288
289impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
290    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
291        match self {
292            CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
293            CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
294            CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
295            CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
296            CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
297            CsrConfigOptionName::ValueCompatibilityLevel => {
298                f.write_str("VALUE COMPATIBILITY LEVEL")
299            }
300        }
301    }
302}
303impl_display_t!(CsrConfigOptionName);
304
305#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
306/// An option in a `{FROM|INTO} CONNECTION ...` statement.
307pub struct CsrConfigOption<T: AstInfo> {
308    pub name: CsrConfigOptionName<T>,
309    pub value: Option<WithOptionValue<T>>,
310}
311impl_display_for_with_option!(CsrConfigOption);
312impl_display_t!(CsrConfigOption);
313
314#[derive(Debug, Clone, PartialEq, Eq, Hash)]
315pub struct CsrConnection<T: AstInfo> {
316    pub connection: T::ItemName,
317    pub options: Vec<CsrConfigOption<T>>,
318}
319
320impl<T: AstInfo> AstDisplay for CsrConnection<T> {
321    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
322        f.write_str("CONNECTION ");
323        f.write_node(&self.connection);
324        if !self.options.is_empty() {
325            f.write_str(" (");
326            f.write_node(&display::comma_separated(&self.options));
327            f.write_str(")");
328        }
329    }
330}
331impl_display_t!(CsrConnection);
332
333#[derive(Debug, Clone, PartialEq, Eq, Hash)]
334pub enum ReaderSchemaSelectionStrategy {
335    Latest,
336    Inline(String),
337    ById(i32),
338}
339
340impl Default for ReaderSchemaSelectionStrategy {
341    fn default() -> Self {
342        Self::Latest
343    }
344}
345
346#[derive(Debug, Clone, PartialEq, Eq, Hash)]
347pub struct CsrConnectionAvro<T: AstInfo> {
348    pub connection: CsrConnection<T>,
349    pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
350    pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
351    pub seed: Option<CsrSeedAvro>,
352}
353
354impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
355    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
356        f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
357        f.write_node(&self.connection);
358        if let Some(seed) = &self.seed {
359            f.write_str(" ");
360            f.write_node(seed);
361        }
362    }
363}
364impl_display_t!(CsrConnectionAvro);
365
366#[derive(Debug, Clone, PartialEq, Eq, Hash)]
367pub struct CsrConnectionProtobuf<T: AstInfo> {
368    pub connection: CsrConnection<T>,
369    pub seed: Option<CsrSeedProtobuf>,
370}
371
372impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
373    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
374        f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
375        f.write_node(&self.connection);
376
377        if let Some(seed) = &self.seed {
378            f.write_str(" ");
379            f.write_node(seed);
380        }
381    }
382}
383impl_display_t!(CsrConnectionProtobuf);
384
385#[derive(Debug, Clone, PartialEq, Eq, Hash)]
386pub struct CsrSeedAvro {
387    pub key_schema: Option<String>,
388    pub value_schema: String,
389}
390
391impl AstDisplay for CsrSeedAvro {
392    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
393        f.write_str("SEED");
394        if let Some(key_schema) = &self.key_schema {
395            f.write_str(" KEY SCHEMA '");
396            f.write_node(&display::escape_single_quote_string(key_schema));
397            f.write_str("'");
398        }
399        f.write_str(" VALUE SCHEMA '");
400        f.write_node(&display::escape_single_quote_string(&self.value_schema));
401        f.write_str("'");
402    }
403}
404impl_display!(CsrSeedAvro);
405
406#[derive(Debug, Clone, PartialEq, Eq, Hash)]
407pub struct CsrSeedProtobuf {
408    pub key: Option<CsrSeedProtobufSchema>,
409    pub value: CsrSeedProtobufSchema,
410}
411
412impl AstDisplay for CsrSeedProtobuf {
413    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
414        f.write_str("SEED");
415        if let Some(key) = &self.key {
416            f.write_str(" KEY ");
417            f.write_node(key);
418        }
419        f.write_str(" VALUE ");
420        f.write_node(&self.value);
421    }
422}
423impl_display!(CsrSeedProtobuf);
424
425#[derive(Debug, Clone, PartialEq, Eq, Hash)]
426pub struct CsrSeedProtobufSchema {
427    // Hex encoded string.
428    pub schema: String,
429    pub message_name: String,
430}
431impl AstDisplay for CsrSeedProtobufSchema {
432    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
433        f.write_str("SCHEMA '");
434        f.write_str(&display::escape_single_quote_string(&self.schema));
435        f.write_str("' MESSAGE '");
436        f.write_str(&self.message_name);
437        f.write_str("'");
438    }
439}
440impl_display!(CsrSeedProtobufSchema);
441
442#[derive(Debug, Clone, PartialEq, Eq, Hash)]
443pub enum FormatSpecifier<T: AstInfo> {
444    /// `CREATE SOURCE/SINK .. FORMAT`
445    Bare(Format<T>),
446    /// `CREATE SOURCE/SINK .. KEY FORMAT .. VALUE FORMAT`
447    KeyValue { key: Format<T>, value: Format<T> },
448}
449
450impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
451    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
452        match self {
453            FormatSpecifier::Bare(format) => {
454                f.write_str("FORMAT ");
455                f.write_node(format)
456            }
457            FormatSpecifier::KeyValue { key, value } => {
458                f.write_str("KEY FORMAT ");
459                f.write_node(key);
460                f.write_str(" VALUE FORMAT ");
461                f.write_node(value);
462            }
463        }
464    }
465}
466impl_display_t!(FormatSpecifier);
467
468#[derive(Debug, Clone, PartialEq, Eq, Hash)]
469pub enum Format<T: AstInfo> {
470    Bytes,
471    Avro(AvroSchema<T>),
472    Protobuf(ProtobufSchema<T>),
473    Regex(String),
474    Csv {
475        columns: CsvColumns,
476        delimiter: char,
477    },
478    Json {
479        array: bool,
480    },
481    Text,
482}
483
484#[derive(Debug, Clone, PartialEq, Eq, Hash)]
485pub enum CsvColumns {
486    /// `WITH count COLUMNS`
487    Count(u64),
488    /// `WITH HEADER (ident, ...)?`: `names` is empty if there are no names specified
489    Header { names: Vec<Ident> },
490}
491
492impl AstDisplay for CsvColumns {
493    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
494        match self {
495            CsvColumns::Count(n) => {
496                f.write_str(n);
497                f.write_str(" COLUMNS")
498            }
499            CsvColumns::Header { names } => {
500                f.write_str("HEADER");
501                if !names.is_empty() {
502                    f.write_str(" (");
503                    f.write_node(&display::comma_separated(names));
504                    f.write_str(")");
505                }
506            }
507        }
508    }
509}
510
511#[derive(Debug, Clone, PartialEq, Eq, Hash)]
512pub enum SourceIncludeMetadata {
513    Key {
514        alias: Option<Ident>,
515    },
516    Timestamp {
517        alias: Option<Ident>,
518    },
519    Partition {
520        alias: Option<Ident>,
521    },
522    Offset {
523        alias: Option<Ident>,
524    },
525    Headers {
526        alias: Option<Ident>,
527    },
528    Header {
529        key: String,
530        alias: Ident,
531        use_bytes: bool,
532    },
533}
534
535impl AstDisplay for SourceIncludeMetadata {
536    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
537        let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
538            if let Some(alias) = alias {
539                f.write_str(" AS ");
540                f.write_node(alias);
541            }
542        };
543
544        match self {
545            SourceIncludeMetadata::Key { alias } => {
546                f.write_str("KEY");
547                print_alias(f, alias);
548            }
549            SourceIncludeMetadata::Timestamp { alias } => {
550                f.write_str("TIMESTAMP");
551                print_alias(f, alias);
552            }
553            SourceIncludeMetadata::Partition { alias } => {
554                f.write_str("PARTITION");
555                print_alias(f, alias);
556            }
557            SourceIncludeMetadata::Offset { alias } => {
558                f.write_str("OFFSET");
559                print_alias(f, alias);
560            }
561            SourceIncludeMetadata::Headers { alias } => {
562                f.write_str("HEADERS");
563                print_alias(f, alias);
564            }
565            SourceIncludeMetadata::Header {
566                alias,
567                key,
568                use_bytes,
569            } => {
570                f.write_str("HEADER '");
571                f.write_str(&display::escape_single_quote_string(key));
572                f.write_str("'");
573                print_alias(f, &Some(alias.clone()));
574                if *use_bytes {
575                    f.write_str(" BYTES");
576                }
577            }
578        }
579    }
580}
581impl_display!(SourceIncludeMetadata);
582
583#[derive(Debug, Clone, PartialEq, Eq, Hash)]
584pub enum SourceErrorPolicy {
585    Inline {
586        /// The alias to use for the error column. If unspecified will be `error`.
587        alias: Option<Ident>,
588    },
589}
590
591impl AstDisplay for SourceErrorPolicy {
592    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
593        match self {
594            Self::Inline { alias } => {
595                f.write_str("INLINE");
596                if let Some(alias) = alias {
597                    f.write_str(" AS ");
598                    f.write_node(alias);
599                }
600            }
601        }
602    }
603}
604impl_display!(SourceErrorPolicy);
605
606#[derive(Debug, Clone, PartialEq, Eq, Hash)]
607pub enum SourceEnvelope {
608    None,
609    Debezium,
610    Upsert {
611        value_decode_err_policy: Vec<SourceErrorPolicy>,
612    },
613    CdcV2,
614}
615
616impl SourceEnvelope {
617    /// `true` iff Materialize is expected to crash or exhibit UB
618    /// when attempting to ingest data starting at an offset other than zero.
619    pub fn requires_all_input(&self) -> bool {
620        match self {
621            SourceEnvelope::None => false,
622            SourceEnvelope::Debezium => false,
623            SourceEnvelope::Upsert { .. } => false,
624            SourceEnvelope::CdcV2 => true,
625        }
626    }
627}
628
629impl AstDisplay for SourceEnvelope {
630    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
631        match self {
632            Self::None => {
633                // this is unreachable as long as the default is None, but include it in case we ever change that
634                f.write_str("NONE");
635            }
636            Self::Debezium => {
637                f.write_str("DEBEZIUM");
638            }
639            Self::Upsert {
640                value_decode_err_policy,
641            } => {
642                if value_decode_err_policy.is_empty() {
643                    f.write_str("UPSERT");
644                } else {
645                    f.write_str("UPSERT (VALUE DECODING ERRORS = (");
646                    f.write_node(&display::comma_separated(value_decode_err_policy));
647                    f.write_str("))")
648                }
649            }
650            Self::CdcV2 => {
651                f.write_str("MATERIALIZE");
652            }
653        }
654    }
655}
656impl_display!(SourceEnvelope);
657
658#[derive(Debug, Clone, PartialEq, Eq, Hash)]
659pub enum SinkEnvelope {
660    Debezium,
661    Upsert,
662}
663
664impl AstDisplay for SinkEnvelope {
665    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
666        match self {
667            Self::Upsert => {
668                f.write_str("UPSERT");
669            }
670            Self::Debezium => {
671                f.write_str("DEBEZIUM");
672            }
673        }
674    }
675}
676impl_display!(SinkEnvelope);
677
678#[derive(Debug, Clone, PartialEq, Eq, Hash)]
679pub enum SubscribeOutput<T: AstInfo> {
680    Diffs,
681    WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
682    EnvelopeUpsert { key_columns: Vec<Ident> },
683    EnvelopeDebezium { key_columns: Vec<Ident> },
684}
685
686impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
687    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
688        match self {
689            Self::Diffs => {}
690            Self::WithinTimestampOrderBy { order_by } => {
691                f.write_str(" WITHIN TIMESTAMP ORDER BY ");
692                f.write_node(&display::comma_separated(order_by));
693            }
694            Self::EnvelopeUpsert { key_columns } => {
695                f.write_str(" ENVELOPE UPSERT (KEY (");
696                f.write_node(&display::comma_separated(key_columns));
697                f.write_str("))");
698            }
699            Self::EnvelopeDebezium { key_columns } => {
700                f.write_str(" ENVELOPE DEBEZIUM (KEY (");
701                f.write_node(&display::comma_separated(key_columns));
702                f.write_str("))");
703            }
704        }
705    }
706}
707impl_display_t!(SubscribeOutput);
708
709impl<T: AstInfo> AstDisplay for Format<T> {
710    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
711        match self {
712            Self::Bytes => f.write_str("BYTES"),
713            Self::Avro(inner) => {
714                f.write_str("AVRO ");
715                f.write_node(inner);
716            }
717            Self::Protobuf(inner) => {
718                f.write_str("PROTOBUF ");
719                f.write_node(inner);
720            }
721            Self::Regex(regex) => {
722                f.write_str("REGEX '");
723                f.write_node(&display::escape_single_quote_string(regex));
724                f.write_str("'");
725            }
726            Self::Csv { columns, delimiter } => {
727                f.write_str("CSV WITH ");
728                f.write_node(columns);
729
730                if *delimiter != ',' {
731                    f.write_str(" DELIMITED BY '");
732                    f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
733                    f.write_str("'");
734                }
735            }
736            Self::Json { array } => {
737                f.write_str("JSON");
738                if *array {
739                    f.write_str(" ARRAY");
740                }
741            }
742            Self::Text => f.write_str("TEXT"),
743        }
744    }
745}
746impl_display_t!(Format);
747
748// All connection options are bundled together to allow us to parse `ALTER
749// CONNECTION` without specifying the type of connection we're altering. Il faut
750// souffrir pour être belle.
751#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
752pub enum ConnectionOptionName {
753    AccessKeyId,
754    AssumeRoleArn,
755    AssumeRoleSessionName,
756    AvailabilityZones,
757    AwsConnection,
758    AwsPrivatelink,
759    Broker,
760    Brokers,
761    Credential,
762    Database,
763    Endpoint,
764    Host,
765    Password,
766    Port,
767    ProgressTopic,
768    ProgressTopicReplicationFactor,
769    PublicKey1,
770    PublicKey2,
771    Region,
772    SaslMechanisms,
773    SaslPassword,
774    SaslUsername,
775    Scope,
776    SecretAccessKey,
777    SecurityProtocol,
778    ServiceName,
779    SshTunnel,
780    SslCertificate,
781    SslCertificateAuthority,
782    SslKey,
783    SslMode,
784    SessionToken,
785    CatalogType,
786    Url,
787    User,
788    Warehouse,
789}
790
791impl AstDisplay for ConnectionOptionName {
792    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
793        f.write_str(match self {
794            ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
795            ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
796            ConnectionOptionName::AwsConnection => "AWS CONNECTION",
797            ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
798            ConnectionOptionName::Broker => "BROKER",
799            ConnectionOptionName::Brokers => "BROKERS",
800            ConnectionOptionName::Credential => "CREDENTIAL",
801            ConnectionOptionName::Database => "DATABASE",
802            ConnectionOptionName::Endpoint => "ENDPOINT",
803            ConnectionOptionName::Host => "HOST",
804            ConnectionOptionName::Password => "PASSWORD",
805            ConnectionOptionName::Port => "PORT",
806            ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
807            ConnectionOptionName::ProgressTopicReplicationFactor => {
808                "PROGRESS TOPIC REPLICATION FACTOR"
809            }
810            ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
811            ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
812            ConnectionOptionName::Region => "REGION",
813            ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
814            ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
815            ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
816            ConnectionOptionName::SaslPassword => "SASL PASSWORD",
817            ConnectionOptionName::SaslUsername => "SASL USERNAME",
818            ConnectionOptionName::Scope => "SCOPE",
819            ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
820            ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
821            ConnectionOptionName::ServiceName => "SERVICE NAME",
822            ConnectionOptionName::SshTunnel => "SSH TUNNEL",
823            ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
824            ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
825            ConnectionOptionName::SslKey => "SSL KEY",
826            ConnectionOptionName::SslMode => "SSL MODE",
827            ConnectionOptionName::SessionToken => "SESSION TOKEN",
828            ConnectionOptionName::CatalogType => "CATALOG TYPE",
829            ConnectionOptionName::Url => "URL",
830            ConnectionOptionName::User => "USER",
831            ConnectionOptionName::Warehouse => "WAREHOUSE",
832        })
833    }
834}
835impl_display!(ConnectionOptionName);
836
837impl WithOptionName for ConnectionOptionName {
838    /// # WARNING
839    ///
840    /// Whenever implementing this trait consider very carefully whether or not
841    /// this value could contain sensitive user data. If you're uncertain, err
842    /// on the conservative side and return `true`.
843    fn redact_value(&self) -> bool {
844        match self {
845            ConnectionOptionName::AccessKeyId
846            | ConnectionOptionName::AvailabilityZones
847            | ConnectionOptionName::AwsConnection
848            | ConnectionOptionName::AwsPrivatelink
849            | ConnectionOptionName::Broker
850            | ConnectionOptionName::Brokers
851            | ConnectionOptionName::Credential
852            | ConnectionOptionName::Database
853            | ConnectionOptionName::Endpoint
854            | ConnectionOptionName::Host
855            | ConnectionOptionName::Password
856            | ConnectionOptionName::Port
857            | ConnectionOptionName::ProgressTopic
858            | ConnectionOptionName::ProgressTopicReplicationFactor
859            | ConnectionOptionName::PublicKey1
860            | ConnectionOptionName::PublicKey2
861            | ConnectionOptionName::Region
862            | ConnectionOptionName::AssumeRoleArn
863            | ConnectionOptionName::AssumeRoleSessionName
864            | ConnectionOptionName::SaslMechanisms
865            | ConnectionOptionName::SaslPassword
866            | ConnectionOptionName::SaslUsername
867            | ConnectionOptionName::Scope
868            | ConnectionOptionName::SecurityProtocol
869            | ConnectionOptionName::SecretAccessKey
870            | ConnectionOptionName::ServiceName
871            | ConnectionOptionName::SshTunnel
872            | ConnectionOptionName::SslCertificate
873            | ConnectionOptionName::SslCertificateAuthority
874            | ConnectionOptionName::SslKey
875            | ConnectionOptionName::SslMode
876            | ConnectionOptionName::SessionToken
877            | ConnectionOptionName::CatalogType
878            | ConnectionOptionName::Url
879            | ConnectionOptionName::User
880            | ConnectionOptionName::Warehouse => false,
881        }
882    }
883}
884
885#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
886/// An option in a `CREATE CONNECTION`.
887pub struct ConnectionOption<T: AstInfo> {
888    pub name: ConnectionOptionName,
889    pub value: Option<WithOptionValue<T>>,
890}
891impl_display_for_with_option!(ConnectionOption);
892impl_display_t!(ConnectionOption);
893
894#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
895pub enum CreateConnectionType {
896    Aws,
897    AwsPrivatelink,
898    Kafka,
899    Csr,
900    Postgres,
901    Ssh,
902    SqlServer,
903    MySql,
904    IcebergCatalog,
905}
906
907impl AstDisplay for CreateConnectionType {
908    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
909        match self {
910            Self::Kafka => {
911                f.write_str("KAFKA");
912            }
913            Self::Csr => {
914                f.write_str("CONFLUENT SCHEMA REGISTRY");
915            }
916            Self::Postgres => {
917                f.write_str("POSTGRES");
918            }
919            Self::Aws => {
920                f.write_str("AWS");
921            }
922            Self::AwsPrivatelink => {
923                f.write_str("AWS PRIVATELINK");
924            }
925            Self::Ssh => {
926                f.write_str("SSH TUNNEL");
927            }
928            Self::SqlServer => {
929                f.write_str("SQL SERVER");
930            }
931            Self::MySql => {
932                f.write_str("MYSQL");
933            }
934            Self::IcebergCatalog => {
935                f.write_str("ICEBERG CATALOG");
936            }
937        }
938    }
939}
940impl_display!(CreateConnectionType);
941
942#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
943pub enum CreateConnectionOptionName {
944    Validate,
945}
946
947impl AstDisplay for CreateConnectionOptionName {
948    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
949        f.write_str(match self {
950            CreateConnectionOptionName::Validate => "VALIDATE",
951        })
952    }
953}
954impl_display!(CreateConnectionOptionName);
955
956impl WithOptionName for CreateConnectionOptionName {
957    /// # WARNING
958    ///
959    /// Whenever implementing this trait consider very carefully whether or not
960    /// this value could contain sensitive user data. If you're uncertain, err
961    /// on the conservative side and return `true`.
962    fn redact_value(&self) -> bool {
963        match self {
964            CreateConnectionOptionName::Validate => false,
965        }
966    }
967}
968
969#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
970/// An option in a `CREATE CONNECTION...` statement.
971pub struct CreateConnectionOption<T: AstInfo> {
972    pub name: CreateConnectionOptionName,
973    pub value: Option<WithOptionValue<T>>,
974}
975impl_display_for_with_option!(CreateConnectionOption);
976impl_display_t!(CreateConnectionOption);
977
978#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
979pub enum KafkaSourceConfigOptionName {
980    GroupIdPrefix,
981    Topic,
982    TopicMetadataRefreshInterval,
983    StartTimestamp,
984    StartOffset,
985}
986
987impl AstDisplay for KafkaSourceConfigOptionName {
988    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
989        f.write_str(match self {
990            KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
991            KafkaSourceConfigOptionName::Topic => "TOPIC",
992            KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
993                "TOPIC METADATA REFRESH INTERVAL"
994            }
995            KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
996            KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
997        })
998    }
999}
1000impl_display!(KafkaSourceConfigOptionName);
1001
1002impl WithOptionName for KafkaSourceConfigOptionName {
1003    /// # WARNING
1004    ///
1005    /// Whenever implementing this trait consider very carefully whether or not
1006    /// this value could contain sensitive user data. If you're uncertain, err
1007    /// on the conservative side and return `true`.
1008    fn redact_value(&self) -> bool {
1009        match self {
1010            KafkaSourceConfigOptionName::GroupIdPrefix
1011            | KafkaSourceConfigOptionName::Topic
1012            | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1013            | KafkaSourceConfigOptionName::StartOffset
1014            | KafkaSourceConfigOptionName::StartTimestamp => false,
1015        }
1016    }
1017}
1018
1019#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1020pub struct KafkaSourceConfigOption<T: AstInfo> {
1021    pub name: KafkaSourceConfigOptionName,
1022    pub value: Option<WithOptionValue<T>>,
1023}
1024impl_display_for_with_option!(KafkaSourceConfigOption);
1025impl_display_t!(KafkaSourceConfigOption);
1026
1027#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1028pub enum KafkaSinkConfigOptionName {
1029    CompressionType,
1030    PartitionBy,
1031    ProgressGroupIdPrefix,
1032    Topic,
1033    TransactionalIdPrefix,
1034    LegacyIds,
1035    TopicConfig,
1036    TopicMetadataRefreshInterval,
1037    TopicPartitionCount,
1038    TopicReplicationFactor,
1039}
1040
1041impl AstDisplay for KafkaSinkConfigOptionName {
1042    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1043        f.write_str(match self {
1044            KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1045            KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1046            KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1047            KafkaSinkConfigOptionName::Topic => "TOPIC",
1048            KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1049            KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1050            KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1051            KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1052                "TOPIC METADATA REFRESH INTERVAL"
1053            }
1054            KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1055            KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1056        })
1057    }
1058}
1059impl_display!(KafkaSinkConfigOptionName);
1060
1061impl WithOptionName for KafkaSinkConfigOptionName {
1062    /// # WARNING
1063    ///
1064    /// Whenever implementing this trait consider very carefully whether or not
1065    /// this value could contain sensitive user data. If you're uncertain, err
1066    /// on the conservative side and return `true`.
1067    fn redact_value(&self) -> bool {
1068        match self {
1069            KafkaSinkConfigOptionName::CompressionType
1070            | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1071            | KafkaSinkConfigOptionName::Topic
1072            | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1073            | KafkaSinkConfigOptionName::TransactionalIdPrefix
1074            | KafkaSinkConfigOptionName::LegacyIds
1075            | KafkaSinkConfigOptionName::TopicConfig
1076            | KafkaSinkConfigOptionName::TopicPartitionCount
1077            | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1078            KafkaSinkConfigOptionName::PartitionBy => true,
1079        }
1080    }
1081}
1082
1083#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1084pub struct KafkaSinkConfigOption<T: AstInfo> {
1085    pub name: KafkaSinkConfigOptionName,
1086    pub value: Option<WithOptionValue<T>>,
1087}
1088impl_display_for_with_option!(KafkaSinkConfigOption);
1089impl_display_t!(KafkaSinkConfigOption);
1090
1091#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1092pub enum PgConfigOptionName {
1093    /// Hex encoded string of binary serialization of
1094    /// `mz_storage_types::sources::postgres::PostgresSourcePublicationDetails`
1095    Details,
1096    /// The name of the publication to sync
1097    Publication,
1098    /// Columns whose types you want to unconditionally format as text
1099    /// NOTE(roshan): This value is kept around to allow round-tripping a
1100    /// `CREATE SOURCE` statement while we still allow creating implicit
1101    /// subsources from `CREATE SOURCE`, but will be removed once
1102    /// fully deprecating that feature and forcing users to use explicit
1103    /// `CREATE TABLE .. FROM SOURCE` statements
1104    TextColumns,
1105}
1106
1107impl AstDisplay for PgConfigOptionName {
1108    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1109        f.write_str(match self {
1110            PgConfigOptionName::Details => "DETAILS",
1111            PgConfigOptionName::Publication => "PUBLICATION",
1112            PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1113        })
1114    }
1115}
1116impl_display!(PgConfigOptionName);
1117
1118impl WithOptionName for PgConfigOptionName {
1119    /// # WARNING
1120    ///
1121    /// Whenever implementing this trait consider very carefully whether or not
1122    /// this value could contain sensitive user data. If you're uncertain, err
1123    /// on the conservative side and return `true`.
1124    fn redact_value(&self) -> bool {
1125        match self {
1126            PgConfigOptionName::Details
1127            | PgConfigOptionName::Publication
1128            | PgConfigOptionName::TextColumns => false,
1129        }
1130    }
1131}
1132
1133#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1134/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1135pub struct PgConfigOption<T: AstInfo> {
1136    pub name: PgConfigOptionName,
1137    pub value: Option<WithOptionValue<T>>,
1138}
1139impl_display_for_with_option!(PgConfigOption);
1140impl_display_t!(PgConfigOption);
1141
1142#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1143pub enum MySqlConfigOptionName {
1144    /// Hex encoded string of binary serialization of
1145    /// `mz_storage_types::sources::mysql::MySqlSourceDetails`
1146    Details,
1147    /// Columns whose types you want to unconditionally format as text
1148    /// NOTE(roshan): This value is kept around to allow round-tripping a
1149    /// `CREATE SOURCE` statement while we still allow creating implicit
1150    /// subsources from `CREATE SOURCE`, but will be removed once
1151    /// fully deprecating that feature and forcing users to use explicit
1152    /// `CREATE TABLE .. FROM SOURCE` statements
1153    TextColumns,
1154    /// Columns you want to exclude
1155    /// NOTE(roshan): This value is kept around to allow round-tripping a
1156    /// `CREATE SOURCE` statement while we still allow creating implicit
1157    /// subsources from `CREATE SOURCE`, but will be removed once
1158    /// fully deprecating that feature and forcing users to use explicit
1159    /// `CREATE TABLE .. FROM SOURCE` statements
1160    ExcludeColumns,
1161}
1162
1163impl AstDisplay for MySqlConfigOptionName {
1164    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1165        f.write_str(match self {
1166            MySqlConfigOptionName::Details => "DETAILS",
1167            MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1168            MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1169        })
1170    }
1171}
1172impl_display!(MySqlConfigOptionName);
1173
1174impl WithOptionName for MySqlConfigOptionName {
1175    /// # WARNING
1176    ///
1177    /// Whenever implementing this trait consider very carefully whether or not
1178    /// this value could contain sensitive user data. If you're uncertain, err
1179    /// on the conservative side and return `true`.
1180    fn redact_value(&self) -> bool {
1181        match self {
1182            MySqlConfigOptionName::Details
1183            | MySqlConfigOptionName::TextColumns
1184            | MySqlConfigOptionName::ExcludeColumns => false,
1185        }
1186    }
1187}
1188
1189#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1190/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1191pub struct MySqlConfigOption<T: AstInfo> {
1192    pub name: MySqlConfigOptionName,
1193    pub value: Option<WithOptionValue<T>>,
1194}
1195impl_display_for_with_option!(MySqlConfigOption);
1196impl_display_t!(MySqlConfigOption);
1197
1198#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1199pub enum SqlServerConfigOptionName {
1200    /// Hex encoded string of binary serialization of
1201    /// `mz_storage_types::sources::sql_server::SqlServerSourceDetails`.
1202    Details,
1203    /// Columns whose types you want to unconditionally format as text.
1204    ///
1205    /// NOTE(roshan): This value is kept around to allow round-tripping a
1206    /// `CREATE SOURCE` statement while we still allow creating implicit
1207    /// subsources from `CREATE SOURCE`, but will be removed once
1208    /// fully deprecating that feature and forcing users to use explicit
1209    /// `CREATE TABLE .. FROM SOURCE` statements
1210    TextColumns,
1211    /// Columns you want to exclude.
1212    ///
1213    /// NOTE(roshan): This value is kept around to allow round-tripping a
1214    /// `CREATE SOURCE` statement while we still allow creating implicit
1215    /// subsources from `CREATE SOURCE`, but will be removed once
1216    /// fully deprecating that feature and forcing users to use explicit
1217    /// `CREATE TABLE .. FROM SOURCE` statements
1218    ExcludeColumns,
1219}
1220
1221impl AstDisplay for SqlServerConfigOptionName {
1222    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1223        f.write_str(match self {
1224            SqlServerConfigOptionName::Details => "DETAILS",
1225            SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1226            SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1227        })
1228    }
1229}
1230impl_display!(SqlServerConfigOptionName);
1231
1232impl WithOptionName for SqlServerConfigOptionName {
1233    /// # WARNING
1234    ///
1235    /// Whenever implementing this trait consider very carefully whether or not
1236    /// this value could contain sensitive user data. If you're uncertain, err
1237    /// on the conservative side and return `true`.
1238    fn redact_value(&self) -> bool {
1239        match self {
1240            SqlServerConfigOptionName::Details
1241            | SqlServerConfigOptionName::TextColumns
1242            | SqlServerConfigOptionName::ExcludeColumns => false,
1243        }
1244    }
1245}
1246
1247#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1248/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1249pub struct SqlServerConfigOption<T: AstInfo> {
1250    pub name: SqlServerConfigOptionName,
1251    pub value: Option<WithOptionValue<T>>,
1252}
1253impl_display_for_with_option!(SqlServerConfigOption);
1254impl_display_t!(SqlServerConfigOption);
1255
1256#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1257pub enum CreateSourceConnection<T: AstInfo> {
1258    Kafka {
1259        connection: T::ItemName,
1260        options: Vec<KafkaSourceConfigOption<T>>,
1261    },
1262    Postgres {
1263        connection: T::ItemName,
1264        options: Vec<PgConfigOption<T>>,
1265    },
1266    SqlServer {
1267        connection: T::ItemName,
1268        options: Vec<SqlServerConfigOption<T>>,
1269    },
1270    MySql {
1271        connection: T::ItemName,
1272        options: Vec<MySqlConfigOption<T>>,
1273    },
1274    LoadGenerator {
1275        generator: LoadGenerator,
1276        options: Vec<LoadGeneratorOption<T>>,
1277    },
1278}
1279
1280impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1281    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1282        match self {
1283            CreateSourceConnection::Kafka {
1284                connection,
1285                options,
1286            } => {
1287                f.write_str("KAFKA CONNECTION ");
1288                f.write_node(connection);
1289                if !options.is_empty() {
1290                    f.write_str(" (");
1291                    f.write_node(&display::comma_separated(options));
1292                    f.write_str(")");
1293                }
1294            }
1295            CreateSourceConnection::Postgres {
1296                connection,
1297                options,
1298            } => {
1299                f.write_str("POSTGRES CONNECTION ");
1300                f.write_node(connection);
1301                if !options.is_empty() {
1302                    f.write_str(" (");
1303                    f.write_node(&display::comma_separated(options));
1304                    f.write_str(")");
1305                }
1306            }
1307            CreateSourceConnection::SqlServer {
1308                connection,
1309                options,
1310            } => {
1311                f.write_str("SQL SERVER CONNECTION ");
1312                f.write_node(connection);
1313                if !options.is_empty() {
1314                    f.write_str(" (");
1315                    f.write_node(&display::comma_separated(options));
1316                    f.write_str(")");
1317                }
1318            }
1319            CreateSourceConnection::MySql {
1320                connection,
1321                options,
1322            } => {
1323                f.write_str("MYSQL CONNECTION ");
1324                f.write_node(connection);
1325                if !options.is_empty() {
1326                    f.write_str(" (");
1327                    f.write_node(&display::comma_separated(options));
1328                    f.write_str(")");
1329                }
1330            }
1331            CreateSourceConnection::LoadGenerator { generator, options } => {
1332                f.write_str("LOAD GENERATOR ");
1333                f.write_node(generator);
1334                if !options.is_empty() {
1335                    f.write_str(" (");
1336                    f.write_node(&display::comma_separated(options));
1337                    f.write_str(")");
1338                }
1339            }
1340        }
1341    }
1342}
1343impl_display_t!(CreateSourceConnection);
1344
1345#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1346pub enum LoadGenerator {
1347    Clock,
1348    Counter,
1349    Marketing,
1350    Auction,
1351    Datums,
1352    Tpch,
1353    KeyValue,
1354}
1355
1356impl AstDisplay for LoadGenerator {
1357    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1358        match self {
1359            Self::Counter => f.write_str("COUNTER"),
1360            Self::Clock => f.write_str("CLOCK"),
1361            Self::Marketing => f.write_str("MARKETING"),
1362            Self::Auction => f.write_str("AUCTION"),
1363            Self::Datums => f.write_str("DATUMS"),
1364            Self::Tpch => f.write_str("TPCH"),
1365            Self::KeyValue => f.write_str("KEY VALUE"),
1366        }
1367    }
1368}
1369impl_display!(LoadGenerator);
1370
1371impl LoadGenerator {
1372    /// Corresponds with the same mapping on the `LoadGenerator` enum defined in
1373    /// src/storage-types/src/sources/load_generator.rs, but re-defined here for
1374    /// cases where we only have the AST representation. This can be removed once
1375    /// the `ast_rewrite_sources_to_tables` migration is removed.
1376    pub fn schema_name(&self) -> &'static str {
1377        match self {
1378            LoadGenerator::Counter => "counter",
1379            LoadGenerator::Clock => "clock",
1380            LoadGenerator::Marketing => "marketing",
1381            LoadGenerator::Auction => "auction",
1382            LoadGenerator::Datums => "datums",
1383            LoadGenerator::Tpch => "tpch",
1384            LoadGenerator::KeyValue => "key_value",
1385        }
1386    }
1387}
1388
1389#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1390pub enum LoadGeneratorOptionName {
1391    ScaleFactor,
1392    TickInterval,
1393    AsOf,
1394    UpTo,
1395    MaxCardinality,
1396    Keys,
1397    SnapshotRounds,
1398    TransactionalSnapshot,
1399    ValueSize,
1400    Seed,
1401    Partitions,
1402    BatchSize,
1403}
1404
1405impl AstDisplay for LoadGeneratorOptionName {
1406    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1407        f.write_str(match self {
1408            LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1409            LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1410            LoadGeneratorOptionName::AsOf => "AS OF",
1411            LoadGeneratorOptionName::UpTo => "UP TO",
1412            LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1413            LoadGeneratorOptionName::Keys => "KEYS",
1414            LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1415            LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1416            LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1417            LoadGeneratorOptionName::Seed => "SEED",
1418            LoadGeneratorOptionName::Partitions => "PARTITIONS",
1419            LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1420        })
1421    }
1422}
1423impl_display!(LoadGeneratorOptionName);
1424
1425impl WithOptionName for LoadGeneratorOptionName {
1426    /// # WARNING
1427    ///
1428    /// Whenever implementing this trait consider very carefully whether or not
1429    /// this value could contain sensitive user data. If you're uncertain, err
1430    /// on the conservative side and return `true`.
1431    fn redact_value(&self) -> bool {
1432        match self {
1433            LoadGeneratorOptionName::ScaleFactor
1434            | LoadGeneratorOptionName::TickInterval
1435            | LoadGeneratorOptionName::AsOf
1436            | LoadGeneratorOptionName::UpTo
1437            | LoadGeneratorOptionName::MaxCardinality
1438            | LoadGeneratorOptionName::Keys
1439            | LoadGeneratorOptionName::SnapshotRounds
1440            | LoadGeneratorOptionName::TransactionalSnapshot
1441            | LoadGeneratorOptionName::ValueSize
1442            | LoadGeneratorOptionName::Partitions
1443            | LoadGeneratorOptionName::BatchSize
1444            | LoadGeneratorOptionName::Seed => false,
1445        }
1446    }
1447}
1448
1449#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1450/// An option in a `CREATE CONNECTION...SSH`.
1451pub struct LoadGeneratorOption<T: AstInfo> {
1452    pub name: LoadGeneratorOptionName,
1453    pub value: Option<WithOptionValue<T>>,
1454}
1455impl_display_for_with_option!(LoadGeneratorOption);
1456impl_display_t!(LoadGeneratorOption);
1457
1458#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1459pub enum CreateSinkConnection<T: AstInfo> {
1460    Kafka {
1461        connection: T::ItemName,
1462        options: Vec<KafkaSinkConfigOption<T>>,
1463        key: Option<KafkaSinkKey>,
1464        headers: Option<Ident>,
1465    },
1466}
1467
1468impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1469    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1470        match self {
1471            CreateSinkConnection::Kafka {
1472                connection,
1473                options,
1474                key,
1475                headers,
1476            } => {
1477                f.write_str("KAFKA CONNECTION ");
1478                f.write_node(connection);
1479                if !options.is_empty() {
1480                    f.write_str(" (");
1481                    f.write_node(&display::comma_separated(options));
1482                    f.write_str(")");
1483                }
1484                if let Some(key) = key.as_ref() {
1485                    f.write_node(key);
1486                }
1487                if let Some(headers) = headers {
1488                    f.write_str(" HEADERS ");
1489                    f.write_node(headers);
1490                }
1491            }
1492        }
1493    }
1494}
1495impl_display_t!(CreateSinkConnection);
1496
1497#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1498pub struct KafkaSinkKey {
1499    pub key_columns: Vec<Ident>,
1500    pub not_enforced: bool,
1501}
1502
1503impl AstDisplay for KafkaSinkKey {
1504    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1505        f.write_str(" KEY (");
1506        f.write_node(&display::comma_separated(&self.key_columns));
1507        f.write_str(")");
1508        if self.not_enforced {
1509            f.write_str(" NOT ENFORCED");
1510        }
1511    }
1512}
1513
1514/// A table-level constraint, specified in a `CREATE TABLE` or an
1515/// `ALTER TABLE ADD <constraint>` statement.
1516#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1517pub enum TableConstraint<T: AstInfo> {
1518    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE (NULLS NOT DISTINCT)? } (<columns>)`
1519    Unique {
1520        name: Option<Ident>,
1521        columns: Vec<Ident>,
1522        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
1523        is_primary: bool,
1524        // Where this constraint treats each NULL value as distinct; only available on `UNIQUE`
1525        // constraints.
1526        nulls_not_distinct: bool,
1527    },
1528    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
1529    /// REFERENCES <foreign_table> (<referred_columns>)`)
1530    ForeignKey {
1531        name: Option<Ident>,
1532        columns: Vec<Ident>,
1533        foreign_table: T::ItemName,
1534        referred_columns: Vec<Ident>,
1535    },
1536    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
1537    Check {
1538        name: Option<Ident>,
1539        expr: Box<Expr<T>>,
1540    },
1541}
1542
1543impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1544    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1545        match self {
1546            TableConstraint::Unique {
1547                name,
1548                columns,
1549                is_primary,
1550                nulls_not_distinct,
1551            } => {
1552                f.write_node(&display_constraint_name(name));
1553                if *is_primary {
1554                    f.write_str("PRIMARY KEY ");
1555                } else {
1556                    f.write_str("UNIQUE ");
1557                    if *nulls_not_distinct {
1558                        f.write_str("NULLS NOT DISTINCT ");
1559                    }
1560                }
1561                f.write_str("(");
1562                f.write_node(&display::comma_separated(columns));
1563                f.write_str(")");
1564            }
1565            TableConstraint::ForeignKey {
1566                name,
1567                columns,
1568                foreign_table,
1569                referred_columns,
1570            } => {
1571                f.write_node(&display_constraint_name(name));
1572                f.write_str("FOREIGN KEY (");
1573                f.write_node(&display::comma_separated(columns));
1574                f.write_str(") REFERENCES ");
1575                f.write_node(foreign_table);
1576                f.write_str("(");
1577                f.write_node(&display::comma_separated(referred_columns));
1578                f.write_str(")");
1579            }
1580            TableConstraint::Check { name, expr } => {
1581                f.write_node(&display_constraint_name(name));
1582                f.write_str("CHECK (");
1583                f.write_node(&expr);
1584                f.write_str(")");
1585            }
1586        }
1587    }
1588}
1589impl_display_t!(TableConstraint);
1590
1591/// A key constraint, specified in a `CREATE SOURCE`.
1592#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1593pub enum KeyConstraint {
1594    // PRIMARY KEY (<columns>) NOT ENFORCED
1595    PrimaryKeyNotEnforced { columns: Vec<Ident> },
1596}
1597
1598impl AstDisplay for KeyConstraint {
1599    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1600        match self {
1601            KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1602                f.write_str("PRIMARY KEY ");
1603                f.write_str("(");
1604                f.write_node(&display::comma_separated(columns));
1605                f.write_str(") ");
1606                f.write_str("NOT ENFORCED");
1607            }
1608        }
1609    }
1610}
1611impl_display!(KeyConstraint);
1612
1613#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1614pub enum CreateSourceOptionName {
1615    TimestampInterval,
1616    RetainHistory,
1617}
1618
1619impl AstDisplay for CreateSourceOptionName {
1620    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1621        f.write_str(match self {
1622            CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1623            CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1624        })
1625    }
1626}
1627impl_display!(CreateSourceOptionName);
1628
1629impl WithOptionName for CreateSourceOptionName {
1630    /// # WARNING
1631    ///
1632    /// Whenever implementing this trait consider very carefully whether or not
1633    /// this value could contain sensitive user data. If you're uncertain, err
1634    /// on the conservative side and return `true`.
1635    fn redact_value(&self) -> bool {
1636        match self {
1637            CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1638                false
1639            }
1640        }
1641    }
1642}
1643
1644#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1645/// An option in a `CREATE SOURCE...` statement.
1646pub struct CreateSourceOption<T: AstInfo> {
1647    pub name: CreateSourceOptionName,
1648    pub value: Option<WithOptionValue<T>>,
1649}
1650impl_display_for_with_option!(CreateSourceOption);
1651impl_display_t!(CreateSourceOption);
1652
1653/// SQL column definition
1654#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1655pub struct ColumnDef<T: AstInfo> {
1656    pub name: Ident,
1657    pub data_type: T::DataType,
1658    pub collation: Option<UnresolvedItemName>,
1659    pub options: Vec<ColumnOptionDef<T>>,
1660}
1661
1662impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1663    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1664        f.write_node(&self.name);
1665        f.write_str(" ");
1666        f.write_node(&self.data_type);
1667        if let Some(collation) = &self.collation {
1668            f.write_str(" COLLATE ");
1669            f.write_node(collation);
1670        }
1671        for option in &self.options {
1672            f.write_str(" ");
1673            f.write_node(option);
1674        }
1675    }
1676}
1677impl_display_t!(ColumnDef);
1678
1679/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
1680///
1681/// Note that implementations are substantially more permissive than the ANSI
1682/// specification on what order column options can be presented in, and whether
1683/// they are allowed to be named. The specification distinguishes between
1684/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
1685/// and can appear in any order, and other options (DEFAULT, GENERATED), which
1686/// cannot be named and must appear in a fixed order. PostgreSQL, however,
1687/// allows preceding any option with `CONSTRAINT <name>`, even those that are
1688/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
1689/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
1690/// NOT NULL constraints (the last of which is in violation of the spec).
1691///
1692/// For maximum flexibility, we don't distinguish between constraint and
1693/// non-constraint options, lumping them all together under the umbrella of
1694/// "column options," and we allow any column option to be named.
1695#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1696pub struct ColumnOptionDef<T: AstInfo> {
1697    pub name: Option<Ident>,
1698    pub option: ColumnOption<T>,
1699}
1700
1701impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1702    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1703        f.write_node(&display_constraint_name(&self.name));
1704        f.write_node(&self.option);
1705    }
1706}
1707impl_display_t!(ColumnOptionDef);
1708
1709/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
1710/// TABLE` statement.
1711#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1712pub enum ColumnOption<T: AstInfo> {
1713    /// `NULL`
1714    Null,
1715    /// `NOT NULL`
1716    NotNull,
1717    /// `DEFAULT <restricted-expr>`
1718    Default(Expr<T>),
1719    /// `{ PRIMARY KEY | UNIQUE }`
1720    Unique { is_primary: bool },
1721    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
1722    /// <foreign_table> (<referred_columns>)`).
1723    ForeignKey {
1724        foreign_table: UnresolvedItemName,
1725        referred_columns: Vec<Ident>,
1726    },
1727    /// `CHECK (<expr>)`
1728    Check(Expr<T>),
1729    /// `VERSION <action> <version>`
1730    Versioned {
1731        action: ColumnVersioned,
1732        version: Version,
1733    },
1734}
1735
1736impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1737    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1738        use ColumnOption::*;
1739        match self {
1740            Null => f.write_str("NULL"),
1741            NotNull => f.write_str("NOT NULL"),
1742            Default(expr) => {
1743                f.write_str("DEFAULT ");
1744                f.write_node(expr);
1745            }
1746            Unique { is_primary } => {
1747                if *is_primary {
1748                    f.write_str("PRIMARY KEY");
1749                } else {
1750                    f.write_str("UNIQUE");
1751                }
1752            }
1753            ForeignKey {
1754                foreign_table,
1755                referred_columns,
1756            } => {
1757                f.write_str("REFERENCES ");
1758                f.write_node(foreign_table);
1759                f.write_str(" (");
1760                f.write_node(&display::comma_separated(referred_columns));
1761                f.write_str(")");
1762            }
1763            Check(expr) => {
1764                f.write_str("CHECK (");
1765                f.write_node(expr);
1766                f.write_str(")");
1767            }
1768            Versioned { action, version } => {
1769                f.write_str("VERSION ");
1770                f.write_node(action);
1771                f.write_str(" ");
1772                f.write_node(version);
1773            }
1774        }
1775    }
1776}
1777impl_display_t!(ColumnOption);
1778
1779#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1780pub enum ColumnVersioned {
1781    Added,
1782}
1783
1784impl AstDisplay for ColumnVersioned {
1785    fn fmt<W>(&self, f: &mut AstFormatter<W>)
1786    where
1787        W: fmt::Write,
1788    {
1789        match self {
1790            // TODO(alter_table): Support dropped columns.
1791            ColumnVersioned::Added => f.write_str("ADDED"),
1792        }
1793    }
1794}
1795impl_display!(ColumnVersioned);
1796
1797fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
1798    struct ConstraintName<'a>(&'a Option<Ident>);
1799    impl<'a> AstDisplay for ConstraintName<'a> {
1800        fn fmt<W>(&self, f: &mut AstFormatter<W>)
1801        where
1802            W: fmt::Write,
1803        {
1804            if let Some(name) = self.0 {
1805                f.write_str("CONSTRAINT ");
1806                f.write_node(name);
1807                f.write_str(" ");
1808            }
1809        }
1810    }
1811    ConstraintName(name)
1812}