mz_sql_parser/ast/defs/
ddl.rs

1// Copyright 2018 sqlparser-rs contributors. All rights reserved.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// This file is derived from the sqlparser-rs project, available at
5// https://github.com/andygrove/sqlparser-rs. It was incorporated
6// directly into Materialize on December 21, 2019.
7//
8// Licensed under the Apache License, Version 2.0 (the "License");
9// you may not use this file except in compliance with the License.
10// You may obtain a copy of the License in the LICENSE file at the
11// root of this repository, or online at
12//
13//     http://www.apache.org/licenses/LICENSE-2.0
14//
15// Unless required by applicable law or agreed to in writing, software
16// distributed under the License is distributed on an "AS IS" BASIS,
17// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18// See the License for the specific language governing permissions and
19// limitations under the License.
20
21//! AST types specific to CREATE/ALTER variants of [crate::ast::Statement]
22//! (commonly referred to as Data Definition Language, or DDL)
23
24use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28    AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33    /// The `ASSERT NOT NULL [=] <ident>` option.
34    AssertNotNull,
35    PartitionBy,
36    RetainHistory,
37    /// The `REFRESH [=] ...` option.
38    Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43        match self {
44            MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45            MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46            MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47            MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48        }
49    }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53    /// # WARNING
54    ///
55    /// Whenever implementing this trait consider very carefully whether or not
56    /// this value could contain sensitive user data. If you're uncertain, err
57    /// on the conservative side and return `true`.
58    fn redact_value(&self) -> bool {
59        match self {
60            MaterializedViewOptionName::AssertNotNull
61            | MaterializedViewOptionName::PartitionBy
62            | MaterializedViewOptionName::RetainHistory
63            | MaterializedViewOptionName::Refresh => false,
64        }
65    }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70    pub name: MaterializedViewOptionName,
71    pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
76pub enum ContinualTaskOptionName {
77    /// The `SNAPSHOT [=] ...` option.
78    Snapshot,
79}
80
81impl AstDisplay for ContinualTaskOptionName {
82    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
83        match self {
84            ContinualTaskOptionName::Snapshot => f.write_str("SNAPSHOT"),
85        }
86    }
87}
88
89impl WithOptionName for ContinualTaskOptionName {
90    /// # WARNING
91    ///
92    /// Whenever implementing this trait consider very carefully whether or not
93    /// this value could contain sensitive user data. If you're uncertain, err
94    /// on the conservative side and return `true`.
95    fn redact_value(&self) -> bool {
96        match self {
97            ContinualTaskOptionName::Snapshot => false,
98        }
99    }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
103pub struct ContinualTaskOption<T: AstInfo> {
104    pub name: ContinualTaskOptionName,
105    pub value: Option<WithOptionValue<T>>,
106}
107impl_display_for_with_option!(ContinualTaskOption);
108
109#[derive(Debug, Clone, PartialEq, Eq, Hash)]
110pub struct Schema {
111    pub schema: String,
112}
113
114impl AstDisplay for Schema {
115    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
116        f.write_str("SCHEMA '");
117        f.write_node(&display::escape_single_quote_string(&self.schema));
118        f.write_str("'");
119    }
120}
121impl_display!(Schema);
122
123#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
124pub enum AvroSchemaOptionName {
125    /// The `CONFLUENT WIRE FORMAT [=] <bool>` option.
126    ConfluentWireFormat,
127}
128
129impl AstDisplay for AvroSchemaOptionName {
130    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
131        match self {
132            AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
133        }
134    }
135}
136
137impl WithOptionName for AvroSchemaOptionName {
138    /// # WARNING
139    ///
140    /// Whenever implementing this trait consider very carefully whether or not
141    /// this value could contain sensitive user data. If you're uncertain, err
142    /// on the conservative side and return `true`.
143    fn redact_value(&self) -> bool {
144        match self {
145            Self::ConfluentWireFormat => false,
146        }
147    }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
151pub struct AvroSchemaOption<T: AstInfo> {
152    pub name: AvroSchemaOptionName,
153    pub value: Option<WithOptionValue<T>>,
154}
155impl_display_for_with_option!(AvroSchemaOption);
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum AvroSchema<T: AstInfo> {
159    Csr {
160        csr_connection: CsrConnectionAvro<T>,
161    },
162    InlineSchema {
163        schema: Schema,
164        with_options: Vec<AvroSchemaOption<T>>,
165    },
166}
167
168impl<T: AstInfo> AstDisplay for AvroSchema<T> {
169    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
170        match self {
171            Self::Csr { csr_connection } => {
172                f.write_node(csr_connection);
173            }
174            Self::InlineSchema {
175                schema,
176                with_options,
177            } => {
178                f.write_str("USING ");
179                schema.fmt(f);
180                if !with_options.is_empty() {
181                    f.write_str(" (");
182                    f.write_node(&display::comma_separated(with_options));
183                    f.write_str(")");
184                }
185            }
186        }
187    }
188}
189impl_display_t!(AvroSchema);
190
191#[derive(Debug, Clone, PartialEq, Eq, Hash)]
192pub enum ProtobufSchema<T: AstInfo> {
193    Csr {
194        csr_connection: CsrConnectionProtobuf<T>,
195    },
196    InlineSchema {
197        message_name: String,
198        schema: Schema,
199    },
200}
201
202impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
203    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
204        match self {
205            Self::Csr { csr_connection } => {
206                f.write_node(csr_connection);
207            }
208            Self::InlineSchema {
209                message_name,
210                schema,
211            } => {
212                f.write_str("MESSAGE '");
213                f.write_node(&display::escape_single_quote_string(message_name));
214                f.write_str("' USING ");
215                f.write_str(schema);
216            }
217        }
218    }
219}
220impl_display_t!(ProtobufSchema);
221
222#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
223pub enum CsrConfigOptionName<T: AstInfo> {
224    AvroKeyFullname,
225    AvroValueFullname,
226    NullDefaults,
227    AvroDocOn(AvroDocOn<T>),
228    KeyCompatibilityLevel,
229    ValueCompatibilityLevel,
230}
231
232impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
233    /// # WARNING
234    ///
235    /// Whenever implementing this trait consider very carefully whether or not
236    /// this value could contain sensitive user data. If you're uncertain, err
237    /// on the conservative side and return `true`.
238    fn redact_value(&self) -> bool {
239        match self {
240            Self::AvroKeyFullname
241            | Self::AvroValueFullname
242            | Self::NullDefaults
243            | Self::AvroDocOn(_)
244            | Self::KeyCompatibilityLevel
245            | Self::ValueCompatibilityLevel => false,
246        }
247    }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
251pub struct AvroDocOn<T: AstInfo> {
252    pub identifier: DocOnIdentifier<T>,
253    pub for_schema: DocOnSchema,
254}
255#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
256pub enum DocOnSchema {
257    KeyOnly,
258    ValueOnly,
259    All,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
263pub enum DocOnIdentifier<T: AstInfo> {
264    Column(ColumnName<T>),
265    Type(T::ItemName),
266}
267
268impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
269    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
270        match &self.for_schema {
271            DocOnSchema::KeyOnly => f.write_str("KEY "),
272            DocOnSchema::ValueOnly => f.write_str("VALUE "),
273            DocOnSchema::All => {}
274        }
275        match &self.identifier {
276            DocOnIdentifier::Column(name) => {
277                f.write_str("DOC ON COLUMN ");
278                f.write_node(name);
279            }
280            DocOnIdentifier::Type(name) => {
281                f.write_str("DOC ON TYPE ");
282                f.write_node(name);
283            }
284        }
285    }
286}
287impl_display_t!(AvroDocOn);
288
289impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
290    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
291        match self {
292            CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
293            CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
294            CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
295            CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
296            CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
297            CsrConfigOptionName::ValueCompatibilityLevel => {
298                f.write_str("VALUE COMPATIBILITY LEVEL")
299            }
300        }
301    }
302}
303impl_display_t!(CsrConfigOptionName);
304
305#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
306/// An option in a `{FROM|INTO} CONNECTION ...` statement.
307pub struct CsrConfigOption<T: AstInfo> {
308    pub name: CsrConfigOptionName<T>,
309    pub value: Option<WithOptionValue<T>>,
310}
311impl_display_for_with_option!(CsrConfigOption);
312impl_display_t!(CsrConfigOption);
313
314#[derive(Debug, Clone, PartialEq, Eq, Hash)]
315pub struct CsrConnection<T: AstInfo> {
316    pub connection: T::ItemName,
317    pub options: Vec<CsrConfigOption<T>>,
318}
319
320impl<T: AstInfo> AstDisplay for CsrConnection<T> {
321    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
322        f.write_str("CONNECTION ");
323        f.write_node(&self.connection);
324        if !self.options.is_empty() {
325            f.write_str(" (");
326            f.write_node(&display::comma_separated(&self.options));
327            f.write_str(")");
328        }
329    }
330}
331impl_display_t!(CsrConnection);
332
333#[derive(Debug, Clone, PartialEq, Eq, Hash)]
334pub enum ReaderSchemaSelectionStrategy {
335    Latest,
336    Inline(String),
337    ById(i32),
338}
339
340impl Default for ReaderSchemaSelectionStrategy {
341    fn default() -> Self {
342        Self::Latest
343    }
344}
345
346#[derive(Debug, Clone, PartialEq, Eq, Hash)]
347pub struct CsrConnectionAvro<T: AstInfo> {
348    pub connection: CsrConnection<T>,
349    pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
350    pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
351    pub seed: Option<CsrSeedAvro>,
352}
353
354impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
355    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
356        f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
357        f.write_node(&self.connection);
358        if let Some(seed) = &self.seed {
359            f.write_str(" ");
360            f.write_node(seed);
361        }
362    }
363}
364impl_display_t!(CsrConnectionAvro);
365
366#[derive(Debug, Clone, PartialEq, Eq, Hash)]
367pub struct CsrConnectionProtobuf<T: AstInfo> {
368    pub connection: CsrConnection<T>,
369    pub seed: Option<CsrSeedProtobuf>,
370}
371
372impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
373    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
374        f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
375        f.write_node(&self.connection);
376
377        if let Some(seed) = &self.seed {
378            f.write_str(" ");
379            f.write_node(seed);
380        }
381    }
382}
383impl_display_t!(CsrConnectionProtobuf);
384
385#[derive(Debug, Clone, PartialEq, Eq, Hash)]
386pub struct CsrSeedAvro {
387    pub key_schema: Option<String>,
388    pub value_schema: String,
389}
390
391impl AstDisplay for CsrSeedAvro {
392    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
393        f.write_str("SEED");
394        if let Some(key_schema) = &self.key_schema {
395            f.write_str(" KEY SCHEMA '");
396            f.write_node(&display::escape_single_quote_string(key_schema));
397            f.write_str("'");
398        }
399        f.write_str(" VALUE SCHEMA '");
400        f.write_node(&display::escape_single_quote_string(&self.value_schema));
401        f.write_str("'");
402    }
403}
404impl_display!(CsrSeedAvro);
405
406#[derive(Debug, Clone, PartialEq, Eq, Hash)]
407pub struct CsrSeedProtobuf {
408    pub key: Option<CsrSeedProtobufSchema>,
409    pub value: CsrSeedProtobufSchema,
410}
411
412impl AstDisplay for CsrSeedProtobuf {
413    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
414        f.write_str("SEED");
415        if let Some(key) = &self.key {
416            f.write_str(" KEY ");
417            f.write_node(key);
418        }
419        f.write_str(" VALUE ");
420        f.write_node(&self.value);
421    }
422}
423impl_display!(CsrSeedProtobuf);
424
425#[derive(Debug, Clone, PartialEq, Eq, Hash)]
426pub struct CsrSeedProtobufSchema {
427    // Hex encoded string.
428    pub schema: String,
429    pub message_name: String,
430}
431impl AstDisplay for CsrSeedProtobufSchema {
432    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
433        f.write_str("SCHEMA '");
434        f.write_str(&display::escape_single_quote_string(&self.schema));
435        f.write_str("' MESSAGE '");
436        f.write_str(&self.message_name);
437        f.write_str("'");
438    }
439}
440impl_display!(CsrSeedProtobufSchema);
441
442#[derive(Debug, Clone, PartialEq, Eq, Hash)]
443pub enum FormatSpecifier<T: AstInfo> {
444    /// `CREATE SOURCE/SINK .. FORMAT`
445    Bare(Format<T>),
446    /// `CREATE SOURCE/SINK .. KEY FORMAT .. VALUE FORMAT`
447    KeyValue { key: Format<T>, value: Format<T> },
448}
449
450impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
451    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
452        match self {
453            FormatSpecifier::Bare(format) => {
454                f.write_str("FORMAT ");
455                f.write_node(format)
456            }
457            FormatSpecifier::KeyValue { key, value } => {
458                f.write_str("KEY FORMAT ");
459                f.write_node(key);
460                f.write_str(" VALUE FORMAT ");
461                f.write_node(value);
462            }
463        }
464    }
465}
466impl_display_t!(FormatSpecifier);
467
468#[derive(Debug, Clone, PartialEq, Eq, Hash)]
469pub enum Format<T: AstInfo> {
470    Bytes,
471    Avro(AvroSchema<T>),
472    Protobuf(ProtobufSchema<T>),
473    Regex(String),
474    Csv {
475        columns: CsvColumns,
476        delimiter: char,
477    },
478    Json {
479        array: bool,
480    },
481    Text,
482}
483
484#[derive(Debug, Clone, PartialEq, Eq, Hash)]
485pub enum CsvColumns {
486    /// `WITH count COLUMNS`
487    Count(u64),
488    /// `WITH HEADER (ident, ...)?`: `names` is empty if there are no names specified
489    Header { names: Vec<Ident> },
490}
491
492impl AstDisplay for CsvColumns {
493    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
494        match self {
495            CsvColumns::Count(n) => {
496                f.write_str(n);
497                f.write_str(" COLUMNS")
498            }
499            CsvColumns::Header { names } => {
500                f.write_str("HEADER");
501                if !names.is_empty() {
502                    f.write_str(" (");
503                    f.write_node(&display::comma_separated(names));
504                    f.write_str(")");
505                }
506            }
507        }
508    }
509}
510
511#[derive(Debug, Clone, PartialEq, Eq, Hash)]
512pub enum SourceIncludeMetadata {
513    Key {
514        alias: Option<Ident>,
515    },
516    Timestamp {
517        alias: Option<Ident>,
518    },
519    Partition {
520        alias: Option<Ident>,
521    },
522    Offset {
523        alias: Option<Ident>,
524    },
525    Headers {
526        alias: Option<Ident>,
527    },
528    Header {
529        key: String,
530        alias: Ident,
531        use_bytes: bool,
532    },
533}
534
535impl AstDisplay for SourceIncludeMetadata {
536    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
537        let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
538            if let Some(alias) = alias {
539                f.write_str(" AS ");
540                f.write_node(alias);
541            }
542        };
543
544        match self {
545            SourceIncludeMetadata::Key { alias } => {
546                f.write_str("KEY");
547                print_alias(f, alias);
548            }
549            SourceIncludeMetadata::Timestamp { alias } => {
550                f.write_str("TIMESTAMP");
551                print_alias(f, alias);
552            }
553            SourceIncludeMetadata::Partition { alias } => {
554                f.write_str("PARTITION");
555                print_alias(f, alias);
556            }
557            SourceIncludeMetadata::Offset { alias } => {
558                f.write_str("OFFSET");
559                print_alias(f, alias);
560            }
561            SourceIncludeMetadata::Headers { alias } => {
562                f.write_str("HEADERS");
563                print_alias(f, alias);
564            }
565            SourceIncludeMetadata::Header {
566                alias,
567                key,
568                use_bytes,
569            } => {
570                f.write_str("HEADER '");
571                f.write_str(&display::escape_single_quote_string(key));
572                f.write_str("'");
573                print_alias(f, &Some(alias.clone()));
574                if *use_bytes {
575                    f.write_str(" BYTES");
576                }
577            }
578        }
579    }
580}
581impl_display!(SourceIncludeMetadata);
582
583#[derive(Debug, Clone, PartialEq, Eq, Hash)]
584pub enum SourceErrorPolicy {
585    Inline {
586        /// The alias to use for the error column. If unspecified will be `error`.
587        alias: Option<Ident>,
588    },
589}
590
591impl AstDisplay for SourceErrorPolicy {
592    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
593        match self {
594            Self::Inline { alias } => {
595                f.write_str("INLINE");
596                if let Some(alias) = alias {
597                    f.write_str(" AS ");
598                    f.write_node(alias);
599                }
600            }
601        }
602    }
603}
604impl_display!(SourceErrorPolicy);
605
606#[derive(Debug, Clone, PartialEq, Eq, Hash)]
607pub enum SourceEnvelope {
608    None,
609    Debezium,
610    Upsert {
611        value_decode_err_policy: Vec<SourceErrorPolicy>,
612    },
613    CdcV2,
614}
615
616impl SourceEnvelope {
617    /// `true` iff Materialize is expected to crash or exhibit UB
618    /// when attempting to ingest data starting at an offset other than zero.
619    pub fn requires_all_input(&self) -> bool {
620        match self {
621            SourceEnvelope::None => false,
622            SourceEnvelope::Debezium => false,
623            SourceEnvelope::Upsert { .. } => false,
624            SourceEnvelope::CdcV2 => true,
625        }
626    }
627}
628
629impl AstDisplay for SourceEnvelope {
630    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
631        match self {
632            Self::None => {
633                // this is unreachable as long as the default is None, but include it in case we ever change that
634                f.write_str("NONE");
635            }
636            Self::Debezium => {
637                f.write_str("DEBEZIUM");
638            }
639            Self::Upsert {
640                value_decode_err_policy,
641            } => {
642                if value_decode_err_policy.is_empty() {
643                    f.write_str("UPSERT");
644                } else {
645                    f.write_str("UPSERT (VALUE DECODING ERRORS = (");
646                    f.write_node(&display::comma_separated(value_decode_err_policy));
647                    f.write_str("))")
648                }
649            }
650            Self::CdcV2 => {
651                f.write_str("MATERIALIZE");
652            }
653        }
654    }
655}
656impl_display!(SourceEnvelope);
657
658#[derive(Debug, Clone, PartialEq, Eq, Hash)]
659pub enum SinkEnvelope {
660    Debezium,
661    Upsert,
662}
663
664impl AstDisplay for SinkEnvelope {
665    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
666        match self {
667            Self::Upsert => {
668                f.write_str("UPSERT");
669            }
670            Self::Debezium => {
671                f.write_str("DEBEZIUM");
672            }
673        }
674    }
675}
676impl_display!(SinkEnvelope);
677
678#[derive(Debug, Clone, PartialEq, Eq, Hash)]
679pub enum SubscribeOutput<T: AstInfo> {
680    Diffs,
681    WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
682    EnvelopeUpsert { key_columns: Vec<Ident> },
683    EnvelopeDebezium { key_columns: Vec<Ident> },
684}
685
686impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
687    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
688        match self {
689            Self::Diffs => {}
690            Self::WithinTimestampOrderBy { order_by } => {
691                f.write_str(" WITHIN TIMESTAMP ORDER BY ");
692                f.write_node(&display::comma_separated(order_by));
693            }
694            Self::EnvelopeUpsert { key_columns } => {
695                f.write_str(" ENVELOPE UPSERT (KEY (");
696                f.write_node(&display::comma_separated(key_columns));
697                f.write_str("))");
698            }
699            Self::EnvelopeDebezium { key_columns } => {
700                f.write_str(" ENVELOPE DEBEZIUM (KEY (");
701                f.write_node(&display::comma_separated(key_columns));
702                f.write_str("))");
703            }
704        }
705    }
706}
707impl_display_t!(SubscribeOutput);
708
709impl<T: AstInfo> AstDisplay for Format<T> {
710    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
711        match self {
712            Self::Bytes => f.write_str("BYTES"),
713            Self::Avro(inner) => {
714                f.write_str("AVRO ");
715                f.write_node(inner);
716            }
717            Self::Protobuf(inner) => {
718                f.write_str("PROTOBUF ");
719                f.write_node(inner);
720            }
721            Self::Regex(regex) => {
722                f.write_str("REGEX '");
723                f.write_node(&display::escape_single_quote_string(regex));
724                f.write_str("'");
725            }
726            Self::Csv { columns, delimiter } => {
727                f.write_str("CSV WITH ");
728                f.write_node(columns);
729
730                if *delimiter != ',' {
731                    f.write_str(" DELIMITED BY '");
732                    f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
733                    f.write_str("'");
734                }
735            }
736            Self::Json { array } => {
737                f.write_str("JSON");
738                if *array {
739                    f.write_str(" ARRAY");
740                }
741            }
742            Self::Text => f.write_str("TEXT"),
743        }
744    }
745}
746impl_display_t!(Format);
747
748// All connection options are bundled together to allow us to parse `ALTER
749// CONNECTION` without specifying the type of connection we're altering. Il faut
750// souffrir pour être belle.
751#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
752pub enum ConnectionOptionName {
753    AccessKeyId,
754    AssumeRoleArn,
755    AssumeRoleSessionName,
756    AvailabilityZones,
757    AwsConnection,
758    AwsPrivatelink,
759    Broker,
760    Brokers,
761    Database,
762    Endpoint,
763    Host,
764    Password,
765    Port,
766    ProgressTopic,
767    ProgressTopicReplicationFactor,
768    PublicKey1,
769    PublicKey2,
770    Region,
771    SaslMechanisms,
772    SaslPassword,
773    SaslUsername,
774    SecretAccessKey,
775    SecurityProtocol,
776    ServiceName,
777    SshTunnel,
778    SslCertificate,
779    SslCertificateAuthority,
780    SslKey,
781    SslMode,
782    SessionToken,
783    Url,
784    User,
785}
786
787impl AstDisplay for ConnectionOptionName {
788    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
789        f.write_str(match self {
790            ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
791            ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
792            ConnectionOptionName::AwsConnection => "AWS CONNECTION",
793            ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
794            ConnectionOptionName::Broker => "BROKER",
795            ConnectionOptionName::Brokers => "BROKERS",
796            ConnectionOptionName::Database => "DATABASE",
797            ConnectionOptionName::Endpoint => "ENDPOINT",
798            ConnectionOptionName::Host => "HOST",
799            ConnectionOptionName::Password => "PASSWORD",
800            ConnectionOptionName::Port => "PORT",
801            ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
802            ConnectionOptionName::ProgressTopicReplicationFactor => {
803                "PROGRESS TOPIC REPLICATION FACTOR"
804            }
805            ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
806            ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
807            ConnectionOptionName::Region => "REGION",
808            ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
809            ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
810            ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
811            ConnectionOptionName::SaslPassword => "SASL PASSWORD",
812            ConnectionOptionName::SaslUsername => "SASL USERNAME",
813            ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
814            ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
815            ConnectionOptionName::ServiceName => "SERVICE NAME",
816            ConnectionOptionName::SshTunnel => "SSH TUNNEL",
817            ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
818            ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
819            ConnectionOptionName::SslKey => "SSL KEY",
820            ConnectionOptionName::SslMode => "SSL MODE",
821            ConnectionOptionName::SessionToken => "SESSION TOKEN",
822            ConnectionOptionName::Url => "URL",
823            ConnectionOptionName::User => "USER",
824        })
825    }
826}
827impl_display!(ConnectionOptionName);
828
829impl WithOptionName for ConnectionOptionName {
830    /// # WARNING
831    ///
832    /// Whenever implementing this trait consider very carefully whether or not
833    /// this value could contain sensitive user data. If you're uncertain, err
834    /// on the conservative side and return `true`.
835    fn redact_value(&self) -> bool {
836        match self {
837            ConnectionOptionName::AccessKeyId
838            | ConnectionOptionName::AvailabilityZones
839            | ConnectionOptionName::AwsConnection
840            | ConnectionOptionName::AwsPrivatelink
841            | ConnectionOptionName::Broker
842            | ConnectionOptionName::Brokers
843            | ConnectionOptionName::Database
844            | ConnectionOptionName::Endpoint
845            | ConnectionOptionName::Host
846            | ConnectionOptionName::Password
847            | ConnectionOptionName::Port
848            | ConnectionOptionName::ProgressTopic
849            | ConnectionOptionName::ProgressTopicReplicationFactor
850            | ConnectionOptionName::PublicKey1
851            | ConnectionOptionName::PublicKey2
852            | ConnectionOptionName::Region
853            | ConnectionOptionName::AssumeRoleArn
854            | ConnectionOptionName::AssumeRoleSessionName
855            | ConnectionOptionName::SaslMechanisms
856            | ConnectionOptionName::SaslPassword
857            | ConnectionOptionName::SaslUsername
858            | ConnectionOptionName::SecurityProtocol
859            | ConnectionOptionName::SecretAccessKey
860            | ConnectionOptionName::ServiceName
861            | ConnectionOptionName::SshTunnel
862            | ConnectionOptionName::SslCertificate
863            | ConnectionOptionName::SslCertificateAuthority
864            | ConnectionOptionName::SslKey
865            | ConnectionOptionName::SslMode
866            | ConnectionOptionName::SessionToken
867            | ConnectionOptionName::Url
868            | ConnectionOptionName::User => false,
869        }
870    }
871}
872
873#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
874/// An option in a `CREATE CONNECTION`.
875pub struct ConnectionOption<T: AstInfo> {
876    pub name: ConnectionOptionName,
877    pub value: Option<WithOptionValue<T>>,
878}
879impl_display_for_with_option!(ConnectionOption);
880impl_display_t!(ConnectionOption);
881
882#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
883pub enum CreateConnectionType {
884    Aws,
885    AwsPrivatelink,
886    Kafka,
887    Csr,
888    Postgres,
889    Ssh,
890    SqlServer,
891    MySql,
892    Yugabyte,
893}
894
895impl AstDisplay for CreateConnectionType {
896    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
897        match self {
898            Self::Kafka => {
899                f.write_str("KAFKA");
900            }
901            Self::Csr => {
902                f.write_str("CONFLUENT SCHEMA REGISTRY");
903            }
904            Self::Postgres => {
905                f.write_str("POSTGRES");
906            }
907            Self::Aws => {
908                f.write_str("AWS");
909            }
910            Self::AwsPrivatelink => {
911                f.write_str("AWS PRIVATELINK");
912            }
913            Self::Ssh => {
914                f.write_str("SSH TUNNEL");
915            }
916            Self::SqlServer => {
917                f.write_str("SQL SERVER");
918            }
919            Self::MySql => {
920                f.write_str("MYSQL");
921            }
922            Self::Yugabyte => {
923                f.write_str("YUGABYTE");
924            }
925        }
926    }
927}
928impl_display!(CreateConnectionType);
929
930#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
931pub enum CreateConnectionOptionName {
932    Validate,
933}
934
935impl AstDisplay for CreateConnectionOptionName {
936    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
937        f.write_str(match self {
938            CreateConnectionOptionName::Validate => "VALIDATE",
939        })
940    }
941}
942impl_display!(CreateConnectionOptionName);
943
944impl WithOptionName for CreateConnectionOptionName {
945    /// # WARNING
946    ///
947    /// Whenever implementing this trait consider very carefully whether or not
948    /// this value could contain sensitive user data. If you're uncertain, err
949    /// on the conservative side and return `true`.
950    fn redact_value(&self) -> bool {
951        match self {
952            CreateConnectionOptionName::Validate => false,
953        }
954    }
955}
956
957#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
958/// An option in a `CREATE CONNECTION...` statement.
959pub struct CreateConnectionOption<T: AstInfo> {
960    pub name: CreateConnectionOptionName,
961    pub value: Option<WithOptionValue<T>>,
962}
963impl_display_for_with_option!(CreateConnectionOption);
964impl_display_t!(CreateConnectionOption);
965
966#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
967pub enum KafkaSourceConfigOptionName {
968    GroupIdPrefix,
969    Topic,
970    TopicMetadataRefreshInterval,
971    StartTimestamp,
972    StartOffset,
973}
974
975impl AstDisplay for KafkaSourceConfigOptionName {
976    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
977        f.write_str(match self {
978            KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
979            KafkaSourceConfigOptionName::Topic => "TOPIC",
980            KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
981                "TOPIC METADATA REFRESH INTERVAL"
982            }
983            KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
984            KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
985        })
986    }
987}
988impl_display!(KafkaSourceConfigOptionName);
989
990impl WithOptionName for KafkaSourceConfigOptionName {
991    /// # WARNING
992    ///
993    /// Whenever implementing this trait consider very carefully whether or not
994    /// this value could contain sensitive user data. If you're uncertain, err
995    /// on the conservative side and return `true`.
996    fn redact_value(&self) -> bool {
997        match self {
998            KafkaSourceConfigOptionName::GroupIdPrefix
999            | KafkaSourceConfigOptionName::Topic
1000            | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1001            | KafkaSourceConfigOptionName::StartOffset
1002            | KafkaSourceConfigOptionName::StartTimestamp => false,
1003        }
1004    }
1005}
1006
1007#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1008pub struct KafkaSourceConfigOption<T: AstInfo> {
1009    pub name: KafkaSourceConfigOptionName,
1010    pub value: Option<WithOptionValue<T>>,
1011}
1012impl_display_for_with_option!(KafkaSourceConfigOption);
1013impl_display_t!(KafkaSourceConfigOption);
1014
1015#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1016pub enum KafkaSinkConfigOptionName {
1017    CompressionType,
1018    PartitionBy,
1019    ProgressGroupIdPrefix,
1020    Topic,
1021    TransactionalIdPrefix,
1022    LegacyIds,
1023    TopicConfig,
1024    TopicMetadataRefreshInterval,
1025    TopicPartitionCount,
1026    TopicReplicationFactor,
1027}
1028
1029impl AstDisplay for KafkaSinkConfigOptionName {
1030    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1031        f.write_str(match self {
1032            KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1033            KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1034            KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1035            KafkaSinkConfigOptionName::Topic => "TOPIC",
1036            KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1037            KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1038            KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1039            KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1040                "TOPIC METADATA REFRESH INTERVAL"
1041            }
1042            KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1043            KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1044        })
1045    }
1046}
1047impl_display!(KafkaSinkConfigOptionName);
1048
1049impl WithOptionName for KafkaSinkConfigOptionName {
1050    /// # WARNING
1051    ///
1052    /// Whenever implementing this trait consider very carefully whether or not
1053    /// this value could contain sensitive user data. If you're uncertain, err
1054    /// on the conservative side and return `true`.
1055    fn redact_value(&self) -> bool {
1056        match self {
1057            KafkaSinkConfigOptionName::CompressionType
1058            | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1059            | KafkaSinkConfigOptionName::Topic
1060            | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1061            | KafkaSinkConfigOptionName::TransactionalIdPrefix
1062            | KafkaSinkConfigOptionName::LegacyIds
1063            | KafkaSinkConfigOptionName::TopicConfig
1064            | KafkaSinkConfigOptionName::TopicPartitionCount
1065            | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1066            KafkaSinkConfigOptionName::PartitionBy => true,
1067        }
1068    }
1069}
1070
1071#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1072pub struct KafkaSinkConfigOption<T: AstInfo> {
1073    pub name: KafkaSinkConfigOptionName,
1074    pub value: Option<WithOptionValue<T>>,
1075}
1076impl_display_for_with_option!(KafkaSinkConfigOption);
1077impl_display_t!(KafkaSinkConfigOption);
1078
1079#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1080pub enum PgConfigOptionName {
1081    /// Hex encoded string of binary serialization of
1082    /// `mz_storage_types::sources::postgres::PostgresSourcePublicationDetails`
1083    Details,
1084    /// The name of the publication to sync
1085    Publication,
1086    /// Columns whose types you want to unconditionally format as text
1087    /// NOTE(roshan): This value is kept around to allow round-tripping a
1088    /// `CREATE SOURCE` statement while we still allow creating implicit
1089    /// subsources from `CREATE SOURCE`, but will be removed once
1090    /// fully deprecating that feature and forcing users to use explicit
1091    /// `CREATE TABLE .. FROM SOURCE` statements
1092    TextColumns,
1093}
1094
1095impl AstDisplay for PgConfigOptionName {
1096    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1097        f.write_str(match self {
1098            PgConfigOptionName::Details => "DETAILS",
1099            PgConfigOptionName::Publication => "PUBLICATION",
1100            PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1101        })
1102    }
1103}
1104impl_display!(PgConfigOptionName);
1105
1106impl WithOptionName for PgConfigOptionName {
1107    /// # WARNING
1108    ///
1109    /// Whenever implementing this trait consider very carefully whether or not
1110    /// this value could contain sensitive user data. If you're uncertain, err
1111    /// on the conservative side and return `true`.
1112    fn redact_value(&self) -> bool {
1113        match self {
1114            PgConfigOptionName::Details
1115            | PgConfigOptionName::Publication
1116            | PgConfigOptionName::TextColumns => false,
1117        }
1118    }
1119}
1120
1121#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1122/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1123pub struct PgConfigOption<T: AstInfo> {
1124    pub name: PgConfigOptionName,
1125    pub value: Option<WithOptionValue<T>>,
1126}
1127impl_display_for_with_option!(PgConfigOption);
1128impl_display_t!(PgConfigOption);
1129
1130#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1131pub enum MySqlConfigOptionName {
1132    /// Hex encoded string of binary serialization of
1133    /// `mz_storage_types::sources::mysql::MySqlSourceDetails`
1134    Details,
1135    /// Columns whose types you want to unconditionally format as text
1136    /// NOTE(roshan): This value is kept around to allow round-tripping a
1137    /// `CREATE SOURCE` statement while we still allow creating implicit
1138    /// subsources from `CREATE SOURCE`, but will be removed once
1139    /// fully deprecating that feature and forcing users to use explicit
1140    /// `CREATE TABLE .. FROM SOURCE` statements
1141    TextColumns,
1142    /// Columns you want to exclude
1143    /// NOTE(roshan): This value is kept around to allow round-tripping a
1144    /// `CREATE SOURCE` statement while we still allow creating implicit
1145    /// subsources from `CREATE SOURCE`, but will be removed once
1146    /// fully deprecating that feature and forcing users to use explicit
1147    /// `CREATE TABLE .. FROM SOURCE` statements
1148    ExcludeColumns,
1149}
1150
1151impl AstDisplay for MySqlConfigOptionName {
1152    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1153        f.write_str(match self {
1154            MySqlConfigOptionName::Details => "DETAILS",
1155            MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1156            MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1157        })
1158    }
1159}
1160impl_display!(MySqlConfigOptionName);
1161
1162impl WithOptionName for MySqlConfigOptionName {
1163    /// # WARNING
1164    ///
1165    /// Whenever implementing this trait consider very carefully whether or not
1166    /// this value could contain sensitive user data. If you're uncertain, err
1167    /// on the conservative side and return `true`.
1168    fn redact_value(&self) -> bool {
1169        match self {
1170            MySqlConfigOptionName::Details
1171            | MySqlConfigOptionName::TextColumns
1172            | MySqlConfigOptionName::ExcludeColumns => false,
1173        }
1174    }
1175}
1176
1177#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1178/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1179pub struct MySqlConfigOption<T: AstInfo> {
1180    pub name: MySqlConfigOptionName,
1181    pub value: Option<WithOptionValue<T>>,
1182}
1183impl_display_for_with_option!(MySqlConfigOption);
1184impl_display_t!(MySqlConfigOption);
1185
1186#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1187pub enum SqlServerConfigOptionName {
1188    /// Hex encoded string of binary serialization of
1189    /// `mz_storage_types::sources::sql_server::SqlServerSourceDetails`.
1190    Details,
1191    /// Columns whose types you want to unconditionally format as text.
1192    ///
1193    /// NOTE(roshan): This value is kept around to allow round-tripping a
1194    /// `CREATE SOURCE` statement while we still allow creating implicit
1195    /// subsources from `CREATE SOURCE`, but will be removed once
1196    /// fully deprecating that feature and forcing users to use explicit
1197    /// `CREATE TABLE .. FROM SOURCE` statements
1198    TextColumns,
1199    /// Columns you want to exclude.
1200    ///
1201    /// NOTE(roshan): This value is kept around to allow round-tripping a
1202    /// `CREATE SOURCE` statement while we still allow creating implicit
1203    /// subsources from `CREATE SOURCE`, but will be removed once
1204    /// fully deprecating that feature and forcing users to use explicit
1205    /// `CREATE TABLE .. FROM SOURCE` statements
1206    ExcludeColumns,
1207}
1208
1209impl AstDisplay for SqlServerConfigOptionName {
1210    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1211        f.write_str(match self {
1212            SqlServerConfigOptionName::Details => "DETAILS",
1213            SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1214            SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1215        })
1216    }
1217}
1218impl_display!(SqlServerConfigOptionName);
1219
1220impl WithOptionName for SqlServerConfigOptionName {
1221    /// # WARNING
1222    ///
1223    /// Whenever implementing this trait consider very carefully whether or not
1224    /// this value could contain sensitive user data. If you're uncertain, err
1225    /// on the conservative side and return `true`.
1226    fn redact_value(&self) -> bool {
1227        match self {
1228            SqlServerConfigOptionName::Details
1229            | SqlServerConfigOptionName::TextColumns
1230            | SqlServerConfigOptionName::ExcludeColumns => false,
1231        }
1232    }
1233}
1234
1235#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1236/// An option in a `{FROM|INTO} CONNECTION ...` statement.
1237pub struct SqlServerConfigOption<T: AstInfo> {
1238    pub name: SqlServerConfigOptionName,
1239    pub value: Option<WithOptionValue<T>>,
1240}
1241impl_display_for_with_option!(SqlServerConfigOption);
1242impl_display_t!(SqlServerConfigOption);
1243
1244#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1245pub enum CreateSourceConnection<T: AstInfo> {
1246    Kafka {
1247        connection: T::ItemName,
1248        options: Vec<KafkaSourceConfigOption<T>>,
1249    },
1250    Postgres {
1251        connection: T::ItemName,
1252        options: Vec<PgConfigOption<T>>,
1253    },
1254    Yugabyte {
1255        connection: T::ItemName,
1256        options: Vec<PgConfigOption<T>>,
1257    },
1258    SqlServer {
1259        connection: T::ItemName,
1260        options: Vec<SqlServerConfigOption<T>>,
1261    },
1262    MySql {
1263        connection: T::ItemName,
1264        options: Vec<MySqlConfigOption<T>>,
1265    },
1266    LoadGenerator {
1267        generator: LoadGenerator,
1268        options: Vec<LoadGeneratorOption<T>>,
1269    },
1270}
1271
1272impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1273    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1274        match self {
1275            CreateSourceConnection::Kafka {
1276                connection,
1277                options,
1278            } => {
1279                f.write_str("KAFKA CONNECTION ");
1280                f.write_node(connection);
1281                if !options.is_empty() {
1282                    f.write_str(" (");
1283                    f.write_node(&display::comma_separated(options));
1284                    f.write_str(")");
1285                }
1286            }
1287            CreateSourceConnection::Postgres {
1288                connection,
1289                options,
1290            } => {
1291                f.write_str("POSTGRES CONNECTION ");
1292                f.write_node(connection);
1293                if !options.is_empty() {
1294                    f.write_str(" (");
1295                    f.write_node(&display::comma_separated(options));
1296                    f.write_str(")");
1297                }
1298            }
1299            CreateSourceConnection::Yugabyte {
1300                connection,
1301                options,
1302            } => {
1303                f.write_str("YUGABYTE CONNECTION ");
1304                f.write_node(connection);
1305                if !options.is_empty() {
1306                    f.write_str(" (");
1307                    f.write_node(&display::comma_separated(options));
1308                    f.write_str(")");
1309                }
1310            }
1311            CreateSourceConnection::SqlServer {
1312                connection,
1313                options,
1314            } => {
1315                f.write_str("SQL SERVER CONNECTION ");
1316                f.write_node(connection);
1317                if !options.is_empty() {
1318                    f.write_str(" (");
1319                    f.write_node(&display::comma_separated(options));
1320                    f.write_str(")");
1321                }
1322            }
1323            CreateSourceConnection::MySql {
1324                connection,
1325                options,
1326            } => {
1327                f.write_str("MYSQL CONNECTION ");
1328                f.write_node(connection);
1329                if !options.is_empty() {
1330                    f.write_str(" (");
1331                    f.write_node(&display::comma_separated(options));
1332                    f.write_str(")");
1333                }
1334            }
1335            CreateSourceConnection::LoadGenerator { generator, options } => {
1336                f.write_str("LOAD GENERATOR ");
1337                f.write_node(generator);
1338                if !options.is_empty() {
1339                    f.write_str(" (");
1340                    f.write_node(&display::comma_separated(options));
1341                    f.write_str(")");
1342                }
1343            }
1344        }
1345    }
1346}
1347impl_display_t!(CreateSourceConnection);
1348
1349#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1350pub enum LoadGenerator {
1351    Clock,
1352    Counter,
1353    Marketing,
1354    Auction,
1355    Datums,
1356    Tpch,
1357    KeyValue,
1358}
1359
1360impl AstDisplay for LoadGenerator {
1361    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1362        match self {
1363            Self::Counter => f.write_str("COUNTER"),
1364            Self::Clock => f.write_str("CLOCK"),
1365            Self::Marketing => f.write_str("MARKETING"),
1366            Self::Auction => f.write_str("AUCTION"),
1367            Self::Datums => f.write_str("DATUMS"),
1368            Self::Tpch => f.write_str("TPCH"),
1369            Self::KeyValue => f.write_str("KEY VALUE"),
1370        }
1371    }
1372}
1373impl_display!(LoadGenerator);
1374
1375impl LoadGenerator {
1376    /// Corresponds with the same mapping on the `LoadGenerator` enum defined in
1377    /// src/storage-types/src/sources/load_generator.rs, but re-defined here for
1378    /// cases where we only have the AST representation. This can be removed once
1379    /// the `ast_rewrite_sources_to_tables` migration is removed.
1380    pub fn schema_name(&self) -> &'static str {
1381        match self {
1382            LoadGenerator::Counter => "counter",
1383            LoadGenerator::Clock => "clock",
1384            LoadGenerator::Marketing => "marketing",
1385            LoadGenerator::Auction => "auction",
1386            LoadGenerator::Datums => "datums",
1387            LoadGenerator::Tpch => "tpch",
1388            LoadGenerator::KeyValue => "key_value",
1389        }
1390    }
1391}
1392
1393#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1394pub enum LoadGeneratorOptionName {
1395    ScaleFactor,
1396    TickInterval,
1397    AsOf,
1398    UpTo,
1399    MaxCardinality,
1400    Keys,
1401    SnapshotRounds,
1402    TransactionalSnapshot,
1403    ValueSize,
1404    Seed,
1405    Partitions,
1406    BatchSize,
1407}
1408
1409impl AstDisplay for LoadGeneratorOptionName {
1410    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1411        f.write_str(match self {
1412            LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1413            LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1414            LoadGeneratorOptionName::AsOf => "AS OF",
1415            LoadGeneratorOptionName::UpTo => "UP TO",
1416            LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1417            LoadGeneratorOptionName::Keys => "KEYS",
1418            LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1419            LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1420            LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1421            LoadGeneratorOptionName::Seed => "SEED",
1422            LoadGeneratorOptionName::Partitions => "PARTITIONS",
1423            LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1424        })
1425    }
1426}
1427impl_display!(LoadGeneratorOptionName);
1428
1429impl WithOptionName for LoadGeneratorOptionName {
1430    /// # WARNING
1431    ///
1432    /// Whenever implementing this trait consider very carefully whether or not
1433    /// this value could contain sensitive user data. If you're uncertain, err
1434    /// on the conservative side and return `true`.
1435    fn redact_value(&self) -> bool {
1436        match self {
1437            LoadGeneratorOptionName::ScaleFactor
1438            | LoadGeneratorOptionName::TickInterval
1439            | LoadGeneratorOptionName::AsOf
1440            | LoadGeneratorOptionName::UpTo
1441            | LoadGeneratorOptionName::MaxCardinality
1442            | LoadGeneratorOptionName::Keys
1443            | LoadGeneratorOptionName::SnapshotRounds
1444            | LoadGeneratorOptionName::TransactionalSnapshot
1445            | LoadGeneratorOptionName::ValueSize
1446            | LoadGeneratorOptionName::Partitions
1447            | LoadGeneratorOptionName::BatchSize
1448            | LoadGeneratorOptionName::Seed => false,
1449        }
1450    }
1451}
1452
1453#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1454/// An option in a `CREATE CONNECTION...SSH`.
1455pub struct LoadGeneratorOption<T: AstInfo> {
1456    pub name: LoadGeneratorOptionName,
1457    pub value: Option<WithOptionValue<T>>,
1458}
1459impl_display_for_with_option!(LoadGeneratorOption);
1460impl_display_t!(LoadGeneratorOption);
1461
1462#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1463pub enum CreateSinkConnection<T: AstInfo> {
1464    Kafka {
1465        connection: T::ItemName,
1466        options: Vec<KafkaSinkConfigOption<T>>,
1467        key: Option<KafkaSinkKey>,
1468        headers: Option<Ident>,
1469    },
1470}
1471
1472impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1473    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1474        match self {
1475            CreateSinkConnection::Kafka {
1476                connection,
1477                options,
1478                key,
1479                headers,
1480            } => {
1481                f.write_str("KAFKA CONNECTION ");
1482                f.write_node(connection);
1483                if !options.is_empty() {
1484                    f.write_str(" (");
1485                    f.write_node(&display::comma_separated(options));
1486                    f.write_str(")");
1487                }
1488                if let Some(key) = key.as_ref() {
1489                    f.write_node(key);
1490                }
1491                if let Some(headers) = headers {
1492                    f.write_str(" HEADERS ");
1493                    f.write_node(headers);
1494                }
1495            }
1496        }
1497    }
1498}
1499impl_display_t!(CreateSinkConnection);
1500
1501#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1502pub struct KafkaSinkKey {
1503    pub key_columns: Vec<Ident>,
1504    pub not_enforced: bool,
1505}
1506
1507impl AstDisplay for KafkaSinkKey {
1508    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1509        f.write_str(" KEY (");
1510        f.write_node(&display::comma_separated(&self.key_columns));
1511        f.write_str(")");
1512        if self.not_enforced {
1513            f.write_str(" NOT ENFORCED");
1514        }
1515    }
1516}
1517
1518/// A table-level constraint, specified in a `CREATE TABLE` or an
1519/// `ALTER TABLE ADD <constraint>` statement.
1520#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1521pub enum TableConstraint<T: AstInfo> {
1522    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE (NULLS NOT DISTINCT)? } (<columns>)`
1523    Unique {
1524        name: Option<Ident>,
1525        columns: Vec<Ident>,
1526        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
1527        is_primary: bool,
1528        // Where this constraint treats each NULL value as distinct; only available on `UNIQUE`
1529        // constraints.
1530        nulls_not_distinct: bool,
1531    },
1532    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
1533    /// REFERENCES <foreign_table> (<referred_columns>)`)
1534    ForeignKey {
1535        name: Option<Ident>,
1536        columns: Vec<Ident>,
1537        foreign_table: T::ItemName,
1538        referred_columns: Vec<Ident>,
1539    },
1540    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
1541    Check {
1542        name: Option<Ident>,
1543        expr: Box<Expr<T>>,
1544    },
1545}
1546
1547impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1548    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1549        match self {
1550            TableConstraint::Unique {
1551                name,
1552                columns,
1553                is_primary,
1554                nulls_not_distinct,
1555            } => {
1556                f.write_node(&display_constraint_name(name));
1557                if *is_primary {
1558                    f.write_str("PRIMARY KEY ");
1559                } else {
1560                    f.write_str("UNIQUE ");
1561                    if *nulls_not_distinct {
1562                        f.write_str("NULLS NOT DISTINCT ");
1563                    }
1564                }
1565                f.write_str("(");
1566                f.write_node(&display::comma_separated(columns));
1567                f.write_str(")");
1568            }
1569            TableConstraint::ForeignKey {
1570                name,
1571                columns,
1572                foreign_table,
1573                referred_columns,
1574            } => {
1575                f.write_node(&display_constraint_name(name));
1576                f.write_str("FOREIGN KEY (");
1577                f.write_node(&display::comma_separated(columns));
1578                f.write_str(") REFERENCES ");
1579                f.write_node(foreign_table);
1580                f.write_str("(");
1581                f.write_node(&display::comma_separated(referred_columns));
1582                f.write_str(")");
1583            }
1584            TableConstraint::Check { name, expr } => {
1585                f.write_node(&display_constraint_name(name));
1586                f.write_str("CHECK (");
1587                f.write_node(&expr);
1588                f.write_str(")");
1589            }
1590        }
1591    }
1592}
1593impl_display_t!(TableConstraint);
1594
1595/// A key constraint, specified in a `CREATE SOURCE`.
1596#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1597pub enum KeyConstraint {
1598    // PRIMARY KEY (<columns>) NOT ENFORCED
1599    PrimaryKeyNotEnforced { columns: Vec<Ident> },
1600}
1601
1602impl AstDisplay for KeyConstraint {
1603    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1604        match self {
1605            KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1606                f.write_str("PRIMARY KEY ");
1607                f.write_str("(");
1608                f.write_node(&display::comma_separated(columns));
1609                f.write_str(") ");
1610                f.write_str("NOT ENFORCED");
1611            }
1612        }
1613    }
1614}
1615impl_display!(KeyConstraint);
1616
1617#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1618pub enum CreateSourceOptionName {
1619    TimestampInterval,
1620    RetainHistory,
1621}
1622
1623impl AstDisplay for CreateSourceOptionName {
1624    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1625        f.write_str(match self {
1626            CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1627            CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1628        })
1629    }
1630}
1631impl_display!(CreateSourceOptionName);
1632
1633impl WithOptionName for CreateSourceOptionName {
1634    /// # WARNING
1635    ///
1636    /// Whenever implementing this trait consider very carefully whether or not
1637    /// this value could contain sensitive user data. If you're uncertain, err
1638    /// on the conservative side and return `true`.
1639    fn redact_value(&self) -> bool {
1640        match self {
1641            CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1642                false
1643            }
1644        }
1645    }
1646}
1647
1648#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1649/// An option in a `CREATE SOURCE...` statement.
1650pub struct CreateSourceOption<T: AstInfo> {
1651    pub name: CreateSourceOptionName,
1652    pub value: Option<WithOptionValue<T>>,
1653}
1654impl_display_for_with_option!(CreateSourceOption);
1655impl_display_t!(CreateSourceOption);
1656
1657/// SQL column definition
1658#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1659pub struct ColumnDef<T: AstInfo> {
1660    pub name: Ident,
1661    pub data_type: T::DataType,
1662    pub collation: Option<UnresolvedItemName>,
1663    pub options: Vec<ColumnOptionDef<T>>,
1664}
1665
1666impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1667    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1668        f.write_node(&self.name);
1669        f.write_str(" ");
1670        f.write_node(&self.data_type);
1671        if let Some(collation) = &self.collation {
1672            f.write_str(" COLLATE ");
1673            f.write_node(collation);
1674        }
1675        for option in &self.options {
1676            f.write_str(" ");
1677            f.write_node(option);
1678        }
1679    }
1680}
1681impl_display_t!(ColumnDef);
1682
1683/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
1684///
1685/// Note that implementations are substantially more permissive than the ANSI
1686/// specification on what order column options can be presented in, and whether
1687/// they are allowed to be named. The specification distinguishes between
1688/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
1689/// and can appear in any order, and other options (DEFAULT, GENERATED), which
1690/// cannot be named and must appear in a fixed order. PostgreSQL, however,
1691/// allows preceding any option with `CONSTRAINT <name>`, even those that are
1692/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
1693/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
1694/// NOT NULL constraints (the last of which is in violation of the spec).
1695///
1696/// For maximum flexibility, we don't distinguish between constraint and
1697/// non-constraint options, lumping them all together under the umbrella of
1698/// "column options," and we allow any column option to be named.
1699#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1700pub struct ColumnOptionDef<T: AstInfo> {
1701    pub name: Option<Ident>,
1702    pub option: ColumnOption<T>,
1703}
1704
1705impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1706    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1707        f.write_node(&display_constraint_name(&self.name));
1708        f.write_node(&self.option);
1709    }
1710}
1711impl_display_t!(ColumnOptionDef);
1712
1713/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
1714/// TABLE` statement.
1715#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1716pub enum ColumnOption<T: AstInfo> {
1717    /// `NULL`
1718    Null,
1719    /// `NOT NULL`
1720    NotNull,
1721    /// `DEFAULT <restricted-expr>`
1722    Default(Expr<T>),
1723    /// `{ PRIMARY KEY | UNIQUE }`
1724    Unique { is_primary: bool },
1725    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
1726    /// <foreign_table> (<referred_columns>)`).
1727    ForeignKey {
1728        foreign_table: UnresolvedItemName,
1729        referred_columns: Vec<Ident>,
1730    },
1731    /// `CHECK (<expr>)`
1732    Check(Expr<T>),
1733    /// `VERSION <action> <version>`
1734    Versioned {
1735        action: ColumnVersioned,
1736        version: Version,
1737    },
1738}
1739
1740impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1741    fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1742        use ColumnOption::*;
1743        match self {
1744            Null => f.write_str("NULL"),
1745            NotNull => f.write_str("NOT NULL"),
1746            Default(expr) => {
1747                f.write_str("DEFAULT ");
1748                f.write_node(expr);
1749            }
1750            Unique { is_primary } => {
1751                if *is_primary {
1752                    f.write_str("PRIMARY KEY");
1753                } else {
1754                    f.write_str("UNIQUE");
1755                }
1756            }
1757            ForeignKey {
1758                foreign_table,
1759                referred_columns,
1760            } => {
1761                f.write_str("REFERENCES ");
1762                f.write_node(foreign_table);
1763                f.write_str(" (");
1764                f.write_node(&display::comma_separated(referred_columns));
1765                f.write_str(")");
1766            }
1767            Check(expr) => {
1768                f.write_str("CHECK (");
1769                f.write_node(expr);
1770                f.write_str(")");
1771            }
1772            Versioned { action, version } => {
1773                f.write_str("VERSION ");
1774                f.write_node(action);
1775                f.write_str(" ");
1776                f.write_node(version);
1777            }
1778        }
1779    }
1780}
1781impl_display_t!(ColumnOption);
1782
1783#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1784pub enum ColumnVersioned {
1785    Added,
1786}
1787
1788impl AstDisplay for ColumnVersioned {
1789    fn fmt<W>(&self, f: &mut AstFormatter<W>)
1790    where
1791        W: fmt::Write,
1792    {
1793        match self {
1794            // TODO(alter_table): Support dropped columns.
1795            ColumnVersioned::Added => f.write_str("ADDED"),
1796        }
1797    }
1798}
1799impl_display!(ColumnVersioned);
1800
1801fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
1802    struct ConstraintName<'a>(&'a Option<Ident>);
1803    impl<'a> AstDisplay for ConstraintName<'a> {
1804        fn fmt<W>(&self, f: &mut AstFormatter<W>)
1805        where
1806            W: fmt::Write,
1807        {
1808            if let Some(name) = self.0 {
1809                f.write_str("CONSTRAINT ");
1810                f.write_node(name);
1811                f.write_str(" ");
1812            }
1813        }
1814    }
1815    ConstraintName(name)
1816}