Skip to main content

mz_sql_parser/ast/defs/
ddl.rs

1// Copyright 2018 sqlparser-rs contributors. All rights reserved.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// This file is derived from the sqlparser-rs project, available at
5// https://github.com/andygrove/sqlparser-rs. It was incorporated
6// directly into Materialize on December 21, 2019.
7//
8// Licensed under the Apache License, Version 2.0 (the "License");
9// you may not use this file except in compliance with the License.
10// You may obtain a copy of the License in the LICENSE file at the
11// root of this repository, or online at
12//
13//     http://www.apache.org/licenses/LICENSE-2.0
14//
15// Unless required by applicable law or agreed to in writing, software
16// distributed under the License is distributed on an "AS IS" BASIS,
17// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18// See the License for the specific language governing permissions and
19// limitations under the License.
20
21//! AST types specific to CREATE/ALTER variants of [crate::ast::Statement]
22//! (commonly referred to as Data Definition Language, or DDL)
23
24use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28    AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33    /// The `ASSERT NOT NULL [=] <ident>` option.
34    AssertNotNull,
35    PartitionBy,
36    RetainHistory,
37    /// The `REFRESH [=] ...` option.
38    Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43        match self {
44            MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45            MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46            MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47            MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48        }
49    }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53    /// # WARNING
54    ///
55    /// Whenever implementing this trait consider very carefully whether or not
56    /// this value could contain sensitive user data. If you're uncertain, err
57    /// on the conservative side and return `true`.
58    fn redact_value(&self) -> bool {
59        match self {
60            MaterializedViewOptionName::AssertNotNull
61            | MaterializedViewOptionName::PartitionBy
62            | MaterializedViewOptionName::RetainHistory
63            | MaterializedViewOptionName::Refresh => false,
64        }
65    }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70    pub name: MaterializedViewOptionName,
71    pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
76pub enum ContinualTaskOptionName {
77    /// The `SNAPSHOT [=] ...` option.
78    Snapshot,
79}
80
81impl AstDisplay for ContinualTaskOptionName {
82    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
83        match self {
84            ContinualTaskOptionName::Snapshot => f.write_str("SNAPSHOT"),
85        }
86    }
87}
88
89impl WithOptionName for ContinualTaskOptionName {
90    /// # WARNING
91    ///
92    /// Whenever implementing this trait consider very carefully whether or not
93    /// this value could contain sensitive user data. If you're uncertain, err
94    /// on the conservative side and return `true`.
95    fn redact_value(&self) -> bool {
96        match self {
97            ContinualTaskOptionName::Snapshot => false,
98        }
99    }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
103pub struct ContinualTaskOption<T: AstInfo> {
104    pub name: ContinualTaskOptionName,
105    pub value: Option<WithOptionValue<T>>,
106}
107impl_display_for_with_option!(ContinualTaskOption);
108
109#[derive(Debug, Clone, PartialEq, Eq, Hash)]
110pub struct Schema {
111    pub schema: String,
112}
113
114impl AstDisplay for Schema {
115    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
116        f.write_str("SCHEMA '");
117        f.write_node(&display::escape_single_quote_string(&self.schema));
118        f.write_str("'");
119    }
120}
121impl_display!(Schema);
122
123#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
124pub enum AvroSchemaOptionName {
125    /// The `CONFLUENT WIRE FORMAT [=] <bool>` option.
126    ConfluentWireFormat,
127}
128
129impl AstDisplay for AvroSchemaOptionName {
130    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
131        match self {
132            AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
133        }
134    }
135}
136
137impl WithOptionName for AvroSchemaOptionName {
138    /// # WARNING
139    ///
140    /// Whenever implementing this trait consider very carefully whether or not
141    /// this value could contain sensitive user data. If you're uncertain, err
142    /// on the conservative side and return `true`.
143    fn redact_value(&self) -> bool {
144        match self {
145            Self::ConfluentWireFormat => false,
146        }
147    }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
151pub struct AvroSchemaOption<T: AstInfo> {
152    pub name: AvroSchemaOptionName,
153    pub value: Option<WithOptionValue<T>>,
154}
155impl_display_for_with_option!(AvroSchemaOption);
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum AvroSchema<T: AstInfo> {
159    Csr {
160        csr_connection: CsrConnectionAvro<T>,
161    },
162    InlineSchema {
163        schema: Schema,
164        with_options: Vec<AvroSchemaOption<T>>,
165    },
166}
167
168impl<T: AstInfo> AstDisplay for AvroSchema<T> {
169    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
170        match self {
171            Self::Csr { csr_connection } => {
172                f.write_node(csr_connection);
173            }
174            Self::InlineSchema {
175                schema,
176                with_options,
177            } => {
178                f.write_str("USING ");
179                schema.fmt(f);
180                if !with_options.is_empty() {
181                    f.write_str(" (");
182                    f.write_node(&display::comma_separated(with_options));
183                    f.write_str(")");
184                }
185            }
186        }
187    }
188}
189impl_display_t!(AvroSchema);
190
191#[derive(Debug, Clone, PartialEq, Eq, Hash)]
192pub enum ProtobufSchema<T: AstInfo> {
193    Csr {
194        csr_connection: CsrConnectionProtobuf<T>,
195    },
196    InlineSchema {
197        message_name: String,
198        schema: Schema,
199    },
200}
201
202impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
203    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
204        match self {
205            Self::Csr { csr_connection } => {
206                f.write_node(csr_connection);
207            }
208            Self::InlineSchema {
209                message_name,
210                schema,
211            } => {
212                f.write_str("MESSAGE '");
213                f.write_node(&display::escape_single_quote_string(message_name));
214                f.write_str("' USING ");
215                f.write_str(schema);
216            }
217        }
218    }
219}
220impl_display_t!(ProtobufSchema);
221
222#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
223pub enum CsrConfigOptionName<T: AstInfo> {
224    AvroKeyFullname,
225    AvroValueFullname,
226    NullDefaults,
227    AvroDocOn(AvroDocOn<T>),
228    KeyCompatibilityLevel,
229    ValueCompatibilityLevel,
230}
231
232impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
233    /// # WARNING
234    ///
235    /// Whenever implementing this trait consider very carefully whether or not
236    /// this value could contain sensitive user data. If you're uncertain, err
237    /// on the conservative side and return `true`.
238    fn redact_value(&self) -> bool {
239        match self {
240            Self::AvroKeyFullname
241            | Self::AvroValueFullname
242            | Self::NullDefaults
243            | Self::AvroDocOn(_)
244            | Self::KeyCompatibilityLevel
245            | Self::ValueCompatibilityLevel => false,
246        }
247    }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
251pub struct AvroDocOn<T: AstInfo> {
252    pub identifier: DocOnIdentifier<T>,
253    pub for_schema: DocOnSchema,
254}
255#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
256pub enum DocOnSchema {
257    KeyOnly,
258    ValueOnly,
259    All,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
263pub enum DocOnIdentifier<T: AstInfo> {
264    Column(ColumnName<T>),
265    Type(T::ItemName),
266}
267
268impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
269    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
270        match &self.for_schema {
271            DocOnSchema::KeyOnly => f.write_str("KEY "),
272            DocOnSchema::ValueOnly => f.write_str("VALUE "),
273            DocOnSchema::All => {}
274        }
275        match &self.identifier {
276            DocOnIdentifier::Column(name) => {
277                f.write_str("DOC ON COLUMN ");
278                f.write_node(name);
279            }
280            DocOnIdentifier::Type(name) => {
281                f.write_str("DOC ON TYPE ");
282                f.write_node(name);
283            }
284        }
285    }
286}
287impl_display_t!(AvroDocOn);
288
289impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
290    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
291        match self {
292            CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
293            CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
294            CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
295            CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
296            CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
297            CsrConfigOptionName::ValueCompatibilityLevel => {
298                f.write_str("VALUE COMPATIBILITY LEVEL")
299            }
300        }
301    }
302}
303impl_display_t!(CsrConfigOptionName);
304
305#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
306/// An option in a `{FROM|INTO} CONNECTION ...` statement.
307pub struct CsrConfigOption<T: AstInfo> {
308    pub name: CsrConfigOptionName<T>,
309    pub value: Option<WithOptionValue<T>>,
310}
311impl_display_for_with_option!(CsrConfigOption);
312impl_display_t!(CsrConfigOption);
313
314#[derive(Debug, Clone, PartialEq, Eq, Hash)]
315pub struct CsrConnection<T: AstInfo> {
316    pub connection: T::ItemName,
317    pub options: Vec<CsrConfigOption<T>>,
318}
319
320impl<T: AstInfo> AstDisplay for CsrConnection<T> {
321    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
322        f.write_str("CONNECTION ");
323        f.write_node(&self.connection);
324        if !self.options.is_empty() {
325            f.write_str(" (");
326            f.write_node(&display::comma_separated(&self.options));
327            f.write_str(")");
328        }
329    }
330}
331impl_display_t!(CsrConnection);
332
333#[derive(Debug, Clone, PartialEq, Eq, Hash)]
334pub enum ReaderSchemaSelectionStrategy {
335    Latest,
336    Inline(String),
337    ById(i32),
338}
339
340impl Default for ReaderSchemaSelectionStrategy {
341    fn default() -> Self {
342        Self::Latest
343    }
344}
345
346#[derive(Debug, Clone, PartialEq, Eq, Hash)]
347pub struct CsrConnectionAvro<T: AstInfo> {
348    pub connection: CsrConnection<T>,
349    pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
350    pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
351    pub seed: Option<CsrSeedAvro>,
352}
353
354impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
355    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
356        f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
357        f.write_node(&self.connection);
358        if let Some(seed) = &self.seed {
359            f.write_str(" ");
360            f.write_node(seed);
361        }
362    }
363}
364impl_display_t!(CsrConnectionAvro);
365
366#[derive(Debug, Clone, PartialEq, Eq, Hash)]
367pub struct CsrConnectionProtobuf<T: AstInfo> {
368    pub connection: CsrConnection<T>,
369    pub seed: Option<CsrSeedProtobuf>,
370}
371
372impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
373    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
374        f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
375        f.write_node(&self.connection);
376
377        if let Some(seed) = &self.seed {
378            f.write_str(" ");
379            f.write_node(seed);
380        }
381    }
382}
383impl_display_t!(CsrConnectionProtobuf);
384
385#[derive(Debug, Clone, PartialEq, Eq, Hash)]
386pub struct CsrSeedAvro {
387    pub key_schema: Option<String>,
388    pub value_schema: String,
389}
390
391impl AstDisplay for CsrSeedAvro {
392    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
393        f.write_str("SEED");
394        if let Some(key_schema) = &self.key_schema {
395            f.write_str(" KEY SCHEMA '");
396            f.write_node(&display::escape_single_quote_string(key_schema));
397            f.write_str("'");
398        }
399        f.write_str(" VALUE SCHEMA '");
400        f.write_node(&display::escape_single_quote_string(&self.value_schema));
401        f.write_str("'");
402    }
403}
404impl_display!(CsrSeedAvro);
405
406#[derive(Debug, Clone, PartialEq, Eq, Hash)]
407pub struct CsrSeedProtobuf {
408    pub key: Option<CsrSeedProtobufSchema>,
409    pub value: CsrSeedProtobufSchema,
410}
411
412impl AstDisplay for CsrSeedProtobuf {
413    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
414        f.write_str("SEED");
415        if let Some(key) = &self.key {
416            f.write_str(" KEY ");
417            f.write_node(key);
418        }
419        f.write_str(" VALUE ");
420        f.write_node(&self.value);
421    }
422}
423impl_display!(CsrSeedProtobuf);
424
425#[derive(Debug, Clone, PartialEq, Eq, Hash)]
426pub struct CsrSeedProtobufSchema {
427    // Hex encoded string.
428    pub schema: String,
429    pub message_name: String,
430}
431impl AstDisplay for CsrSeedProtobufSchema {
432    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
433        f.write_str("SCHEMA '");
434        f.write_str(&display::escape_single_quote_string(&self.schema));
435        f.write_str("' MESSAGE '");
436        f.write_str(&self.message_name);
437        f.write_str("'");
438    }
439}
440impl_display!(CsrSeedProtobufSchema);
441
442#[derive(Debug, Clone, PartialEq, Eq, Hash)]
443pub enum FormatSpecifier<T: AstInfo> {
444    /// `CREATE SOURCE/SINK .. FORMAT`
445    Bare(Format<T>),
446    /// `CREATE SOURCE/SINK .. KEY FORMAT .. VALUE FORMAT`
447    KeyValue { key: Format<T>, value: Format<T> },
448}
449
450impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
451    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
452        match self {
453            FormatSpecifier::Bare(format) => {
454                f.write_str("FORMAT ");
455                f.write_node(format)
456            }
457            FormatSpecifier::KeyValue { key, value } => {
458                f.write_str("KEY FORMAT ");
459                f.write_node(key);
460                f.write_str(" VALUE FORMAT ");
461                f.write_node(value);
462            }
463        }
464    }
465}
466impl_display_t!(FormatSpecifier);
467
468#[derive(Debug, Clone, PartialEq, Eq, Hash)]
469pub enum Format<T: AstInfo> {
470    Bytes,
471    Avro(AvroSchema<T>),
472    Protobuf(ProtobufSchema<T>),
473    Regex(String),
474    Csv {
475        columns: CsvColumns,
476        delimiter: char,
477    },
478    Json {
479        array: bool,
480    },
481    Text,
482}
483
484#[derive(Debug, Clone, PartialEq, Eq, Hash)]
485pub enum CsvColumns {
486    /// `WITH count COLUMNS`
487    Count(u64),
488    /// `WITH HEADER (ident, ...)?`: `names` is empty if there are no names specified
489    Header { names: Vec<Ident> },
490}
491
492impl AstDisplay for CsvColumns {
493    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
494        match self {
495            CsvColumns::Count(n) => {
496                f.write_str(n);
497                f.write_str(" COLUMNS")
498            }
499            CsvColumns::Header { names } => {
500                f.write_str("HEADER");
501                if !names.is_empty() {
502                    f.write_str(" (");
503                    f.write_node(&display::comma_separated(names));
504                    f.write_str(")");
505                }
506            }
507        }
508    }
509}
510
511#[derive(Debug, Clone, PartialEq, Eq, Hash)]
512pub enum SourceIncludeMetadata {
513    Key {
514        alias: Option<Ident>,
515    },
516    Timestamp {
517        alias: Option<Ident>,
518    },
519    Partition {
520        alias: Option<Ident>,
521    },
522    Offset {
523        alias: Option<Ident>,
524    },
525    Headers {
526        alias: Option<Ident>,
527    },
528    Header {
529        key: String,
530        alias: Ident,
531        use_bytes: bool,
532    },
533}
534
535impl AstDisplay for SourceIncludeMetadata {
536    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
537        let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
538            if let Some(alias) = alias {
539                f.write_str(" AS ");
540                f.write_node(alias);
541            }
542        };
543
544        match self {
545            SourceIncludeMetadata::Key { alias } => {
546                f.write_str("KEY");
547                print_alias(f, alias);
548            }
549            SourceIncludeMetadata::Timestamp { alias } => {
550                f.write_str("TIMESTAMP");
551                print_alias(f, alias);
552            }
553            SourceIncludeMetadata::Partition { alias } => {
554                f.write_str("PARTITION");
555                print_alias(f, alias);
556            }
557            SourceIncludeMetadata::Offset { alias } => {
558                f.write_str("OFFSET");
559                print_alias(f, alias);
560            }
561            SourceIncludeMetadata::Headers { alias } => {
562                f.write_str("HEADERS");
563                print_alias(f, alias);
564            }
565            SourceIncludeMetadata::Header {
566                alias,
567                key,
568                use_bytes,
569            } => {
570                f.write_str("HEADER '");
571                f.write_str(&display::escape_single_quote_string(key));
572                f.write_str("'");
573                print_alias(f, &Some(alias.clone()));
574                if *use_bytes {
575                    f.write_str(" BYTES");
576                }
577            }
578        }
579    }
580}
581impl_display!(SourceIncludeMetadata);
582
583#[derive(Debug, Clone, PartialEq, Eq, Hash)]
584pub enum SourceErrorPolicy {
585    Inline {
586        /// The alias to use for the error column. If unspecified will be `error`.
587        alias: Option<Ident>,
588    },
589}
590
591impl AstDisplay for SourceErrorPolicy {
592    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
593        match self {
594            Self::Inline { alias } => {
595                f.write_str("INLINE");
596                if let Some(alias) = alias {
597                    f.write_str(" AS ");
598                    f.write_node(alias);
599                }
600            }
601        }
602    }
603}
604impl_display!(SourceErrorPolicy);
605
606#[derive(Debug, Clone, PartialEq, Eq, Hash)]
607pub enum SourceEnvelope {
608    None,
609    Debezium,
610    Upsert {
611        value_decode_err_policy: Vec<SourceErrorPolicy>,
612    },
613    CdcV2,
614}
615
616impl SourceEnvelope {
617    /// `true` iff Materialize is expected to crash or exhibit UB
618    /// when attempting to ingest data starting at an offset other than zero.
619    pub fn requires_all_input(&self) -> bool {
620        match self {
621            SourceEnvelope::None => false,
622            SourceEnvelope::Debezium => false,
623            SourceEnvelope::Upsert { .. } => false,
624            SourceEnvelope::CdcV2 => true,
625        }
626    }
627}
628
629impl AstDisplay for SourceEnvelope {
630    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
631        match self {
632            Self::None => {
633                // this is unreachable as long as the default is None, but include it in case we ever change that
634                f.write_str("NONE");
635            }
636            Self::Debezium => {
637                f.write_str("DEBEZIUM");
638            }
639            Self::Upsert {
640                value_decode_err_policy,
641            } => {
642                if value_decode_err_policy.is_empty() {
643                    f.write_str("UPSERT");
644                } else {
645                    f.write_str("UPSERT (VALUE DECODING ERRORS = (");
646                    f.write_node(&display::comma_separated(value_decode_err_policy));
647                    f.write_str("))")
648                }
649            }
650            Self::CdcV2 => {
651                f.write_str("MATERIALIZE");
652            }
653        }
654    }
655}
656impl_display!(SourceEnvelope);
657
658#[derive(Debug, Clone, PartialEq, Eq, Hash)]
659pub enum SinkEnvelope {
660    Debezium,
661    Upsert,
662}
663
664impl AstDisplay for SinkEnvelope {
665    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
666        match self {
667            Self::Upsert => {
668                f.write_str("UPSERT");
669            }
670            Self::Debezium => {
671                f.write_str("DEBEZIUM");
672            }
673        }
674    }
675}
676impl_display!(SinkEnvelope);
677
678#[derive(Debug, Clone, PartialEq, Eq, Hash)]
679pub enum IcebergSinkMode {
680    Upsert,
681}
682
683impl AstDisplay for IcebergSinkMode {
684    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
685        match self {
686            Self::Upsert => {
687                f.write_str("UPSERT");
688            }
689        }
690    }
691}
692impl_display!(IcebergSinkMode);
693
694#[derive(Debug, Clone, PartialEq, Eq, Hash)]
695pub enum SubscribeOutput<T: AstInfo> {
696    Diffs,
697    WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
698    EnvelopeUpsert { key_columns: Vec<Ident> },
699    EnvelopeDebezium { key_columns: Vec<Ident> },
700}
701
702impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
703    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
704        match self {
705            Self::Diffs => {}
706            Self::WithinTimestampOrderBy { order_by } => {
707                f.write_str(" WITHIN TIMESTAMP ORDER BY ");
708                f.write_node(&display::comma_separated(order_by));
709            }
710            Self::EnvelopeUpsert { key_columns } => {
711                f.write_str(" ENVELOPE UPSERT (KEY (");
712                f.write_node(&display::comma_separated(key_columns));
713                f.write_str("))");
714            }
715            Self::EnvelopeDebezium { key_columns } => {
716                f.write_str(" ENVELOPE DEBEZIUM (KEY (");
717                f.write_node(&display::comma_separated(key_columns));
718                f.write_str("))");
719            }
720        }
721    }
722}
723impl_display_t!(SubscribeOutput);
724
725impl<T: AstInfo> AstDisplay for Format<T> {
726    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
727        match self {
728            Self::Bytes => f.write_str("BYTES"),
729            Self::Avro(inner) => {
730                f.write_str("AVRO ");
731                f.write_node(inner);
732            }
733            Self::Protobuf(inner) => {
734                f.write_str("PROTOBUF ");
735                f.write_node(inner);
736            }
737            Self::Regex(regex) => {
738                f.write_str("REGEX '");
739                f.write_node(&display::escape_single_quote_string(regex));
740                f.write_str("'");
741            }
742            Self::Csv { columns, delimiter } => {
743                f.write_str("CSV WITH ");
744                f.write_node(columns);
745
746                if *delimiter != ',' {
747                    f.write_str(" DELIMITED BY '");
748                    f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
749                    f.write_str("'");
750                }
751            }
752            Self::Json { array } => {
753                f.write_str("JSON");
754                if *array {
755                    f.write_str(" ARRAY");
756                }
757            }
758            Self::Text => f.write_str("TEXT"),
759        }
760    }
761}
762impl_display_t!(Format);
763
764// All connection options are bundled together to allow us to parse `ALTER
765// CONNECTION` without specifying the type of connection we're altering. Il faut
766// souffrir pour être belle.
767#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
768pub enum ConnectionOptionName {
769    AccessKeyId,
770    AssumeRoleArn,
771    AssumeRoleSessionName,
772    AvailabilityZones,
773    AwsConnection,
774    AwsPrivatelink,
775    Broker,
776    Brokers,
777    Credential,
778    Database,
779    Endpoint,
780    Host,
781    Password,
782    Port,
783    ProgressTopic,
784    ProgressTopicReplicationFactor,
785    PublicKey1,
786    PublicKey2,
787    Region,
788    SaslMechanisms,
789    SaslPassword,
790    SaslUsername,
791    Scope,
792    SecretAccessKey,
793    SecurityProtocol,
794    ServiceName,
795    SshTunnel,
796    SslCertificate,
797    SslCertificateAuthority,
798    SslKey,
799    SslMode,
800    SessionToken,
801    CatalogType,
802    Url,
803    User,
804    Warehouse,
805}
806
807impl AstDisplay for ConnectionOptionName {
808    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
809        f.write_str(match self {
810            ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
811            ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
812            ConnectionOptionName::AwsConnection => "AWS CONNECTION",
813            ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
814            ConnectionOptionName::Broker => "BROKER",
815            ConnectionOptionName::Brokers => "BROKERS",
816            ConnectionOptionName::Credential => "CREDENTIAL",
817            ConnectionOptionName::Database => "DATABASE",
818            ConnectionOptionName::Endpoint => "ENDPOINT",
819            ConnectionOptionName::Host => "HOST",
820            ConnectionOptionName::Password => "PASSWORD",
821            ConnectionOptionName::Port => "PORT",
822            ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
823            ConnectionOptionName::ProgressTopicReplicationFactor => {
824                "PROGRESS TOPIC REPLICATION FACTOR"
825            }
826            ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
827            ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
828            ConnectionOptionName::Region => "REGION",
829            ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
830            ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
831            ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
832            ConnectionOptionName::SaslPassword => "SASL PASSWORD",
833            ConnectionOptionName::SaslUsername => "SASL USERNAME",
834            ConnectionOptionName::Scope => "SCOPE",
835            ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
836            ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
837            ConnectionOptionName::ServiceName => "SERVICE NAME",
838            ConnectionOptionName::SshTunnel => "SSH TUNNEL",
839            ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
840            ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
841            ConnectionOptionName::SslKey => "SSL KEY",
842            ConnectionOptionName::SslMode => "SSL MODE",
843            ConnectionOptionName::SessionToken => "SESSION TOKEN",
844            ConnectionOptionName::CatalogType => "CATALOG TYPE",
845            ConnectionOptionName::Url => "URL",
846            ConnectionOptionName::User => "USER",
847            ConnectionOptionName::Warehouse => "WAREHOUSE",
848        })
849    }
850}
851impl_display!(ConnectionOptionName);
852
853impl WithOptionName for ConnectionOptionName {
854    /// # WARNING
855    ///
856    /// Whenever implementing this trait consider very carefully whether or not
857    /// this value could contain sensitive user data. If you're uncertain, err
858    /// on the conservative side and return `true`.
859    fn redact_value(&self) -> bool {
860        match self {
861            ConnectionOptionName::AccessKeyId
862            | ConnectionOptionName::AvailabilityZones
863            | ConnectionOptionName::AwsConnection
864            | ConnectionOptionName::AwsPrivatelink
865            | ConnectionOptionName::Broker
866            | ConnectionOptionName::Brokers
867            | ConnectionOptionName::Credential
868            | ConnectionOptionName::Database
869            | ConnectionOptionName::Endpoint
870            | ConnectionOptionName::Host
871            | ConnectionOptionName::Password
872            | ConnectionOptionName::Port
873            | ConnectionOptionName::ProgressTopic
874            | ConnectionOptionName::ProgressTopicReplicationFactor
875            | ConnectionOptionName::PublicKey1
876            | ConnectionOptionName::PublicKey2
877            | ConnectionOptionName::Region
878            | ConnectionOptionName::AssumeRoleArn
879            | ConnectionOptionName::AssumeRoleSessionName
880            | ConnectionOptionName::SaslMechanisms
881            | ConnectionOptionName::SaslPassword
882            | ConnectionOptionName::SaslUsername
883            | ConnectionOptionName::Scope
884            | ConnectionOptionName::SecurityProtocol
885            | ConnectionOptionName::SecretAccessKey
886            | ConnectionOptionName::ServiceName
887            | ConnectionOptionName::SshTunnel
888            | ConnectionOptionName::SslCertificate
889            | ConnectionOptionName::SslCertificateAuthority
890            | ConnectionOptionName::SslKey
891            | ConnectionOptionName::SslMode
892            | ConnectionOptionName::SessionToken
893            | ConnectionOptionName::CatalogType
894            | ConnectionOptionName::Url
895            | ConnectionOptionName::User
896            | ConnectionOptionName::Warehouse => false,
897        }
898    }
899}
900
901#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
902/// An option in a `CREATE CONNECTION`.
903pub struct ConnectionOption<T: AstInfo> {
904    pub name: ConnectionOptionName,
905    pub value: Option<WithOptionValue<T>>,
906}
907impl_display_for_with_option!(ConnectionOption);
908impl_display_t!(ConnectionOption);
909
910#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
911pub enum CreateConnectionType {
912    Aws,
913    AwsPrivatelink,
914    Kafka,
915    Csr,
916    Postgres,
917    Ssh,
918    SqlServer,
919    MySql,
920    IcebergCatalog,
921}
922
923impl AstDisplay for CreateConnectionType {
924    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
925        match self {
926            Self::Kafka => {
927                f.write_str("KAFKA");
928            }
929            Self::Csr => {
930                f.write_str("CONFLUENT SCHEMA REGISTRY");
931            }
932            Self::Postgres => {
933                f.write_str("POSTGRES");
934            }
935            Self::Aws => {
936                f.write_str("AWS");
937            }
938            Self::AwsPrivatelink => {
939                f.write_str("AWS PRIVATELINK");
940            }
941            Self::Ssh => {
942                f.write_str("SSH TUNNEL");
943            }
944            Self::SqlServer => {
945                f.write_str("SQL SERVER");
946            }
947            Self::MySql => {
948                f.write_str("MYSQL");
949            }
950            Self::IcebergCatalog => {
951                f.write_str("ICEBERG CATALOG");
952            }
953        }
954    }
955}
956impl_display!(CreateConnectionType);
957
958#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
959pub enum CreateConnectionOptionName {
960    Validate,
961}
962
963impl AstDisplay for CreateConnectionOptionName {
964    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
965        f.write_str(match self {
966            CreateConnectionOptionName::Validate => "VALIDATE",
967        })
968    }
969}
970impl_display!(CreateConnectionOptionName);
971
972impl WithOptionName for CreateConnectionOptionName {
973    /// # WARNING
974    ///
975    /// Whenever implementing this trait consider very carefully whether or not
976    /// this value could contain sensitive user data. If you're uncertain, err
977    /// on the conservative side and return `true`.
978    fn redact_value(&self) -> bool {
979        match self {
980            CreateConnectionOptionName::Validate => false,
981        }
982    }
983}
984
985#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
986/// An option in a `CREATE CONNECTION...` statement.
987pub struct CreateConnectionOption<T: AstInfo> {
988    pub name: CreateConnectionOptionName,
989    pub value: Option<WithOptionValue<T>>,
990}
991impl_display_for_with_option!(CreateConnectionOption);
992impl_display_t!(CreateConnectionOption);
993
994#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
995pub enum KafkaSourceConfigOptionName {
996    GroupIdPrefix,
997    Topic,
998    TopicMetadataRefreshInterval,
999    StartTimestamp,
1000    StartOffset,
1001}
1002
1003impl AstDisplay for KafkaSourceConfigOptionName {
1004    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1005        f.write_str(match self {
1006            KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
1007            KafkaSourceConfigOptionName::Topic => "TOPIC",
1008            KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
1009                "TOPIC METADATA REFRESH INTERVAL"
1010            }
1011            KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
1012            KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
1013        })
1014    }
1015}
1016impl_display!(KafkaSourceConfigOptionName);
1017
1018impl WithOptionName for KafkaSourceConfigOptionName {
1019    /// # WARNING
1020    ///
1021    /// Whenever implementing this trait consider very carefully whether or not
1022    /// this value could contain sensitive user data. If you're uncertain, err
1023    /// on the conservative side and return `true`.
1024    fn redact_value(&self) -> bool {
1025        match self {
1026            KafkaSourceConfigOptionName::GroupIdPrefix
1027            | KafkaSourceConfigOptionName::Topic
1028            | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1029            | KafkaSourceConfigOptionName::StartOffset
1030            | KafkaSourceConfigOptionName::StartTimestamp => false,
1031        }
1032    }
1033}
1034
1035#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1036pub struct KafkaSourceConfigOption<T: AstInfo> {
1037    pub name: KafkaSourceConfigOptionName,
1038    pub value: Option<WithOptionValue<T>>,
1039}
1040impl_display_for_with_option!(KafkaSourceConfigOption);
1041impl_display_t!(KafkaSourceConfigOption);
1042
1043#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1044pub enum KafkaSinkConfigOptionName {
1045    CompressionType,
1046    PartitionBy,
1047    ProgressGroupIdPrefix,
1048    Topic,
1049    TransactionalIdPrefix,
1050    LegacyIds,
1051    TopicConfig,
1052    TopicMetadataRefreshInterval,
1053    TopicPartitionCount,
1054    TopicReplicationFactor,
1055}
1056
1057impl AstDisplay for KafkaSinkConfigOptionName {
1058    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1059        f.write_str(match self {
1060            KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1061            KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1062            KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1063            KafkaSinkConfigOptionName::Topic => "TOPIC",
1064            KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1065            KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1066            KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1067            KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1068                "TOPIC METADATA REFRESH INTERVAL"
1069            }
1070            KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1071            KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1072        })
1073    }
1074}
1075impl_display!(KafkaSinkConfigOptionName);
1076
1077impl WithOptionName for KafkaSinkConfigOptionName {
1078    /// # WARNING
1079    ///
1080    /// Whenever implementing this trait consider very carefully whether or not
1081    /// this value could contain sensitive user data. If you're uncertain, err
1082    /// on the conservative side and return `true`.
1083    fn redact_value(&self) -> bool {
1084        match self {
1085            KafkaSinkConfigOptionName::CompressionType
1086            | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1087            | KafkaSinkConfigOptionName::Topic
1088            | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1089            | KafkaSinkConfigOptionName::TransactionalIdPrefix
1090            | KafkaSinkConfigOptionName::LegacyIds
1091            | KafkaSinkConfigOptionName::TopicConfig
1092            | KafkaSinkConfigOptionName::TopicPartitionCount
1093            | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1094            KafkaSinkConfigOptionName::PartitionBy => true,
1095        }
1096    }
1097}
1098
1099#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1100pub struct KafkaSinkConfigOption<T: AstInfo> {
1101    pub name: KafkaSinkConfigOptionName,
1102    pub value: Option<WithOptionValue<T>>,
1103}
1104impl_display_for_with_option!(KafkaSinkConfigOption);
1105impl_display_t!(KafkaSinkConfigOption);
1106
1107#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1108pub enum IcebergSinkConfigOptionName {
1109    Namespace,
1110    Table,
1111}
1112
1113impl AstDisplay for IcebergSinkConfigOptionName {
1114    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1115        f.write_str(match self {
1116            IcebergSinkConfigOptionName::Namespace => "NAMESPACE",
1117            IcebergSinkConfigOptionName::Table => "TABLE",
1118        })
1119    }
1120}
1121impl_display!(IcebergSinkConfigOptionName);
1122
1123impl WithOptionName for IcebergSinkConfigOptionName {
1124    /// # WARNING
1125    ///
1126    /// Whenever implementing this trait consider very carefully whether or not
1127    /// this value could contain sensitive user data. If you're uncertain, err
1128    /// on the conservative side and return `true`.
1129    fn redact_value(&self) -> bool {
1130        match self {
1131            IcebergSinkConfigOptionName::Namespace | IcebergSinkConfigOptionName::Table => false,
1132        }
1133    }
1134}
1135
1136#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1137pub struct IcebergSinkConfigOption<T: AstInfo> {
1138    pub name: IcebergSinkConfigOptionName,
1139    pub value: Option<WithOptionValue<T>>,
1140}
1141impl_display_for_with_option!(IcebergSinkConfigOption);
1142impl_display_t!(IcebergSinkConfigOption);
1143
1144#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1145pub enum PgConfigOptionName {
1146    /// Hex encoded string of binary serialization of
1147    /// `mz_storage_types::sources::postgres::PostgresSourcePublicationDetails`
1148    Details,
1149    /// The name of the publication to sync
1150    Publication,
1151    /// Columns whose types you want to unconditionally format as text
1152    /// NOTE(roshan): This value is kept around to allow round-tripping a
1153    /// `CREATE SOURCE` statement while we still allow creating implicit
1154    /// subsources from `CREATE SOURCE`, but will be removed once
1155    /// fully deprecating that feature and forcing users to use explicit
1156    /// `CREATE TABLE .. FROM SOURCE` statements
1157    TextColumns,
1158    /// Columns you want to exclude
1159    /// NOTE: This value is kept around to allow round-tripping a
1160    /// `CREATE SOURCE` statement while we still allow creating implicit
1161    /// subsources from `CREATE SOURCE`, but will be removed once
1162    /// fully deprecating that feature and forcing users to use explicit
1163    /// `CREATE TABLE .. FROM SOURCE` statements
1164    ExcludeColumns,
1165}
1166
1167impl AstDisplay for PgConfigOptionName {
1168    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1169        f.write_str(match self {
1170            PgConfigOptionName::Details => "DETAILS",
1171            PgConfigOptionName::Publication => "PUBLICATION",
1172            PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1173            PgConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1174        })
1175    }
1176}
1177impl_display!(PgConfigOptionName);
1178
1179impl WithOptionName for PgConfigOptionName {
1180    /// # WARNING
1181    ///
1182    /// Whenever implementing this trait consider very carefully whether or not
1183    /// this value could contain sensitive user data. If you're uncertain, err
1184    /// on the conservative side and return `true`.
1185    fn redact_value(&self) -> bool {
1186        match self {
1187            PgConfigOptionName::Details
1188            | PgConfigOptionName::Publication
1189            | PgConfigOptionName::TextColumns
1190            | PgConfigOptionName::ExcludeColumns => false,
1191        }
1192    }
1193}
1194
1195#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1196/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1197pub struct PgConfigOption<T: AstInfo> {
1198    pub name: PgConfigOptionName,
1199    pub value: Option<WithOptionValue<T>>,
1200}
1201impl_display_for_with_option!(PgConfigOption);
1202impl_display_t!(PgConfigOption);
1203
1204#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1205pub enum MySqlConfigOptionName {
1206    /// Hex encoded string of binary serialization of
1207    /// `mz_storage_types::sources::mysql::MySqlSourceDetails`
1208    Details,
1209    /// Columns whose types you want to unconditionally format as text
1210    /// NOTE(roshan): This value is kept around to allow round-tripping a
1211    /// `CREATE SOURCE` statement while we still allow creating implicit
1212    /// subsources from `CREATE SOURCE`, but will be removed once
1213    /// fully deprecating that feature and forcing users to use explicit
1214    /// `CREATE TABLE .. FROM SOURCE` statements
1215    TextColumns,
1216    /// Columns you want to exclude
1217    /// NOTE(roshan): This value is kept around to allow round-tripping a
1218    /// `CREATE SOURCE` statement while we still allow creating implicit
1219    /// subsources from `CREATE SOURCE`, but will be removed once
1220    /// fully deprecating that feature and forcing users to use explicit
1221    /// `CREATE TABLE .. FROM SOURCE` statements
1222    ExcludeColumns,
1223}
1224
1225impl AstDisplay for MySqlConfigOptionName {
1226    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1227        f.write_str(match self {
1228            MySqlConfigOptionName::Details => "DETAILS",
1229            MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1230            MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1231        })
1232    }
1233}
1234impl_display!(MySqlConfigOptionName);
1235
1236impl WithOptionName for MySqlConfigOptionName {
1237    /// # WARNING
1238    ///
1239    /// Whenever implementing this trait consider very carefully whether or not
1240    /// this value could contain sensitive user data. If you're uncertain, err
1241    /// on the conservative side and return `true`.
1242    fn redact_value(&self) -> bool {
1243        match self {
1244            MySqlConfigOptionName::Details
1245            | MySqlConfigOptionName::TextColumns
1246            | MySqlConfigOptionName::ExcludeColumns => false,
1247        }
1248    }
1249}
1250
1251#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1252/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1253pub struct MySqlConfigOption<T: AstInfo> {
1254    pub name: MySqlConfigOptionName,
1255    pub value: Option<WithOptionValue<T>>,
1256}
1257impl_display_for_with_option!(MySqlConfigOption);
1258impl_display_t!(MySqlConfigOption);
1259
1260#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1261pub enum SqlServerConfigOptionName {
1262    /// Hex encoded string of binary serialization of
1263    /// `mz_storage_types::sources::sql_server::SqlServerSourceDetails`.
1264    Details,
1265    /// Columns whose types you want to unconditionally format as text.
1266    ///
1267    /// NOTE(roshan): This value is kept around to allow round-tripping a
1268    /// `CREATE SOURCE` statement while we still allow creating implicit
1269    /// subsources from `CREATE SOURCE`, but will be removed once
1270    /// fully deprecating that feature and forcing users to use explicit
1271    /// `CREATE TABLE .. FROM SOURCE` statements
1272    TextColumns,
1273    /// Columns you want to exclude.
1274    ///
1275    /// NOTE(roshan): This value is kept around to allow round-tripping a
1276    /// `CREATE SOURCE` statement while we still allow creating implicit
1277    /// subsources from `CREATE SOURCE`, but will be removed once
1278    /// fully deprecating that feature and forcing users to use explicit
1279    /// `CREATE TABLE .. FROM SOURCE` statements
1280    ExcludeColumns,
1281}
1282
1283impl AstDisplay for SqlServerConfigOptionName {
1284    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1285        f.write_str(match self {
1286            SqlServerConfigOptionName::Details => "DETAILS",
1287            SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1288            SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1289        })
1290    }
1291}
1292impl_display!(SqlServerConfigOptionName);
1293
1294impl WithOptionName for SqlServerConfigOptionName {
1295    /// # WARNING
1296    ///
1297    /// Whenever implementing this trait consider very carefully whether or not
1298    /// this value could contain sensitive user data. If you're uncertain, err
1299    /// on the conservative side and return `true`.
1300    fn redact_value(&self) -> bool {
1301        match self {
1302            SqlServerConfigOptionName::Details
1303            | SqlServerConfigOptionName::TextColumns
1304            | SqlServerConfigOptionName::ExcludeColumns => false,
1305        }
1306    }
1307}
1308
1309#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1310/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1311pub struct SqlServerConfigOption<T: AstInfo> {
1312    pub name: SqlServerConfigOptionName,
1313    pub value: Option<WithOptionValue<T>>,
1314}
1315impl_display_for_with_option!(SqlServerConfigOption);
1316impl_display_t!(SqlServerConfigOption);
1317
1318#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1319pub enum CreateSourceConnection<T: AstInfo> {
1320    Kafka {
1321        connection: T::ItemName,
1322        options: Vec<KafkaSourceConfigOption<T>>,
1323    },
1324    Postgres {
1325        connection: T::ItemName,
1326        options: Vec<PgConfigOption<T>>,
1327    },
1328    SqlServer {
1329        connection: T::ItemName,
1330        options: Vec<SqlServerConfigOption<T>>,
1331    },
1332    MySql {
1333        connection: T::ItemName,
1334        options: Vec<MySqlConfigOption<T>>,
1335    },
1336    LoadGenerator {
1337        generator: LoadGenerator,
1338        options: Vec<LoadGeneratorOption<T>>,
1339    },
1340}
1341
1342impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1343    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1344        match self {
1345            CreateSourceConnection::Kafka {
1346                connection,
1347                options,
1348            } => {
1349                f.write_str("KAFKA CONNECTION ");
1350                f.write_node(connection);
1351                if !options.is_empty() {
1352                    f.write_str(" (");
1353                    f.write_node(&display::comma_separated(options));
1354                    f.write_str(")");
1355                }
1356            }
1357            CreateSourceConnection::Postgres {
1358                connection,
1359                options,
1360            } => {
1361                f.write_str("POSTGRES CONNECTION ");
1362                f.write_node(connection);
1363                if !options.is_empty() {
1364                    f.write_str(" (");
1365                    f.write_node(&display::comma_separated(options));
1366                    f.write_str(")");
1367                }
1368            }
1369            CreateSourceConnection::SqlServer {
1370                connection,
1371                options,
1372            } => {
1373                f.write_str("SQL SERVER CONNECTION ");
1374                f.write_node(connection);
1375                if !options.is_empty() {
1376                    f.write_str(" (");
1377                    f.write_node(&display::comma_separated(options));
1378                    f.write_str(")");
1379                }
1380            }
1381            CreateSourceConnection::MySql {
1382                connection,
1383                options,
1384            } => {
1385                f.write_str("MYSQL CONNECTION ");
1386                f.write_node(connection);
1387                if !options.is_empty() {
1388                    f.write_str(" (");
1389                    f.write_node(&display::comma_separated(options));
1390                    f.write_str(")");
1391                }
1392            }
1393            CreateSourceConnection::LoadGenerator { generator, options } => {
1394                f.write_str("LOAD GENERATOR ");
1395                f.write_node(generator);
1396                if !options.is_empty() {
1397                    f.write_str(" (");
1398                    f.write_node(&display::comma_separated(options));
1399                    f.write_str(")");
1400                }
1401            }
1402        }
1403    }
1404}
1405impl_display_t!(CreateSourceConnection);
1406
1407#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1408pub enum LoadGenerator {
1409    Clock,
1410    Counter,
1411    Marketing,
1412    Auction,
1413    Datums,
1414    Tpch,
1415    KeyValue,
1416}
1417
1418impl AstDisplay for LoadGenerator {
1419    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1420        match self {
1421            Self::Counter => f.write_str("COUNTER"),
1422            Self::Clock => f.write_str("CLOCK"),
1423            Self::Marketing => f.write_str("MARKETING"),
1424            Self::Auction => f.write_str("AUCTION"),
1425            Self::Datums => f.write_str("DATUMS"),
1426            Self::Tpch => f.write_str("TPCH"),
1427            Self::KeyValue => f.write_str("KEY VALUE"),
1428        }
1429    }
1430}
1431impl_display!(LoadGenerator);
1432
1433impl LoadGenerator {
1434    /// Corresponds with the same mapping on the `LoadGenerator` enum defined in
1435    /// src/storage-types/src/sources/load_generator.rs, but re-defined here for
1436    /// cases where we only have the AST representation. This can be removed once
1437    /// the `ast_rewrite_sources_to_tables` migration is removed.
1438    pub fn schema_name(&self) -> &'static str {
1439        match self {
1440            LoadGenerator::Counter => "counter",
1441            LoadGenerator::Clock => "clock",
1442            LoadGenerator::Marketing => "marketing",
1443            LoadGenerator::Auction => "auction",
1444            LoadGenerator::Datums => "datums",
1445            LoadGenerator::Tpch => "tpch",
1446            LoadGenerator::KeyValue => "key_value",
1447        }
1448    }
1449}
1450
1451#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1452pub enum LoadGeneratorOptionName {
1453    ScaleFactor,
1454    TickInterval,
1455    AsOf,
1456    UpTo,
1457    MaxCardinality,
1458    Keys,
1459    SnapshotRounds,
1460    TransactionalSnapshot,
1461    ValueSize,
1462    Seed,
1463    Partitions,
1464    BatchSize,
1465}
1466
1467impl AstDisplay for LoadGeneratorOptionName {
1468    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1469        f.write_str(match self {
1470            LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1471            LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1472            LoadGeneratorOptionName::AsOf => "AS OF",
1473            LoadGeneratorOptionName::UpTo => "UP TO",
1474            LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1475            LoadGeneratorOptionName::Keys => "KEYS",
1476            LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1477            LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1478            LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1479            LoadGeneratorOptionName::Seed => "SEED",
1480            LoadGeneratorOptionName::Partitions => "PARTITIONS",
1481            LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1482        })
1483    }
1484}
1485impl_display!(LoadGeneratorOptionName);
1486
1487impl WithOptionName for LoadGeneratorOptionName {
1488    /// # WARNING
1489    ///
1490    /// Whenever implementing this trait consider very carefully whether or not
1491    /// this value could contain sensitive user data. If you're uncertain, err
1492    /// on the conservative side and return `true`.
1493    fn redact_value(&self) -> bool {
1494        match self {
1495            LoadGeneratorOptionName::ScaleFactor
1496            | LoadGeneratorOptionName::TickInterval
1497            | LoadGeneratorOptionName::AsOf
1498            | LoadGeneratorOptionName::UpTo
1499            | LoadGeneratorOptionName::MaxCardinality
1500            | LoadGeneratorOptionName::Keys
1501            | LoadGeneratorOptionName::SnapshotRounds
1502            | LoadGeneratorOptionName::TransactionalSnapshot
1503            | LoadGeneratorOptionName::ValueSize
1504            | LoadGeneratorOptionName::Partitions
1505            | LoadGeneratorOptionName::BatchSize
1506            | LoadGeneratorOptionName::Seed => false,
1507        }
1508    }
1509}
1510
1511#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1512/// An option in a `CREATE CONNECTION...SSH`.
1513pub struct LoadGeneratorOption<T: AstInfo> {
1514    pub name: LoadGeneratorOptionName,
1515    pub value: Option<WithOptionValue<T>>,
1516}
1517impl_display_for_with_option!(LoadGeneratorOption);
1518impl_display_t!(LoadGeneratorOption);
1519
1520#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1521pub enum CreateSinkConnection<T: AstInfo> {
1522    Kafka {
1523        connection: T::ItemName,
1524        options: Vec<KafkaSinkConfigOption<T>>,
1525        key: Option<SinkKey>,
1526        headers: Option<Ident>,
1527    },
1528    Iceberg {
1529        connection: T::ItemName,
1530        aws_connection: T::ItemName,
1531        key: Option<SinkKey>,
1532        options: Vec<IcebergSinkConfigOption<T>>,
1533    },
1534}
1535
1536impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1537    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1538        match self {
1539            CreateSinkConnection::Kafka {
1540                connection,
1541                options,
1542                key,
1543                headers,
1544            } => {
1545                f.write_str("KAFKA CONNECTION ");
1546                f.write_node(connection);
1547                if !options.is_empty() {
1548                    f.write_str(" (");
1549                    f.write_node(&display::comma_separated(options));
1550                    f.write_str(")");
1551                }
1552                if let Some(key) = key.as_ref() {
1553                    f.write_str(" ");
1554                    f.write_node(key);
1555                }
1556                if let Some(headers) = headers {
1557                    f.write_str(" HEADERS ");
1558                    f.write_node(headers);
1559                }
1560            }
1561            CreateSinkConnection::Iceberg {
1562                connection,
1563                aws_connection,
1564                key,
1565                options,
1566            } => {
1567                f.write_str("ICEBERG CATALOG CONNECTION ");
1568                f.write_node(connection);
1569                if !options.is_empty() {
1570                    f.write_str(" (");
1571                    f.write_node(&display::comma_separated(options));
1572                    f.write_str(")");
1573                }
1574                f.write_str(" USING AWS CONNECTION ");
1575                f.write_node(aws_connection);
1576                if let Some(key) = key.as_ref() {
1577                    f.write_str(" ");
1578                    f.write_node(key);
1579                }
1580            }
1581        }
1582    }
1583}
1584impl_display_t!(CreateSinkConnection);
1585
1586#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1587pub struct SinkKey {
1588    pub key_columns: Vec<Ident>,
1589    pub not_enforced: bool,
1590}
1591
1592impl AstDisplay for SinkKey {
1593    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1594        f.write_str("KEY (");
1595        f.write_node(&display::comma_separated(&self.key_columns));
1596        f.write_str(")");
1597        if self.not_enforced {
1598            f.write_str(" NOT ENFORCED");
1599        }
1600    }
1601}
1602
1603/// A table-level constraint, specified in a `CREATE TABLE` or an
1604/// `ALTER TABLE ADD <constraint>` statement.
1605#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1606pub enum TableConstraint<T: AstInfo> {
1607    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE (NULLS NOT DISTINCT)? } (<columns>)`
1608    Unique {
1609        name: Option<Ident>,
1610        columns: Vec<Ident>,
1611        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
1612        is_primary: bool,
1613        // Where this constraint treats each NULL value as distinct; only available on `UNIQUE`
1614        // constraints.
1615        nulls_not_distinct: bool,
1616    },
1617    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
1618    /// REFERENCES <foreign_table> (<referred_columns>)`)
1619    ForeignKey {
1620        name: Option<Ident>,
1621        columns: Vec<Ident>,
1622        foreign_table: T::ItemName,
1623        referred_columns: Vec<Ident>,
1624    },
1625    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
1626    Check {
1627        name: Option<Ident>,
1628        expr: Box<Expr<T>>,
1629    },
1630}
1631
1632impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1633    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1634        match self {
1635            TableConstraint::Unique {
1636                name,
1637                columns,
1638                is_primary,
1639                nulls_not_distinct,
1640            } => {
1641                f.write_node(&display_constraint_name(name));
1642                if *is_primary {
1643                    f.write_str("PRIMARY KEY ");
1644                } else {
1645                    f.write_str("UNIQUE ");
1646                    if *nulls_not_distinct {
1647                        f.write_str("NULLS NOT DISTINCT ");
1648                    }
1649                }
1650                f.write_str("(");
1651                f.write_node(&display::comma_separated(columns));
1652                f.write_str(")");
1653            }
1654            TableConstraint::ForeignKey {
1655                name,
1656                columns,
1657                foreign_table,
1658                referred_columns,
1659            } => {
1660                f.write_node(&display_constraint_name(name));
1661                f.write_str("FOREIGN KEY (");
1662                f.write_node(&display::comma_separated(columns));
1663                f.write_str(") REFERENCES ");
1664                f.write_node(foreign_table);
1665                f.write_str("(");
1666                f.write_node(&display::comma_separated(referred_columns));
1667                f.write_str(")");
1668            }
1669            TableConstraint::Check { name, expr } => {
1670                f.write_node(&display_constraint_name(name));
1671                f.write_str("CHECK (");
1672                f.write_node(&expr);
1673                f.write_str(")");
1674            }
1675        }
1676    }
1677}
1678impl_display_t!(TableConstraint);
1679
1680/// A key constraint, specified in a `CREATE SOURCE`.
1681#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1682pub enum KeyConstraint {
1683    // PRIMARY KEY (<columns>) NOT ENFORCED
1684    PrimaryKeyNotEnforced { columns: Vec<Ident> },
1685}
1686
1687impl AstDisplay for KeyConstraint {
1688    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1689        match self {
1690            KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1691                f.write_str("PRIMARY KEY ");
1692                f.write_str("(");
1693                f.write_node(&display::comma_separated(columns));
1694                f.write_str(") ");
1695                f.write_str("NOT ENFORCED");
1696            }
1697        }
1698    }
1699}
1700impl_display!(KeyConstraint);
1701
1702#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1703pub enum CreateSourceOptionName {
1704    TimestampInterval,
1705    RetainHistory,
1706}
1707
1708impl AstDisplay for CreateSourceOptionName {
1709    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1710        f.write_str(match self {
1711            CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1712            CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1713        })
1714    }
1715}
1716impl_display!(CreateSourceOptionName);
1717
1718impl WithOptionName for CreateSourceOptionName {
1719    /// # WARNING
1720    ///
1721    /// Whenever implementing this trait consider very carefully whether or not
1722    /// this value could contain sensitive user data. If you're uncertain, err
1723    /// on the conservative side and return `true`.
1724    fn redact_value(&self) -> bool {
1725        match self {
1726            CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1727                false
1728            }
1729        }
1730    }
1731}
1732
1733#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1734/// An option in a `CREATE SOURCE...` statement.
1735pub struct CreateSourceOption<T: AstInfo> {
1736    pub name: CreateSourceOptionName,
1737    pub value: Option<WithOptionValue<T>>,
1738}
1739impl_display_for_with_option!(CreateSourceOption);
1740impl_display_t!(CreateSourceOption);
1741
1742/// SQL column definition
1743#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1744pub struct ColumnDef<T: AstInfo> {
1745    pub name: Ident,
1746    pub data_type: T::DataType,
1747    pub collation: Option<UnresolvedItemName>,
1748    pub options: Vec<ColumnOptionDef<T>>,
1749}
1750
1751impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1752    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1753        f.write_node(&self.name);
1754        f.write_str(" ");
1755        f.write_node(&self.data_type);
1756        if let Some(collation) = &self.collation {
1757            f.write_str(" COLLATE ");
1758            f.write_node(collation);
1759        }
1760        for option in &self.options {
1761            f.write_str(" ");
1762            f.write_node(option);
1763        }
1764    }
1765}
1766impl_display_t!(ColumnDef);
1767
1768/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
1769///
1770/// Note that implementations are substantially more permissive than the ANSI
1771/// specification on what order column options can be presented in, and whether
1772/// they are allowed to be named. The specification distinguishes between
1773/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
1774/// and can appear in any order, and other options (DEFAULT, GENERATED), which
1775/// cannot be named and must appear in a fixed order. PostgreSQL, however,
1776/// allows preceding any option with `CONSTRAINT <name>`, even those that are
1777/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
1778/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
1779/// NOT NULL constraints (the last of which is in violation of the spec).
1780///
1781/// For maximum flexibility, we don't distinguish between constraint and
1782/// non-constraint options, lumping them all together under the umbrella of
1783/// "column options," and we allow any column option to be named.
1784#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1785pub struct ColumnOptionDef<T: AstInfo> {
1786    pub name: Option<Ident>,
1787    pub option: ColumnOption<T>,
1788}
1789
1790impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1791    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1792        f.write_node(&display_constraint_name(&self.name));
1793        f.write_node(&self.option);
1794    }
1795}
1796impl_display_t!(ColumnOptionDef);
1797
1798/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
1799/// TABLE` statement.
1800#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1801pub enum ColumnOption<T: AstInfo> {
1802    /// `NULL`
1803    Null,
1804    /// `NOT NULL`
1805    NotNull,
1806    /// `DEFAULT <restricted-expr>`
1807    Default(Expr<T>),
1808    /// `{ PRIMARY KEY | UNIQUE }`
1809    Unique { is_primary: bool },
1810    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
1811    /// <foreign_table> (<referred_columns>)`).
1812    ForeignKey {
1813        foreign_table: UnresolvedItemName,
1814        referred_columns: Vec<Ident>,
1815    },
1816    /// `CHECK (<expr>)`
1817    Check(Expr<T>),
1818    /// `VERSION <action> <version>`
1819    Versioned {
1820        action: ColumnVersioned,
1821        version: Version,
1822    },
1823}
1824
1825impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1826    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1827        use ColumnOption::*;
1828        match self {
1829            Null => f.write_str("NULL"),
1830            NotNull => f.write_str("NOT NULL"),
1831            Default(expr) => {
1832                f.write_str("DEFAULT ");
1833                f.write_node(expr);
1834            }
1835            Unique { is_primary } => {
1836                if *is_primary {
1837                    f.write_str("PRIMARY KEY");
1838                } else {
1839                    f.write_str("UNIQUE");
1840                }
1841            }
1842            ForeignKey {
1843                foreign_table,
1844                referred_columns,
1845            } => {
1846                f.write_str("REFERENCES ");
1847                f.write_node(foreign_table);
1848                f.write_str(" (");
1849                f.write_node(&display::comma_separated(referred_columns));
1850                f.write_str(")");
1851            }
1852            Check(expr) => {
1853                f.write_str("CHECK (");
1854                f.write_node(expr);
1855                f.write_str(")");
1856            }
1857            Versioned { action, version } => {
1858                f.write_str("VERSION ");
1859                f.write_node(action);
1860                f.write_str(" ");
1861                f.write_node(version);
1862            }
1863        }
1864    }
1865}
1866impl_display_t!(ColumnOption);
1867
1868#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1869pub enum ColumnVersioned {
1870    Added,
1871}
1872
1873impl AstDisplay for ColumnVersioned {
1874    fn fmt<W>(&self, f: &mut AstFormatter<W>)
1875    where
1876        W: fmt::Write,
1877    {
1878        match self {
1879            // TODO(alter_table): Support dropped columns.
1880            ColumnVersioned::Added => f.write_str("ADDED"),
1881        }
1882    }
1883}
1884impl_display!(ColumnVersioned);
1885
1886fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
1887    struct ConstraintName<'a>(&'a Option<Ident>);
1888    impl<'a> AstDisplay for ConstraintName<'a> {
1889        fn fmt<W>(&self, f: &mut AstFormatter<W>)
1890        where
1891            W: fmt::Write,
1892        {
1893            if let Some(name) = self.0 {
1894                f.write_str("CONSTRAINT ");
1895                f.write_node(name);
1896                f.write_str(" ");
1897            }
1898        }
1899    }
1900    ConstraintName(name)
1901}