1use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28 AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33 AssertNotNull,
35 PartitionBy,
36 RetainHistory,
37 Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43 match self {
44 MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45 MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46 MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47 MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48 }
49 }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53 fn redact_value(&self) -> bool {
59 match self {
60 MaterializedViewOptionName::AssertNotNull
61 | MaterializedViewOptionName::PartitionBy
62 | MaterializedViewOptionName::RetainHistory
63 | MaterializedViewOptionName::Refresh => false,
64 }
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70 pub name: MaterializedViewOptionName,
71 pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, Hash)]
76pub struct Schema {
77 pub schema: String,
78}
79
80impl AstDisplay for Schema {
81 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
82 f.write_str("SCHEMA '");
83 f.write_node(&display::escape_single_quote_string(&self.schema));
84 f.write_str("'");
85 }
86}
87impl_display!(Schema);
88
89#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
90pub enum AvroSchemaOptionName {
91 ConfluentWireFormat,
93}
94
95impl AstDisplay for AvroSchemaOptionName {
96 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
97 match self {
98 AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
99 }
100 }
101}
102
103impl WithOptionName for AvroSchemaOptionName {
104 fn redact_value(&self) -> bool {
110 match self {
111 Self::ConfluentWireFormat => false,
112 }
113 }
114}
115
116#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
117pub struct AvroSchemaOption<T: AstInfo> {
118 pub name: AvroSchemaOptionName,
119 pub value: Option<WithOptionValue<T>>,
120}
121impl_display_for_with_option!(AvroSchemaOption);
122
123#[derive(Debug, Clone, PartialEq, Eq, Hash)]
124pub enum AvroSchema<T: AstInfo> {
125 Csr {
126 csr_connection: CsrConnectionAvro<T>,
127 },
128 InlineSchema {
129 schema: Schema,
130 with_options: Vec<AvroSchemaOption<T>>,
131 },
132}
133
134impl<T: AstInfo> AstDisplay for AvroSchema<T> {
135 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
136 match self {
137 Self::Csr { csr_connection } => {
138 f.write_node(csr_connection);
139 }
140 Self::InlineSchema {
141 schema,
142 with_options,
143 } => {
144 f.write_str("USING ");
145 schema.fmt(f);
146 if !with_options.is_empty() {
147 f.write_str(" (");
148 f.write_node(&display::comma_separated(with_options));
149 f.write_str(")");
150 }
151 }
152 }
153 }
154}
155impl_display_t!(AvroSchema);
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum ProtobufSchema<T: AstInfo> {
159 Csr {
160 csr_connection: CsrConnectionProtobuf<T>,
161 },
162 InlineSchema {
163 message_name: String,
164 schema: Schema,
165 },
166}
167
168impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
169 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
170 match self {
171 Self::Csr { csr_connection } => {
172 f.write_node(csr_connection);
173 }
174 Self::InlineSchema {
175 message_name,
176 schema,
177 } => {
178 f.write_str("MESSAGE '");
179 f.write_node(&display::escape_single_quote_string(message_name));
180 f.write_str("' USING ");
181 f.write_str(schema);
182 }
183 }
184 }
185}
186impl_display_t!(ProtobufSchema);
187
188#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
189pub enum CsrConfigOptionName<T: AstInfo> {
190 AvroKeyFullname,
191 AvroValueFullname,
192 NullDefaults,
193 AvroDocOn(AvroDocOn<T>),
194 KeyCompatibilityLevel,
195 ValueCompatibilityLevel,
196}
197
198impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
199 fn redact_value(&self) -> bool {
205 match self {
206 Self::AvroKeyFullname
207 | Self::AvroValueFullname
208 | Self::NullDefaults
209 | Self::AvroDocOn(_)
210 | Self::KeyCompatibilityLevel
211 | Self::ValueCompatibilityLevel => false,
212 }
213 }
214}
215
216#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
217pub struct AvroDocOn<T: AstInfo> {
218 pub identifier: DocOnIdentifier<T>,
219 pub for_schema: DocOnSchema,
220}
221#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
222pub enum DocOnSchema {
223 KeyOnly,
224 ValueOnly,
225 All,
226}
227
228#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
229pub enum DocOnIdentifier<T: AstInfo> {
230 Column(ColumnName<T>),
231 Type(T::ItemName),
232}
233
234impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
235 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
236 match &self.for_schema {
237 DocOnSchema::KeyOnly => f.write_str("KEY "),
238 DocOnSchema::ValueOnly => f.write_str("VALUE "),
239 DocOnSchema::All => {}
240 }
241 match &self.identifier {
242 DocOnIdentifier::Column(name) => {
243 f.write_str("DOC ON COLUMN ");
244 f.write_node(name);
245 }
246 DocOnIdentifier::Type(name) => {
247 f.write_str("DOC ON TYPE ");
248 f.write_node(name);
249 }
250 }
251 }
252}
253impl_display_t!(AvroDocOn);
254
255impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
256 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
257 match self {
258 CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
259 CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
260 CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
261 CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
262 CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
263 CsrConfigOptionName::ValueCompatibilityLevel => {
264 f.write_str("VALUE COMPATIBILITY LEVEL")
265 }
266 }
267 }
268}
269impl_display_t!(CsrConfigOptionName);
270
271#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
272pub struct CsrConfigOption<T: AstInfo> {
274 pub name: CsrConfigOptionName<T>,
275 pub value: Option<WithOptionValue<T>>,
276}
277impl_display_for_with_option!(CsrConfigOption);
278impl_display_t!(CsrConfigOption);
279
280#[derive(Debug, Clone, PartialEq, Eq, Hash)]
281pub struct CsrConnection<T: AstInfo> {
282 pub connection: T::ItemName,
283 pub options: Vec<CsrConfigOption<T>>,
284}
285
286impl<T: AstInfo> AstDisplay for CsrConnection<T> {
287 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
288 f.write_str("CONNECTION ");
289 f.write_node(&self.connection);
290 if !self.options.is_empty() {
291 f.write_str(" (");
292 f.write_node(&display::comma_separated(&self.options));
293 f.write_str(")");
294 }
295 }
296}
297impl_display_t!(CsrConnection);
298
299#[derive(Debug, Clone, PartialEq, Eq, Hash)]
300pub enum ReaderSchemaSelectionStrategy {
301 Latest,
302 Inline(String),
303 ById(i32),
304}
305
306impl Default for ReaderSchemaSelectionStrategy {
307 fn default() -> Self {
308 Self::Latest
309 }
310}
311
312#[derive(Debug, Clone, PartialEq, Eq, Hash)]
313pub struct CsrConnectionAvro<T: AstInfo> {
314 pub connection: CsrConnection<T>,
315 pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
316 pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
317 pub seed: Option<CsrSeedAvro>,
318}
319
320impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
321 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
322 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
323 f.write_node(&self.connection);
324 if let Some(seed) = &self.seed {
325 f.write_str(" ");
326 f.write_node(seed);
327 }
328 }
329}
330impl_display_t!(CsrConnectionAvro);
331
332#[derive(Debug, Clone, PartialEq, Eq, Hash)]
333pub struct CsrConnectionProtobuf<T: AstInfo> {
334 pub connection: CsrConnection<T>,
335 pub seed: Option<CsrSeedProtobuf>,
336}
337
338impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
339 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
340 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
341 f.write_node(&self.connection);
342
343 if let Some(seed) = &self.seed {
344 f.write_str(" ");
345 f.write_node(seed);
346 }
347 }
348}
349impl_display_t!(CsrConnectionProtobuf);
350
351#[derive(Debug, Clone, PartialEq, Eq, Hash)]
352pub struct CsrSeedAvro {
353 pub key_schema: Option<String>,
354 pub value_schema: String,
355 pub key_reference_schemas: Vec<String>,
358 pub value_reference_schemas: Vec<String>,
361}
362
363impl AstDisplay for CsrSeedAvro {
364 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
365 f.write_str("SEED");
366 if let Some(key_schema) = &self.key_schema {
367 f.write_str(" KEY SCHEMA '");
368 f.write_node(&display::escape_single_quote_string(key_schema));
369 f.write_str("'");
370 if !self.key_reference_schemas.is_empty() {
371 f.write_str(" KEY REFERENCES (");
372 for (i, schema) in self.key_reference_schemas.iter().enumerate() {
373 if i > 0 {
374 f.write_str(", ");
375 }
376 f.write_str("'");
377 f.write_node(&display::escape_single_quote_string(schema));
378 f.write_str("'");
379 }
380 f.write_str(")");
381 }
382 }
383 f.write_str(" VALUE SCHEMA '");
384 f.write_node(&display::escape_single_quote_string(&self.value_schema));
385 f.write_str("'");
386 if !self.value_reference_schemas.is_empty() {
387 f.write_str(" VALUE REFERENCES (");
388 for (i, schema) in self.value_reference_schemas.iter().enumerate() {
389 if i > 0 {
390 f.write_str(", ");
391 }
392 f.write_str("'");
393 f.write_node(&display::escape_single_quote_string(schema));
394 f.write_str("'");
395 }
396 f.write_str(")");
397 }
398 }
399}
400impl_display!(CsrSeedAvro);
401
402#[derive(Debug, Clone, PartialEq, Eq, Hash)]
403pub struct CsrSeedProtobuf {
404 pub key: Option<CsrSeedProtobufSchema>,
405 pub value: CsrSeedProtobufSchema,
406}
407
408impl AstDisplay for CsrSeedProtobuf {
409 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
410 f.write_str("SEED");
411 if let Some(key) = &self.key {
412 f.write_str(" KEY ");
413 f.write_node(key);
414 }
415 f.write_str(" VALUE ");
416 f.write_node(&self.value);
417 }
418}
419impl_display!(CsrSeedProtobuf);
420
421#[derive(Debug, Clone, PartialEq, Eq, Hash)]
422pub struct CsrSeedProtobufSchema {
423 pub schema: String,
425 pub message_name: String,
426}
427impl AstDisplay for CsrSeedProtobufSchema {
428 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
429 f.write_str("SCHEMA '");
430 f.write_str(&display::escape_single_quote_string(&self.schema));
431 f.write_str("' MESSAGE '");
432 f.write_str(&self.message_name);
433 f.write_str("'");
434 }
435}
436impl_display!(CsrSeedProtobufSchema);
437
438#[derive(Debug, Clone, PartialEq, Eq, Hash)]
439pub enum FormatSpecifier<T: AstInfo> {
440 Bare(Format<T>),
442 KeyValue { key: Format<T>, value: Format<T> },
444}
445
446impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
447 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
448 match self {
449 FormatSpecifier::Bare(format) => {
450 f.write_str("FORMAT ");
451 f.write_node(format)
452 }
453 FormatSpecifier::KeyValue { key, value } => {
454 f.write_str("KEY FORMAT ");
455 f.write_node(key);
456 f.write_str(" VALUE FORMAT ");
457 f.write_node(value);
458 }
459 }
460 }
461}
462impl_display_t!(FormatSpecifier);
463
464#[derive(Debug, Clone, PartialEq, Eq, Hash)]
465pub enum Format<T: AstInfo> {
466 Bytes,
467 Avro(AvroSchema<T>),
468 Protobuf(ProtobufSchema<T>),
469 Regex(String),
470 Csv {
471 columns: CsvColumns,
472 delimiter: char,
473 },
474 Json {
475 array: bool,
476 },
477 Text,
478}
479
480#[derive(Debug, Clone, PartialEq, Eq, Hash)]
481pub enum CsvColumns {
482 Count(u64),
484 Header { names: Vec<Ident> },
486}
487
488impl AstDisplay for CsvColumns {
489 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
490 match self {
491 CsvColumns::Count(n) => {
492 f.write_str(n);
493 f.write_str(" COLUMNS")
494 }
495 CsvColumns::Header { names } => {
496 f.write_str("HEADER");
497 if !names.is_empty() {
498 f.write_str(" (");
499 f.write_node(&display::comma_separated(names));
500 f.write_str(")");
501 }
502 }
503 }
504 }
505}
506
507#[derive(Debug, Clone, PartialEq, Eq, Hash)]
508pub enum SourceIncludeMetadata {
509 Key {
510 alias: Option<Ident>,
511 },
512 Timestamp {
513 alias: Option<Ident>,
514 },
515 Partition {
516 alias: Option<Ident>,
517 },
518 Offset {
519 alias: Option<Ident>,
520 },
521 Headers {
522 alias: Option<Ident>,
523 },
524 Header {
525 key: String,
526 alias: Ident,
527 use_bytes: bool,
528 },
529}
530
531impl AstDisplay for SourceIncludeMetadata {
532 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
533 let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
534 if let Some(alias) = alias {
535 f.write_str(" AS ");
536 f.write_node(alias);
537 }
538 };
539
540 match self {
541 SourceIncludeMetadata::Key { alias } => {
542 f.write_str("KEY");
543 print_alias(f, alias);
544 }
545 SourceIncludeMetadata::Timestamp { alias } => {
546 f.write_str("TIMESTAMP");
547 print_alias(f, alias);
548 }
549 SourceIncludeMetadata::Partition { alias } => {
550 f.write_str("PARTITION");
551 print_alias(f, alias);
552 }
553 SourceIncludeMetadata::Offset { alias } => {
554 f.write_str("OFFSET");
555 print_alias(f, alias);
556 }
557 SourceIncludeMetadata::Headers { alias } => {
558 f.write_str("HEADERS");
559 print_alias(f, alias);
560 }
561 SourceIncludeMetadata::Header {
562 alias,
563 key,
564 use_bytes,
565 } => {
566 f.write_str("HEADER '");
567 f.write_str(&display::escape_single_quote_string(key));
568 f.write_str("'");
569 print_alias(f, &Some(alias.clone()));
570 if *use_bytes {
571 f.write_str(" BYTES");
572 }
573 }
574 }
575 }
576}
577impl_display!(SourceIncludeMetadata);
578
579#[derive(Debug, Clone, PartialEq, Eq, Hash)]
580pub enum SourceErrorPolicy {
581 Inline {
582 alias: Option<Ident>,
584 },
585}
586
587impl AstDisplay for SourceErrorPolicy {
588 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
589 match self {
590 Self::Inline { alias } => {
591 f.write_str("INLINE");
592 if let Some(alias) = alias {
593 f.write_str(" AS ");
594 f.write_node(alias);
595 }
596 }
597 }
598 }
599}
600impl_display!(SourceErrorPolicy);
601
602#[derive(Debug, Clone, PartialEq, Eq, Hash)]
603pub enum SourceEnvelope {
604 None,
605 Debezium,
606 Upsert {
607 value_decode_err_policy: Vec<SourceErrorPolicy>,
608 },
609 CdcV2,
610}
611
612impl SourceEnvelope {
613 pub fn requires_all_input(&self) -> bool {
616 match self {
617 SourceEnvelope::None => false,
618 SourceEnvelope::Debezium => false,
619 SourceEnvelope::Upsert { .. } => false,
620 SourceEnvelope::CdcV2 => true,
621 }
622 }
623}
624
625impl AstDisplay for SourceEnvelope {
626 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
627 match self {
628 Self::None => {
629 f.write_str("NONE");
631 }
632 Self::Debezium => {
633 f.write_str("DEBEZIUM");
634 }
635 Self::Upsert {
636 value_decode_err_policy,
637 } => {
638 if value_decode_err_policy.is_empty() {
639 f.write_str("UPSERT");
640 } else {
641 f.write_str("UPSERT (VALUE DECODING ERRORS = (");
642 f.write_node(&display::comma_separated(value_decode_err_policy));
643 f.write_str("))")
644 }
645 }
646 Self::CdcV2 => {
647 f.write_str("MATERIALIZE");
648 }
649 }
650 }
651}
652impl_display!(SourceEnvelope);
653
654#[derive(Debug, Clone, PartialEq, Eq, Hash)]
655pub enum SinkEnvelope {
656 Debezium,
657 Upsert,
658}
659
660impl AstDisplay for SinkEnvelope {
661 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
662 match self {
663 Self::Upsert => {
664 f.write_str("UPSERT");
665 }
666 Self::Debezium => {
667 f.write_str("DEBEZIUM");
668 }
669 }
670 }
671}
672impl_display!(SinkEnvelope);
673
674#[derive(Debug, Clone, PartialEq, Eq, Hash)]
675pub enum IcebergSinkMode {
676 Upsert,
677 Append,
678}
679
680impl AstDisplay for IcebergSinkMode {
681 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
682 match self {
683 Self::Upsert => {
684 f.write_str("UPSERT");
685 }
686 Self::Append => {
687 f.write_str("APPEND");
688 }
689 }
690 }
691}
692impl_display!(IcebergSinkMode);
693
694#[derive(Debug, Clone, PartialEq, Eq, Hash)]
695pub enum SubscribeOutput<T: AstInfo> {
696 Diffs,
697 WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
698 EnvelopeUpsert { key_columns: Vec<Ident> },
699 EnvelopeDebezium { key_columns: Vec<Ident> },
700}
701
702impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
703 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
704 match self {
705 Self::Diffs => {}
706 Self::WithinTimestampOrderBy { order_by } => {
707 f.write_str(" WITHIN TIMESTAMP ORDER BY ");
708 f.write_node(&display::comma_separated(order_by));
709 }
710 Self::EnvelopeUpsert { key_columns } => {
711 f.write_str(" ENVELOPE UPSERT (KEY (");
712 f.write_node(&display::comma_separated(key_columns));
713 f.write_str("))");
714 }
715 Self::EnvelopeDebezium { key_columns } => {
716 f.write_str(" ENVELOPE DEBEZIUM (KEY (");
717 f.write_node(&display::comma_separated(key_columns));
718 f.write_str("))");
719 }
720 }
721 }
722}
723impl_display_t!(SubscribeOutput);
724
725impl<T: AstInfo> AstDisplay for Format<T> {
726 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
727 match self {
728 Self::Bytes => f.write_str("BYTES"),
729 Self::Avro(inner) => {
730 f.write_str("AVRO ");
731 f.write_node(inner);
732 }
733 Self::Protobuf(inner) => {
734 f.write_str("PROTOBUF ");
735 f.write_node(inner);
736 }
737 Self::Regex(regex) => {
738 f.write_str("REGEX '");
739 f.write_node(&display::escape_single_quote_string(regex));
740 f.write_str("'");
741 }
742 Self::Csv { columns, delimiter } => {
743 f.write_str("CSV WITH ");
744 f.write_node(columns);
745
746 if *delimiter != ',' {
747 f.write_str(" DELIMITED BY '");
748 f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
749 f.write_str("'");
750 }
751 }
752 Self::Json { array } => {
753 f.write_str("JSON");
754 if *array {
755 f.write_str(" ARRAY");
756 }
757 }
758 Self::Text => f.write_str("TEXT"),
759 }
760 }
761}
762impl_display_t!(Format);
763
764#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
768pub enum ConnectionOptionName {
769 AccessKeyId,
770 AssumeRoleArn,
771 AssumeRoleSessionName,
772 AvailabilityZones,
773 AwsConnection,
774 AwsPrivatelink,
775 Broker,
776 Brokers,
777 Credential,
778 Database,
779 Endpoint,
780 Host,
781 Password,
782 Port,
783 ProgressTopic,
784 ProgressTopicReplicationFactor,
785 PublicKey1,
786 PublicKey2,
787 Region,
788 Registry,
789 SaslMechanisms,
790 SaslPassword,
791 SaslUsername,
792 Scope,
793 SecretAccessKey,
794 SecurityProtocol,
795 ServiceName,
796 SshTunnel,
797 SslCertificate,
798 SslCertificateAuthority,
799 SslKey,
800 SslMode,
801 SessionToken,
802 CatalogType,
803 Url,
804 User,
805 Warehouse,
806}
807
808impl AstDisplay for ConnectionOptionName {
809 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
810 f.write_str(match self {
811 ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
812 ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
813 ConnectionOptionName::AwsConnection => "AWS CONNECTION",
814 ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
815 ConnectionOptionName::Broker => "BROKER",
816 ConnectionOptionName::Brokers => "BROKERS",
817 ConnectionOptionName::Credential => "CREDENTIAL",
818 ConnectionOptionName::Database => "DATABASE",
819 ConnectionOptionName::Endpoint => "ENDPOINT",
820 ConnectionOptionName::Host => "HOST",
821 ConnectionOptionName::Password => "PASSWORD",
822 ConnectionOptionName::Port => "PORT",
823 ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
824 ConnectionOptionName::ProgressTopicReplicationFactor => {
825 "PROGRESS TOPIC REPLICATION FACTOR"
826 }
827 ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
828 ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
829 ConnectionOptionName::Region => "REGION",
830 ConnectionOptionName::Registry => "REGISTRY",
831 ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
832 ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
833 ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
834 ConnectionOptionName::SaslPassword => "SASL PASSWORD",
835 ConnectionOptionName::SaslUsername => "SASL USERNAME",
836 ConnectionOptionName::Scope => "SCOPE",
837 ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
838 ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
839 ConnectionOptionName::ServiceName => "SERVICE NAME",
840 ConnectionOptionName::SshTunnel => "SSH TUNNEL",
841 ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
842 ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
843 ConnectionOptionName::SslKey => "SSL KEY",
844 ConnectionOptionName::SslMode => "SSL MODE",
845 ConnectionOptionName::SessionToken => "SESSION TOKEN",
846 ConnectionOptionName::CatalogType => "CATALOG TYPE",
847 ConnectionOptionName::Url => "URL",
848 ConnectionOptionName::User => "USER",
849 ConnectionOptionName::Warehouse => "WAREHOUSE",
850 })
851 }
852}
853impl_display!(ConnectionOptionName);
854
855impl WithOptionName for ConnectionOptionName {
856 fn redact_value(&self) -> bool {
862 match self {
863 ConnectionOptionName::AccessKeyId
864 | ConnectionOptionName::AvailabilityZones
865 | ConnectionOptionName::AwsConnection
866 | ConnectionOptionName::AwsPrivatelink
867 | ConnectionOptionName::Broker
868 | ConnectionOptionName::Brokers
869 | ConnectionOptionName::Credential
870 | ConnectionOptionName::Database
871 | ConnectionOptionName::Endpoint
872 | ConnectionOptionName::Host
873 | ConnectionOptionName::Password
874 | ConnectionOptionName::Port
875 | ConnectionOptionName::ProgressTopic
876 | ConnectionOptionName::ProgressTopicReplicationFactor
877 | ConnectionOptionName::PublicKey1
878 | ConnectionOptionName::PublicKey2
879 | ConnectionOptionName::Region
880 | ConnectionOptionName::Registry
881 | ConnectionOptionName::AssumeRoleArn
882 | ConnectionOptionName::AssumeRoleSessionName
883 | ConnectionOptionName::SaslMechanisms
884 | ConnectionOptionName::SaslPassword
885 | ConnectionOptionName::SaslUsername
886 | ConnectionOptionName::Scope
887 | ConnectionOptionName::SecurityProtocol
888 | ConnectionOptionName::SecretAccessKey
889 | ConnectionOptionName::ServiceName
890 | ConnectionOptionName::SshTunnel
891 | ConnectionOptionName::SslCertificate
892 | ConnectionOptionName::SslCertificateAuthority
893 | ConnectionOptionName::SslKey
894 | ConnectionOptionName::SslMode
895 | ConnectionOptionName::SessionToken
896 | ConnectionOptionName::CatalogType
897 | ConnectionOptionName::Url
898 | ConnectionOptionName::User
899 | ConnectionOptionName::Warehouse => false,
900 }
901 }
902}
903
904#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
905pub struct ConnectionOption<T: AstInfo> {
907 pub name: ConnectionOptionName,
908 pub value: Option<WithOptionValue<T>>,
909}
910impl_display_for_with_option!(ConnectionOption);
911impl_display_t!(ConnectionOption);
912
913#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
914pub enum CreateConnectionType {
915 Aws,
916 AwsPrivatelink,
917 GlueSchemaRegistry,
918 Kafka,
919 Csr,
920 Postgres,
921 Ssh,
922 SqlServer,
923 MySql,
924 IcebergCatalog,
925}
926
927impl CreateConnectionType {
928 pub fn as_str(&self) -> &'static str {
929 match self {
930 Self::Kafka => "kafka",
931 Self::Csr => "confluent-schema-registry",
932 Self::Postgres => "postgres",
933 Self::Aws => "aws",
934 Self::AwsPrivatelink => "aws-privatelink",
935 Self::GlueSchemaRegistry => "glue-schema-registry",
936 Self::Ssh => "ssh-tunnel",
937 Self::MySql => "mysql",
938 Self::SqlServer => "sql-server",
939 Self::IcebergCatalog => "iceberg-catalog",
940 }
941 }
942}
943
944impl AstDisplay for CreateConnectionType {
945 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
946 match self {
947 Self::Kafka => {
948 f.write_str("KAFKA");
949 }
950 Self::Csr => {
951 f.write_str("CONFLUENT SCHEMA REGISTRY");
952 }
953 Self::Postgres => {
954 f.write_str("POSTGRES");
955 }
956 Self::Aws => {
957 f.write_str("AWS");
958 }
959 Self::AwsPrivatelink => {
960 f.write_str("AWS PRIVATELINK");
961 }
962 Self::GlueSchemaRegistry => {
963 f.write_str("AWS GLUE SCHEMA REGISTRY");
964 }
965 Self::Ssh => {
966 f.write_str("SSH TUNNEL");
967 }
968 Self::SqlServer => {
969 f.write_str("SQL SERVER");
970 }
971 Self::MySql => {
972 f.write_str("MYSQL");
973 }
974 Self::IcebergCatalog => {
975 f.write_str("ICEBERG CATALOG");
976 }
977 }
978 }
979}
980impl_display!(CreateConnectionType);
981
982#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
983pub enum CreateConnectionOptionName {
984 Validate,
985}
986
987impl AstDisplay for CreateConnectionOptionName {
988 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
989 f.write_str(match self {
990 CreateConnectionOptionName::Validate => "VALIDATE",
991 })
992 }
993}
994impl_display!(CreateConnectionOptionName);
995
996impl WithOptionName for CreateConnectionOptionName {
997 fn redact_value(&self) -> bool {
1003 match self {
1004 CreateConnectionOptionName::Validate => false,
1005 }
1006 }
1007}
1008
1009#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1010pub struct CreateConnectionOption<T: AstInfo> {
1012 pub name: CreateConnectionOptionName,
1013 pub value: Option<WithOptionValue<T>>,
1014}
1015impl_display_for_with_option!(CreateConnectionOption);
1016impl_display_t!(CreateConnectionOption);
1017
1018#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1019pub enum KafkaSourceConfigOptionName {
1020 GroupIdPrefix,
1021 Topic,
1022 TopicMetadataRefreshInterval,
1023 StartTimestamp,
1024 StartOffset,
1025}
1026
1027impl AstDisplay for KafkaSourceConfigOptionName {
1028 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1029 f.write_str(match self {
1030 KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
1031 KafkaSourceConfigOptionName::Topic => "TOPIC",
1032 KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
1033 "TOPIC METADATA REFRESH INTERVAL"
1034 }
1035 KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
1036 KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
1037 })
1038 }
1039}
1040impl_display!(KafkaSourceConfigOptionName);
1041
1042impl WithOptionName for KafkaSourceConfigOptionName {
1043 fn redact_value(&self) -> bool {
1049 match self {
1050 KafkaSourceConfigOptionName::GroupIdPrefix
1051 | KafkaSourceConfigOptionName::Topic
1052 | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1053 | KafkaSourceConfigOptionName::StartOffset
1054 | KafkaSourceConfigOptionName::StartTimestamp => false,
1055 }
1056 }
1057}
1058
1059#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1060pub struct KafkaSourceConfigOption<T: AstInfo> {
1061 pub name: KafkaSourceConfigOptionName,
1062 pub value: Option<WithOptionValue<T>>,
1063}
1064impl_display_for_with_option!(KafkaSourceConfigOption);
1065impl_display_t!(KafkaSourceConfigOption);
1066
1067#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1068pub enum KafkaSinkConfigOptionName {
1069 CompressionType,
1070 PartitionBy,
1071 ProgressGroupIdPrefix,
1072 Topic,
1073 TransactionalIdPrefix,
1074 LegacyIds,
1075 TopicConfig,
1076 TopicMetadataRefreshInterval,
1077 TopicPartitionCount,
1078 TopicReplicationFactor,
1079}
1080
1081impl AstDisplay for KafkaSinkConfigOptionName {
1082 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1083 f.write_str(match self {
1084 KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1085 KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1086 KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1087 KafkaSinkConfigOptionName::Topic => "TOPIC",
1088 KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1089 KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1090 KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1091 KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1092 "TOPIC METADATA REFRESH INTERVAL"
1093 }
1094 KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1095 KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1096 })
1097 }
1098}
1099impl_display!(KafkaSinkConfigOptionName);
1100
1101impl WithOptionName for KafkaSinkConfigOptionName {
1102 fn redact_value(&self) -> bool {
1108 match self {
1109 KafkaSinkConfigOptionName::CompressionType
1110 | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1111 | KafkaSinkConfigOptionName::Topic
1112 | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1113 | KafkaSinkConfigOptionName::TransactionalIdPrefix
1114 | KafkaSinkConfigOptionName::LegacyIds
1115 | KafkaSinkConfigOptionName::TopicConfig
1116 | KafkaSinkConfigOptionName::TopicPartitionCount
1117 | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1118 KafkaSinkConfigOptionName::PartitionBy => true,
1119 }
1120 }
1121}
1122
1123#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1124pub struct KafkaSinkConfigOption<T: AstInfo> {
1125 pub name: KafkaSinkConfigOptionName,
1126 pub value: Option<WithOptionValue<T>>,
1127}
1128impl_display_for_with_option!(KafkaSinkConfigOption);
1129impl_display_t!(KafkaSinkConfigOption);
1130
1131#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1132pub enum IcebergSinkConfigOptionName {
1133 Namespace,
1134 Table,
1135}
1136
1137impl AstDisplay for IcebergSinkConfigOptionName {
1138 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1139 f.write_str(match self {
1140 IcebergSinkConfigOptionName::Namespace => "NAMESPACE",
1141 IcebergSinkConfigOptionName::Table => "TABLE",
1142 })
1143 }
1144}
1145impl_display!(IcebergSinkConfigOptionName);
1146
1147impl WithOptionName for IcebergSinkConfigOptionName {
1148 fn redact_value(&self) -> bool {
1154 match self {
1155 IcebergSinkConfigOptionName::Namespace | IcebergSinkConfigOptionName::Table => false,
1156 }
1157 }
1158}
1159
1160#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1161pub struct IcebergSinkConfigOption<T: AstInfo> {
1162 pub name: IcebergSinkConfigOptionName,
1163 pub value: Option<WithOptionValue<T>>,
1164}
1165impl_display_for_with_option!(IcebergSinkConfigOption);
1166impl_display_t!(IcebergSinkConfigOption);
1167
1168#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1169pub enum PgConfigOptionName {
1170 Details,
1173 Publication,
1175 TextColumns,
1182 ExcludeColumns,
1189}
1190
1191impl AstDisplay for PgConfigOptionName {
1192 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1193 f.write_str(match self {
1194 PgConfigOptionName::Details => "DETAILS",
1195 PgConfigOptionName::Publication => "PUBLICATION",
1196 PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1197 PgConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1198 })
1199 }
1200}
1201impl_display!(PgConfigOptionName);
1202
1203impl WithOptionName for PgConfigOptionName {
1204 fn redact_value(&self) -> bool {
1210 match self {
1211 PgConfigOptionName::Details
1212 | PgConfigOptionName::Publication
1213 | PgConfigOptionName::TextColumns
1214 | PgConfigOptionName::ExcludeColumns => false,
1215 }
1216 }
1217}
1218
1219#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1220pub struct PgConfigOption<T: AstInfo> {
1222 pub name: PgConfigOptionName,
1223 pub value: Option<WithOptionValue<T>>,
1224}
1225impl_display_for_with_option!(PgConfigOption);
1226impl_display_t!(PgConfigOption);
1227
1228#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1229pub enum MySqlConfigOptionName {
1230 Details,
1233 TextColumns,
1240 ExcludeColumns,
1247}
1248
1249impl AstDisplay for MySqlConfigOptionName {
1250 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1251 f.write_str(match self {
1252 MySqlConfigOptionName::Details => "DETAILS",
1253 MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1254 MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1255 })
1256 }
1257}
1258impl_display!(MySqlConfigOptionName);
1259
1260impl WithOptionName for MySqlConfigOptionName {
1261 fn redact_value(&self) -> bool {
1267 match self {
1268 MySqlConfigOptionName::Details
1269 | MySqlConfigOptionName::TextColumns
1270 | MySqlConfigOptionName::ExcludeColumns => false,
1271 }
1272 }
1273}
1274
1275#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1276pub struct MySqlConfigOption<T: AstInfo> {
1278 pub name: MySqlConfigOptionName,
1279 pub value: Option<WithOptionValue<T>>,
1280}
1281impl_display_for_with_option!(MySqlConfigOption);
1282impl_display_t!(MySqlConfigOption);
1283
1284#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1285pub enum SqlServerConfigOptionName {
1286 Details,
1289 TextColumns,
1297 ExcludeColumns,
1305}
1306
1307impl AstDisplay for SqlServerConfigOptionName {
1308 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1309 f.write_str(match self {
1310 SqlServerConfigOptionName::Details => "DETAILS",
1311 SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1312 SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1313 })
1314 }
1315}
1316impl_display!(SqlServerConfigOptionName);
1317
1318impl WithOptionName for SqlServerConfigOptionName {
1319 fn redact_value(&self) -> bool {
1325 match self {
1326 SqlServerConfigOptionName::Details
1327 | SqlServerConfigOptionName::TextColumns
1328 | SqlServerConfigOptionName::ExcludeColumns => false,
1329 }
1330 }
1331}
1332
1333#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1334pub struct SqlServerConfigOption<T: AstInfo> {
1336 pub name: SqlServerConfigOptionName,
1337 pub value: Option<WithOptionValue<T>>,
1338}
1339impl_display_for_with_option!(SqlServerConfigOption);
1340impl_display_t!(SqlServerConfigOption);
1341
1342#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1343pub enum CreateSourceConnection<T: AstInfo> {
1344 Kafka {
1345 connection: T::ItemName,
1346 options: Vec<KafkaSourceConfigOption<T>>,
1347 },
1348 Postgres {
1349 connection: T::ItemName,
1350 options: Vec<PgConfigOption<T>>,
1351 },
1352 SqlServer {
1353 connection: T::ItemName,
1354 options: Vec<SqlServerConfigOption<T>>,
1355 },
1356 MySql {
1357 connection: T::ItemName,
1358 options: Vec<MySqlConfigOption<T>>,
1359 },
1360 LoadGenerator {
1361 generator: LoadGenerator,
1362 options: Vec<LoadGeneratorOption<T>>,
1363 },
1364}
1365
1366impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1367 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1368 match self {
1369 CreateSourceConnection::Kafka {
1370 connection,
1371 options,
1372 } => {
1373 f.write_str("KAFKA CONNECTION ");
1374 f.write_node(connection);
1375 if !options.is_empty() {
1376 f.write_str(" (");
1377 f.write_node(&display::comma_separated(options));
1378 f.write_str(")");
1379 }
1380 }
1381 CreateSourceConnection::Postgres {
1382 connection,
1383 options,
1384 } => {
1385 f.write_str("POSTGRES CONNECTION ");
1386 f.write_node(connection);
1387 if !options.is_empty() {
1388 f.write_str(" (");
1389 f.write_node(&display::comma_separated(options));
1390 f.write_str(")");
1391 }
1392 }
1393 CreateSourceConnection::SqlServer {
1394 connection,
1395 options,
1396 } => {
1397 f.write_str("SQL SERVER CONNECTION ");
1398 f.write_node(connection);
1399 if !options.is_empty() {
1400 f.write_str(" (");
1401 f.write_node(&display::comma_separated(options));
1402 f.write_str(")");
1403 }
1404 }
1405 CreateSourceConnection::MySql {
1406 connection,
1407 options,
1408 } => {
1409 f.write_str("MYSQL CONNECTION ");
1410 f.write_node(connection);
1411 if !options.is_empty() {
1412 f.write_str(" (");
1413 f.write_node(&display::comma_separated(options));
1414 f.write_str(")");
1415 }
1416 }
1417 CreateSourceConnection::LoadGenerator { generator, options } => {
1418 f.write_str("LOAD GENERATOR ");
1419 f.write_node(generator);
1420 if !options.is_empty() {
1421 f.write_str(" (");
1422 f.write_node(&display::comma_separated(options));
1423 f.write_str(")");
1424 }
1425 }
1426 }
1427 }
1428}
1429impl_display_t!(CreateSourceConnection);
1430
1431#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1432pub enum LoadGenerator {
1433 Clock,
1434 Counter,
1435 Marketing,
1436 Auction,
1437 Datums,
1438 Tpch,
1439 KeyValue,
1440}
1441
1442impl AstDisplay for LoadGenerator {
1443 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1444 match self {
1445 Self::Counter => f.write_str("COUNTER"),
1446 Self::Clock => f.write_str("CLOCK"),
1447 Self::Marketing => f.write_str("MARKETING"),
1448 Self::Auction => f.write_str("AUCTION"),
1449 Self::Datums => f.write_str("DATUMS"),
1450 Self::Tpch => f.write_str("TPCH"),
1451 Self::KeyValue => f.write_str("KEY VALUE"),
1452 }
1453 }
1454}
1455impl_display!(LoadGenerator);
1456
1457impl LoadGenerator {
1458 pub fn schema_name(&self) -> &'static str {
1463 match self {
1464 LoadGenerator::Counter => "counter",
1465 LoadGenerator::Clock => "clock",
1466 LoadGenerator::Marketing => "marketing",
1467 LoadGenerator::Auction => "auction",
1468 LoadGenerator::Datums => "datums",
1469 LoadGenerator::Tpch => "tpch",
1470 LoadGenerator::KeyValue => "key_value",
1471 }
1472 }
1473}
1474
1475#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1476pub enum LoadGeneratorOptionName {
1477 ScaleFactor,
1478 TickInterval,
1479 AsOf,
1480 UpTo,
1481 MaxCardinality,
1482 Keys,
1483 SnapshotRounds,
1484 TransactionalSnapshot,
1485 ValueSize,
1486 Seed,
1487 Partitions,
1488 BatchSize,
1489}
1490
1491impl AstDisplay for LoadGeneratorOptionName {
1492 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1493 f.write_str(match self {
1494 LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1495 LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1496 LoadGeneratorOptionName::AsOf => "AS OF",
1497 LoadGeneratorOptionName::UpTo => "UP TO",
1498 LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1499 LoadGeneratorOptionName::Keys => "KEYS",
1500 LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1501 LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1502 LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1503 LoadGeneratorOptionName::Seed => "SEED",
1504 LoadGeneratorOptionName::Partitions => "PARTITIONS",
1505 LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1506 })
1507 }
1508}
1509impl_display!(LoadGeneratorOptionName);
1510
1511impl WithOptionName for LoadGeneratorOptionName {
1512 fn redact_value(&self) -> bool {
1518 match self {
1519 LoadGeneratorOptionName::ScaleFactor
1520 | LoadGeneratorOptionName::TickInterval
1521 | LoadGeneratorOptionName::AsOf
1522 | LoadGeneratorOptionName::UpTo
1523 | LoadGeneratorOptionName::MaxCardinality
1524 | LoadGeneratorOptionName::Keys
1525 | LoadGeneratorOptionName::SnapshotRounds
1526 | LoadGeneratorOptionName::TransactionalSnapshot
1527 | LoadGeneratorOptionName::ValueSize
1528 | LoadGeneratorOptionName::Partitions
1529 | LoadGeneratorOptionName::BatchSize
1530 | LoadGeneratorOptionName::Seed => false,
1531 }
1532 }
1533}
1534
1535#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1536pub struct LoadGeneratorOption<T: AstInfo> {
1538 pub name: LoadGeneratorOptionName,
1539 pub value: Option<WithOptionValue<T>>,
1540}
1541impl_display_for_with_option!(LoadGeneratorOption);
1542impl_display_t!(LoadGeneratorOption);
1543
1544#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1545pub enum CreateSinkConnection<T: AstInfo> {
1546 Kafka {
1547 connection: T::ItemName,
1548 options: Vec<KafkaSinkConfigOption<T>>,
1549 key: Option<SinkKey>,
1550 headers: Option<Ident>,
1551 },
1552 Iceberg {
1553 connection: T::ItemName,
1554 aws_connection: T::ItemName,
1555 key: Option<SinkKey>,
1556 options: Vec<IcebergSinkConfigOption<T>>,
1557 },
1558}
1559
1560impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1561 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1562 match self {
1563 CreateSinkConnection::Kafka {
1564 connection,
1565 options,
1566 key,
1567 headers,
1568 } => {
1569 f.write_str("KAFKA CONNECTION ");
1570 f.write_node(connection);
1571 if !options.is_empty() {
1572 f.write_str(" (");
1573 f.write_node(&display::comma_separated(options));
1574 f.write_str(")");
1575 }
1576 if let Some(key) = key.as_ref() {
1577 f.write_str(" ");
1578 f.write_node(key);
1579 }
1580 if let Some(headers) = headers {
1581 f.write_str(" HEADERS ");
1582 f.write_node(headers);
1583 }
1584 }
1585 CreateSinkConnection::Iceberg {
1586 connection,
1587 aws_connection,
1588 key,
1589 options,
1590 } => {
1591 f.write_str("ICEBERG CATALOG CONNECTION ");
1592 f.write_node(connection);
1593 if !options.is_empty() {
1594 f.write_str(" (");
1595 f.write_node(&display::comma_separated(options));
1596 f.write_str(")");
1597 }
1598 f.write_str(" USING AWS CONNECTION ");
1599 f.write_node(aws_connection);
1600 if let Some(key) = key.as_ref() {
1601 f.write_str(" ");
1602 f.write_node(key);
1603 }
1604 }
1605 }
1606 }
1607}
1608impl_display_t!(CreateSinkConnection);
1609
1610#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1611pub struct SinkKey {
1612 pub key_columns: Vec<Ident>,
1613 pub not_enforced: bool,
1614}
1615
1616impl AstDisplay for SinkKey {
1617 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1618 f.write_str("KEY (");
1619 f.write_node(&display::comma_separated(&self.key_columns));
1620 f.write_str(")");
1621 if self.not_enforced {
1622 f.write_str(" NOT ENFORCED");
1623 }
1624 }
1625}
1626
1627#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1630pub enum TableConstraint<T: AstInfo> {
1631 Unique {
1633 name: Option<Ident>,
1634 columns: Vec<Ident>,
1635 is_primary: bool,
1637 nulls_not_distinct: bool,
1640 },
1641 ForeignKey {
1644 name: Option<Ident>,
1645 columns: Vec<Ident>,
1646 foreign_table: T::ItemName,
1647 referred_columns: Vec<Ident>,
1648 },
1649 Check {
1651 name: Option<Ident>,
1652 expr: Box<Expr<T>>,
1653 },
1654}
1655
1656impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1657 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1658 match self {
1659 TableConstraint::Unique {
1660 name,
1661 columns,
1662 is_primary,
1663 nulls_not_distinct,
1664 } => {
1665 f.write_node(&display_constraint_name(name));
1666 if *is_primary {
1667 f.write_str("PRIMARY KEY ");
1668 } else {
1669 f.write_str("UNIQUE ");
1670 if *nulls_not_distinct {
1671 f.write_str("NULLS NOT DISTINCT ");
1672 }
1673 }
1674 f.write_str("(");
1675 f.write_node(&display::comma_separated(columns));
1676 f.write_str(")");
1677 }
1678 TableConstraint::ForeignKey {
1679 name,
1680 columns,
1681 foreign_table,
1682 referred_columns,
1683 } => {
1684 f.write_node(&display_constraint_name(name));
1685 f.write_str("FOREIGN KEY (");
1686 f.write_node(&display::comma_separated(columns));
1687 f.write_str(") REFERENCES ");
1688 f.write_node(foreign_table);
1689 f.write_str("(");
1690 f.write_node(&display::comma_separated(referred_columns));
1691 f.write_str(")");
1692 }
1693 TableConstraint::Check { name, expr } => {
1694 f.write_node(&display_constraint_name(name));
1695 f.write_str("CHECK (");
1696 f.write_node(&expr);
1697 f.write_str(")");
1698 }
1699 }
1700 }
1701}
1702impl_display_t!(TableConstraint);
1703
1704#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1706pub enum KeyConstraint {
1707 PrimaryKeyNotEnforced { columns: Vec<Ident> },
1709}
1710
1711impl AstDisplay for KeyConstraint {
1712 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1713 match self {
1714 KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1715 f.write_str("PRIMARY KEY ");
1716 f.write_str("(");
1717 f.write_node(&display::comma_separated(columns));
1718 f.write_str(") ");
1719 f.write_str("NOT ENFORCED");
1720 }
1721 }
1722 }
1723}
1724impl_display!(KeyConstraint);
1725
1726#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1727pub enum CreateSourceOptionName {
1728 TimestampInterval,
1729 RetainHistory,
1730}
1731
1732impl AstDisplay for CreateSourceOptionName {
1733 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1734 f.write_str(match self {
1735 CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1736 CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1737 })
1738 }
1739}
1740impl_display!(CreateSourceOptionName);
1741
1742impl WithOptionName for CreateSourceOptionName {
1743 fn redact_value(&self) -> bool {
1749 match self {
1750 CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1751 false
1752 }
1753 }
1754 }
1755}
1756
1757#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1758pub struct CreateSourceOption<T: AstInfo> {
1760 pub name: CreateSourceOptionName,
1761 pub value: Option<WithOptionValue<T>>,
1762}
1763impl_display_for_with_option!(CreateSourceOption);
1764impl_display_t!(CreateSourceOption);
1765
1766#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1768pub struct ColumnDef<T: AstInfo> {
1769 pub name: Ident,
1770 pub data_type: T::DataType,
1771 pub collation: Option<UnresolvedItemName>,
1772 pub options: Vec<ColumnOptionDef<T>>,
1773}
1774
1775impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1776 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1777 f.write_node(&self.name);
1778 f.write_str(" ");
1779 f.write_node(&self.data_type);
1780 if let Some(collation) = &self.collation {
1781 f.write_str(" COLLATE ");
1782 f.write_node(collation);
1783 }
1784 for option in &self.options {
1785 f.write_str(" ");
1786 f.write_node(option);
1787 }
1788 }
1789}
1790impl_display_t!(ColumnDef);
1791
1792#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1809pub struct ColumnOptionDef<T: AstInfo> {
1810 pub name: Option<Ident>,
1811 pub option: ColumnOption<T>,
1812}
1813
1814impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1815 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1816 f.write_node(&display_constraint_name(&self.name));
1817 f.write_node(&self.option);
1818 }
1819}
1820impl_display_t!(ColumnOptionDef);
1821
1822#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1825pub enum ColumnOption<T: AstInfo> {
1826 Null,
1828 NotNull,
1830 Default(Expr<T>),
1832 Unique { is_primary: bool },
1834 ForeignKey {
1837 foreign_table: UnresolvedItemName,
1838 referred_columns: Vec<Ident>,
1839 },
1840 Check(Expr<T>),
1842 Versioned {
1844 action: ColumnVersioned,
1845 version: Version,
1846 },
1847}
1848
1849impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1850 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1851 use ColumnOption::*;
1852 match self {
1853 Null => f.write_str("NULL"),
1854 NotNull => f.write_str("NOT NULL"),
1855 Default(expr) => {
1856 f.write_str("DEFAULT ");
1857 f.write_node(expr);
1858 }
1859 Unique { is_primary } => {
1860 if *is_primary {
1861 f.write_str("PRIMARY KEY");
1862 } else {
1863 f.write_str("UNIQUE");
1864 }
1865 }
1866 ForeignKey {
1867 foreign_table,
1868 referred_columns,
1869 } => {
1870 f.write_str("REFERENCES ");
1871 f.write_node(foreign_table);
1872 f.write_str(" (");
1873 f.write_node(&display::comma_separated(referred_columns));
1874 f.write_str(")");
1875 }
1876 Check(expr) => {
1877 f.write_str("CHECK (");
1878 f.write_node(expr);
1879 f.write_str(")");
1880 }
1881 Versioned { action, version } => {
1882 f.write_str("VERSION ");
1883 f.write_node(action);
1884 f.write_str(" ");
1885 f.write_node(version);
1886 }
1887 }
1888 }
1889}
1890impl_display_t!(ColumnOption);
1891
1892#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1893pub enum ColumnVersioned {
1894 Added,
1895}
1896
1897impl AstDisplay for ColumnVersioned {
1898 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1899 where
1900 W: fmt::Write,
1901 {
1902 match self {
1903 ColumnVersioned::Added => f.write_str("ADDED"),
1905 }
1906 }
1907}
1908impl_display!(ColumnVersioned);
1909
1910fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
1911 struct ConstraintName<'a>(&'a Option<Ident>);
1912 impl<'a> AstDisplay for ConstraintName<'a> {
1913 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1914 where
1915 W: fmt::Write,
1916 {
1917 if let Some(name) = self.0 {
1918 f.write_str("CONSTRAINT ");
1919 f.write_node(name);
1920 f.write_str(" ");
1921 }
1922 }
1923 }
1924 ConstraintName(name)
1925}