mz_sql_parser/ast/defs/
ddl.rs

1// Copyright 2018 sqlparser-rs contributors. All rights reserved.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// This file is derived from the sqlparser-rs project, available at
5// https://github.com/andygrove/sqlparser-rs. It was incorporated
6// directly into Materialize on December 21, 2019.
7//
8// Licensed under the Apache License, Version 2.0 (the "License");
9// you may not use this file except in compliance with the License.
10// You may obtain a copy of the License in the LICENSE file at the
11// root of this repository, or online at
12//
13//     http://www.apache.org/licenses/LICENSE-2.0
14//
15// Unless required by applicable law or agreed to in writing, software
16// distributed under the License is distributed on an "AS IS" BASIS,
17// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18// See the License for the specific language governing permissions and
19// limitations under the License.
20
21//! AST types specific to CREATE/ALTER variants of [crate::ast::Statement]
22//! (commonly referred to as Data Definition Language, or DDL)
23
24use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28    AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33    /// The `ASSERT NOT NULL [=] <ident>` option.
34    AssertNotNull,
35    PartitionBy,
36    RetainHistory,
37    /// The `REFRESH [=] ...` option.
38    Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43        match self {
44            MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45            MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46            MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47            MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48        }
49    }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53    /// # WARNING
54    ///
55    /// Whenever implementing this trait consider very carefully whether or not
56    /// this value could contain sensitive user data. If you're uncertain, err
57    /// on the conservative side and return `true`.
58    fn redact_value(&self) -> bool {
59        match self {
60            MaterializedViewOptionName::AssertNotNull
61            | MaterializedViewOptionName::PartitionBy
62            | MaterializedViewOptionName::RetainHistory
63            | MaterializedViewOptionName::Refresh => false,
64        }
65    }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70    pub name: MaterializedViewOptionName,
71    pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
76pub enum ContinualTaskOptionName {
77    /// The `SNAPSHOT [=] ...` option.
78    Snapshot,
79}
80
81impl AstDisplay for ContinualTaskOptionName {
82    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
83        match self {
84            ContinualTaskOptionName::Snapshot => f.write_str("SNAPSHOT"),
85        }
86    }
87}
88
89impl WithOptionName for ContinualTaskOptionName {
90    /// # WARNING
91    ///
92    /// Whenever implementing this trait consider very carefully whether or not
93    /// this value could contain sensitive user data. If you're uncertain, err
94    /// on the conservative side and return `true`.
95    fn redact_value(&self) -> bool {
96        match self {
97            ContinualTaskOptionName::Snapshot => false,
98        }
99    }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
103pub struct ContinualTaskOption<T: AstInfo> {
104    pub name: ContinualTaskOptionName,
105    pub value: Option<WithOptionValue<T>>,
106}
107impl_display_for_with_option!(ContinualTaskOption);
108
109#[derive(Debug, Clone, PartialEq, Eq, Hash)]
110pub struct Schema {
111    pub schema: String,
112}
113
114impl AstDisplay for Schema {
115    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
116        f.write_str("SCHEMA '");
117        f.write_node(&display::escape_single_quote_string(&self.schema));
118        f.write_str("'");
119    }
120}
121impl_display!(Schema);
122
123#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
124pub enum AvroSchemaOptionName {
125    /// The `CONFLUENT WIRE FORMAT [=] <bool>` option.
126    ConfluentWireFormat,
127}
128
129impl AstDisplay for AvroSchemaOptionName {
130    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
131        match self {
132            AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
133        }
134    }
135}
136
137impl WithOptionName for AvroSchemaOptionName {
138    /// # WARNING
139    ///
140    /// Whenever implementing this trait consider very carefully whether or not
141    /// this value could contain sensitive user data. If you're uncertain, err
142    /// on the conservative side and return `true`.
143    fn redact_value(&self) -> bool {
144        match self {
145            Self::ConfluentWireFormat => false,
146        }
147    }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
151pub struct AvroSchemaOption<T: AstInfo> {
152    pub name: AvroSchemaOptionName,
153    pub value: Option<WithOptionValue<T>>,
154}
155impl_display_for_with_option!(AvroSchemaOption);
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum AvroSchema<T: AstInfo> {
159    Csr {
160        csr_connection: CsrConnectionAvro<T>,
161    },
162    InlineSchema {
163        schema: Schema,
164        with_options: Vec<AvroSchemaOption<T>>,
165    },
166}
167
168impl<T: AstInfo> AstDisplay for AvroSchema<T> {
169    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
170        match self {
171            Self::Csr { csr_connection } => {
172                f.write_node(csr_connection);
173            }
174            Self::InlineSchema {
175                schema,
176                with_options,
177            } => {
178                f.write_str("USING ");
179                schema.fmt(f);
180                if !with_options.is_empty() {
181                    f.write_str(" (");
182                    f.write_node(&display::comma_separated(with_options));
183                    f.write_str(")");
184                }
185            }
186        }
187    }
188}
189impl_display_t!(AvroSchema);
190
191#[derive(Debug, Clone, PartialEq, Eq, Hash)]
192pub enum ProtobufSchema<T: AstInfo> {
193    Csr {
194        csr_connection: CsrConnectionProtobuf<T>,
195    },
196    InlineSchema {
197        message_name: String,
198        schema: Schema,
199    },
200}
201
202impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
203    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
204        match self {
205            Self::Csr { csr_connection } => {
206                f.write_node(csr_connection);
207            }
208            Self::InlineSchema {
209                message_name,
210                schema,
211            } => {
212                f.write_str("MESSAGE '");
213                f.write_node(&display::escape_single_quote_string(message_name));
214                f.write_str("' USING ");
215                f.write_str(schema);
216            }
217        }
218    }
219}
220impl_display_t!(ProtobufSchema);
221
222#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
223pub enum CsrConfigOptionName<T: AstInfo> {
224    AvroKeyFullname,
225    AvroValueFullname,
226    NullDefaults,
227    AvroDocOn(AvroDocOn<T>),
228    KeyCompatibilityLevel,
229    ValueCompatibilityLevel,
230}
231
232impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
233    /// # WARNING
234    ///
235    /// Whenever implementing this trait consider very carefully whether or not
236    /// this value could contain sensitive user data. If you're uncertain, err
237    /// on the conservative side and return `true`.
238    fn redact_value(&self) -> bool {
239        match self {
240            Self::AvroKeyFullname
241            | Self::AvroValueFullname
242            | Self::NullDefaults
243            | Self::AvroDocOn(_)
244            | Self::KeyCompatibilityLevel
245            | Self::ValueCompatibilityLevel => false,
246        }
247    }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
251pub struct AvroDocOn<T: AstInfo> {
252    pub identifier: DocOnIdentifier<T>,
253    pub for_schema: DocOnSchema,
254}
255#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
256pub enum DocOnSchema {
257    KeyOnly,
258    ValueOnly,
259    All,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
263pub enum DocOnIdentifier<T: AstInfo> {
264    Column(ColumnName<T>),
265    Type(T::ItemName),
266}
267
268impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
269    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
270        match &self.for_schema {
271            DocOnSchema::KeyOnly => f.write_str("KEY "),
272            DocOnSchema::ValueOnly => f.write_str("VALUE "),
273            DocOnSchema::All => {}
274        }
275        match &self.identifier {
276            DocOnIdentifier::Column(name) => {
277                f.write_str("DOC ON COLUMN ");
278                f.write_node(name);
279            }
280            DocOnIdentifier::Type(name) => {
281                f.write_str("DOC ON TYPE ");
282                f.write_node(name);
283            }
284        }
285    }
286}
287impl_display_t!(AvroDocOn);
288
289impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
290    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
291        match self {
292            CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
293            CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
294            CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
295            CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
296            CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
297            CsrConfigOptionName::ValueCompatibilityLevel => {
298                f.write_str("VALUE COMPATIBILITY LEVEL")
299            }
300        }
301    }
302}
303impl_display_t!(CsrConfigOptionName);
304
305#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
306/// An option in a `{FROM|INTO} CONNECTION ...` statement.
307pub struct CsrConfigOption<T: AstInfo> {
308    pub name: CsrConfigOptionName<T>,
309    pub value: Option<WithOptionValue<T>>,
310}
311impl_display_for_with_option!(CsrConfigOption);
312impl_display_t!(CsrConfigOption);
313
314#[derive(Debug, Clone, PartialEq, Eq, Hash)]
315pub struct CsrConnection<T: AstInfo> {
316    pub connection: T::ItemName,
317    pub options: Vec<CsrConfigOption<T>>,
318}
319
320impl<T: AstInfo> AstDisplay for CsrConnection<T> {
321    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
322        f.write_str("CONNECTION ");
323        f.write_node(&self.connection);
324        if !self.options.is_empty() {
325            f.write_str(" (");
326            f.write_node(&display::comma_separated(&self.options));
327            f.write_str(")");
328        }
329    }
330}
331impl_display_t!(CsrConnection);
332
333#[derive(Debug, Clone, PartialEq, Eq, Hash)]
334pub enum ReaderSchemaSelectionStrategy {
335    Latest,
336    Inline(String),
337    ById(i32),
338}
339
340impl Default for ReaderSchemaSelectionStrategy {
341    fn default() -> Self {
342        Self::Latest
343    }
344}
345
346#[derive(Debug, Clone, PartialEq, Eq, Hash)]
347pub struct CsrConnectionAvro<T: AstInfo> {
348    pub connection: CsrConnection<T>,
349    pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
350    pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
351    pub seed: Option<CsrSeedAvro>,
352}
353
354impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
355    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
356        f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
357        f.write_node(&self.connection);
358        if let Some(seed) = &self.seed {
359            f.write_str(" ");
360            f.write_node(seed);
361        }
362    }
363}
364impl_display_t!(CsrConnectionAvro);
365
366#[derive(Debug, Clone, PartialEq, Eq, Hash)]
367pub struct CsrConnectionProtobuf<T: AstInfo> {
368    pub connection: CsrConnection<T>,
369    pub seed: Option<CsrSeedProtobuf>,
370}
371
372impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
373    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
374        f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
375        f.write_node(&self.connection);
376
377        if let Some(seed) = &self.seed {
378            f.write_str(" ");
379            f.write_node(seed);
380        }
381    }
382}
383impl_display_t!(CsrConnectionProtobuf);
384
385#[derive(Debug, Clone, PartialEq, Eq, Hash)]
386pub struct CsrSeedAvro {
387    pub key_schema: Option<String>,
388    pub value_schema: String,
389}
390
391impl AstDisplay for CsrSeedAvro {
392    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
393        f.write_str("SEED");
394        if let Some(key_schema) = &self.key_schema {
395            f.write_str(" KEY SCHEMA '");
396            f.write_node(&display::escape_single_quote_string(key_schema));
397            f.write_str("'");
398        }
399        f.write_str(" VALUE SCHEMA '");
400        f.write_node(&display::escape_single_quote_string(&self.value_schema));
401        f.write_str("'");
402    }
403}
404impl_display!(CsrSeedAvro);
405
406#[derive(Debug, Clone, PartialEq, Eq, Hash)]
407pub struct CsrSeedProtobuf {
408    pub key: Option<CsrSeedProtobufSchema>,
409    pub value: CsrSeedProtobufSchema,
410}
411
412impl AstDisplay for CsrSeedProtobuf {
413    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
414        f.write_str("SEED");
415        if let Some(key) = &self.key {
416            f.write_str(" KEY ");
417            f.write_node(key);
418        }
419        f.write_str(" VALUE ");
420        f.write_node(&self.value);
421    }
422}
423impl_display!(CsrSeedProtobuf);
424
425#[derive(Debug, Clone, PartialEq, Eq, Hash)]
426pub struct CsrSeedProtobufSchema {
427    // Hex encoded string.
428    pub schema: String,
429    pub message_name: String,
430}
431impl AstDisplay for CsrSeedProtobufSchema {
432    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
433        f.write_str("SCHEMA '");
434        f.write_str(&display::escape_single_quote_string(&self.schema));
435        f.write_str("' MESSAGE '");
436        f.write_str(&self.message_name);
437        f.write_str("'");
438    }
439}
440impl_display!(CsrSeedProtobufSchema);
441
442#[derive(Debug, Clone, PartialEq, Eq, Hash)]
443pub enum FormatSpecifier<T: AstInfo> {
444    /// `CREATE SOURCE/SINK .. FORMAT`
445    Bare(Format<T>),
446    /// `CREATE SOURCE/SINK .. KEY FORMAT .. VALUE FORMAT`
447    KeyValue { key: Format<T>, value: Format<T> },
448}
449
450impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
451    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
452        match self {
453            FormatSpecifier::Bare(format) => {
454                f.write_str("FORMAT ");
455                f.write_node(format)
456            }
457            FormatSpecifier::KeyValue { key, value } => {
458                f.write_str("KEY FORMAT ");
459                f.write_node(key);
460                f.write_str(" VALUE FORMAT ");
461                f.write_node(value);
462            }
463        }
464    }
465}
466impl_display_t!(FormatSpecifier);
467
468#[derive(Debug, Clone, PartialEq, Eq, Hash)]
469pub enum Format<T: AstInfo> {
470    Bytes,
471    Avro(AvroSchema<T>),
472    Protobuf(ProtobufSchema<T>),
473    Regex(String),
474    Csv {
475        columns: CsvColumns,
476        delimiter: char,
477    },
478    Json {
479        array: bool,
480    },
481    Text,
482}
483
484#[derive(Debug, Clone, PartialEq, Eq, Hash)]
485pub enum CsvColumns {
486    /// `WITH count COLUMNS`
487    Count(u64),
488    /// `WITH HEADER (ident, ...)?`: `names` is empty if there are no names specified
489    Header { names: Vec<Ident> },
490}
491
492impl AstDisplay for CsvColumns {
493    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
494        match self {
495            CsvColumns::Count(n) => {
496                f.write_str(n);
497                f.write_str(" COLUMNS")
498            }
499            CsvColumns::Header { names } => {
500                f.write_str("HEADER");
501                if !names.is_empty() {
502                    f.write_str(" (");
503                    f.write_node(&display::comma_separated(names));
504                    f.write_str(")");
505                }
506            }
507        }
508    }
509}
510
511#[derive(Debug, Clone, PartialEq, Eq, Hash)]
512pub enum SourceIncludeMetadata {
513    Key {
514        alias: Option<Ident>,
515    },
516    Timestamp {
517        alias: Option<Ident>,
518    },
519    Partition {
520        alias: Option<Ident>,
521    },
522    Offset {
523        alias: Option<Ident>,
524    },
525    Headers {
526        alias: Option<Ident>,
527    },
528    Header {
529        key: String,
530        alias: Ident,
531        use_bytes: bool,
532    },
533}
534
535impl AstDisplay for SourceIncludeMetadata {
536    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
537        let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
538            if let Some(alias) = alias {
539                f.write_str(" AS ");
540                f.write_node(alias);
541            }
542        };
543
544        match self {
545            SourceIncludeMetadata::Key { alias } => {
546                f.write_str("KEY");
547                print_alias(f, alias);
548            }
549            SourceIncludeMetadata::Timestamp { alias } => {
550                f.write_str("TIMESTAMP");
551                print_alias(f, alias);
552            }
553            SourceIncludeMetadata::Partition { alias } => {
554                f.write_str("PARTITION");
555                print_alias(f, alias);
556            }
557            SourceIncludeMetadata::Offset { alias } => {
558                f.write_str("OFFSET");
559                print_alias(f, alias);
560            }
561            SourceIncludeMetadata::Headers { alias } => {
562                f.write_str("HEADERS");
563                print_alias(f, alias);
564            }
565            SourceIncludeMetadata::Header {
566                alias,
567                key,
568                use_bytes,
569            } => {
570                f.write_str("HEADER '");
571                f.write_str(&display::escape_single_quote_string(key));
572                f.write_str("'");
573                print_alias(f, &Some(alias.clone()));
574                if *use_bytes {
575                    f.write_str(" BYTES");
576                }
577            }
578        }
579    }
580}
581impl_display!(SourceIncludeMetadata);
582
583#[derive(Debug, Clone, PartialEq, Eq, Hash)]
584pub enum SourceErrorPolicy {
585    Inline {
586        /// The alias to use for the error column. If unspecified will be `error`.
587        alias: Option<Ident>,
588    },
589}
590
591impl AstDisplay for SourceErrorPolicy {
592    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
593        match self {
594            Self::Inline { alias } => {
595                f.write_str("INLINE");
596                if let Some(alias) = alias {
597                    f.write_str(" AS ");
598                    f.write_node(alias);
599                }
600            }
601        }
602    }
603}
604impl_display!(SourceErrorPolicy);
605
606#[derive(Debug, Clone, PartialEq, Eq, Hash)]
607pub enum SourceEnvelope {
608    None,
609    Debezium,
610    Upsert {
611        value_decode_err_policy: Vec<SourceErrorPolicy>,
612    },
613    CdcV2,
614}
615
616impl SourceEnvelope {
617    /// `true` iff Materialize is expected to crash or exhibit UB
618    /// when attempting to ingest data starting at an offset other than zero.
619    pub fn requires_all_input(&self) -> bool {
620        match self {
621            SourceEnvelope::None => false,
622            SourceEnvelope::Debezium => false,
623            SourceEnvelope::Upsert { .. } => false,
624            SourceEnvelope::CdcV2 => true,
625        }
626    }
627}
628
629impl AstDisplay for SourceEnvelope {
630    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
631        match self {
632            Self::None => {
633                // this is unreachable as long as the default is None, but include it in case we ever change that
634                f.write_str("NONE");
635            }
636            Self::Debezium => {
637                f.write_str("DEBEZIUM");
638            }
639            Self::Upsert {
640                value_decode_err_policy,
641            } => {
642                if value_decode_err_policy.is_empty() {
643                    f.write_str("UPSERT");
644                } else {
645                    f.write_str("UPSERT (VALUE DECODING ERRORS = (");
646                    f.write_node(&display::comma_separated(value_decode_err_policy));
647                    f.write_str("))")
648                }
649            }
650            Self::CdcV2 => {
651                f.write_str("MATERIALIZE");
652            }
653        }
654    }
655}
656impl_display!(SourceEnvelope);
657
658#[derive(Debug, Clone, PartialEq, Eq, Hash)]
659pub enum SinkEnvelope {
660    Debezium,
661    Upsert,
662}
663
664impl AstDisplay for SinkEnvelope {
665    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
666        match self {
667            Self::Upsert => {
668                f.write_str("UPSERT");
669            }
670            Self::Debezium => {
671                f.write_str("DEBEZIUM");
672            }
673        }
674    }
675}
676impl_display!(SinkEnvelope);
677
678#[derive(Debug, Clone, PartialEq, Eq, Hash)]
679pub enum SubscribeOutput<T: AstInfo> {
680    Diffs,
681    WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
682    EnvelopeUpsert { key_columns: Vec<Ident> },
683    EnvelopeDebezium { key_columns: Vec<Ident> },
684}
685
686impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
687    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
688        match self {
689            Self::Diffs => {}
690            Self::WithinTimestampOrderBy { order_by } => {
691                f.write_str(" WITHIN TIMESTAMP ORDER BY ");
692                f.write_node(&display::comma_separated(order_by));
693            }
694            Self::EnvelopeUpsert { key_columns } => {
695                f.write_str(" ENVELOPE UPSERT (KEY (");
696                f.write_node(&display::comma_separated(key_columns));
697                f.write_str("))");
698            }
699            Self::EnvelopeDebezium { key_columns } => {
700                f.write_str(" ENVELOPE DEBEZIUM (KEY (");
701                f.write_node(&display::comma_separated(key_columns));
702                f.write_str("))");
703            }
704        }
705    }
706}
707impl_display_t!(SubscribeOutput);
708
709impl<T: AstInfo> AstDisplay for Format<T> {
710    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
711        match self {
712            Self::Bytes => f.write_str("BYTES"),
713            Self::Avro(inner) => {
714                f.write_str("AVRO ");
715                f.write_node(inner);
716            }
717            Self::Protobuf(inner) => {
718                f.write_str("PROTOBUF ");
719                f.write_node(inner);
720            }
721            Self::Regex(regex) => {
722                f.write_str("REGEX '");
723                f.write_node(&display::escape_single_quote_string(regex));
724                f.write_str("'");
725            }
726            Self::Csv { columns, delimiter } => {
727                f.write_str("CSV WITH ");
728                f.write_node(columns);
729
730                if *delimiter != ',' {
731                    f.write_str(" DELIMITED BY '");
732                    f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
733                    f.write_str("'");
734                }
735            }
736            Self::Json { array } => {
737                f.write_str("JSON");
738                if *array {
739                    f.write_str(" ARRAY");
740                }
741            }
742            Self::Text => f.write_str("TEXT"),
743        }
744    }
745}
746impl_display_t!(Format);
747
748// All connection options are bundled together to allow us to parse `ALTER
749// CONNECTION` without specifying the type of connection we're altering. Il faut
750// souffrir pour être belle.
751#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
752pub enum ConnectionOptionName {
753    AccessKeyId,
754    AssumeRoleArn,
755    AssumeRoleSessionName,
756    AvailabilityZones,
757    AwsConnection,
758    AwsPrivatelink,
759    Broker,
760    Brokers,
761    Credential,
762    Database,
763    Endpoint,
764    Host,
765    Password,
766    Port,
767    ProgressTopic,
768    ProgressTopicReplicationFactor,
769    PublicKey1,
770    PublicKey2,
771    Region,
772    SaslMechanisms,
773    SaslPassword,
774    SaslUsername,
775    Scope,
776    SecretAccessKey,
777    SecurityProtocol,
778    ServiceName,
779    SshTunnel,
780    SslCertificate,
781    SslCertificateAuthority,
782    SslKey,
783    SslMode,
784    SessionToken,
785    CatalogType,
786    Url,
787    User,
788    Warehouse,
789}
790
791impl AstDisplay for ConnectionOptionName {
792    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
793        f.write_str(match self {
794            ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
795            ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
796            ConnectionOptionName::AwsConnection => "AWS CONNECTION",
797            ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
798            ConnectionOptionName::Broker => "BROKER",
799            ConnectionOptionName::Brokers => "BROKERS",
800            ConnectionOptionName::Credential => "CREDENTIAL",
801            ConnectionOptionName::Database => "DATABASE",
802            ConnectionOptionName::Endpoint => "ENDPOINT",
803            ConnectionOptionName::Host => "HOST",
804            ConnectionOptionName::Password => "PASSWORD",
805            ConnectionOptionName::Port => "PORT",
806            ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
807            ConnectionOptionName::ProgressTopicReplicationFactor => {
808                "PROGRESS TOPIC REPLICATION FACTOR"
809            }
810            ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
811            ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
812            ConnectionOptionName::Region => "REGION",
813            ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
814            ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
815            ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
816            ConnectionOptionName::SaslPassword => "SASL PASSWORD",
817            ConnectionOptionName::SaslUsername => "SASL USERNAME",
818            ConnectionOptionName::Scope => "SCOPE",
819            ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
820            ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
821            ConnectionOptionName::ServiceName => "SERVICE NAME",
822            ConnectionOptionName::SshTunnel => "SSH TUNNEL",
823            ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
824            ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
825            ConnectionOptionName::SslKey => "SSL KEY",
826            ConnectionOptionName::SslMode => "SSL MODE",
827            ConnectionOptionName::SessionToken => "SESSION TOKEN",
828            ConnectionOptionName::CatalogType => "CATALOG TYPE",
829            ConnectionOptionName::Url => "URL",
830            ConnectionOptionName::User => "USER",
831            ConnectionOptionName::Warehouse => "WAREHOUSE",
832        })
833    }
834}
835impl_display!(ConnectionOptionName);
836
837impl WithOptionName for ConnectionOptionName {
838    /// # WARNING
839    ///
840    /// Whenever implementing this trait consider very carefully whether or not
841    /// this value could contain sensitive user data. If you're uncertain, err
842    /// on the conservative side and return `true`.
843    fn redact_value(&self) -> bool {
844        match self {
845            ConnectionOptionName::AccessKeyId
846            | ConnectionOptionName::AvailabilityZones
847            | ConnectionOptionName::AwsConnection
848            | ConnectionOptionName::AwsPrivatelink
849            | ConnectionOptionName::Broker
850            | ConnectionOptionName::Brokers
851            | ConnectionOptionName::Credential
852            | ConnectionOptionName::Database
853            | ConnectionOptionName::Endpoint
854            | ConnectionOptionName::Host
855            | ConnectionOptionName::Password
856            | ConnectionOptionName::Port
857            | ConnectionOptionName::ProgressTopic
858            | ConnectionOptionName::ProgressTopicReplicationFactor
859            | ConnectionOptionName::PublicKey1
860            | ConnectionOptionName::PublicKey2
861            | ConnectionOptionName::Region
862            | ConnectionOptionName::AssumeRoleArn
863            | ConnectionOptionName::AssumeRoleSessionName
864            | ConnectionOptionName::SaslMechanisms
865            | ConnectionOptionName::SaslPassword
866            | ConnectionOptionName::SaslUsername
867            | ConnectionOptionName::Scope
868            | ConnectionOptionName::SecurityProtocol
869            | ConnectionOptionName::SecretAccessKey
870            | ConnectionOptionName::ServiceName
871            | ConnectionOptionName::SshTunnel
872            | ConnectionOptionName::SslCertificate
873            | ConnectionOptionName::SslCertificateAuthority
874            | ConnectionOptionName::SslKey
875            | ConnectionOptionName::SslMode
876            | ConnectionOptionName::SessionToken
877            | ConnectionOptionName::CatalogType
878            | ConnectionOptionName::Url
879            | ConnectionOptionName::User
880            | ConnectionOptionName::Warehouse => false,
881        }
882    }
883}
884
885#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
886/// An option in a `CREATE CONNECTION`.
887pub struct ConnectionOption<T: AstInfo> {
888    pub name: ConnectionOptionName,
889    pub value: Option<WithOptionValue<T>>,
890}
891impl_display_for_with_option!(ConnectionOption);
892impl_display_t!(ConnectionOption);
893
894#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
895pub enum CreateConnectionType {
896    Aws,
897    AwsPrivatelink,
898    Kafka,
899    Csr,
900    Postgres,
901    Ssh,
902    SqlServer,
903    MySql,
904    Yugabyte,
905    IcebergCatalog,
906}
907
908impl AstDisplay for CreateConnectionType {
909    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
910        match self {
911            Self::Kafka => {
912                f.write_str("KAFKA");
913            }
914            Self::Csr => {
915                f.write_str("CONFLUENT SCHEMA REGISTRY");
916            }
917            Self::Postgres => {
918                f.write_str("POSTGRES");
919            }
920            Self::Aws => {
921                f.write_str("AWS");
922            }
923            Self::AwsPrivatelink => {
924                f.write_str("AWS PRIVATELINK");
925            }
926            Self::Ssh => {
927                f.write_str("SSH TUNNEL");
928            }
929            Self::SqlServer => {
930                f.write_str("SQL SERVER");
931            }
932            Self::MySql => {
933                f.write_str("MYSQL");
934            }
935            Self::Yugabyte => {
936                f.write_str("YUGABYTE");
937            }
938            Self::IcebergCatalog => {
939                f.write_str("ICEBERG CATALOG");
940            }
941        }
942    }
943}
944impl_display!(CreateConnectionType);
945
946#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
947pub enum CreateConnectionOptionName {
948    Validate,
949}
950
951impl AstDisplay for CreateConnectionOptionName {
952    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
953        f.write_str(match self {
954            CreateConnectionOptionName::Validate => "VALIDATE",
955        })
956    }
957}
958impl_display!(CreateConnectionOptionName);
959
960impl WithOptionName for CreateConnectionOptionName {
961    /// # WARNING
962    ///
963    /// Whenever implementing this trait consider very carefully whether or not
964    /// this value could contain sensitive user data. If you're uncertain, err
965    /// on the conservative side and return `true`.
966    fn redact_value(&self) -> bool {
967        match self {
968            CreateConnectionOptionName::Validate => false,
969        }
970    }
971}
972
973#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
974/// An option in a `CREATE CONNECTION...` statement.
975pub struct CreateConnectionOption<T: AstInfo> {
976    pub name: CreateConnectionOptionName,
977    pub value: Option<WithOptionValue<T>>,
978}
979impl_display_for_with_option!(CreateConnectionOption);
980impl_display_t!(CreateConnectionOption);
981
982#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
983pub enum KafkaSourceConfigOptionName {
984    GroupIdPrefix,
985    Topic,
986    TopicMetadataRefreshInterval,
987    StartTimestamp,
988    StartOffset,
989}
990
991impl AstDisplay for KafkaSourceConfigOptionName {
992    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
993        f.write_str(match self {
994            KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
995            KafkaSourceConfigOptionName::Topic => "TOPIC",
996            KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
997                "TOPIC METADATA REFRESH INTERVAL"
998            }
999            KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
1000            KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
1001        })
1002    }
1003}
1004impl_display!(KafkaSourceConfigOptionName);
1005
1006impl WithOptionName for KafkaSourceConfigOptionName {
1007    /// # WARNING
1008    ///
1009    /// Whenever implementing this trait consider very carefully whether or not
1010    /// this value could contain sensitive user data. If you're uncertain, err
1011    /// on the conservative side and return `true`.
1012    fn redact_value(&self) -> bool {
1013        match self {
1014            KafkaSourceConfigOptionName::GroupIdPrefix
1015            | KafkaSourceConfigOptionName::Topic
1016            | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1017            | KafkaSourceConfigOptionName::StartOffset
1018            | KafkaSourceConfigOptionName::StartTimestamp => false,
1019        }
1020    }
1021}
1022
1023#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1024pub struct KafkaSourceConfigOption<T: AstInfo> {
1025    pub name: KafkaSourceConfigOptionName,
1026    pub value: Option<WithOptionValue<T>>,
1027}
1028impl_display_for_with_option!(KafkaSourceConfigOption);
1029impl_display_t!(KafkaSourceConfigOption);
1030
1031#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1032pub enum KafkaSinkConfigOptionName {
1033    CompressionType,
1034    PartitionBy,
1035    ProgressGroupIdPrefix,
1036    Topic,
1037    TransactionalIdPrefix,
1038    LegacyIds,
1039    TopicConfig,
1040    TopicMetadataRefreshInterval,
1041    TopicPartitionCount,
1042    TopicReplicationFactor,
1043}
1044
1045impl AstDisplay for KafkaSinkConfigOptionName {
1046    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1047        f.write_str(match self {
1048            KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1049            KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1050            KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1051            KafkaSinkConfigOptionName::Topic => "TOPIC",
1052            KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1053            KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1054            KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1055            KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1056                "TOPIC METADATA REFRESH INTERVAL"
1057            }
1058            KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1059            KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1060        })
1061    }
1062}
1063impl_display!(KafkaSinkConfigOptionName);
1064
1065impl WithOptionName for KafkaSinkConfigOptionName {
1066    /// # WARNING
1067    ///
1068    /// Whenever implementing this trait consider very carefully whether or not
1069    /// this value could contain sensitive user data. If you're uncertain, err
1070    /// on the conservative side and return `true`.
1071    fn redact_value(&self) -> bool {
1072        match self {
1073            KafkaSinkConfigOptionName::CompressionType
1074            | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1075            | KafkaSinkConfigOptionName::Topic
1076            | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1077            | KafkaSinkConfigOptionName::TransactionalIdPrefix
1078            | KafkaSinkConfigOptionName::LegacyIds
1079            | KafkaSinkConfigOptionName::TopicConfig
1080            | KafkaSinkConfigOptionName::TopicPartitionCount
1081            | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1082            KafkaSinkConfigOptionName::PartitionBy => true,
1083        }
1084    }
1085}
1086
1087#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1088pub struct KafkaSinkConfigOption<T: AstInfo> {
1089    pub name: KafkaSinkConfigOptionName,
1090    pub value: Option<WithOptionValue<T>>,
1091}
1092impl_display_for_with_option!(KafkaSinkConfigOption);
1093impl_display_t!(KafkaSinkConfigOption);
1094
1095#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1096pub enum PgConfigOptionName {
1097    /// Hex encoded string of binary serialization of
1098    /// `mz_storage_types::sources::postgres::PostgresSourcePublicationDetails`
1099    Details,
1100    /// The name of the publication to sync
1101    Publication,
1102    /// Columns whose types you want to unconditionally format as text
1103    /// NOTE(roshan): This value is kept around to allow round-tripping a
1104    /// `CREATE SOURCE` statement while we still allow creating implicit
1105    /// subsources from `CREATE SOURCE`, but will be removed once
1106    /// fully deprecating that feature and forcing users to use explicit
1107    /// `CREATE TABLE .. FROM SOURCE` statements
1108    TextColumns,
1109}
1110
1111impl AstDisplay for PgConfigOptionName {
1112    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1113        f.write_str(match self {
1114            PgConfigOptionName::Details => "DETAILS",
1115            PgConfigOptionName::Publication => "PUBLICATION",
1116            PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1117        })
1118    }
1119}
1120impl_display!(PgConfigOptionName);
1121
1122impl WithOptionName for PgConfigOptionName {
1123    /// # WARNING
1124    ///
1125    /// Whenever implementing this trait consider very carefully whether or not
1126    /// this value could contain sensitive user data. If you're uncertain, err
1127    /// on the conservative side and return `true`.
1128    fn redact_value(&self) -> bool {
1129        match self {
1130            PgConfigOptionName::Details
1131            | PgConfigOptionName::Publication
1132            | PgConfigOptionName::TextColumns => false,
1133        }
1134    }
1135}
1136
1137#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1138/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1139pub struct PgConfigOption<T: AstInfo> {
1140    pub name: PgConfigOptionName,
1141    pub value: Option<WithOptionValue<T>>,
1142}
1143impl_display_for_with_option!(PgConfigOption);
1144impl_display_t!(PgConfigOption);
1145
1146#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1147pub enum MySqlConfigOptionName {
1148    /// Hex encoded string of binary serialization of
1149    /// `mz_storage_types::sources::mysql::MySqlSourceDetails`
1150    Details,
1151    /// Columns whose types you want to unconditionally format as text
1152    /// NOTE(roshan): This value is kept around to allow round-tripping a
1153    /// `CREATE SOURCE` statement while we still allow creating implicit
1154    /// subsources from `CREATE SOURCE`, but will be removed once
1155    /// fully deprecating that feature and forcing users to use explicit
1156    /// `CREATE TABLE .. FROM SOURCE` statements
1157    TextColumns,
1158    /// Columns you want to exclude
1159    /// NOTE(roshan): This value is kept around to allow round-tripping a
1160    /// `CREATE SOURCE` statement while we still allow creating implicit
1161    /// subsources from `CREATE SOURCE`, but will be removed once
1162    /// fully deprecating that feature and forcing users to use explicit
1163    /// `CREATE TABLE .. FROM SOURCE` statements
1164    ExcludeColumns,
1165}
1166
1167impl AstDisplay for MySqlConfigOptionName {
1168    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1169        f.write_str(match self {
1170            MySqlConfigOptionName::Details => "DETAILS",
1171            MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1172            MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1173        })
1174    }
1175}
1176impl_display!(MySqlConfigOptionName);
1177
1178impl WithOptionName for MySqlConfigOptionName {
1179    /// # WARNING
1180    ///
1181    /// Whenever implementing this trait consider very carefully whether or not
1182    /// this value could contain sensitive user data. If you're uncertain, err
1183    /// on the conservative side and return `true`.
1184    fn redact_value(&self) -> bool {
1185        match self {
1186            MySqlConfigOptionName::Details
1187            | MySqlConfigOptionName::TextColumns
1188            | MySqlConfigOptionName::ExcludeColumns => false,
1189        }
1190    }
1191}
1192
1193#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1194/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1195pub struct MySqlConfigOption<T: AstInfo> {
1196    pub name: MySqlConfigOptionName,
1197    pub value: Option<WithOptionValue<T>>,
1198}
1199impl_display_for_with_option!(MySqlConfigOption);
1200impl_display_t!(MySqlConfigOption);
1201
1202#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1203pub enum SqlServerConfigOptionName {
1204    /// Hex encoded string of binary serialization of
1205    /// `mz_storage_types::sources::sql_server::SqlServerSourceDetails`.
1206    Details,
1207    /// Columns whose types you want to unconditionally format as text.
1208    ///
1209    /// NOTE(roshan): This value is kept around to allow round-tripping a
1210    /// `CREATE SOURCE` statement while we still allow creating implicit
1211    /// subsources from `CREATE SOURCE`, but will be removed once
1212    /// fully deprecating that feature and forcing users to use explicit
1213    /// `CREATE TABLE .. FROM SOURCE` statements
1214    TextColumns,
1215    /// Columns you want to exclude.
1216    ///
1217    /// NOTE(roshan): This value is kept around to allow round-tripping a
1218    /// `CREATE SOURCE` statement while we still allow creating implicit
1219    /// subsources from `CREATE SOURCE`, but will be removed once
1220    /// fully deprecating that feature and forcing users to use explicit
1221    /// `CREATE TABLE .. FROM SOURCE` statements
1222    ExcludeColumns,
1223}
1224
1225impl AstDisplay for SqlServerConfigOptionName {
1226    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1227        f.write_str(match self {
1228            SqlServerConfigOptionName::Details => "DETAILS",
1229            SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1230            SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1231        })
1232    }
1233}
1234impl_display!(SqlServerConfigOptionName);
1235
1236impl WithOptionName for SqlServerConfigOptionName {
1237    /// # WARNING
1238    ///
1239    /// Whenever implementing this trait consider very carefully whether or not
1240    /// this value could contain sensitive user data. If you're uncertain, err
1241    /// on the conservative side and return `true`.
1242    fn redact_value(&self) -> bool {
1243        match self {
1244            SqlServerConfigOptionName::Details
1245            | SqlServerConfigOptionName::TextColumns
1246            | SqlServerConfigOptionName::ExcludeColumns => false,
1247        }
1248    }
1249}
1250
1251#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1252/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1253pub struct SqlServerConfigOption<T: AstInfo> {
1254    pub name: SqlServerConfigOptionName,
1255    pub value: Option<WithOptionValue<T>>,
1256}
1257impl_display_for_with_option!(SqlServerConfigOption);
1258impl_display_t!(SqlServerConfigOption);
1259
1260#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1261pub enum CreateSourceConnection<T: AstInfo> {
1262    Kafka {
1263        connection: T::ItemName,
1264        options: Vec<KafkaSourceConfigOption<T>>,
1265    },
1266    Postgres {
1267        connection: T::ItemName,
1268        options: Vec<PgConfigOption<T>>,
1269    },
1270    Yugabyte {
1271        connection: T::ItemName,
1272        options: Vec<PgConfigOption<T>>,
1273    },
1274    SqlServer {
1275        connection: T::ItemName,
1276        options: Vec<SqlServerConfigOption<T>>,
1277    },
1278    MySql {
1279        connection: T::ItemName,
1280        options: Vec<MySqlConfigOption<T>>,
1281    },
1282    LoadGenerator {
1283        generator: LoadGenerator,
1284        options: Vec<LoadGeneratorOption<T>>,
1285    },
1286}
1287
1288impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1289    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1290        match self {
1291            CreateSourceConnection::Kafka {
1292                connection,
1293                options,
1294            } => {
1295                f.write_str("KAFKA CONNECTION ");
1296                f.write_node(connection);
1297                if !options.is_empty() {
1298                    f.write_str(" (");
1299                    f.write_node(&display::comma_separated(options));
1300                    f.write_str(")");
1301                }
1302            }
1303            CreateSourceConnection::Postgres {
1304                connection,
1305                options,
1306            } => {
1307                f.write_str("POSTGRES CONNECTION ");
1308                f.write_node(connection);
1309                if !options.is_empty() {
1310                    f.write_str(" (");
1311                    f.write_node(&display::comma_separated(options));
1312                    f.write_str(")");
1313                }
1314            }
1315            CreateSourceConnection::Yugabyte {
1316                connection,
1317                options,
1318            } => {
1319                f.write_str("YUGABYTE CONNECTION ");
1320                f.write_node(connection);
1321                if !options.is_empty() {
1322                    f.write_str(" (");
1323                    f.write_node(&display::comma_separated(options));
1324                    f.write_str(")");
1325                }
1326            }
1327            CreateSourceConnection::SqlServer {
1328                connection,
1329                options,
1330            } => {
1331                f.write_str("SQL SERVER CONNECTION ");
1332                f.write_node(connection);
1333                if !options.is_empty() {
1334                    f.write_str(" (");
1335                    f.write_node(&display::comma_separated(options));
1336                    f.write_str(")");
1337                }
1338            }
1339            CreateSourceConnection::MySql {
1340                connection,
1341                options,
1342            } => {
1343                f.write_str("MYSQL CONNECTION ");
1344                f.write_node(connection);
1345                if !options.is_empty() {
1346                    f.write_str(" (");
1347                    f.write_node(&display::comma_separated(options));
1348                    f.write_str(")");
1349                }
1350            }
1351            CreateSourceConnection::LoadGenerator { generator, options } => {
1352                f.write_str("LOAD GENERATOR ");
1353                f.write_node(generator);
1354                if !options.is_empty() {
1355                    f.write_str(" (");
1356                    f.write_node(&display::comma_separated(options));
1357                    f.write_str(")");
1358                }
1359            }
1360        }
1361    }
1362}
1363impl_display_t!(CreateSourceConnection);
1364
1365#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1366pub enum LoadGenerator {
1367    Clock,
1368    Counter,
1369    Marketing,
1370    Auction,
1371    Datums,
1372    Tpch,
1373    KeyValue,
1374}
1375
1376impl AstDisplay for LoadGenerator {
1377    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1378        match self {
1379            Self::Counter => f.write_str("COUNTER"),
1380            Self::Clock => f.write_str("CLOCK"),
1381            Self::Marketing => f.write_str("MARKETING"),
1382            Self::Auction => f.write_str("AUCTION"),
1383            Self::Datums => f.write_str("DATUMS"),
1384            Self::Tpch => f.write_str("TPCH"),
1385            Self::KeyValue => f.write_str("KEY VALUE"),
1386        }
1387    }
1388}
1389impl_display!(LoadGenerator);
1390
1391impl LoadGenerator {
1392    /// Corresponds with the same mapping on the `LoadGenerator` enum defined in
1393    /// src/storage-types/src/sources/load_generator.rs, but re-defined here for
1394    /// cases where we only have the AST representation. This can be removed once
1395    /// the `ast_rewrite_sources_to_tables` migration is removed.
1396    pub fn schema_name(&self) -> &'static str {
1397        match self {
1398            LoadGenerator::Counter => "counter",
1399            LoadGenerator::Clock => "clock",
1400            LoadGenerator::Marketing => "marketing",
1401            LoadGenerator::Auction => "auction",
1402            LoadGenerator::Datums => "datums",
1403            LoadGenerator::Tpch => "tpch",
1404            LoadGenerator::KeyValue => "key_value",
1405        }
1406    }
1407}
1408
1409#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1410pub enum LoadGeneratorOptionName {
1411    ScaleFactor,
1412    TickInterval,
1413    AsOf,
1414    UpTo,
1415    MaxCardinality,
1416    Keys,
1417    SnapshotRounds,
1418    TransactionalSnapshot,
1419    ValueSize,
1420    Seed,
1421    Partitions,
1422    BatchSize,
1423}
1424
1425impl AstDisplay for LoadGeneratorOptionName {
1426    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1427        f.write_str(match self {
1428            LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1429            LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1430            LoadGeneratorOptionName::AsOf => "AS OF",
1431            LoadGeneratorOptionName::UpTo => "UP TO",
1432            LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1433            LoadGeneratorOptionName::Keys => "KEYS",
1434            LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1435            LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1436            LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1437            LoadGeneratorOptionName::Seed => "SEED",
1438            LoadGeneratorOptionName::Partitions => "PARTITIONS",
1439            LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1440        })
1441    }
1442}
1443impl_display!(LoadGeneratorOptionName);
1444
1445impl WithOptionName for LoadGeneratorOptionName {
1446    /// # WARNING
1447    ///
1448    /// Whenever implementing this trait consider very carefully whether or not
1449    /// this value could contain sensitive user data. If you're uncertain, err
1450    /// on the conservative side and return `true`.
1451    fn redact_value(&self) -> bool {
1452        match self {
1453            LoadGeneratorOptionName::ScaleFactor
1454            | LoadGeneratorOptionName::TickInterval
1455            | LoadGeneratorOptionName::AsOf
1456            | LoadGeneratorOptionName::UpTo
1457            | LoadGeneratorOptionName::MaxCardinality
1458            | LoadGeneratorOptionName::Keys
1459            | LoadGeneratorOptionName::SnapshotRounds
1460            | LoadGeneratorOptionName::TransactionalSnapshot
1461            | LoadGeneratorOptionName::ValueSize
1462            | LoadGeneratorOptionName::Partitions
1463            | LoadGeneratorOptionName::BatchSize
1464            | LoadGeneratorOptionName::Seed => false,
1465        }
1466    }
1467}
1468
1469#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1470/// An option in a `CREATE CONNECTION...SSH`.
1471pub struct LoadGeneratorOption<T: AstInfo> {
1472    pub name: LoadGeneratorOptionName,
1473    pub value: Option<WithOptionValue<T>>,
1474}
1475impl_display_for_with_option!(LoadGeneratorOption);
1476impl_display_t!(LoadGeneratorOption);
1477
1478#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1479pub enum CreateSinkConnection<T: AstInfo> {
1480    Kafka {
1481        connection: T::ItemName,
1482        options: Vec<KafkaSinkConfigOption<T>>,
1483        key: Option<KafkaSinkKey>,
1484        headers: Option<Ident>,
1485    },
1486}
1487
1488impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1489    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1490        match self {
1491            CreateSinkConnection::Kafka {
1492                connection,
1493                options,
1494                key,
1495                headers,
1496            } => {
1497                f.write_str("KAFKA CONNECTION ");
1498                f.write_node(connection);
1499                if !options.is_empty() {
1500                    f.write_str(" (");
1501                    f.write_node(&display::comma_separated(options));
1502                    f.write_str(")");
1503                }
1504                if let Some(key) = key.as_ref() {
1505                    f.write_node(key);
1506                }
1507                if let Some(headers) = headers {
1508                    f.write_str(" HEADERS ");
1509                    f.write_node(headers);
1510                }
1511            }
1512        }
1513    }
1514}
1515impl_display_t!(CreateSinkConnection);
1516
1517#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1518pub struct KafkaSinkKey {
1519    pub key_columns: Vec<Ident>,
1520    pub not_enforced: bool,
1521}
1522
1523impl AstDisplay for KafkaSinkKey {
1524    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1525        f.write_str(" KEY (");
1526        f.write_node(&display::comma_separated(&self.key_columns));
1527        f.write_str(")");
1528        if self.not_enforced {
1529            f.write_str(" NOT ENFORCED");
1530        }
1531    }
1532}
1533
1534/// A table-level constraint, specified in a `CREATE TABLE` or an
1535/// `ALTER TABLE ADD <constraint>` statement.
1536#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1537pub enum TableConstraint<T: AstInfo> {
1538    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE (NULLS NOT DISTINCT)? } (<columns>)`
1539    Unique {
1540        name: Option<Ident>,
1541        columns: Vec<Ident>,
1542        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
1543        is_primary: bool,
1544        // Where this constraint treats each NULL value as distinct; only available on `UNIQUE`
1545        // constraints.
1546        nulls_not_distinct: bool,
1547    },
1548    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
1549    /// REFERENCES <foreign_table> (<referred_columns>)`)
1550    ForeignKey {
1551        name: Option<Ident>,
1552        columns: Vec<Ident>,
1553        foreign_table: T::ItemName,
1554        referred_columns: Vec<Ident>,
1555    },
1556    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
1557    Check {
1558        name: Option<Ident>,
1559        expr: Box<Expr<T>>,
1560    },
1561}
1562
1563impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1564    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1565        match self {
1566            TableConstraint::Unique {
1567                name,
1568                columns,
1569                is_primary,
1570                nulls_not_distinct,
1571            } => {
1572                f.write_node(&display_constraint_name(name));
1573                if *is_primary {
1574                    f.write_str("PRIMARY KEY ");
1575                } else {
1576                    f.write_str("UNIQUE ");
1577                    if *nulls_not_distinct {
1578                        f.write_str("NULLS NOT DISTINCT ");
1579                    }
1580                }
1581                f.write_str("(");
1582                f.write_node(&display::comma_separated(columns));
1583                f.write_str(")");
1584            }
1585            TableConstraint::ForeignKey {
1586                name,
1587                columns,
1588                foreign_table,
1589                referred_columns,
1590            } => {
1591                f.write_node(&display_constraint_name(name));
1592                f.write_str("FOREIGN KEY (");
1593                f.write_node(&display::comma_separated(columns));
1594                f.write_str(") REFERENCES ");
1595                f.write_node(foreign_table);
1596                f.write_str("(");
1597                f.write_node(&display::comma_separated(referred_columns));
1598                f.write_str(")");
1599            }
1600            TableConstraint::Check { name, expr } => {
1601                f.write_node(&display_constraint_name(name));
1602                f.write_str("CHECK (");
1603                f.write_node(&expr);
1604                f.write_str(")");
1605            }
1606        }
1607    }
1608}
1609impl_display_t!(TableConstraint);
1610
1611/// A key constraint, specified in a `CREATE SOURCE`.
1612#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1613pub enum KeyConstraint {
1614    // PRIMARY KEY (<columns>) NOT ENFORCED
1615    PrimaryKeyNotEnforced { columns: Vec<Ident> },
1616}
1617
1618impl AstDisplay for KeyConstraint {
1619    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1620        match self {
1621            KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1622                f.write_str("PRIMARY KEY ");
1623                f.write_str("(");
1624                f.write_node(&display::comma_separated(columns));
1625                f.write_str(") ");
1626                f.write_str("NOT ENFORCED");
1627            }
1628        }
1629    }
1630}
1631impl_display!(KeyConstraint);
1632
1633#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1634pub enum CreateSourceOptionName {
1635    TimestampInterval,
1636    RetainHistory,
1637}
1638
1639impl AstDisplay for CreateSourceOptionName {
1640    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1641        f.write_str(match self {
1642            CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1643            CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1644        })
1645    }
1646}
1647impl_display!(CreateSourceOptionName);
1648
1649impl WithOptionName for CreateSourceOptionName {
1650    /// # WARNING
1651    ///
1652    /// Whenever implementing this trait consider very carefully whether or not
1653    /// this value could contain sensitive user data. If you're uncertain, err
1654    /// on the conservative side and return `true`.
1655    fn redact_value(&self) -> bool {
1656        match self {
1657            CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1658                false
1659            }
1660        }
1661    }
1662}
1663
1664#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1665/// An option in a `CREATE SOURCE...` statement.
1666pub struct CreateSourceOption<T: AstInfo> {
1667    pub name: CreateSourceOptionName,
1668    pub value: Option<WithOptionValue<T>>,
1669}
1670impl_display_for_with_option!(CreateSourceOption);
1671impl_display_t!(CreateSourceOption);
1672
1673/// SQL column definition
1674#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1675pub struct ColumnDef<T: AstInfo> {
1676    pub name: Ident,
1677    pub data_type: T::DataType,
1678    pub collation: Option<UnresolvedItemName>,
1679    pub options: Vec<ColumnOptionDef<T>>,
1680}
1681
1682impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1683    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1684        f.write_node(&self.name);
1685        f.write_str(" ");
1686        f.write_node(&self.data_type);
1687        if let Some(collation) = &self.collation {
1688            f.write_str(" COLLATE ");
1689            f.write_node(collation);
1690        }
1691        for option in &self.options {
1692            f.write_str(" ");
1693            f.write_node(option);
1694        }
1695    }
1696}
1697impl_display_t!(ColumnDef);
1698
1699/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
1700///
1701/// Note that implementations are substantially more permissive than the ANSI
1702/// specification on what order column options can be presented in, and whether
1703/// they are allowed to be named. The specification distinguishes between
1704/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
1705/// and can appear in any order, and other options (DEFAULT, GENERATED), which
1706/// cannot be named and must appear in a fixed order. PostgreSQL, however,
1707/// allows preceding any option with `CONSTRAINT <name>`, even those that are
1708/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
1709/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
1710/// NOT NULL constraints (the last of which is in violation of the spec).
1711///
1712/// For maximum flexibility, we don't distinguish between constraint and
1713/// non-constraint options, lumping them all together under the umbrella of
1714/// "column options," and we allow any column option to be named.
1715#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1716pub struct ColumnOptionDef<T: AstInfo> {
1717    pub name: Option<Ident>,
1718    pub option: ColumnOption<T>,
1719}
1720
1721impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1722    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1723        f.write_node(&display_constraint_name(&self.name));
1724        f.write_node(&self.option);
1725    }
1726}
1727impl_display_t!(ColumnOptionDef);
1728
1729/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
1730/// TABLE` statement.
1731#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1732pub enum ColumnOption<T: AstInfo> {
1733    /// `NULL`
1734    Null,
1735    /// `NOT NULL`
1736    NotNull,
1737    /// `DEFAULT <restricted-expr>`
1738    Default(Expr<T>),
1739    /// `{ PRIMARY KEY | UNIQUE }`
1740    Unique { is_primary: bool },
1741    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
1742    /// <foreign_table> (<referred_columns>)`).
1743    ForeignKey {
1744        foreign_table: UnresolvedItemName,
1745        referred_columns: Vec<Ident>,
1746    },
1747    /// `CHECK (<expr>)`
1748    Check(Expr<T>),
1749    /// `VERSION <action> <version>`
1750    Versioned {
1751        action: ColumnVersioned,
1752        version: Version,
1753    },
1754}
1755
1756impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1757    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1758        use ColumnOption::*;
1759        match self {
1760            Null => f.write_str("NULL"),
1761            NotNull => f.write_str("NOT NULL"),
1762            Default(expr) => {
1763                f.write_str("DEFAULT ");
1764                f.write_node(expr);
1765            }
1766            Unique { is_primary } => {
1767                if *is_primary {
1768                    f.write_str("PRIMARY KEY");
1769                } else {
1770                    f.write_str("UNIQUE");
1771                }
1772            }
1773            ForeignKey {
1774                foreign_table,
1775                referred_columns,
1776            } => {
1777                f.write_str("REFERENCES ");
1778                f.write_node(foreign_table);
1779                f.write_str(" (");
1780                f.write_node(&display::comma_separated(referred_columns));
1781                f.write_str(")");
1782            }
1783            Check(expr) => {
1784                f.write_str("CHECK (");
1785                f.write_node(expr);
1786                f.write_str(")");
1787            }
1788            Versioned { action, version } => {
1789                f.write_str("VERSION ");
1790                f.write_node(action);
1791                f.write_str(" ");
1792                f.write_node(version);
1793            }
1794        }
1795    }
1796}
1797impl_display_t!(ColumnOption);
1798
1799#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1800pub enum ColumnVersioned {
1801    Added,
1802}
1803
1804impl AstDisplay for ColumnVersioned {
1805    fn fmt<W>(&self, f: &mut AstFormatter<W>)
1806    where
1807        W: fmt::Write,
1808    {
1809        match self {
1810            // TODO(alter_table): Support dropped columns.
1811            ColumnVersioned::Added => f.write_str("ADDED"),
1812        }
1813    }
1814}
1815impl_display!(ColumnVersioned);
1816
1817fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
1818    struct ConstraintName<'a>(&'a Option<Ident>);
1819    impl<'a> AstDisplay for ConstraintName<'a> {
1820        fn fmt<W>(&self, f: &mut AstFormatter<W>)
1821        where
1822            W: fmt::Write,
1823        {
1824            if let Some(name) = self.0 {
1825                f.write_str("CONSTRAINT ");
1826                f.write_node(name);
1827                f.write_str(" ");
1828            }
1829        }
1830    }
1831    ConstraintName(name)
1832}