1use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28 AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33 AssertNotNull,
35 PartitionBy,
36 RetainHistory,
37 Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43 match self {
44 MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45 MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46 MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47 MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48 }
49 }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53 fn redact_value(&self) -> bool {
59 match self {
60 MaterializedViewOptionName::AssertNotNull
61 | MaterializedViewOptionName::PartitionBy
62 | MaterializedViewOptionName::RetainHistory
63 | MaterializedViewOptionName::Refresh => false,
64 }
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70 pub name: MaterializedViewOptionName,
71 pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, Hash)]
76pub struct Schema {
77 pub schema: String,
78}
79
80impl AstDisplay for Schema {
81 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
82 f.write_str("SCHEMA '");
83 f.write_node(&display::escape_single_quote_string(&self.schema));
84 f.write_str("'");
85 }
86}
87impl_display!(Schema);
88
89#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
90pub enum AvroSchemaOptionName {
91 ConfluentWireFormat,
93}
94
95impl AstDisplay for AvroSchemaOptionName {
96 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
97 match self {
98 AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
99 }
100 }
101}
102
103impl WithOptionName for AvroSchemaOptionName {
104 fn redact_value(&self) -> bool {
110 match self {
111 Self::ConfluentWireFormat => false,
112 }
113 }
114}
115
116#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
121pub enum GlueAvroOptionName {
122 SchemaName,
125}
126
127impl AstDisplay for GlueAvroOptionName {
128 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
129 match self {
130 GlueAvroOptionName::SchemaName => f.write_str("SCHEMA NAME"),
131 }
132 }
133}
134
135impl WithOptionName for GlueAvroOptionName {
136 fn redact_value(&self) -> bool {
142 match self {
143 Self::SchemaName => false,
146 }
147 }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
151pub struct GlueAvroOption<T: AstInfo> {
152 pub name: GlueAvroOptionName,
153 pub value: Option<WithOptionValue<T>>,
154}
155impl_display_for_with_option!(GlueAvroOption);
156
157#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
158pub struct AvroSchemaOption<T: AstInfo> {
159 pub name: AvroSchemaOptionName,
160 pub value: Option<WithOptionValue<T>>,
161}
162impl_display_for_with_option!(AvroSchemaOption);
163
164#[derive(Debug, Clone, PartialEq, Eq, Hash)]
165pub enum AvroSchema<T: AstInfo> {
166 Csr {
167 csr_connection: CsrConnectionAvro<T>,
168 },
169 InlineSchema {
170 schema: Schema,
171 with_options: Vec<AvroSchemaOption<T>>,
172 },
173 Glue {
177 connection: T::ItemName,
178 with_options: Vec<GlueAvroOption<T>>,
179 seed: Option<GlueAvroSeed>,
184 },
185}
186
187impl<T: AstInfo> AstDisplay for AvroSchema<T> {
188 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
189 match self {
190 Self::Csr { csr_connection } => {
191 f.write_node(csr_connection);
192 }
193 Self::InlineSchema {
194 schema,
195 with_options,
196 } => {
197 f.write_str("USING ");
198 schema.fmt(f);
199 if !with_options.is_empty() {
200 f.write_str(" (");
201 f.write_node(&display::comma_separated(with_options));
202 f.write_str(")");
203 }
204 }
205 Self::Glue {
206 connection,
207 with_options,
208 seed,
209 } => {
210 f.write_str("USING AWS GLUE SCHEMA REGISTRY CONNECTION ");
211 f.write_node(connection);
212 if !with_options.is_empty() {
213 f.write_str(" (");
214 f.write_node(&display::comma_separated(with_options));
215 f.write_str(")");
216 }
217 if let Some(seed) = seed {
218 f.write_str(" ");
219 f.write_node(seed);
220 }
221 }
222 }
223 }
224}
225impl_display_t!(AvroSchema);
226
227#[derive(Debug, Clone, PartialEq, Eq, Hash)]
228pub enum ProtobufSchema<T: AstInfo> {
229 Csr {
230 csr_connection: CsrConnectionProtobuf<T>,
231 },
232 InlineSchema {
233 message_name: String,
234 schema: Schema,
235 },
236}
237
238impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
239 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
240 match self {
241 Self::Csr { csr_connection } => {
242 f.write_node(csr_connection);
243 }
244 Self::InlineSchema {
245 message_name,
246 schema,
247 } => {
248 f.write_str("MESSAGE '");
249 f.write_node(&display::escape_single_quote_string(message_name));
250 f.write_str("' USING ");
251 f.write_str(schema);
252 }
253 }
254 }
255}
256impl_display_t!(ProtobufSchema);
257
258#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
259pub enum CsrConfigOptionName<T: AstInfo> {
260 AvroKeyFullname,
261 AvroValueFullname,
262 NullDefaults,
263 AvroDocOn(AvroDocOn<T>),
264 KeyCompatibilityLevel,
265 ValueCompatibilityLevel,
266}
267
268impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
269 fn redact_value(&self) -> bool {
275 match self {
276 Self::AvroKeyFullname
277 | Self::AvroValueFullname
278 | Self::NullDefaults
279 | Self::AvroDocOn(_)
280 | Self::KeyCompatibilityLevel
281 | Self::ValueCompatibilityLevel => false,
282 }
283 }
284}
285
286#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
287pub struct AvroDocOn<T: AstInfo> {
288 pub identifier: DocOnIdentifier<T>,
289 pub for_schema: DocOnSchema,
290}
291#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
292pub enum DocOnSchema {
293 KeyOnly,
294 ValueOnly,
295 All,
296}
297
298#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
299pub enum DocOnIdentifier<T: AstInfo> {
300 Column(ColumnName<T>),
301 Type(T::ItemName),
302}
303
304impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
305 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
306 match &self.for_schema {
307 DocOnSchema::KeyOnly => f.write_str("KEY "),
308 DocOnSchema::ValueOnly => f.write_str("VALUE "),
309 DocOnSchema::All => {}
310 }
311 match &self.identifier {
312 DocOnIdentifier::Column(name) => {
313 f.write_str("DOC ON COLUMN ");
314 f.write_node(name);
315 }
316 DocOnIdentifier::Type(name) => {
317 f.write_str("DOC ON TYPE ");
318 f.write_node(name);
319 }
320 }
321 }
322}
323impl_display_t!(AvroDocOn);
324
325impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
326 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
327 match self {
328 CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
329 CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
330 CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
331 CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
332 CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
333 CsrConfigOptionName::ValueCompatibilityLevel => {
334 f.write_str("VALUE COMPATIBILITY LEVEL")
335 }
336 }
337 }
338}
339impl_display_t!(CsrConfigOptionName);
340
341#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
342pub struct CsrConfigOption<T: AstInfo> {
344 pub name: CsrConfigOptionName<T>,
345 pub value: Option<WithOptionValue<T>>,
346}
347impl_display_for_with_option!(CsrConfigOption);
348impl_display_t!(CsrConfigOption);
349
350#[derive(Debug, Clone, PartialEq, Eq, Hash)]
351pub struct CsrConnection<T: AstInfo> {
352 pub connection: T::ItemName,
353 pub options: Vec<CsrConfigOption<T>>,
354}
355
356impl<T: AstInfo> AstDisplay for CsrConnection<T> {
357 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
358 f.write_str("CONNECTION ");
359 f.write_node(&self.connection);
360 if !self.options.is_empty() {
361 f.write_str(" (");
362 f.write_node(&display::comma_separated(&self.options));
363 f.write_str(")");
364 }
365 }
366}
367impl_display_t!(CsrConnection);
368
369#[derive(Debug, Clone, PartialEq, Eq, Hash)]
370pub enum ReaderSchemaSelectionStrategy {
371 Latest,
372 Inline(String),
373 ById(i32),
374}
375
376impl Default for ReaderSchemaSelectionStrategy {
377 fn default() -> Self {
378 Self::Latest
379 }
380}
381
382#[derive(Debug, Clone, PartialEq, Eq, Hash)]
383pub struct CsrConnectionAvro<T: AstInfo> {
384 pub connection: CsrConnection<T>,
385 pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
386 pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
387 pub seed: Option<CsrSeedAvro>,
388}
389
390impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
391 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
392 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
393 f.write_node(&self.connection);
394 if let Some(seed) = &self.seed {
395 f.write_str(" ");
396 f.write_node(seed);
397 }
398 }
399}
400impl_display_t!(CsrConnectionAvro);
401
402#[derive(Debug, Clone, PartialEq, Eq, Hash)]
403pub struct CsrConnectionProtobuf<T: AstInfo> {
404 pub connection: CsrConnection<T>,
405 pub seed: Option<CsrSeedProtobuf>,
406}
407
408impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
409 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
410 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
411 f.write_node(&self.connection);
412
413 if let Some(seed) = &self.seed {
414 f.write_str(" ");
415 f.write_node(seed);
416 }
417 }
418}
419impl_display_t!(CsrConnectionProtobuf);
420
421#[derive(Debug, Clone, PartialEq, Eq, Hash)]
422pub struct CsrSeedAvro {
423 pub key_schema: Option<String>,
424 pub value_schema: String,
425 pub key_reference_schemas: Vec<String>,
428 pub value_reference_schemas: Vec<String>,
431}
432
433impl AstDisplay for CsrSeedAvro {
434 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
435 f.write_str("SEED");
436 if let Some(key_schema) = &self.key_schema {
437 f.write_str(" KEY SCHEMA '");
438 f.write_node(&display::escape_single_quote_string(key_schema));
439 f.write_str("'");
440 if !self.key_reference_schemas.is_empty() {
441 f.write_str(" KEY REFERENCES (");
442 for (i, schema) in self.key_reference_schemas.iter().enumerate() {
443 if i > 0 {
444 f.write_str(", ");
445 }
446 f.write_str("'");
447 f.write_node(&display::escape_single_quote_string(schema));
448 f.write_str("'");
449 }
450 f.write_str(")");
451 }
452 }
453 f.write_str(" VALUE SCHEMA '");
454 f.write_node(&display::escape_single_quote_string(&self.value_schema));
455 f.write_str("'");
456 if !self.value_reference_schemas.is_empty() {
457 f.write_str(" VALUE REFERENCES (");
458 for (i, schema) in self.value_reference_schemas.iter().enumerate() {
459 if i > 0 {
460 f.write_str(", ");
461 }
462 f.write_str("'");
463 f.write_node(&display::escape_single_quote_string(schema));
464 f.write_str("'");
465 }
466 f.write_str(")");
467 }
468 }
469}
470impl_display!(CsrSeedAvro);
471
472#[derive(Debug, Clone, PartialEq, Eq, Hash)]
488pub struct GlueAvroSeed {
489 pub value_schema: String,
490}
491
492impl AstDisplay for GlueAvroSeed {
493 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
494 f.write_str("SEED VALUE SCHEMA '");
495 f.write_node(&display::escape_single_quote_string(&self.value_schema));
496 f.write_str("'");
497 }
498}
499impl_display!(GlueAvroSeed);
500
501#[derive(Debug, Clone, PartialEq, Eq, Hash)]
502pub struct CsrSeedProtobuf {
503 pub key: Option<CsrSeedProtobufSchema>,
504 pub value: CsrSeedProtobufSchema,
505}
506
507impl AstDisplay for CsrSeedProtobuf {
508 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
509 f.write_str("SEED");
510 if let Some(key) = &self.key {
511 f.write_str(" KEY ");
512 f.write_node(key);
513 }
514 f.write_str(" VALUE ");
515 f.write_node(&self.value);
516 }
517}
518impl_display!(CsrSeedProtobuf);
519
520#[derive(Debug, Clone, PartialEq, Eq, Hash)]
521pub struct CsrSeedProtobufSchema {
522 pub schema: String,
524 pub message_name: String,
525}
526impl AstDisplay for CsrSeedProtobufSchema {
527 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
528 f.write_str("SCHEMA '");
529 f.write_str(&display::escape_single_quote_string(&self.schema));
530 f.write_str("' MESSAGE '");
531 f.write_str(&display::escape_single_quote_string(&self.message_name));
532 f.write_str("'");
533 }
534}
535impl_display!(CsrSeedProtobufSchema);
536
537#[derive(Debug, Clone, PartialEq, Eq, Hash)]
538pub enum FormatSpecifier<T: AstInfo> {
539 Bare(Format<T>),
541 KeyValue { key: Format<T>, value: Format<T> },
543}
544
545impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
546 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
547 match self {
548 FormatSpecifier::Bare(format) => {
549 f.write_str("FORMAT ");
550 f.write_node(format)
551 }
552 FormatSpecifier::KeyValue { key, value } => {
553 f.write_str("KEY FORMAT ");
554 f.write_node(key);
555 f.write_str(" VALUE FORMAT ");
556 f.write_node(value);
557 }
558 }
559 }
560}
561impl_display_t!(FormatSpecifier);
562
563#[derive(Debug, Clone, PartialEq, Eq, Hash)]
564pub enum Format<T: AstInfo> {
565 Bytes,
566 Avro(AvroSchema<T>),
567 Protobuf(ProtobufSchema<T>),
568 Regex(String),
569 Csv {
570 columns: CsvColumns,
571 delimiter: char,
572 },
573 Json {
574 array: bool,
575 },
576 Text,
577}
578
579#[derive(Debug, Clone, PartialEq, Eq, Hash)]
580pub enum CsvColumns {
581 Count(u64),
583 Header { names: Vec<Ident> },
585}
586
587impl AstDisplay for CsvColumns {
588 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
589 match self {
590 CsvColumns::Count(n) => {
591 f.write_str(n);
592 f.write_str(" COLUMNS")
593 }
594 CsvColumns::Header { names } => {
595 f.write_str("HEADER");
596 if !names.is_empty() {
597 f.write_str(" (");
598 f.write_node(&display::comma_separated(names));
599 f.write_str(")");
600 }
601 }
602 }
603 }
604}
605
606#[derive(Debug, Clone, PartialEq, Eq, Hash)]
607pub enum SourceIncludeMetadata {
608 Key {
609 alias: Option<Ident>,
610 },
611 Timestamp {
612 alias: Option<Ident>,
613 },
614 Partition {
615 alias: Option<Ident>,
616 },
617 Offset {
618 alias: Option<Ident>,
619 },
620 Headers {
621 alias: Option<Ident>,
622 },
623 Header {
624 key: String,
625 alias: Ident,
626 use_bytes: bool,
627 },
628}
629
630impl AstDisplay for SourceIncludeMetadata {
631 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
632 let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
633 if let Some(alias) = alias {
634 f.write_str(" AS ");
635 f.write_node(alias);
636 }
637 };
638
639 match self {
640 SourceIncludeMetadata::Key { alias } => {
641 f.write_str("KEY");
642 print_alias(f, alias);
643 }
644 SourceIncludeMetadata::Timestamp { alias } => {
645 f.write_str("TIMESTAMP");
646 print_alias(f, alias);
647 }
648 SourceIncludeMetadata::Partition { alias } => {
649 f.write_str("PARTITION");
650 print_alias(f, alias);
651 }
652 SourceIncludeMetadata::Offset { alias } => {
653 f.write_str("OFFSET");
654 print_alias(f, alias);
655 }
656 SourceIncludeMetadata::Headers { alias } => {
657 f.write_str("HEADERS");
658 print_alias(f, alias);
659 }
660 SourceIncludeMetadata::Header {
661 alias,
662 key,
663 use_bytes,
664 } => {
665 f.write_str("HEADER '");
666 f.write_str(&display::escape_single_quote_string(key));
667 f.write_str("'");
668 print_alias(f, &Some(alias.clone()));
669 if *use_bytes {
670 f.write_str(" BYTES");
671 }
672 }
673 }
674 }
675}
676impl_display!(SourceIncludeMetadata);
677
678#[derive(Debug, Clone, PartialEq, Eq, Hash)]
679pub enum SourceErrorPolicy {
680 Inline {
681 alias: Option<Ident>,
683 },
684}
685
686impl AstDisplay for SourceErrorPolicy {
687 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
688 match self {
689 Self::Inline { alias } => {
690 f.write_str("INLINE");
691 if let Some(alias) = alias {
692 f.write_str(" AS ");
693 f.write_node(alias);
694 }
695 }
696 }
697 }
698}
699impl_display!(SourceErrorPolicy);
700
701#[derive(Debug, Clone, PartialEq, Eq, Hash)]
702pub enum SourceEnvelope {
703 None,
704 Debezium,
705 Upsert {
706 value_decode_err_policy: Vec<SourceErrorPolicy>,
707 },
708 CdcV2,
709}
710
711impl SourceEnvelope {
712 pub fn requires_all_input(&self) -> bool {
715 match self {
716 SourceEnvelope::None => false,
717 SourceEnvelope::Debezium => false,
718 SourceEnvelope::Upsert { .. } => false,
719 SourceEnvelope::CdcV2 => true,
720 }
721 }
722}
723
724impl AstDisplay for SourceEnvelope {
725 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
726 match self {
727 Self::None => {
728 f.write_str("NONE");
730 }
731 Self::Debezium => {
732 f.write_str("DEBEZIUM");
733 }
734 Self::Upsert {
735 value_decode_err_policy,
736 } => {
737 if value_decode_err_policy.is_empty() {
738 f.write_str("UPSERT");
739 } else {
740 f.write_str("UPSERT (VALUE DECODING ERRORS = (");
741 f.write_node(&display::comma_separated(value_decode_err_policy));
742 f.write_str("))")
743 }
744 }
745 Self::CdcV2 => {
746 f.write_str("MATERIALIZE");
747 }
748 }
749 }
750}
751impl_display!(SourceEnvelope);
752
753#[derive(Debug, Clone, PartialEq, Eq, Hash)]
754pub enum SinkEnvelope {
755 Debezium,
756 Upsert,
757}
758
759impl AstDisplay for SinkEnvelope {
760 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
761 match self {
762 Self::Upsert => {
763 f.write_str("UPSERT");
764 }
765 Self::Debezium => {
766 f.write_str("DEBEZIUM");
767 }
768 }
769 }
770}
771impl_display!(SinkEnvelope);
772
773#[derive(Debug, Clone, PartialEq, Eq, Hash)]
774pub enum IcebergSinkMode {
775 Upsert,
776 Append,
777}
778
779impl AstDisplay for IcebergSinkMode {
780 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
781 match self {
782 Self::Upsert => {
783 f.write_str("UPSERT");
784 }
785 Self::Append => {
786 f.write_str("APPEND");
787 }
788 }
789 }
790}
791impl_display!(IcebergSinkMode);
792
793#[derive(Debug, Clone, PartialEq, Eq, Hash)]
794pub enum SubscribeOutput<T: AstInfo> {
795 Diffs,
796 WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
797 EnvelopeUpsert { key_columns: Vec<Ident> },
798 EnvelopeDebezium { key_columns: Vec<Ident> },
799}
800
801impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
802 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
803 match self {
804 Self::Diffs => {}
805 Self::WithinTimestampOrderBy { order_by } => {
806 f.write_str(" WITHIN TIMESTAMP ORDER BY ");
807 f.write_node(&display::comma_separated(order_by));
808 }
809 Self::EnvelopeUpsert { key_columns } => {
810 f.write_str(" ENVELOPE UPSERT (KEY (");
811 f.write_node(&display::comma_separated(key_columns));
812 f.write_str("))");
813 }
814 Self::EnvelopeDebezium { key_columns } => {
815 f.write_str(" ENVELOPE DEBEZIUM (KEY (");
816 f.write_node(&display::comma_separated(key_columns));
817 f.write_str("))");
818 }
819 }
820 }
821}
822impl_display_t!(SubscribeOutput);
823
824impl<T: AstInfo> AstDisplay for Format<T> {
825 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
826 match self {
827 Self::Bytes => f.write_str("BYTES"),
828 Self::Avro(inner) => {
829 f.write_str("AVRO ");
830 f.write_node(inner);
831 }
832 Self::Protobuf(inner) => {
833 f.write_str("PROTOBUF ");
834 f.write_node(inner);
835 }
836 Self::Regex(regex) => {
837 f.write_str("REGEX '");
838 f.write_node(&display::escape_single_quote_string(regex));
839 f.write_str("'");
840 }
841 Self::Csv { columns, delimiter } => {
842 f.write_str("CSV WITH ");
843 f.write_node(columns);
844
845 if *delimiter != ',' {
846 f.write_str(" DELIMITED BY '");
847 f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
848 f.write_str("'");
849 }
850 }
851 Self::Json { array } => {
852 f.write_str("JSON");
853 if *array {
854 f.write_str(" ARRAY");
855 }
856 }
857 Self::Text => f.write_str("TEXT"),
858 }
859 }
860}
861impl_display_t!(Format);
862
863#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
867pub enum ConnectionOptionName {
868 AccessKeyId,
869 AssumeRoleArn,
870 AssumeRoleSessionName,
871 AvailabilityZones,
872 AwsConnection,
873 AwsPrivatelink,
874 Broker,
875 Brokers,
876 Credential,
877 Database,
878 Endpoint,
879 GcpConnection,
880 Host,
881 Password,
882 Port,
883 ProgressTopic,
884 ProgressTopicReplicationFactor,
885 PublicKey1,
886 PublicKey2,
887 Region,
888 Registry,
889 SaslMechanisms,
890 SaslPassword,
891 SaslUsername,
892 Scope,
893 SecretAccessKey,
894 SecurityProtocol,
895 ServiceAccountKey,
896 ServiceName,
897 SshTunnel,
898 SslCertificate,
899 SslCertificateAuthority,
900 SslKey,
901 SslMode,
902 SessionToken,
903 CatalogType,
904 Url,
905 User,
906 Warehouse,
907}
908
909impl AstDisplay for ConnectionOptionName {
910 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
911 f.write_str(match self {
912 ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
913 ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
914 ConnectionOptionName::AwsConnection => "AWS CONNECTION",
915 ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
916 ConnectionOptionName::Broker => "BROKER",
917 ConnectionOptionName::Brokers => "BROKERS",
918 ConnectionOptionName::Credential => "CREDENTIAL",
919 ConnectionOptionName::Database => "DATABASE",
920 ConnectionOptionName::Endpoint => "ENDPOINT",
921 ConnectionOptionName::GcpConnection => "GCP CONNECTION",
922 ConnectionOptionName::Host => "HOST",
923 ConnectionOptionName::Password => "PASSWORD",
924 ConnectionOptionName::Port => "PORT",
925 ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
926 ConnectionOptionName::ProgressTopicReplicationFactor => {
927 "PROGRESS TOPIC REPLICATION FACTOR"
928 }
929 ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
930 ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
931 ConnectionOptionName::Region => "REGION",
932 ConnectionOptionName::Registry => "REGISTRY",
933 ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
934 ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
935 ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
936 ConnectionOptionName::SaslPassword => "SASL PASSWORD",
937 ConnectionOptionName::SaslUsername => "SASL USERNAME",
938 ConnectionOptionName::Scope => "SCOPE",
939 ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
940 ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
941 ConnectionOptionName::ServiceAccountKey => "SERVICE ACCOUNT KEY",
942 ConnectionOptionName::ServiceName => "SERVICE NAME",
943 ConnectionOptionName::SshTunnel => "SSH TUNNEL",
944 ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
945 ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
946 ConnectionOptionName::SslKey => "SSL KEY",
947 ConnectionOptionName::SslMode => "SSL MODE",
948 ConnectionOptionName::SessionToken => "SESSION TOKEN",
949 ConnectionOptionName::CatalogType => "CATALOG TYPE",
950 ConnectionOptionName::Url => "URL",
951 ConnectionOptionName::User => "USER",
952 ConnectionOptionName::Warehouse => "WAREHOUSE",
953 })
954 }
955}
956impl_display!(ConnectionOptionName);
957
958impl WithOptionName for ConnectionOptionName {
959 fn redact_value(&self) -> bool {
965 match self {
966 ConnectionOptionName::AccessKeyId
967 | ConnectionOptionName::AvailabilityZones
968 | ConnectionOptionName::AwsConnection
969 | ConnectionOptionName::AwsPrivatelink
970 | ConnectionOptionName::Broker
971 | ConnectionOptionName::Brokers
972 | ConnectionOptionName::Credential
973 | ConnectionOptionName::Database
974 | ConnectionOptionName::Endpoint
975 | ConnectionOptionName::GcpConnection
976 | ConnectionOptionName::Host
977 | ConnectionOptionName::Password
978 | ConnectionOptionName::Port
979 | ConnectionOptionName::ProgressTopic
980 | ConnectionOptionName::ProgressTopicReplicationFactor
981 | ConnectionOptionName::PublicKey1
982 | ConnectionOptionName::PublicKey2
983 | ConnectionOptionName::Region
984 | ConnectionOptionName::Registry
985 | ConnectionOptionName::AssumeRoleArn
986 | ConnectionOptionName::AssumeRoleSessionName
987 | ConnectionOptionName::SaslMechanisms
988 | ConnectionOptionName::SaslPassword
989 | ConnectionOptionName::SaslUsername
990 | ConnectionOptionName::Scope
991 | ConnectionOptionName::SecurityProtocol
992 | ConnectionOptionName::SecretAccessKey
993 | ConnectionOptionName::ServiceAccountKey
994 | ConnectionOptionName::ServiceName
995 | ConnectionOptionName::SshTunnel
996 | ConnectionOptionName::SslCertificate
997 | ConnectionOptionName::SslCertificateAuthority
998 | ConnectionOptionName::SslKey
999 | ConnectionOptionName::SslMode
1000 | ConnectionOptionName::SessionToken
1001 | ConnectionOptionName::CatalogType
1002 | ConnectionOptionName::Url
1003 | ConnectionOptionName::User
1004 | ConnectionOptionName::Warehouse => false,
1005 }
1006 }
1007}
1008
1009#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1010pub struct ConnectionOption<T: AstInfo> {
1012 pub name: ConnectionOptionName,
1013 pub value: Option<WithOptionValue<T>>,
1014}
1015impl_display_for_with_option!(ConnectionOption);
1016impl_display_t!(ConnectionOption);
1017
1018#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
1019pub enum CreateConnectionType {
1020 Aws,
1021 AwsPrivatelink,
1022 GlueSchemaRegistry,
1023 Gcp,
1024 Kafka,
1025 Csr,
1026 Postgres,
1027 Ssh,
1028 SqlServer,
1029 MySql,
1030 IcebergCatalog,
1031}
1032
1033impl CreateConnectionType {
1034 pub fn as_str(&self) -> &'static str {
1035 match self {
1036 Self::Kafka => "kafka",
1037 Self::Csr => "confluent-schema-registry",
1038 Self::Postgres => "postgres",
1039 Self::Aws => "aws",
1040 Self::AwsPrivatelink => "aws-privatelink",
1041 Self::GlueSchemaRegistry => "glue-schema-registry",
1042 Self::Gcp => "gcp",
1043 Self::Ssh => "ssh-tunnel",
1044 Self::MySql => "mysql",
1045 Self::SqlServer => "sql-server",
1046 Self::IcebergCatalog => "iceberg-catalog",
1047 }
1048 }
1049}
1050
1051impl AstDisplay for CreateConnectionType {
1052 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1053 match self {
1054 Self::Kafka => {
1055 f.write_str("KAFKA");
1056 }
1057 Self::Csr => {
1058 f.write_str("CONFLUENT SCHEMA REGISTRY");
1059 }
1060 Self::Postgres => {
1061 f.write_str("POSTGRES");
1062 }
1063 Self::Aws => {
1064 f.write_str("AWS");
1065 }
1066 Self::AwsPrivatelink => {
1067 f.write_str("AWS PRIVATELINK");
1068 }
1069 Self::GlueSchemaRegistry => {
1070 f.write_str("AWS GLUE SCHEMA REGISTRY");
1071 }
1072 Self::Gcp => {
1073 f.write_str("GCP");
1074 }
1075 Self::Ssh => {
1076 f.write_str("SSH TUNNEL");
1077 }
1078 Self::SqlServer => {
1079 f.write_str("SQL SERVER");
1080 }
1081 Self::MySql => {
1082 f.write_str("MYSQL");
1083 }
1084 Self::IcebergCatalog => {
1085 f.write_str("ICEBERG CATALOG");
1086 }
1087 }
1088 }
1089}
1090impl_display!(CreateConnectionType);
1091
1092#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1093pub enum CreateConnectionOptionName {
1094 Validate,
1095}
1096
1097impl AstDisplay for CreateConnectionOptionName {
1098 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1099 f.write_str(match self {
1100 CreateConnectionOptionName::Validate => "VALIDATE",
1101 })
1102 }
1103}
1104impl_display!(CreateConnectionOptionName);
1105
1106impl WithOptionName for CreateConnectionOptionName {
1107 fn redact_value(&self) -> bool {
1113 match self {
1114 CreateConnectionOptionName::Validate => false,
1115 }
1116 }
1117}
1118
1119#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1120pub struct CreateConnectionOption<T: AstInfo> {
1122 pub name: CreateConnectionOptionName,
1123 pub value: Option<WithOptionValue<T>>,
1124}
1125impl_display_for_with_option!(CreateConnectionOption);
1126impl_display_t!(CreateConnectionOption);
1127
1128#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1129pub enum KafkaSourceConfigOptionName {
1130 GroupIdPrefix,
1131 Topic,
1132 TopicMetadataRefreshInterval,
1133 StartTimestamp,
1134 StartOffset,
1135}
1136
1137impl AstDisplay for KafkaSourceConfigOptionName {
1138 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1139 f.write_str(match self {
1140 KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
1141 KafkaSourceConfigOptionName::Topic => "TOPIC",
1142 KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
1143 "TOPIC METADATA REFRESH INTERVAL"
1144 }
1145 KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
1146 KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
1147 })
1148 }
1149}
1150impl_display!(KafkaSourceConfigOptionName);
1151
1152impl WithOptionName for KafkaSourceConfigOptionName {
1153 fn redact_value(&self) -> bool {
1159 match self {
1160 KafkaSourceConfigOptionName::GroupIdPrefix
1161 | KafkaSourceConfigOptionName::Topic
1162 | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1163 | KafkaSourceConfigOptionName::StartOffset
1164 | KafkaSourceConfigOptionName::StartTimestamp => false,
1165 }
1166 }
1167}
1168
1169#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1170pub struct KafkaSourceConfigOption<T: AstInfo> {
1171 pub name: KafkaSourceConfigOptionName,
1172 pub value: Option<WithOptionValue<T>>,
1173}
1174impl_display_for_with_option!(KafkaSourceConfigOption);
1175impl_display_t!(KafkaSourceConfigOption);
1176
1177#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1178pub enum KafkaSinkConfigOptionName {
1179 CompressionType,
1180 PartitionBy,
1181 ProgressGroupIdPrefix,
1182 Topic,
1183 TransactionalIdPrefix,
1184 LegacyIds,
1185 TopicConfig,
1186 TopicMetadataRefreshInterval,
1187 TopicPartitionCount,
1188 TopicReplicationFactor,
1189}
1190
1191impl AstDisplay for KafkaSinkConfigOptionName {
1192 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1193 f.write_str(match self {
1194 KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1195 KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1196 KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1197 KafkaSinkConfigOptionName::Topic => "TOPIC",
1198 KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1199 KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1200 KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1201 KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1202 "TOPIC METADATA REFRESH INTERVAL"
1203 }
1204 KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1205 KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1206 })
1207 }
1208}
1209impl_display!(KafkaSinkConfigOptionName);
1210
1211impl WithOptionName for KafkaSinkConfigOptionName {
1212 fn redact_value(&self) -> bool {
1218 match self {
1219 KafkaSinkConfigOptionName::CompressionType
1220 | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1221 | KafkaSinkConfigOptionName::Topic
1222 | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1223 | KafkaSinkConfigOptionName::TransactionalIdPrefix
1224 | KafkaSinkConfigOptionName::LegacyIds
1225 | KafkaSinkConfigOptionName::TopicConfig
1226 | KafkaSinkConfigOptionName::TopicPartitionCount
1227 | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1228 KafkaSinkConfigOptionName::PartitionBy => true,
1229 }
1230 }
1231}
1232
1233#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1234pub struct KafkaSinkConfigOption<T: AstInfo> {
1235 pub name: KafkaSinkConfigOptionName,
1236 pub value: Option<WithOptionValue<T>>,
1237}
1238impl_display_for_with_option!(KafkaSinkConfigOption);
1239impl_display_t!(KafkaSinkConfigOption);
1240
1241#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1242pub enum IcebergSinkConfigOptionName {
1243 Namespace,
1244 Table,
1245}
1246
1247impl AstDisplay for IcebergSinkConfigOptionName {
1248 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1249 f.write_str(match self {
1250 IcebergSinkConfigOptionName::Namespace => "NAMESPACE",
1251 IcebergSinkConfigOptionName::Table => "TABLE",
1252 })
1253 }
1254}
1255impl_display!(IcebergSinkConfigOptionName);
1256
1257impl WithOptionName for IcebergSinkConfigOptionName {
1258 fn redact_value(&self) -> bool {
1264 match self {
1265 IcebergSinkConfigOptionName::Namespace | IcebergSinkConfigOptionName::Table => false,
1266 }
1267 }
1268}
1269
1270#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1271pub struct IcebergSinkConfigOption<T: AstInfo> {
1272 pub name: IcebergSinkConfigOptionName,
1273 pub value: Option<WithOptionValue<T>>,
1274}
1275impl_display_for_with_option!(IcebergSinkConfigOption);
1276impl_display_t!(IcebergSinkConfigOption);
1277
1278#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1279pub enum PgConfigOptionName {
1280 Details,
1283 Publication,
1285 TextColumns,
1292 ExcludeColumns,
1299}
1300
1301impl AstDisplay for PgConfigOptionName {
1302 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1303 f.write_str(match self {
1304 PgConfigOptionName::Details => "DETAILS",
1305 PgConfigOptionName::Publication => "PUBLICATION",
1306 PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1307 PgConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1308 })
1309 }
1310}
1311impl_display!(PgConfigOptionName);
1312
1313impl WithOptionName for PgConfigOptionName {
1314 fn redact_value(&self) -> bool {
1320 match self {
1321 PgConfigOptionName::Details
1322 | PgConfigOptionName::Publication
1323 | PgConfigOptionName::TextColumns
1324 | PgConfigOptionName::ExcludeColumns => false,
1325 }
1326 }
1327}
1328
1329#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1330pub struct PgConfigOption<T: AstInfo> {
1332 pub name: PgConfigOptionName,
1333 pub value: Option<WithOptionValue<T>>,
1334}
1335impl_display_for_with_option!(PgConfigOption);
1336impl_display_t!(PgConfigOption);
1337
1338#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1339pub enum MySqlConfigOptionName {
1340 Details,
1343 TextColumns,
1350 ExcludeColumns,
1357}
1358
1359impl AstDisplay for MySqlConfigOptionName {
1360 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1361 f.write_str(match self {
1362 MySqlConfigOptionName::Details => "DETAILS",
1363 MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1364 MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1365 })
1366 }
1367}
1368impl_display!(MySqlConfigOptionName);
1369
1370impl WithOptionName for MySqlConfigOptionName {
1371 fn redact_value(&self) -> bool {
1377 match self {
1378 MySqlConfigOptionName::Details
1379 | MySqlConfigOptionName::TextColumns
1380 | MySqlConfigOptionName::ExcludeColumns => false,
1381 }
1382 }
1383}
1384
1385#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1386pub struct MySqlConfigOption<T: AstInfo> {
1388 pub name: MySqlConfigOptionName,
1389 pub value: Option<WithOptionValue<T>>,
1390}
1391impl_display_for_with_option!(MySqlConfigOption);
1392impl_display_t!(MySqlConfigOption);
1393
1394#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1395pub enum SqlServerConfigOptionName {
1396 Details,
1399 TextColumns,
1407 ExcludeColumns,
1415}
1416
1417impl AstDisplay for SqlServerConfigOptionName {
1418 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1419 f.write_str(match self {
1420 SqlServerConfigOptionName::Details => "DETAILS",
1421 SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1422 SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1423 })
1424 }
1425}
1426impl_display!(SqlServerConfigOptionName);
1427
1428impl WithOptionName for SqlServerConfigOptionName {
1429 fn redact_value(&self) -> bool {
1435 match self {
1436 SqlServerConfigOptionName::Details
1437 | SqlServerConfigOptionName::TextColumns
1438 | SqlServerConfigOptionName::ExcludeColumns => false,
1439 }
1440 }
1441}
1442
1443#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1444pub struct SqlServerConfigOption<T: AstInfo> {
1446 pub name: SqlServerConfigOptionName,
1447 pub value: Option<WithOptionValue<T>>,
1448}
1449impl_display_for_with_option!(SqlServerConfigOption);
1450impl_display_t!(SqlServerConfigOption);
1451
1452#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1453pub enum CreateSourceConnection<T: AstInfo> {
1454 Kafka {
1455 connection: T::ItemName,
1456 options: Vec<KafkaSourceConfigOption<T>>,
1457 },
1458 Postgres {
1459 connection: T::ItemName,
1460 options: Vec<PgConfigOption<T>>,
1461 },
1462 SqlServer {
1463 connection: T::ItemName,
1464 options: Vec<SqlServerConfigOption<T>>,
1465 },
1466 MySql {
1467 connection: T::ItemName,
1468 options: Vec<MySqlConfigOption<T>>,
1469 },
1470 LoadGenerator {
1471 generator: LoadGenerator,
1472 options: Vec<LoadGeneratorOption<T>>,
1473 },
1474}
1475
1476impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1477 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1478 match self {
1479 CreateSourceConnection::Kafka {
1480 connection,
1481 options,
1482 } => {
1483 f.write_str("KAFKA CONNECTION ");
1484 f.write_node(connection);
1485 if !options.is_empty() {
1486 f.write_str(" (");
1487 f.write_node(&display::comma_separated(options));
1488 f.write_str(")");
1489 }
1490 }
1491 CreateSourceConnection::Postgres {
1492 connection,
1493 options,
1494 } => {
1495 f.write_str("POSTGRES CONNECTION ");
1496 f.write_node(connection);
1497 if !options.is_empty() {
1498 f.write_str(" (");
1499 f.write_node(&display::comma_separated(options));
1500 f.write_str(")");
1501 }
1502 }
1503 CreateSourceConnection::SqlServer {
1504 connection,
1505 options,
1506 } => {
1507 f.write_str("SQL SERVER CONNECTION ");
1508 f.write_node(connection);
1509 if !options.is_empty() {
1510 f.write_str(" (");
1511 f.write_node(&display::comma_separated(options));
1512 f.write_str(")");
1513 }
1514 }
1515 CreateSourceConnection::MySql {
1516 connection,
1517 options,
1518 } => {
1519 f.write_str("MYSQL CONNECTION ");
1520 f.write_node(connection);
1521 if !options.is_empty() {
1522 f.write_str(" (");
1523 f.write_node(&display::comma_separated(options));
1524 f.write_str(")");
1525 }
1526 }
1527 CreateSourceConnection::LoadGenerator { generator, options } => {
1528 f.write_str("LOAD GENERATOR ");
1529 f.write_node(generator);
1530 if !options.is_empty() {
1531 f.write_str(" (");
1532 f.write_node(&display::comma_separated(options));
1533 f.write_str(")");
1534 }
1535 }
1536 }
1537 }
1538}
1539impl_display_t!(CreateSourceConnection);
1540
1541#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1542pub enum LoadGenerator {
1543 Clock,
1544 Counter,
1545 Marketing,
1546 Auction,
1547 Datums,
1548 Tpch,
1549 KeyValue,
1550}
1551
1552impl AstDisplay for LoadGenerator {
1553 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1554 match self {
1555 Self::Counter => f.write_str("COUNTER"),
1556 Self::Clock => f.write_str("CLOCK"),
1557 Self::Marketing => f.write_str("MARKETING"),
1558 Self::Auction => f.write_str("AUCTION"),
1559 Self::Datums => f.write_str("DATUMS"),
1560 Self::Tpch => f.write_str("TPCH"),
1561 Self::KeyValue => f.write_str("KEY VALUE"),
1562 }
1563 }
1564}
1565impl_display!(LoadGenerator);
1566
1567impl LoadGenerator {
1568 pub fn schema_name(&self) -> &'static str {
1573 match self {
1574 LoadGenerator::Counter => "counter",
1575 LoadGenerator::Clock => "clock",
1576 LoadGenerator::Marketing => "marketing",
1577 LoadGenerator::Auction => "auction",
1578 LoadGenerator::Datums => "datums",
1579 LoadGenerator::Tpch => "tpch",
1580 LoadGenerator::KeyValue => "key_value",
1581 }
1582 }
1583}
1584
1585#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1586pub enum LoadGeneratorOptionName {
1587 ScaleFactor,
1588 TickInterval,
1589 AsOf,
1590 UpTo,
1591 MaxCardinality,
1592 Keys,
1593 SnapshotRounds,
1594 TransactionalSnapshot,
1595 ValueSize,
1596 Seed,
1597 Partitions,
1598 BatchSize,
1599}
1600
1601impl AstDisplay for LoadGeneratorOptionName {
1602 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1603 f.write_str(match self {
1604 LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1605 LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1606 LoadGeneratorOptionName::AsOf => "AS OF",
1607 LoadGeneratorOptionName::UpTo => "UP TO",
1608 LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1609 LoadGeneratorOptionName::Keys => "KEYS",
1610 LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1611 LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1612 LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1613 LoadGeneratorOptionName::Seed => "SEED",
1614 LoadGeneratorOptionName::Partitions => "PARTITIONS",
1615 LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1616 })
1617 }
1618}
1619impl_display!(LoadGeneratorOptionName);
1620
1621impl WithOptionName for LoadGeneratorOptionName {
1622 fn redact_value(&self) -> bool {
1628 match self {
1629 LoadGeneratorOptionName::ScaleFactor
1630 | LoadGeneratorOptionName::TickInterval
1631 | LoadGeneratorOptionName::AsOf
1632 | LoadGeneratorOptionName::UpTo
1633 | LoadGeneratorOptionName::MaxCardinality
1634 | LoadGeneratorOptionName::Keys
1635 | LoadGeneratorOptionName::SnapshotRounds
1636 | LoadGeneratorOptionName::TransactionalSnapshot
1637 | LoadGeneratorOptionName::ValueSize
1638 | LoadGeneratorOptionName::Partitions
1639 | LoadGeneratorOptionName::BatchSize
1640 | LoadGeneratorOptionName::Seed => false,
1641 }
1642 }
1643}
1644
1645#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1646pub struct LoadGeneratorOption<T: AstInfo> {
1648 pub name: LoadGeneratorOptionName,
1649 pub value: Option<WithOptionValue<T>>,
1650}
1651impl_display_for_with_option!(LoadGeneratorOption);
1652impl_display_t!(LoadGeneratorOption);
1653
1654#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1655pub enum CreateSinkConnection<T: AstInfo> {
1656 Kafka {
1657 connection: T::ItemName,
1658 options: Vec<KafkaSinkConfigOption<T>>,
1659 key: Option<SinkKey>,
1660 headers: Option<Ident>,
1661 },
1662 Iceberg {
1663 catalog_connection: T::ItemName,
1664
1665 aws_connection: Option<T::ItemName>,
1667
1668 key: Option<SinkKey>,
1669 options: Vec<IcebergSinkConfigOption<T>>,
1670 },
1671}
1672
1673impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1674 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1675 match self {
1676 CreateSinkConnection::Kafka {
1677 connection,
1678 options,
1679 key,
1680 headers,
1681 } => {
1682 f.write_str("KAFKA CONNECTION ");
1683 f.write_node(connection);
1684 if !options.is_empty() {
1685 f.write_str(" (");
1686 f.write_node(&display::comma_separated(options));
1687 f.write_str(")");
1688 }
1689 if let Some(key) = key.as_ref() {
1690 f.write_str(" ");
1691 f.write_node(key);
1692 }
1693 if let Some(headers) = headers {
1694 f.write_str(" HEADERS ");
1695 f.write_node(headers);
1696 }
1697 }
1698 CreateSinkConnection::Iceberg {
1699 catalog_connection,
1700 aws_connection,
1701 key,
1702 options,
1703 } => {
1704 f.write_str("ICEBERG CATALOG CONNECTION ");
1705 f.write_node(catalog_connection);
1706 if !options.is_empty() {
1707 f.write_str(" (");
1708 f.write_node(&display::comma_separated(options));
1709 f.write_str(")");
1710 }
1711 if let Some(aws_connection) = aws_connection {
1712 f.write_str(" USING AWS CONNECTION ");
1713 f.write_node(aws_connection);
1714 }
1715 if let Some(key) = key.as_ref() {
1716 f.write_str(" ");
1717 f.write_node(key);
1718 }
1719 }
1720 }
1721 }
1722}
1723impl_display_t!(CreateSinkConnection);
1724
1725#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1726pub struct SinkKey {
1727 pub key_columns: Vec<Ident>,
1728 pub not_enforced: bool,
1729}
1730
1731impl AstDisplay for SinkKey {
1732 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1733 f.write_str("KEY (");
1734 f.write_node(&display::comma_separated(&self.key_columns));
1735 f.write_str(")");
1736 if self.not_enforced {
1737 f.write_str(" NOT ENFORCED");
1738 }
1739 }
1740}
1741
1742#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1745pub enum TableConstraint<T: AstInfo> {
1746 Unique {
1748 name: Option<Ident>,
1749 columns: Vec<Ident>,
1750 is_primary: bool,
1752 nulls_not_distinct: bool,
1755 },
1756 ForeignKey {
1759 name: Option<Ident>,
1760 columns: Vec<Ident>,
1761 foreign_table: T::ItemName,
1762 referred_columns: Vec<Ident>,
1763 },
1764 Check {
1766 name: Option<Ident>,
1767 expr: Box<Expr<T>>,
1768 },
1769}
1770
1771impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1772 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1773 match self {
1774 TableConstraint::Unique {
1775 name,
1776 columns,
1777 is_primary,
1778 nulls_not_distinct,
1779 } => {
1780 f.write_node(&display_constraint_name(name));
1781 if *is_primary {
1782 f.write_str("PRIMARY KEY ");
1783 } else {
1784 f.write_str("UNIQUE ");
1785 if *nulls_not_distinct {
1786 f.write_str("NULLS NOT DISTINCT ");
1787 }
1788 }
1789 f.write_str("(");
1790 f.write_node(&display::comma_separated(columns));
1791 f.write_str(")");
1792 }
1793 TableConstraint::ForeignKey {
1794 name,
1795 columns,
1796 foreign_table,
1797 referred_columns,
1798 } => {
1799 f.write_node(&display_constraint_name(name));
1800 f.write_str("FOREIGN KEY (");
1801 f.write_node(&display::comma_separated(columns));
1802 f.write_str(") REFERENCES ");
1803 f.write_node(foreign_table);
1804 f.write_str("(");
1805 f.write_node(&display::comma_separated(referred_columns));
1806 f.write_str(")");
1807 }
1808 TableConstraint::Check { name, expr } => {
1809 f.write_node(&display_constraint_name(name));
1810 f.write_str("CHECK (");
1811 f.write_node(&expr);
1812 f.write_str(")");
1813 }
1814 }
1815 }
1816}
1817impl_display_t!(TableConstraint);
1818
1819#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1821pub enum KeyConstraint {
1822 PrimaryKeyNotEnforced { columns: Vec<Ident> },
1824}
1825
1826impl AstDisplay for KeyConstraint {
1827 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1828 match self {
1829 KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1830 f.write_str("PRIMARY KEY ");
1831 f.write_str("(");
1832 f.write_node(&display::comma_separated(columns));
1833 f.write_str(") ");
1834 f.write_str("NOT ENFORCED");
1835 }
1836 }
1837 }
1838}
1839impl_display!(KeyConstraint);
1840
1841#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1842pub enum CreateSourceOptionName {
1843 TimestampInterval,
1844 RetainHistory,
1845}
1846
1847impl AstDisplay for CreateSourceOptionName {
1848 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1849 f.write_str(match self {
1850 CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1851 CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1852 })
1853 }
1854}
1855impl_display!(CreateSourceOptionName);
1856
1857impl WithOptionName for CreateSourceOptionName {
1858 fn redact_value(&self) -> bool {
1864 match self {
1865 CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1866 false
1867 }
1868 }
1869 }
1870}
1871
1872#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1873pub struct CreateSourceOption<T: AstInfo> {
1875 pub name: CreateSourceOptionName,
1876 pub value: Option<WithOptionValue<T>>,
1877}
1878impl_display_for_with_option!(CreateSourceOption);
1879impl_display_t!(CreateSourceOption);
1880
1881#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1883pub struct ColumnDef<T: AstInfo> {
1884 pub name: Ident,
1885 pub data_type: T::DataType,
1886 pub collation: Option<UnresolvedItemName>,
1887 pub options: Vec<ColumnOptionDef<T>>,
1888}
1889
1890impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1891 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1892 f.write_node(&self.name);
1893 f.write_str(" ");
1894 f.write_node(&self.data_type);
1895 if let Some(collation) = &self.collation {
1896 f.write_str(" COLLATE ");
1897 f.write_node(collation);
1898 }
1899 for option in &self.options {
1900 f.write_str(" ");
1901 f.write_node(option);
1902 }
1903 }
1904}
1905impl_display_t!(ColumnDef);
1906
1907#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1924pub struct ColumnOptionDef<T: AstInfo> {
1925 pub name: Option<Ident>,
1926 pub option: ColumnOption<T>,
1927}
1928
1929impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1930 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1931 f.write_node(&display_constraint_name(&self.name));
1932 f.write_node(&self.option);
1933 }
1934}
1935impl_display_t!(ColumnOptionDef);
1936
1937#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1940pub enum ColumnOption<T: AstInfo> {
1941 Null,
1943 NotNull,
1945 Default(Expr<T>),
1947 Unique { is_primary: bool },
1949 ForeignKey {
1952 foreign_table: UnresolvedItemName,
1953 referred_columns: Vec<Ident>,
1954 },
1955 Check(Expr<T>),
1957 Versioned {
1959 action: ColumnVersioned,
1960 version: Version,
1961 },
1962}
1963
1964impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1965 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1966 use ColumnOption::*;
1967 match self {
1968 Null => f.write_str("NULL"),
1969 NotNull => f.write_str("NOT NULL"),
1970 Default(expr) => {
1971 f.write_str("DEFAULT ");
1972 f.write_node(expr);
1973 }
1974 Unique { is_primary } => {
1975 if *is_primary {
1976 f.write_str("PRIMARY KEY");
1977 } else {
1978 f.write_str("UNIQUE");
1979 }
1980 }
1981 ForeignKey {
1982 foreign_table,
1983 referred_columns,
1984 } => {
1985 f.write_str("REFERENCES ");
1986 f.write_node(foreign_table);
1987 f.write_str(" (");
1988 f.write_node(&display::comma_separated(referred_columns));
1989 f.write_str(")");
1990 }
1991 Check(expr) => {
1992 f.write_str("CHECK (");
1993 f.write_node(expr);
1994 f.write_str(")");
1995 }
1996 Versioned { action, version } => {
1997 f.write_str("VERSION ");
1998 f.write_node(action);
1999 f.write_str(" ");
2000 f.write_node(version);
2001 }
2002 }
2003 }
2004}
2005impl_display_t!(ColumnOption);
2006
2007#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2008pub enum ColumnVersioned {
2009 Added,
2010}
2011
2012impl AstDisplay for ColumnVersioned {
2013 fn fmt<W>(&self, f: &mut AstFormatter<W>)
2014 where
2015 W: fmt::Write,
2016 {
2017 match self {
2018 ColumnVersioned::Added => f.write_str("ADDED"),
2020 }
2021 }
2022}
2023impl_display!(ColumnVersioned);
2024
2025fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
2026 struct ConstraintName<'a>(&'a Option<Ident>);
2027 impl<'a> AstDisplay for ConstraintName<'a> {
2028 fn fmt<W>(&self, f: &mut AstFormatter<W>)
2029 where
2030 W: fmt::Write,
2031 {
2032 if let Some(name) = self.0 {
2033 f.write_str("CONSTRAINT ");
2034 f.write_node(name);
2035 f.write_str(" ");
2036 }
2037 }
2038 }
2039 ConstraintName(name)
2040}