mz_sql_parser/ast/defs/
ddl.rs

1// Copyright 2018 sqlparser-rs contributors. All rights reserved.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// This file is derived from the sqlparser-rs project, available at
5// https://github.com/andygrove/sqlparser-rs. It was incorporated
6// directly into Materialize on December 21, 2019.
7//
8// Licensed under the Apache License, Version 2.0 (the "License");
9// you may not use this file except in compliance with the License.
10// You may obtain a copy of the License in the LICENSE file at the
11// root of this repository, or online at
12//
13//     http://www.apache.org/licenses/LICENSE-2.0
14//
15// Unless required by applicable law or agreed to in writing, software
16// distributed under the License is distributed on an "AS IS" BASIS,
17// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18// See the License for the specific language governing permissions and
19// limitations under the License.
20
21//! AST types specific to CREATE/ALTER variants of [crate::ast::Statement]
22//! (commonly referred to as Data Definition Language, or DDL)
23
24use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28    AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33    /// The `ASSERT NOT NULL [=] <ident>` option.
34    AssertNotNull,
35    PartitionBy,
36    RetainHistory,
37    /// The `REFRESH [=] ...` option.
38    Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43        match self {
44            MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45            MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46            MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47            MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48        }
49    }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53    /// # WARNING
54    ///
55    /// Whenever implementing this trait consider very carefully whether or not
56    /// this value could contain sensitive user data. If you're uncertain, err
57    /// on the conservative side and return `true`.
58    fn redact_value(&self) -> bool {
59        match self {
60            MaterializedViewOptionName::AssertNotNull
61            | MaterializedViewOptionName::PartitionBy
62            | MaterializedViewOptionName::RetainHistory
63            | MaterializedViewOptionName::Refresh => false,
64        }
65    }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70    pub name: MaterializedViewOptionName,
71    pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
76pub enum ContinualTaskOptionName {
77    /// The `SNAPSHOT [=] ...` option.
78    Snapshot,
79}
80
81impl AstDisplay for ContinualTaskOptionName {
82    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
83        match self {
84            ContinualTaskOptionName::Snapshot => f.write_str("SNAPSHOT"),
85        }
86    }
87}
88
89impl WithOptionName for ContinualTaskOptionName {
90    /// # WARNING
91    ///
92    /// Whenever implementing this trait consider very carefully whether or not
93    /// this value could contain sensitive user data. If you're uncertain, err
94    /// on the conservative side and return `true`.
95    fn redact_value(&self) -> bool {
96        match self {
97            ContinualTaskOptionName::Snapshot => false,
98        }
99    }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
103pub struct ContinualTaskOption<T: AstInfo> {
104    pub name: ContinualTaskOptionName,
105    pub value: Option<WithOptionValue<T>>,
106}
107impl_display_for_with_option!(ContinualTaskOption);
108
109#[derive(Debug, Clone, PartialEq, Eq, Hash)]
110pub struct Schema {
111    pub schema: String,
112}
113
114impl AstDisplay for Schema {
115    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
116        f.write_str("SCHEMA '");
117        f.write_node(&display::escape_single_quote_string(&self.schema));
118        f.write_str("'");
119    }
120}
121impl_display!(Schema);
122
123#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
124pub enum AvroSchemaOptionName {
125    /// The `CONFLUENT WIRE FORMAT [=] <bool>` option.
126    ConfluentWireFormat,
127}
128
129impl AstDisplay for AvroSchemaOptionName {
130    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
131        match self {
132            AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
133        }
134    }
135}
136
137impl WithOptionName for AvroSchemaOptionName {
138    /// # WARNING
139    ///
140    /// Whenever implementing this trait consider very carefully whether or not
141    /// this value could contain sensitive user data. If you're uncertain, err
142    /// on the conservative side and return `true`.
143    fn redact_value(&self) -> bool {
144        match self {
145            Self::ConfluentWireFormat => false,
146        }
147    }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
151pub struct AvroSchemaOption<T: AstInfo> {
152    pub name: AvroSchemaOptionName,
153    pub value: Option<WithOptionValue<T>>,
154}
155impl_display_for_with_option!(AvroSchemaOption);
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum AvroSchema<T: AstInfo> {
159    Csr {
160        csr_connection: CsrConnectionAvro<T>,
161    },
162    InlineSchema {
163        schema: Schema,
164        with_options: Vec<AvroSchemaOption<T>>,
165    },
166}
167
168impl<T: AstInfo> AstDisplay for AvroSchema<T> {
169    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
170        match self {
171            Self::Csr { csr_connection } => {
172                f.write_node(csr_connection);
173            }
174            Self::InlineSchema {
175                schema,
176                with_options,
177            } => {
178                f.write_str("USING ");
179                schema.fmt(f);
180                if !with_options.is_empty() {
181                    f.write_str(" (");
182                    f.write_node(&display::comma_separated(with_options));
183                    f.write_str(")");
184                }
185            }
186        }
187    }
188}
189impl_display_t!(AvroSchema);
190
191#[derive(Debug, Clone, PartialEq, Eq, Hash)]
192pub enum ProtobufSchema<T: AstInfo> {
193    Csr {
194        csr_connection: CsrConnectionProtobuf<T>,
195    },
196    InlineSchema {
197        message_name: String,
198        schema: Schema,
199    },
200}
201
202impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
203    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
204        match self {
205            Self::Csr { csr_connection } => {
206                f.write_node(csr_connection);
207            }
208            Self::InlineSchema {
209                message_name,
210                schema,
211            } => {
212                f.write_str("MESSAGE '");
213                f.write_node(&display::escape_single_quote_string(message_name));
214                f.write_str("' USING ");
215                f.write_str(schema);
216            }
217        }
218    }
219}
220impl_display_t!(ProtobufSchema);
221
222#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
223pub enum CsrConfigOptionName<T: AstInfo> {
224    AvroKeyFullname,
225    AvroValueFullname,
226    NullDefaults,
227    AvroDocOn(AvroDocOn<T>),
228    KeyCompatibilityLevel,
229    ValueCompatibilityLevel,
230}
231
232impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
233    /// # WARNING
234    ///
235    /// Whenever implementing this trait consider very carefully whether or not
236    /// this value could contain sensitive user data. If you're uncertain, err
237    /// on the conservative side and return `true`.
238    fn redact_value(&self) -> bool {
239        match self {
240            Self::AvroKeyFullname
241            | Self::AvroValueFullname
242            | Self::NullDefaults
243            | Self::AvroDocOn(_)
244            | Self::KeyCompatibilityLevel
245            | Self::ValueCompatibilityLevel => false,
246        }
247    }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
251pub struct AvroDocOn<T: AstInfo> {
252    pub identifier: DocOnIdentifier<T>,
253    pub for_schema: DocOnSchema,
254}
255#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
256pub enum DocOnSchema {
257    KeyOnly,
258    ValueOnly,
259    All,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
263pub enum DocOnIdentifier<T: AstInfo> {
264    Column(ColumnName<T>),
265    Type(T::ItemName),
266}
267
268impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
269    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
270        match &self.for_schema {
271            DocOnSchema::KeyOnly => f.write_str("KEY "),
272            DocOnSchema::ValueOnly => f.write_str("VALUE "),
273            DocOnSchema::All => {}
274        }
275        match &self.identifier {
276            DocOnIdentifier::Column(name) => {
277                f.write_str("DOC ON COLUMN ");
278                f.write_node(name);
279            }
280            DocOnIdentifier::Type(name) => {
281                f.write_str("DOC ON TYPE ");
282                f.write_node(name);
283            }
284        }
285    }
286}
287impl_display_t!(AvroDocOn);
288
289impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
290    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
291        match self {
292            CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
293            CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
294            CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
295            CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
296            CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
297            CsrConfigOptionName::ValueCompatibilityLevel => {
298                f.write_str("VALUE COMPATIBILITY LEVEL")
299            }
300        }
301    }
302}
303impl_display_t!(CsrConfigOptionName);
304
305#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
306/// An option in a `{FROM|INTO} CONNECTION ...` statement.
307pub struct CsrConfigOption<T: AstInfo> {
308    pub name: CsrConfigOptionName<T>,
309    pub value: Option<WithOptionValue<T>>,
310}
311impl_display_for_with_option!(CsrConfigOption);
312impl_display_t!(CsrConfigOption);
313
314#[derive(Debug, Clone, PartialEq, Eq, Hash)]
315pub struct CsrConnection<T: AstInfo> {
316    pub connection: T::ItemName,
317    pub options: Vec<CsrConfigOption<T>>,
318}
319
320impl<T: AstInfo> AstDisplay for CsrConnection<T> {
321    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
322        f.write_str("CONNECTION ");
323        f.write_node(&self.connection);
324        if !self.options.is_empty() {
325            f.write_str(" (");
326            f.write_node(&display::comma_separated(&self.options));
327            f.write_str(")");
328        }
329    }
330}
331impl_display_t!(CsrConnection);
332
333#[derive(Debug, Clone, PartialEq, Eq, Hash)]
334pub enum ReaderSchemaSelectionStrategy {
335    Latest,
336    Inline(String),
337    ById(i32),
338}
339
340impl Default for ReaderSchemaSelectionStrategy {
341    fn default() -> Self {
342        Self::Latest
343    }
344}
345
346#[derive(Debug, Clone, PartialEq, Eq, Hash)]
347pub struct CsrConnectionAvro<T: AstInfo> {
348    pub connection: CsrConnection<T>,
349    pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
350    pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
351    pub seed: Option<CsrSeedAvro>,
352}
353
354impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
355    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
356        f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
357        f.write_node(&self.connection);
358        if let Some(seed) = &self.seed {
359            f.write_str(" ");
360            f.write_node(seed);
361        }
362    }
363}
364impl_display_t!(CsrConnectionAvro);
365
366#[derive(Debug, Clone, PartialEq, Eq, Hash)]
367pub struct CsrConnectionProtobuf<T: AstInfo> {
368    pub connection: CsrConnection<T>,
369    pub seed: Option<CsrSeedProtobuf>,
370}
371
372impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
373    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
374        f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
375        f.write_node(&self.connection);
376
377        if let Some(seed) = &self.seed {
378            f.write_str(" ");
379            f.write_node(seed);
380        }
381    }
382}
383impl_display_t!(CsrConnectionProtobuf);
384
385#[derive(Debug, Clone, PartialEq, Eq, Hash)]
386pub struct CsrSeedAvro {
387    pub key_schema: Option<String>,
388    pub value_schema: String,
389}
390
391impl AstDisplay for CsrSeedAvro {
392    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
393        f.write_str("SEED");
394        if let Some(key_schema) = &self.key_schema {
395            f.write_str(" KEY SCHEMA '");
396            f.write_node(&display::escape_single_quote_string(key_schema));
397            f.write_str("'");
398        }
399        f.write_str(" VALUE SCHEMA '");
400        f.write_node(&display::escape_single_quote_string(&self.value_schema));
401        f.write_str("'");
402    }
403}
404impl_display!(CsrSeedAvro);
405
406#[derive(Debug, Clone, PartialEq, Eq, Hash)]
407pub struct CsrSeedProtobuf {
408    pub key: Option<CsrSeedProtobufSchema>,
409    pub value: CsrSeedProtobufSchema,
410}
411
412impl AstDisplay for CsrSeedProtobuf {
413    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
414        f.write_str("SEED");
415        if let Some(key) = &self.key {
416            f.write_str(" KEY ");
417            f.write_node(key);
418        }
419        f.write_str(" VALUE ");
420        f.write_node(&self.value);
421    }
422}
423impl_display!(CsrSeedProtobuf);
424
425#[derive(Debug, Clone, PartialEq, Eq, Hash)]
426pub struct CsrSeedProtobufSchema {
427    // Hex encoded string.
428    pub schema: String,
429    pub message_name: String,
430}
431impl AstDisplay for CsrSeedProtobufSchema {
432    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
433        f.write_str("SCHEMA '");
434        f.write_str(&display::escape_single_quote_string(&self.schema));
435        f.write_str("' MESSAGE '");
436        f.write_str(&self.message_name);
437        f.write_str("'");
438    }
439}
440impl_display!(CsrSeedProtobufSchema);
441
442#[derive(Debug, Clone, PartialEq, Eq, Hash)]
443pub enum FormatSpecifier<T: AstInfo> {
444    /// `CREATE SOURCE/SINK .. FORMAT`
445    Bare(Format<T>),
446    /// `CREATE SOURCE/SINK .. KEY FORMAT .. VALUE FORMAT`
447    KeyValue { key: Format<T>, value: Format<T> },
448}
449
450impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
451    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
452        match self {
453            FormatSpecifier::Bare(format) => {
454                f.write_str("FORMAT ");
455                f.write_node(format)
456            }
457            FormatSpecifier::KeyValue { key, value } => {
458                f.write_str("KEY FORMAT ");
459                f.write_node(key);
460                f.write_str(" VALUE FORMAT ");
461                f.write_node(value);
462            }
463        }
464    }
465}
466impl_display_t!(FormatSpecifier);
467
468#[derive(Debug, Clone, PartialEq, Eq, Hash)]
469pub enum Format<T: AstInfo> {
470    Bytes,
471    Avro(AvroSchema<T>),
472    Protobuf(ProtobufSchema<T>),
473    Regex(String),
474    Csv {
475        columns: CsvColumns,
476        delimiter: char,
477    },
478    Json {
479        array: bool,
480    },
481    Text,
482}
483
484#[derive(Debug, Clone, PartialEq, Eq, Hash)]
485pub enum CsvColumns {
486    /// `WITH count COLUMNS`
487    Count(u64),
488    /// `WITH HEADER (ident, ...)?`: `names` is empty if there are no names specified
489    Header { names: Vec<Ident> },
490}
491
492impl AstDisplay for CsvColumns {
493    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
494        match self {
495            CsvColumns::Count(n) => {
496                f.write_str(n);
497                f.write_str(" COLUMNS")
498            }
499            CsvColumns::Header { names } => {
500                f.write_str("HEADER");
501                if !names.is_empty() {
502                    f.write_str(" (");
503                    f.write_node(&display::comma_separated(names));
504                    f.write_str(")");
505                }
506            }
507        }
508    }
509}
510
511#[derive(Debug, Clone, PartialEq, Eq, Hash)]
512pub enum SourceIncludeMetadata {
513    Key {
514        alias: Option<Ident>,
515    },
516    Timestamp {
517        alias: Option<Ident>,
518    },
519    Partition {
520        alias: Option<Ident>,
521    },
522    Offset {
523        alias: Option<Ident>,
524    },
525    Headers {
526        alias: Option<Ident>,
527    },
528    Header {
529        key: String,
530        alias: Ident,
531        use_bytes: bool,
532    },
533}
534
535impl AstDisplay for SourceIncludeMetadata {
536    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
537        let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
538            if let Some(alias) = alias {
539                f.write_str(" AS ");
540                f.write_node(alias);
541            }
542        };
543
544        match self {
545            SourceIncludeMetadata::Key { alias } => {
546                f.write_str("KEY");
547                print_alias(f, alias);
548            }
549            SourceIncludeMetadata::Timestamp { alias } => {
550                f.write_str("TIMESTAMP");
551                print_alias(f, alias);
552            }
553            SourceIncludeMetadata::Partition { alias } => {
554                f.write_str("PARTITION");
555                print_alias(f, alias);
556            }
557            SourceIncludeMetadata::Offset { alias } => {
558                f.write_str("OFFSET");
559                print_alias(f, alias);
560            }
561            SourceIncludeMetadata::Headers { alias } => {
562                f.write_str("HEADERS");
563                print_alias(f, alias);
564            }
565            SourceIncludeMetadata::Header {
566                alias,
567                key,
568                use_bytes,
569            } => {
570                f.write_str("HEADER '");
571                f.write_str(&display::escape_single_quote_string(key));
572                f.write_str("'");
573                print_alias(f, &Some(alias.clone()));
574                if *use_bytes {
575                    f.write_str(" BYTES");
576                }
577            }
578        }
579    }
580}
581impl_display!(SourceIncludeMetadata);
582
583#[derive(Debug, Clone, PartialEq, Eq, Hash)]
584pub enum SourceErrorPolicy {
585    Inline {
586        /// The alias to use for the error column. If unspecified will be `error`.
587        alias: Option<Ident>,
588    },
589}
590
591impl AstDisplay for SourceErrorPolicy {
592    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
593        match self {
594            Self::Inline { alias } => {
595                f.write_str("INLINE");
596                if let Some(alias) = alias {
597                    f.write_str(" AS ");
598                    f.write_node(alias);
599                }
600            }
601        }
602    }
603}
604impl_display!(SourceErrorPolicy);
605
606#[derive(Debug, Clone, PartialEq, Eq, Hash)]
607pub enum SourceEnvelope {
608    None,
609    Debezium,
610    Upsert {
611        value_decode_err_policy: Vec<SourceErrorPolicy>,
612    },
613    CdcV2,
614}
615
616impl SourceEnvelope {
617    /// `true` iff Materialize is expected to crash or exhibit UB
618    /// when attempting to ingest data starting at an offset other than zero.
619    pub fn requires_all_input(&self) -> bool {
620        match self {
621            SourceEnvelope::None => false,
622            SourceEnvelope::Debezium => false,
623            SourceEnvelope::Upsert { .. } => false,
624            SourceEnvelope::CdcV2 => true,
625        }
626    }
627}
628
629impl AstDisplay for SourceEnvelope {
630    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
631        match self {
632            Self::None => {
633                // this is unreachable as long as the default is None, but include it in case we ever change that
634                f.write_str("NONE");
635            }
636            Self::Debezium => {
637                f.write_str("DEBEZIUM");
638            }
639            Self::Upsert {
640                value_decode_err_policy,
641            } => {
642                if value_decode_err_policy.is_empty() {
643                    f.write_str("UPSERT");
644                } else {
645                    f.write_str("UPSERT (VALUE DECODING ERRORS = (");
646                    f.write_node(&display::comma_separated(value_decode_err_policy));
647                    f.write_str("))")
648                }
649            }
650            Self::CdcV2 => {
651                f.write_str("MATERIALIZE");
652            }
653        }
654    }
655}
656impl_display!(SourceEnvelope);
657
658#[derive(Debug, Clone, PartialEq, Eq, Hash)]
659pub enum SinkEnvelope {
660    Debezium,
661    Upsert,
662}
663
664impl AstDisplay for SinkEnvelope {
665    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
666        match self {
667            Self::Upsert => {
668                f.write_str("UPSERT");
669            }
670            Self::Debezium => {
671                f.write_str("DEBEZIUM");
672            }
673        }
674    }
675}
676impl_display!(SinkEnvelope);
677
678#[derive(Debug, Clone, PartialEq, Eq, Hash)]
679pub enum SubscribeOutput<T: AstInfo> {
680    Diffs,
681    WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
682    EnvelopeUpsert { key_columns: Vec<Ident> },
683    EnvelopeDebezium { key_columns: Vec<Ident> },
684}
685
686impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
687    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
688        match self {
689            Self::Diffs => {}
690            Self::WithinTimestampOrderBy { order_by } => {
691                f.write_str(" WITHIN TIMESTAMP ORDER BY ");
692                f.write_node(&display::comma_separated(order_by));
693            }
694            Self::EnvelopeUpsert { key_columns } => {
695                f.write_str(" ENVELOPE UPSERT (KEY (");
696                f.write_node(&display::comma_separated(key_columns));
697                f.write_str("))");
698            }
699            Self::EnvelopeDebezium { key_columns } => {
700                f.write_str(" ENVELOPE DEBEZIUM (KEY (");
701                f.write_node(&display::comma_separated(key_columns));
702                f.write_str("))");
703            }
704        }
705    }
706}
707impl_display_t!(SubscribeOutput);
708
709impl<T: AstInfo> AstDisplay for Format<T> {
710    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
711        match self {
712            Self::Bytes => f.write_str("BYTES"),
713            Self::Avro(inner) => {
714                f.write_str("AVRO ");
715                f.write_node(inner);
716            }
717            Self::Protobuf(inner) => {
718                f.write_str("PROTOBUF ");
719                f.write_node(inner);
720            }
721            Self::Regex(regex) => {
722                f.write_str("REGEX '");
723                f.write_node(&display::escape_single_quote_string(regex));
724                f.write_str("'");
725            }
726            Self::Csv { columns, delimiter } => {
727                f.write_str("CSV WITH ");
728                f.write_node(columns);
729
730                if *delimiter != ',' {
731                    f.write_str(" DELIMITED BY '");
732                    f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
733                    f.write_str("'");
734                }
735            }
736            Self::Json { array } => {
737                f.write_str("JSON");
738                if *array {
739                    f.write_str(" ARRAY");
740                }
741            }
742            Self::Text => f.write_str("TEXT"),
743        }
744    }
745}
746impl_display_t!(Format);
747
748// All connection options are bundled together to allow us to parse `ALTER
749// CONNECTION` without specifying the type of connection we're altering. Il faut
750// souffrir pour être belle.
751#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
752pub enum ConnectionOptionName {
753    AccessKeyId,
754    AssumeRoleArn,
755    AssumeRoleSessionName,
756    AvailabilityZones,
757    AwsConnection,
758    AwsPrivatelink,
759    Broker,
760    Brokers,
761    Credential,
762    Database,
763    Endpoint,
764    Host,
765    Password,
766    Port,
767    ProgressTopic,
768    ProgressTopicReplicationFactor,
769    PublicKey1,
770    PublicKey2,
771    Region,
772    SaslMechanisms,
773    SaslPassword,
774    SaslUsername,
775    Scope,
776    SecretAccessKey,
777    SecurityProtocol,
778    ServiceName,
779    SshTunnel,
780    SslCertificate,
781    SslCertificateAuthority,
782    SslKey,
783    SslMode,
784    SessionToken,
785    CatalogType,
786    Url,
787    User,
788    Warehouse,
789}
790
791impl AstDisplay for ConnectionOptionName {
792    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
793        f.write_str(match self {
794            ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
795            ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
796            ConnectionOptionName::AwsConnection => "AWS CONNECTION",
797            ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
798            ConnectionOptionName::Broker => "BROKER",
799            ConnectionOptionName::Brokers => "BROKERS",
800            ConnectionOptionName::Credential => "CREDENTIAL",
801            ConnectionOptionName::Database => "DATABASE",
802            ConnectionOptionName::Endpoint => "ENDPOINT",
803            ConnectionOptionName::Host => "HOST",
804            ConnectionOptionName::Password => "PASSWORD",
805            ConnectionOptionName::Port => "PORT",
806            ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
807            ConnectionOptionName::ProgressTopicReplicationFactor => {
808                "PROGRESS TOPIC REPLICATION FACTOR"
809            }
810            ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
811            ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
812            ConnectionOptionName::Region => "REGION",
813            ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
814            ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
815            ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
816            ConnectionOptionName::SaslPassword => "SASL PASSWORD",
817            ConnectionOptionName::SaslUsername => "SASL USERNAME",
818            ConnectionOptionName::Scope => "SCOPE",
819            ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
820            ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
821            ConnectionOptionName::ServiceName => "SERVICE NAME",
822            ConnectionOptionName::SshTunnel => "SSH TUNNEL",
823            ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
824            ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
825            ConnectionOptionName::SslKey => "SSL KEY",
826            ConnectionOptionName::SslMode => "SSL MODE",
827            ConnectionOptionName::SessionToken => "SESSION TOKEN",
828            ConnectionOptionName::CatalogType => "CATALOG TYPE",
829            ConnectionOptionName::Url => "URL",
830            ConnectionOptionName::User => "USER",
831            ConnectionOptionName::Warehouse => "WAREHOUSE",
832        })
833    }
834}
835impl_display!(ConnectionOptionName);
836
837impl WithOptionName for ConnectionOptionName {
838    /// # WARNING
839    ///
840    /// Whenever implementing this trait consider very carefully whether or not
841    /// this value could contain sensitive user data. If you're uncertain, err
842    /// on the conservative side and return `true`.
843    fn redact_value(&self) -> bool {
844        match self {
845            ConnectionOptionName::AccessKeyId
846            | ConnectionOptionName::AvailabilityZones
847            | ConnectionOptionName::AwsConnection
848            | ConnectionOptionName::AwsPrivatelink
849            | ConnectionOptionName::Broker
850            | ConnectionOptionName::Brokers
851            | ConnectionOptionName::Credential
852            | ConnectionOptionName::Database
853            | ConnectionOptionName::Endpoint
854            | ConnectionOptionName::Host
855            | ConnectionOptionName::Password
856            | ConnectionOptionName::Port
857            | ConnectionOptionName::ProgressTopic
858            | ConnectionOptionName::ProgressTopicReplicationFactor
859            | ConnectionOptionName::PublicKey1
860            | ConnectionOptionName::PublicKey2
861            | ConnectionOptionName::Region
862            | ConnectionOptionName::AssumeRoleArn
863            | ConnectionOptionName::AssumeRoleSessionName
864            | ConnectionOptionName::SaslMechanisms
865            | ConnectionOptionName::SaslPassword
866            | ConnectionOptionName::SaslUsername
867            | ConnectionOptionName::Scope
868            | ConnectionOptionName::SecurityProtocol
869            | ConnectionOptionName::SecretAccessKey
870            | ConnectionOptionName::ServiceName
871            | ConnectionOptionName::SshTunnel
872            | ConnectionOptionName::SslCertificate
873            | ConnectionOptionName::SslCertificateAuthority
874            | ConnectionOptionName::SslKey
875            | ConnectionOptionName::SslMode
876            | ConnectionOptionName::SessionToken
877            | ConnectionOptionName::CatalogType
878            | ConnectionOptionName::Url
879            | ConnectionOptionName::User
880            | ConnectionOptionName::Warehouse => false,
881        }
882    }
883}
884
885#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
886/// An option in a `CREATE CONNECTION`.
887pub struct ConnectionOption<T: AstInfo> {
888    pub name: ConnectionOptionName,
889    pub value: Option<WithOptionValue<T>>,
890}
891impl_display_for_with_option!(ConnectionOption);
892impl_display_t!(ConnectionOption);
893
894#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
895pub enum CreateConnectionType {
896    Aws,
897    AwsPrivatelink,
898    Kafka,
899    Csr,
900    Postgres,
901    Ssh,
902    SqlServer,
903    MySql,
904    IcebergCatalog,
905}
906
907impl AstDisplay for CreateConnectionType {
908    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
909        match self {
910            Self::Kafka => {
911                f.write_str("KAFKA");
912            }
913            Self::Csr => {
914                f.write_str("CONFLUENT SCHEMA REGISTRY");
915            }
916            Self::Postgres => {
917                f.write_str("POSTGRES");
918            }
919            Self::Aws => {
920                f.write_str("AWS");
921            }
922            Self::AwsPrivatelink => {
923                f.write_str("AWS PRIVATELINK");
924            }
925            Self::Ssh => {
926                f.write_str("SSH TUNNEL");
927            }
928            Self::SqlServer => {
929                f.write_str("SQL SERVER");
930            }
931            Self::MySql => {
932                f.write_str("MYSQL");
933            }
934            Self::IcebergCatalog => {
935                f.write_str("ICEBERG CATALOG");
936            }
937        }
938    }
939}
940impl_display!(CreateConnectionType);
941
942#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
943pub enum CreateConnectionOptionName {
944    Validate,
945}
946
947impl AstDisplay for CreateConnectionOptionName {
948    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
949        f.write_str(match self {
950            CreateConnectionOptionName::Validate => "VALIDATE",
951        })
952    }
953}
954impl_display!(CreateConnectionOptionName);
955
956impl WithOptionName for CreateConnectionOptionName {
957    /// # WARNING
958    ///
959    /// Whenever implementing this trait consider very carefully whether or not
960    /// this value could contain sensitive user data. If you're uncertain, err
961    /// on the conservative side and return `true`.
962    fn redact_value(&self) -> bool {
963        match self {
964            CreateConnectionOptionName::Validate => false,
965        }
966    }
967}
968
969#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
970/// An option in a `CREATE CONNECTION...` statement.
971pub struct CreateConnectionOption<T: AstInfo> {
972    pub name: CreateConnectionOptionName,
973    pub value: Option<WithOptionValue<T>>,
974}
975impl_display_for_with_option!(CreateConnectionOption);
976impl_display_t!(CreateConnectionOption);
977
978#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
979pub enum KafkaSourceConfigOptionName {
980    GroupIdPrefix,
981    Topic,
982    TopicMetadataRefreshInterval,
983    StartTimestamp,
984    StartOffset,
985}
986
987impl AstDisplay for KafkaSourceConfigOptionName {
988    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
989        f.write_str(match self {
990            KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
991            KafkaSourceConfigOptionName::Topic => "TOPIC",
992            KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
993                "TOPIC METADATA REFRESH INTERVAL"
994            }
995            KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
996            KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
997        })
998    }
999}
1000impl_display!(KafkaSourceConfigOptionName);
1001
1002impl WithOptionName for KafkaSourceConfigOptionName {
1003    /// # WARNING
1004    ///
1005    /// Whenever implementing this trait consider very carefully whether or not
1006    /// this value could contain sensitive user data. If you're uncertain, err
1007    /// on the conservative side and return `true`.
1008    fn redact_value(&self) -> bool {
1009        match self {
1010            KafkaSourceConfigOptionName::GroupIdPrefix
1011            | KafkaSourceConfigOptionName::Topic
1012            | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1013            | KafkaSourceConfigOptionName::StartOffset
1014            | KafkaSourceConfigOptionName::StartTimestamp => false,
1015        }
1016    }
1017}
1018
1019#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1020pub struct KafkaSourceConfigOption<T: AstInfo> {
1021    pub name: KafkaSourceConfigOptionName,
1022    pub value: Option<WithOptionValue<T>>,
1023}
1024impl_display_for_with_option!(KafkaSourceConfigOption);
1025impl_display_t!(KafkaSourceConfigOption);
1026
1027#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1028pub enum KafkaSinkConfigOptionName {
1029    CompressionType,
1030    PartitionBy,
1031    ProgressGroupIdPrefix,
1032    Topic,
1033    TransactionalIdPrefix,
1034    LegacyIds,
1035    TopicConfig,
1036    TopicMetadataRefreshInterval,
1037    TopicPartitionCount,
1038    TopicReplicationFactor,
1039}
1040
1041impl AstDisplay for KafkaSinkConfigOptionName {
1042    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1043        f.write_str(match self {
1044            KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1045            KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1046            KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1047            KafkaSinkConfigOptionName::Topic => "TOPIC",
1048            KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1049            KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1050            KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1051            KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1052                "TOPIC METADATA REFRESH INTERVAL"
1053            }
1054            KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1055            KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1056        })
1057    }
1058}
1059impl_display!(KafkaSinkConfigOptionName);
1060
1061impl WithOptionName for KafkaSinkConfigOptionName {
1062    /// # WARNING
1063    ///
1064    /// Whenever implementing this trait consider very carefully whether or not
1065    /// this value could contain sensitive user data. If you're uncertain, err
1066    /// on the conservative side and return `true`.
1067    fn redact_value(&self) -> bool {
1068        match self {
1069            KafkaSinkConfigOptionName::CompressionType
1070            | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1071            | KafkaSinkConfigOptionName::Topic
1072            | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1073            | KafkaSinkConfigOptionName::TransactionalIdPrefix
1074            | KafkaSinkConfigOptionName::LegacyIds
1075            | KafkaSinkConfigOptionName::TopicConfig
1076            | KafkaSinkConfigOptionName::TopicPartitionCount
1077            | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1078            KafkaSinkConfigOptionName::PartitionBy => true,
1079        }
1080    }
1081}
1082
1083#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1084pub struct KafkaSinkConfigOption<T: AstInfo> {
1085    pub name: KafkaSinkConfigOptionName,
1086    pub value: Option<WithOptionValue<T>>,
1087}
1088impl_display_for_with_option!(KafkaSinkConfigOption);
1089impl_display_t!(KafkaSinkConfigOption);
1090
1091#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1092pub enum IcebergSinkConfigOptionName {
1093    Namespace,
1094    Table,
1095}
1096
1097impl AstDisplay for IcebergSinkConfigOptionName {
1098    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1099        f.write_str(match self {
1100            IcebergSinkConfigOptionName::Namespace => "NAMESPACE",
1101            IcebergSinkConfigOptionName::Table => "TABLE",
1102        })
1103    }
1104}
1105impl_display!(IcebergSinkConfigOptionName);
1106
1107impl WithOptionName for IcebergSinkConfigOptionName {
1108    /// # WARNING
1109    ///
1110    /// Whenever implementing this trait consider very carefully whether or not
1111    /// this value could contain sensitive user data. If you're uncertain, err
1112    /// on the conservative side and return `true`.
1113    fn redact_value(&self) -> bool {
1114        match self {
1115            IcebergSinkConfigOptionName::Namespace | IcebergSinkConfigOptionName::Table => false,
1116        }
1117    }
1118}
1119
1120#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1121pub struct IcebergSinkConfigOption<T: AstInfo> {
1122    pub name: IcebergSinkConfigOptionName,
1123    pub value: Option<WithOptionValue<T>>,
1124}
1125impl_display_for_with_option!(IcebergSinkConfigOption);
1126impl_display_t!(IcebergSinkConfigOption);
1127
1128#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1129pub enum PgConfigOptionName {
1130    /// Hex encoded string of binary serialization of
1131    /// `mz_storage_types::sources::postgres::PostgresSourcePublicationDetails`
1132    Details,
1133    /// The name of the publication to sync
1134    Publication,
1135    /// Columns whose types you want to unconditionally format as text
1136    /// NOTE(roshan): This value is kept around to allow round-tripping a
1137    /// `CREATE SOURCE` statement while we still allow creating implicit
1138    /// subsources from `CREATE SOURCE`, but will be removed once
1139    /// fully deprecating that feature and forcing users to use explicit
1140    /// `CREATE TABLE .. FROM SOURCE` statements
1141    TextColumns,
1142    /// Columns you want to exclude
1143    /// NOTE: This value is kept around to allow round-tripping a
1144    /// `CREATE SOURCE` statement while we still allow creating implicit
1145    /// subsources from `CREATE SOURCE`, but will be removed once
1146    /// fully deprecating that feature and forcing users to use explicit
1147    /// `CREATE TABLE .. FROM SOURCE` statements
1148    ExcludeColumns,
1149}
1150
1151impl AstDisplay for PgConfigOptionName {
1152    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1153        f.write_str(match self {
1154            PgConfigOptionName::Details => "DETAILS",
1155            PgConfigOptionName::Publication => "PUBLICATION",
1156            PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1157            PgConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1158        })
1159    }
1160}
1161impl_display!(PgConfigOptionName);
1162
1163impl WithOptionName for PgConfigOptionName {
1164    /// # WARNING
1165    ///
1166    /// Whenever implementing this trait consider very carefully whether or not
1167    /// this value could contain sensitive user data. If you're uncertain, err
1168    /// on the conservative side and return `true`.
1169    fn redact_value(&self) -> bool {
1170        match self {
1171            PgConfigOptionName::Details
1172            | PgConfigOptionName::Publication
1173            | PgConfigOptionName::TextColumns
1174            | PgConfigOptionName::ExcludeColumns => false,
1175        }
1176    }
1177}
1178
1179#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1180/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1181pub struct PgConfigOption<T: AstInfo> {
1182    pub name: PgConfigOptionName,
1183    pub value: Option<WithOptionValue<T>>,
1184}
1185impl_display_for_with_option!(PgConfigOption);
1186impl_display_t!(PgConfigOption);
1187
1188#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1189pub enum MySqlConfigOptionName {
1190    /// Hex encoded string of binary serialization of
1191    /// `mz_storage_types::sources::mysql::MySqlSourceDetails`
1192    Details,
1193    /// Columns whose types you want to unconditionally format as text
1194    /// NOTE(roshan): This value is kept around to allow round-tripping a
1195    /// `CREATE SOURCE` statement while we still allow creating implicit
1196    /// subsources from `CREATE SOURCE`, but will be removed once
1197    /// fully deprecating that feature and forcing users to use explicit
1198    /// `CREATE TABLE .. FROM SOURCE` statements
1199    TextColumns,
1200    /// Columns you want to exclude
1201    /// NOTE(roshan): This value is kept around to allow round-tripping a
1202    /// `CREATE SOURCE` statement while we still allow creating implicit
1203    /// subsources from `CREATE SOURCE`, but will be removed once
1204    /// fully deprecating that feature and forcing users to use explicit
1205    /// `CREATE TABLE .. FROM SOURCE` statements
1206    ExcludeColumns,
1207}
1208
1209impl AstDisplay for MySqlConfigOptionName {
1210    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1211        f.write_str(match self {
1212            MySqlConfigOptionName::Details => "DETAILS",
1213            MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1214            MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1215        })
1216    }
1217}
1218impl_display!(MySqlConfigOptionName);
1219
1220impl WithOptionName for MySqlConfigOptionName {
1221    /// # WARNING
1222    ///
1223    /// Whenever implementing this trait consider very carefully whether or not
1224    /// this value could contain sensitive user data. If you're uncertain, err
1225    /// on the conservative side and return `true`.
1226    fn redact_value(&self) -> bool {
1227        match self {
1228            MySqlConfigOptionName::Details
1229            | MySqlConfigOptionName::TextColumns
1230            | MySqlConfigOptionName::ExcludeColumns => false,
1231        }
1232    }
1233}
1234
1235#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1236/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1237pub struct MySqlConfigOption<T: AstInfo> {
1238    pub name: MySqlConfigOptionName,
1239    pub value: Option<WithOptionValue<T>>,
1240}
1241impl_display_for_with_option!(MySqlConfigOption);
1242impl_display_t!(MySqlConfigOption);
1243
1244#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1245pub enum SqlServerConfigOptionName {
1246    /// Hex encoded string of binary serialization of
1247    /// `mz_storage_types::sources::sql_server::SqlServerSourceDetails`.
1248    Details,
1249    /// Columns whose types you want to unconditionally format as text.
1250    ///
1251    /// NOTE(roshan): This value is kept around to allow round-tripping a
1252    /// `CREATE SOURCE` statement while we still allow creating implicit
1253    /// subsources from `CREATE SOURCE`, but will be removed once
1254    /// fully deprecating that feature and forcing users to use explicit
1255    /// `CREATE TABLE .. FROM SOURCE` statements
1256    TextColumns,
1257    /// Columns you want to exclude.
1258    ///
1259    /// NOTE(roshan): This value is kept around to allow round-tripping a
1260    /// `CREATE SOURCE` statement while we still allow creating implicit
1261    /// subsources from `CREATE SOURCE`, but will be removed once
1262    /// fully deprecating that feature and forcing users to use explicit
1263    /// `CREATE TABLE .. FROM SOURCE` statements
1264    ExcludeColumns,
1265}
1266
1267impl AstDisplay for SqlServerConfigOptionName {
1268    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1269        f.write_str(match self {
1270            SqlServerConfigOptionName::Details => "DETAILS",
1271            SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1272            SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1273        })
1274    }
1275}
1276impl_display!(SqlServerConfigOptionName);
1277
1278impl WithOptionName for SqlServerConfigOptionName {
1279    /// # WARNING
1280    ///
1281    /// Whenever implementing this trait consider very carefully whether or not
1282    /// this value could contain sensitive user data. If you're uncertain, err
1283    /// on the conservative side and return `true`.
1284    fn redact_value(&self) -> bool {
1285        match self {
1286            SqlServerConfigOptionName::Details
1287            | SqlServerConfigOptionName::TextColumns
1288            | SqlServerConfigOptionName::ExcludeColumns => false,
1289        }
1290    }
1291}
1292
1293#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1294/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1295pub struct SqlServerConfigOption<T: AstInfo> {
1296    pub name: SqlServerConfigOptionName,
1297    pub value: Option<WithOptionValue<T>>,
1298}
1299impl_display_for_with_option!(SqlServerConfigOption);
1300impl_display_t!(SqlServerConfigOption);
1301
1302#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1303pub enum CreateSourceConnection<T: AstInfo> {
1304    Kafka {
1305        connection: T::ItemName,
1306        options: Vec<KafkaSourceConfigOption<T>>,
1307    },
1308    Postgres {
1309        connection: T::ItemName,
1310        options: Vec<PgConfigOption<T>>,
1311    },
1312    SqlServer {
1313        connection: T::ItemName,
1314        options: Vec<SqlServerConfigOption<T>>,
1315    },
1316    MySql {
1317        connection: T::ItemName,
1318        options: Vec<MySqlConfigOption<T>>,
1319    },
1320    LoadGenerator {
1321        generator: LoadGenerator,
1322        options: Vec<LoadGeneratorOption<T>>,
1323    },
1324}
1325
1326impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1327    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1328        match self {
1329            CreateSourceConnection::Kafka {
1330                connection,
1331                options,
1332            } => {
1333                f.write_str("KAFKA CONNECTION ");
1334                f.write_node(connection);
1335                if !options.is_empty() {
1336                    f.write_str(" (");
1337                    f.write_node(&display::comma_separated(options));
1338                    f.write_str(")");
1339                }
1340            }
1341            CreateSourceConnection::Postgres {
1342                connection,
1343                options,
1344            } => {
1345                f.write_str("POSTGRES CONNECTION ");
1346                f.write_node(connection);
1347                if !options.is_empty() {
1348                    f.write_str(" (");
1349                    f.write_node(&display::comma_separated(options));
1350                    f.write_str(")");
1351                }
1352            }
1353            CreateSourceConnection::SqlServer {
1354                connection,
1355                options,
1356            } => {
1357                f.write_str("SQL SERVER CONNECTION ");
1358                f.write_node(connection);
1359                if !options.is_empty() {
1360                    f.write_str(" (");
1361                    f.write_node(&display::comma_separated(options));
1362                    f.write_str(")");
1363                }
1364            }
1365            CreateSourceConnection::MySql {
1366                connection,
1367                options,
1368            } => {
1369                f.write_str("MYSQL CONNECTION ");
1370                f.write_node(connection);
1371                if !options.is_empty() {
1372                    f.write_str(" (");
1373                    f.write_node(&display::comma_separated(options));
1374                    f.write_str(")");
1375                }
1376            }
1377            CreateSourceConnection::LoadGenerator { generator, options } => {
1378                f.write_str("LOAD GENERATOR ");
1379                f.write_node(generator);
1380                if !options.is_empty() {
1381                    f.write_str(" (");
1382                    f.write_node(&display::comma_separated(options));
1383                    f.write_str(")");
1384                }
1385            }
1386        }
1387    }
1388}
1389impl_display_t!(CreateSourceConnection);
1390
1391#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1392pub enum LoadGenerator {
1393    Clock,
1394    Counter,
1395    Marketing,
1396    Auction,
1397    Datums,
1398    Tpch,
1399    KeyValue,
1400}
1401
1402impl AstDisplay for LoadGenerator {
1403    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1404        match self {
1405            Self::Counter => f.write_str("COUNTER"),
1406            Self::Clock => f.write_str("CLOCK"),
1407            Self::Marketing => f.write_str("MARKETING"),
1408            Self::Auction => f.write_str("AUCTION"),
1409            Self::Datums => f.write_str("DATUMS"),
1410            Self::Tpch => f.write_str("TPCH"),
1411            Self::KeyValue => f.write_str("KEY VALUE"),
1412        }
1413    }
1414}
1415impl_display!(LoadGenerator);
1416
1417impl LoadGenerator {
1418    /// Corresponds with the same mapping on the `LoadGenerator` enum defined in
1419    /// src/storage-types/src/sources/load_generator.rs, but re-defined here for
1420    /// cases where we only have the AST representation. This can be removed once
1421    /// the `ast_rewrite_sources_to_tables` migration is removed.
1422    pub fn schema_name(&self) -> &'static str {
1423        match self {
1424            LoadGenerator::Counter => "counter",
1425            LoadGenerator::Clock => "clock",
1426            LoadGenerator::Marketing => "marketing",
1427            LoadGenerator::Auction => "auction",
1428            LoadGenerator::Datums => "datums",
1429            LoadGenerator::Tpch => "tpch",
1430            LoadGenerator::KeyValue => "key_value",
1431        }
1432    }
1433}
1434
1435#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1436pub enum LoadGeneratorOptionName {
1437    ScaleFactor,
1438    TickInterval,
1439    AsOf,
1440    UpTo,
1441    MaxCardinality,
1442    Keys,
1443    SnapshotRounds,
1444    TransactionalSnapshot,
1445    ValueSize,
1446    Seed,
1447    Partitions,
1448    BatchSize,
1449}
1450
1451impl AstDisplay for LoadGeneratorOptionName {
1452    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1453        f.write_str(match self {
1454            LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1455            LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1456            LoadGeneratorOptionName::AsOf => "AS OF",
1457            LoadGeneratorOptionName::UpTo => "UP TO",
1458            LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1459            LoadGeneratorOptionName::Keys => "KEYS",
1460            LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1461            LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1462            LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1463            LoadGeneratorOptionName::Seed => "SEED",
1464            LoadGeneratorOptionName::Partitions => "PARTITIONS",
1465            LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1466        })
1467    }
1468}
1469impl_display!(LoadGeneratorOptionName);
1470
1471impl WithOptionName for LoadGeneratorOptionName {
1472    /// # WARNING
1473    ///
1474    /// Whenever implementing this trait consider very carefully whether or not
1475    /// this value could contain sensitive user data. If you're uncertain, err
1476    /// on the conservative side and return `true`.
1477    fn redact_value(&self) -> bool {
1478        match self {
1479            LoadGeneratorOptionName::ScaleFactor
1480            | LoadGeneratorOptionName::TickInterval
1481            | LoadGeneratorOptionName::AsOf
1482            | LoadGeneratorOptionName::UpTo
1483            | LoadGeneratorOptionName::MaxCardinality
1484            | LoadGeneratorOptionName::Keys
1485            | LoadGeneratorOptionName::SnapshotRounds
1486            | LoadGeneratorOptionName::TransactionalSnapshot
1487            | LoadGeneratorOptionName::ValueSize
1488            | LoadGeneratorOptionName::Partitions
1489            | LoadGeneratorOptionName::BatchSize
1490            | LoadGeneratorOptionName::Seed => false,
1491        }
1492    }
1493}
1494
1495#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1496/// An option in a `CREATE CONNECTION...SSH`.
1497pub struct LoadGeneratorOption<T: AstInfo> {
1498    pub name: LoadGeneratorOptionName,
1499    pub value: Option<WithOptionValue<T>>,
1500}
1501impl_display_for_with_option!(LoadGeneratorOption);
1502impl_display_t!(LoadGeneratorOption);
1503
1504#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1505pub enum CreateSinkConnection<T: AstInfo> {
1506    Kafka {
1507        connection: T::ItemName,
1508        options: Vec<KafkaSinkConfigOption<T>>,
1509        key: Option<SinkKey>,
1510        headers: Option<Ident>,
1511    },
1512    Iceberg {
1513        connection: T::ItemName,
1514        aws_connection: T::ItemName,
1515        key: Option<SinkKey>,
1516        options: Vec<IcebergSinkConfigOption<T>>,
1517    },
1518}
1519
1520impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1521    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1522        match self {
1523            CreateSinkConnection::Kafka {
1524                connection,
1525                options,
1526                key,
1527                headers,
1528            } => {
1529                f.write_str("KAFKA CONNECTION ");
1530                f.write_node(connection);
1531                if !options.is_empty() {
1532                    f.write_str(" (");
1533                    f.write_node(&display::comma_separated(options));
1534                    f.write_str(")");
1535                }
1536                if let Some(key) = key.as_ref() {
1537                    f.write_str(" ");
1538                    f.write_node(key);
1539                }
1540                if let Some(headers) = headers {
1541                    f.write_str(" HEADERS ");
1542                    f.write_node(headers);
1543                }
1544            }
1545            CreateSinkConnection::Iceberg {
1546                connection,
1547                aws_connection,
1548                key,
1549                options,
1550            } => {
1551                f.write_str("ICEBERG CATALOG CONNECTION ");
1552                f.write_node(connection);
1553                if !options.is_empty() {
1554                    f.write_str(" (");
1555                    f.write_node(&display::comma_separated(options));
1556                    f.write_str(")");
1557                }
1558                f.write_str(" USING AWS CONNECTION ");
1559                f.write_node(aws_connection);
1560                if let Some(key) = key.as_ref() {
1561                    f.write_str(" ");
1562                    f.write_node(key);
1563                }
1564            }
1565        }
1566    }
1567}
1568impl_display_t!(CreateSinkConnection);
1569
1570#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1571pub struct SinkKey {
1572    pub key_columns: Vec<Ident>,
1573    pub not_enforced: bool,
1574}
1575
1576impl AstDisplay for SinkKey {
1577    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1578        f.write_str("KEY (");
1579        f.write_node(&display::comma_separated(&self.key_columns));
1580        f.write_str(")");
1581        if self.not_enforced {
1582            f.write_str(" NOT ENFORCED");
1583        }
1584    }
1585}
1586
1587/// A table-level constraint, specified in a `CREATE TABLE` or an
1588/// `ALTER TABLE ADD <constraint>` statement.
1589#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1590pub enum TableConstraint<T: AstInfo> {
1591    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE (NULLS NOT DISTINCT)? } (<columns>)`
1592    Unique {
1593        name: Option<Ident>,
1594        columns: Vec<Ident>,
1595        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
1596        is_primary: bool,
1597        // Where this constraint treats each NULL value as distinct; only available on `UNIQUE`
1598        // constraints.
1599        nulls_not_distinct: bool,
1600    },
1601    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
1602    /// REFERENCES <foreign_table> (<referred_columns>)`)
1603    ForeignKey {
1604        name: Option<Ident>,
1605        columns: Vec<Ident>,
1606        foreign_table: T::ItemName,
1607        referred_columns: Vec<Ident>,
1608    },
1609    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
1610    Check {
1611        name: Option<Ident>,
1612        expr: Box<Expr<T>>,
1613    },
1614}
1615
1616impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1617    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1618        match self {
1619            TableConstraint::Unique {
1620                name,
1621                columns,
1622                is_primary,
1623                nulls_not_distinct,
1624            } => {
1625                f.write_node(&display_constraint_name(name));
1626                if *is_primary {
1627                    f.write_str("PRIMARY KEY ");
1628                } else {
1629                    f.write_str("UNIQUE ");
1630                    if *nulls_not_distinct {
1631                        f.write_str("NULLS NOT DISTINCT ");
1632                    }
1633                }
1634                f.write_str("(");
1635                f.write_node(&display::comma_separated(columns));
1636                f.write_str(")");
1637            }
1638            TableConstraint::ForeignKey {
1639                name,
1640                columns,
1641                foreign_table,
1642                referred_columns,
1643            } => {
1644                f.write_node(&display_constraint_name(name));
1645                f.write_str("FOREIGN KEY (");
1646                f.write_node(&display::comma_separated(columns));
1647                f.write_str(") REFERENCES ");
1648                f.write_node(foreign_table);
1649                f.write_str("(");
1650                f.write_node(&display::comma_separated(referred_columns));
1651                f.write_str(")");
1652            }
1653            TableConstraint::Check { name, expr } => {
1654                f.write_node(&display_constraint_name(name));
1655                f.write_str("CHECK (");
1656                f.write_node(&expr);
1657                f.write_str(")");
1658            }
1659        }
1660    }
1661}
1662impl_display_t!(TableConstraint);
1663
1664/// A key constraint, specified in a `CREATE SOURCE`.
1665#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1666pub enum KeyConstraint {
1667    // PRIMARY KEY (<columns>) NOT ENFORCED
1668    PrimaryKeyNotEnforced { columns: Vec<Ident> },
1669}
1670
1671impl AstDisplay for KeyConstraint {
1672    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1673        match self {
1674            KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1675                f.write_str("PRIMARY KEY ");
1676                f.write_str("(");
1677                f.write_node(&display::comma_separated(columns));
1678                f.write_str(") ");
1679                f.write_str("NOT ENFORCED");
1680            }
1681        }
1682    }
1683}
1684impl_display!(KeyConstraint);
1685
1686#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1687pub enum CreateSourceOptionName {
1688    TimestampInterval,
1689    RetainHistory,
1690}
1691
1692impl AstDisplay for CreateSourceOptionName {
1693    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1694        f.write_str(match self {
1695            CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1696            CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1697        })
1698    }
1699}
1700impl_display!(CreateSourceOptionName);
1701
1702impl WithOptionName for CreateSourceOptionName {
1703    /// # WARNING
1704    ///
1705    /// Whenever implementing this trait consider very carefully whether or not
1706    /// this value could contain sensitive user data. If you're uncertain, err
1707    /// on the conservative side and return `true`.
1708    fn redact_value(&self) -> bool {
1709        match self {
1710            CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1711                false
1712            }
1713        }
1714    }
1715}
1716
1717#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1718/// An option in a `CREATE SOURCE...` statement.
1719pub struct CreateSourceOption<T: AstInfo> {
1720    pub name: CreateSourceOptionName,
1721    pub value: Option<WithOptionValue<T>>,
1722}
1723impl_display_for_with_option!(CreateSourceOption);
1724impl_display_t!(CreateSourceOption);
1725
1726/// SQL column definition
1727#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1728pub struct ColumnDef<T: AstInfo> {
1729    pub name: Ident,
1730    pub data_type: T::DataType,
1731    pub collation: Option<UnresolvedItemName>,
1732    pub options: Vec<ColumnOptionDef<T>>,
1733}
1734
1735impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1736    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1737        f.write_node(&self.name);
1738        f.write_str(" ");
1739        f.write_node(&self.data_type);
1740        if let Some(collation) = &self.collation {
1741            f.write_str(" COLLATE ");
1742            f.write_node(collation);
1743        }
1744        for option in &self.options {
1745            f.write_str(" ");
1746            f.write_node(option);
1747        }
1748    }
1749}
1750impl_display_t!(ColumnDef);
1751
1752/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
1753///
1754/// Note that implementations are substantially more permissive than the ANSI
1755/// specification on what order column options can be presented in, and whether
1756/// they are allowed to be named. The specification distinguishes between
1757/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
1758/// and can appear in any order, and other options (DEFAULT, GENERATED), which
1759/// cannot be named and must appear in a fixed order. PostgreSQL, however,
1760/// allows preceding any option with `CONSTRAINT <name>`, even those that are
1761/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
1762/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
1763/// NOT NULL constraints (the last of which is in violation of the spec).
1764///
1765/// For maximum flexibility, we don't distinguish between constraint and
1766/// non-constraint options, lumping them all together under the umbrella of
1767/// "column options," and we allow any column option to be named.
1768#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1769pub struct ColumnOptionDef<T: AstInfo> {
1770    pub name: Option<Ident>,
1771    pub option: ColumnOption<T>,
1772}
1773
1774impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1775    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1776        f.write_node(&display_constraint_name(&self.name));
1777        f.write_node(&self.option);
1778    }
1779}
1780impl_display_t!(ColumnOptionDef);
1781
1782/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
1783/// TABLE` statement.
1784#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1785pub enum ColumnOption<T: AstInfo> {
1786    /// `NULL`
1787    Null,
1788    /// `NOT NULL`
1789    NotNull,
1790    /// `DEFAULT <restricted-expr>`
1791    Default(Expr<T>),
1792    /// `{ PRIMARY KEY | UNIQUE }`
1793    Unique { is_primary: bool },
1794    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
1795    /// <foreign_table> (<referred_columns>)`).
1796    ForeignKey {
1797        foreign_table: UnresolvedItemName,
1798        referred_columns: Vec<Ident>,
1799    },
1800    /// `CHECK (<expr>)`
1801    Check(Expr<T>),
1802    /// `VERSION <action> <version>`
1803    Versioned {
1804        action: ColumnVersioned,
1805        version: Version,
1806    },
1807}
1808
1809impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1810    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1811        use ColumnOption::*;
1812        match self {
1813            Null => f.write_str("NULL"),
1814            NotNull => f.write_str("NOT NULL"),
1815            Default(expr) => {
1816                f.write_str("DEFAULT ");
1817                f.write_node(expr);
1818            }
1819            Unique { is_primary } => {
1820                if *is_primary {
1821                    f.write_str("PRIMARY KEY");
1822                } else {
1823                    f.write_str("UNIQUE");
1824                }
1825            }
1826            ForeignKey {
1827                foreign_table,
1828                referred_columns,
1829            } => {
1830                f.write_str("REFERENCES ");
1831                f.write_node(foreign_table);
1832                f.write_str(" (");
1833                f.write_node(&display::comma_separated(referred_columns));
1834                f.write_str(")");
1835            }
1836            Check(expr) => {
1837                f.write_str("CHECK (");
1838                f.write_node(expr);
1839                f.write_str(")");
1840            }
1841            Versioned { action, version } => {
1842                f.write_str("VERSION ");
1843                f.write_node(action);
1844                f.write_str(" ");
1845                f.write_node(version);
1846            }
1847        }
1848    }
1849}
1850impl_display_t!(ColumnOption);
1851
1852#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1853pub enum ColumnVersioned {
1854    Added,
1855}
1856
1857impl AstDisplay for ColumnVersioned {
1858    fn fmt<W>(&self, f: &mut AstFormatter<W>)
1859    where
1860        W: fmt::Write,
1861    {
1862        match self {
1863            // TODO(alter_table): Support dropped columns.
1864            ColumnVersioned::Added => f.write_str("ADDED"),
1865        }
1866    }
1867}
1868impl_display!(ColumnVersioned);
1869
1870fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
1871    struct ConstraintName<'a>(&'a Option<Ident>);
1872    impl<'a> AstDisplay for ConstraintName<'a> {
1873        fn fmt<W>(&self, f: &mut AstFormatter<W>)
1874        where
1875            W: fmt::Write,
1876        {
1877            if let Some(name) = self.0 {
1878                f.write_str("CONSTRAINT ");
1879                f.write_node(name);
1880                f.write_str(" ");
1881            }
1882        }
1883    }
1884    ConstraintName(name)
1885}