1use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28 AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33 AssertNotNull,
35 PartitionBy,
36 RetainHistory,
37 Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43 match self {
44 MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45 MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46 MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47 MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48 }
49 }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53 fn redact_value(&self) -> bool {
59 match self {
60 MaterializedViewOptionName::AssertNotNull
61 | MaterializedViewOptionName::PartitionBy
62 | MaterializedViewOptionName::RetainHistory
63 | MaterializedViewOptionName::Refresh => false,
64 }
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70 pub name: MaterializedViewOptionName,
71 pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, Hash)]
76pub struct Schema {
77 pub schema: String,
78}
79
80impl AstDisplay for Schema {
81 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
82 f.write_str("SCHEMA '");
83 f.write_node(&display::escape_single_quote_string(&self.schema));
84 f.write_str("'");
85 }
86}
87impl_display!(Schema);
88
89#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
90pub enum AvroSchemaOptionName {
91 ConfluentWireFormat,
93}
94
95impl AstDisplay for AvroSchemaOptionName {
96 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
97 match self {
98 AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
99 }
100 }
101}
102
103impl WithOptionName for AvroSchemaOptionName {
104 fn redact_value(&self) -> bool {
110 match self {
111 Self::ConfluentWireFormat => false,
112 }
113 }
114}
115
116#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
117pub struct AvroSchemaOption<T: AstInfo> {
118 pub name: AvroSchemaOptionName,
119 pub value: Option<WithOptionValue<T>>,
120}
121impl_display_for_with_option!(AvroSchemaOption);
122
123#[derive(Debug, Clone, PartialEq, Eq, Hash)]
124pub enum AvroSchema<T: AstInfo> {
125 Csr {
126 csr_connection: CsrConnectionAvro<T>,
127 },
128 InlineSchema {
129 schema: Schema,
130 with_options: Vec<AvroSchemaOption<T>>,
131 },
132}
133
134impl<T: AstInfo> AstDisplay for AvroSchema<T> {
135 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
136 match self {
137 Self::Csr { csr_connection } => {
138 f.write_node(csr_connection);
139 }
140 Self::InlineSchema {
141 schema,
142 with_options,
143 } => {
144 f.write_str("USING ");
145 schema.fmt(f);
146 if !with_options.is_empty() {
147 f.write_str(" (");
148 f.write_node(&display::comma_separated(with_options));
149 f.write_str(")");
150 }
151 }
152 }
153 }
154}
155impl_display_t!(AvroSchema);
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum ProtobufSchema<T: AstInfo> {
159 Csr {
160 csr_connection: CsrConnectionProtobuf<T>,
161 },
162 InlineSchema {
163 message_name: String,
164 schema: Schema,
165 },
166}
167
168impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
169 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
170 match self {
171 Self::Csr { csr_connection } => {
172 f.write_node(csr_connection);
173 }
174 Self::InlineSchema {
175 message_name,
176 schema,
177 } => {
178 f.write_str("MESSAGE '");
179 f.write_node(&display::escape_single_quote_string(message_name));
180 f.write_str("' USING ");
181 f.write_str(schema);
182 }
183 }
184 }
185}
186impl_display_t!(ProtobufSchema);
187
188#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
189pub enum CsrConfigOptionName<T: AstInfo> {
190 AvroKeyFullname,
191 AvroValueFullname,
192 NullDefaults,
193 AvroDocOn(AvroDocOn<T>),
194 KeyCompatibilityLevel,
195 ValueCompatibilityLevel,
196}
197
198impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
199 fn redact_value(&self) -> bool {
205 match self {
206 Self::AvroKeyFullname
207 | Self::AvroValueFullname
208 | Self::NullDefaults
209 | Self::AvroDocOn(_)
210 | Self::KeyCompatibilityLevel
211 | Self::ValueCompatibilityLevel => false,
212 }
213 }
214}
215
216#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
217pub struct AvroDocOn<T: AstInfo> {
218 pub identifier: DocOnIdentifier<T>,
219 pub for_schema: DocOnSchema,
220}
221#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
222pub enum DocOnSchema {
223 KeyOnly,
224 ValueOnly,
225 All,
226}
227
228#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
229pub enum DocOnIdentifier<T: AstInfo> {
230 Column(ColumnName<T>),
231 Type(T::ItemName),
232}
233
234impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
235 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
236 match &self.for_schema {
237 DocOnSchema::KeyOnly => f.write_str("KEY "),
238 DocOnSchema::ValueOnly => f.write_str("VALUE "),
239 DocOnSchema::All => {}
240 }
241 match &self.identifier {
242 DocOnIdentifier::Column(name) => {
243 f.write_str("DOC ON COLUMN ");
244 f.write_node(name);
245 }
246 DocOnIdentifier::Type(name) => {
247 f.write_str("DOC ON TYPE ");
248 f.write_node(name);
249 }
250 }
251 }
252}
253impl_display_t!(AvroDocOn);
254
255impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
256 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
257 match self {
258 CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
259 CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
260 CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
261 CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
262 CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
263 CsrConfigOptionName::ValueCompatibilityLevel => {
264 f.write_str("VALUE COMPATIBILITY LEVEL")
265 }
266 }
267 }
268}
269impl_display_t!(CsrConfigOptionName);
270
271#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
272pub struct CsrConfigOption<T: AstInfo> {
274 pub name: CsrConfigOptionName<T>,
275 pub value: Option<WithOptionValue<T>>,
276}
277impl_display_for_with_option!(CsrConfigOption);
278impl_display_t!(CsrConfigOption);
279
280#[derive(Debug, Clone, PartialEq, Eq, Hash)]
281pub struct CsrConnection<T: AstInfo> {
282 pub connection: T::ItemName,
283 pub options: Vec<CsrConfigOption<T>>,
284}
285
286impl<T: AstInfo> AstDisplay for CsrConnection<T> {
287 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
288 f.write_str("CONNECTION ");
289 f.write_node(&self.connection);
290 if !self.options.is_empty() {
291 f.write_str(" (");
292 f.write_node(&display::comma_separated(&self.options));
293 f.write_str(")");
294 }
295 }
296}
297impl_display_t!(CsrConnection);
298
299#[derive(Debug, Clone, PartialEq, Eq, Hash)]
300pub enum ReaderSchemaSelectionStrategy {
301 Latest,
302 Inline(String),
303 ById(i32),
304}
305
306impl Default for ReaderSchemaSelectionStrategy {
307 fn default() -> Self {
308 Self::Latest
309 }
310}
311
312#[derive(Debug, Clone, PartialEq, Eq, Hash)]
313pub struct CsrConnectionAvro<T: AstInfo> {
314 pub connection: CsrConnection<T>,
315 pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
316 pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
317 pub seed: Option<CsrSeedAvro>,
318}
319
320impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
321 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
322 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
323 f.write_node(&self.connection);
324 if let Some(seed) = &self.seed {
325 f.write_str(" ");
326 f.write_node(seed);
327 }
328 }
329}
330impl_display_t!(CsrConnectionAvro);
331
332#[derive(Debug, Clone, PartialEq, Eq, Hash)]
333pub struct CsrConnectionProtobuf<T: AstInfo> {
334 pub connection: CsrConnection<T>,
335 pub seed: Option<CsrSeedProtobuf>,
336}
337
338impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
339 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
340 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
341 f.write_node(&self.connection);
342
343 if let Some(seed) = &self.seed {
344 f.write_str(" ");
345 f.write_node(seed);
346 }
347 }
348}
349impl_display_t!(CsrConnectionProtobuf);
350
351#[derive(Debug, Clone, PartialEq, Eq, Hash)]
352pub struct CsrSeedAvro {
353 pub key_schema: Option<String>,
354 pub value_schema: String,
355 pub key_reference_schemas: Vec<String>,
358 pub value_reference_schemas: Vec<String>,
361}
362
363impl AstDisplay for CsrSeedAvro {
364 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
365 f.write_str("SEED");
366 if let Some(key_schema) = &self.key_schema {
367 f.write_str(" KEY SCHEMA '");
368 f.write_node(&display::escape_single_quote_string(key_schema));
369 f.write_str("'");
370 if !self.key_reference_schemas.is_empty() {
371 f.write_str(" KEY REFERENCES (");
372 for (i, schema) in self.key_reference_schemas.iter().enumerate() {
373 if i > 0 {
374 f.write_str(", ");
375 }
376 f.write_str("'");
377 f.write_node(&display::escape_single_quote_string(schema));
378 f.write_str("'");
379 }
380 f.write_str(")");
381 }
382 }
383 f.write_str(" VALUE SCHEMA '");
384 f.write_node(&display::escape_single_quote_string(&self.value_schema));
385 f.write_str("'");
386 if !self.value_reference_schemas.is_empty() {
387 f.write_str(" VALUE REFERENCES (");
388 for (i, schema) in self.value_reference_schemas.iter().enumerate() {
389 if i > 0 {
390 f.write_str(", ");
391 }
392 f.write_str("'");
393 f.write_node(&display::escape_single_quote_string(schema));
394 f.write_str("'");
395 }
396 f.write_str(")");
397 }
398 }
399}
400impl_display!(CsrSeedAvro);
401
402#[derive(Debug, Clone, PartialEq, Eq, Hash)]
403pub struct CsrSeedProtobuf {
404 pub key: Option<CsrSeedProtobufSchema>,
405 pub value: CsrSeedProtobufSchema,
406}
407
408impl AstDisplay for CsrSeedProtobuf {
409 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
410 f.write_str("SEED");
411 if let Some(key) = &self.key {
412 f.write_str(" KEY ");
413 f.write_node(key);
414 }
415 f.write_str(" VALUE ");
416 f.write_node(&self.value);
417 }
418}
419impl_display!(CsrSeedProtobuf);
420
421#[derive(Debug, Clone, PartialEq, Eq, Hash)]
422pub struct CsrSeedProtobufSchema {
423 pub schema: String,
425 pub message_name: String,
426}
427impl AstDisplay for CsrSeedProtobufSchema {
428 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
429 f.write_str("SCHEMA '");
430 f.write_str(&display::escape_single_quote_string(&self.schema));
431 f.write_str("' MESSAGE '");
432 f.write_str(&display::escape_single_quote_string(&self.message_name));
433 f.write_str("'");
434 }
435}
436impl_display!(CsrSeedProtobufSchema);
437
438#[derive(Debug, Clone, PartialEq, Eq, Hash)]
439pub enum FormatSpecifier<T: AstInfo> {
440 Bare(Format<T>),
442 KeyValue { key: Format<T>, value: Format<T> },
444}
445
446impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
447 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
448 match self {
449 FormatSpecifier::Bare(format) => {
450 f.write_str("FORMAT ");
451 f.write_node(format)
452 }
453 FormatSpecifier::KeyValue { key, value } => {
454 f.write_str("KEY FORMAT ");
455 f.write_node(key);
456 f.write_str(" VALUE FORMAT ");
457 f.write_node(value);
458 }
459 }
460 }
461}
462impl_display_t!(FormatSpecifier);
463
464#[derive(Debug, Clone, PartialEq, Eq, Hash)]
465pub enum Format<T: AstInfo> {
466 Bytes,
467 Avro(AvroSchema<T>),
468 Protobuf(ProtobufSchema<T>),
469 Regex(String),
470 Csv {
471 columns: CsvColumns,
472 delimiter: char,
473 },
474 Json {
475 array: bool,
476 },
477 Text,
478}
479
480#[derive(Debug, Clone, PartialEq, Eq, Hash)]
481pub enum CsvColumns {
482 Count(u64),
484 Header { names: Vec<Ident> },
486}
487
488impl AstDisplay for CsvColumns {
489 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
490 match self {
491 CsvColumns::Count(n) => {
492 f.write_str(n);
493 f.write_str(" COLUMNS")
494 }
495 CsvColumns::Header { names } => {
496 f.write_str("HEADER");
497 if !names.is_empty() {
498 f.write_str(" (");
499 f.write_node(&display::comma_separated(names));
500 f.write_str(")");
501 }
502 }
503 }
504 }
505}
506
507#[derive(Debug, Clone, PartialEq, Eq, Hash)]
508pub enum SourceIncludeMetadata {
509 Key {
510 alias: Option<Ident>,
511 },
512 Timestamp {
513 alias: Option<Ident>,
514 },
515 Partition {
516 alias: Option<Ident>,
517 },
518 Offset {
519 alias: Option<Ident>,
520 },
521 Headers {
522 alias: Option<Ident>,
523 },
524 Header {
525 key: String,
526 alias: Ident,
527 use_bytes: bool,
528 },
529}
530
531impl AstDisplay for SourceIncludeMetadata {
532 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
533 let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
534 if let Some(alias) = alias {
535 f.write_str(" AS ");
536 f.write_node(alias);
537 }
538 };
539
540 match self {
541 SourceIncludeMetadata::Key { alias } => {
542 f.write_str("KEY");
543 print_alias(f, alias);
544 }
545 SourceIncludeMetadata::Timestamp { alias } => {
546 f.write_str("TIMESTAMP");
547 print_alias(f, alias);
548 }
549 SourceIncludeMetadata::Partition { alias } => {
550 f.write_str("PARTITION");
551 print_alias(f, alias);
552 }
553 SourceIncludeMetadata::Offset { alias } => {
554 f.write_str("OFFSET");
555 print_alias(f, alias);
556 }
557 SourceIncludeMetadata::Headers { alias } => {
558 f.write_str("HEADERS");
559 print_alias(f, alias);
560 }
561 SourceIncludeMetadata::Header {
562 alias,
563 key,
564 use_bytes,
565 } => {
566 f.write_str("HEADER '");
567 f.write_str(&display::escape_single_quote_string(key));
568 f.write_str("'");
569 print_alias(f, &Some(alias.clone()));
570 if *use_bytes {
571 f.write_str(" BYTES");
572 }
573 }
574 }
575 }
576}
577impl_display!(SourceIncludeMetadata);
578
579#[derive(Debug, Clone, PartialEq, Eq, Hash)]
580pub enum SourceErrorPolicy {
581 Inline {
582 alias: Option<Ident>,
584 },
585}
586
587impl AstDisplay for SourceErrorPolicy {
588 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
589 match self {
590 Self::Inline { alias } => {
591 f.write_str("INLINE");
592 if let Some(alias) = alias {
593 f.write_str(" AS ");
594 f.write_node(alias);
595 }
596 }
597 }
598 }
599}
600impl_display!(SourceErrorPolicy);
601
602#[derive(Debug, Clone, PartialEq, Eq, Hash)]
603pub enum SourceEnvelope {
604 None,
605 Debezium,
606 Upsert {
607 value_decode_err_policy: Vec<SourceErrorPolicy>,
608 },
609 CdcV2,
610}
611
612impl SourceEnvelope {
613 pub fn requires_all_input(&self) -> bool {
616 match self {
617 SourceEnvelope::None => false,
618 SourceEnvelope::Debezium => false,
619 SourceEnvelope::Upsert { .. } => false,
620 SourceEnvelope::CdcV2 => true,
621 }
622 }
623}
624
625impl AstDisplay for SourceEnvelope {
626 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
627 match self {
628 Self::None => {
629 f.write_str("NONE");
631 }
632 Self::Debezium => {
633 f.write_str("DEBEZIUM");
634 }
635 Self::Upsert {
636 value_decode_err_policy,
637 } => {
638 if value_decode_err_policy.is_empty() {
639 f.write_str("UPSERT");
640 } else {
641 f.write_str("UPSERT (VALUE DECODING ERRORS = (");
642 f.write_node(&display::comma_separated(value_decode_err_policy));
643 f.write_str("))")
644 }
645 }
646 Self::CdcV2 => {
647 f.write_str("MATERIALIZE");
648 }
649 }
650 }
651}
652impl_display!(SourceEnvelope);
653
654#[derive(Debug, Clone, PartialEq, Eq, Hash)]
655pub enum SinkEnvelope {
656 Debezium,
657 Upsert,
658}
659
660impl AstDisplay for SinkEnvelope {
661 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
662 match self {
663 Self::Upsert => {
664 f.write_str("UPSERT");
665 }
666 Self::Debezium => {
667 f.write_str("DEBEZIUM");
668 }
669 }
670 }
671}
672impl_display!(SinkEnvelope);
673
674#[derive(Debug, Clone, PartialEq, Eq, Hash)]
675pub enum IcebergSinkMode {
676 Upsert,
677 Append,
678}
679
680impl AstDisplay for IcebergSinkMode {
681 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
682 match self {
683 Self::Upsert => {
684 f.write_str("UPSERT");
685 }
686 Self::Append => {
687 f.write_str("APPEND");
688 }
689 }
690 }
691}
692impl_display!(IcebergSinkMode);
693
694#[derive(Debug, Clone, PartialEq, Eq, Hash)]
695pub enum SubscribeOutput<T: AstInfo> {
696 Diffs,
697 WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
698 EnvelopeUpsert { key_columns: Vec<Ident> },
699 EnvelopeDebezium { key_columns: Vec<Ident> },
700}
701
702impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
703 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
704 match self {
705 Self::Diffs => {}
706 Self::WithinTimestampOrderBy { order_by } => {
707 f.write_str(" WITHIN TIMESTAMP ORDER BY ");
708 f.write_node(&display::comma_separated(order_by));
709 }
710 Self::EnvelopeUpsert { key_columns } => {
711 f.write_str(" ENVELOPE UPSERT (KEY (");
712 f.write_node(&display::comma_separated(key_columns));
713 f.write_str("))");
714 }
715 Self::EnvelopeDebezium { key_columns } => {
716 f.write_str(" ENVELOPE DEBEZIUM (KEY (");
717 f.write_node(&display::comma_separated(key_columns));
718 f.write_str("))");
719 }
720 }
721 }
722}
723impl_display_t!(SubscribeOutput);
724
725impl<T: AstInfo> AstDisplay for Format<T> {
726 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
727 match self {
728 Self::Bytes => f.write_str("BYTES"),
729 Self::Avro(inner) => {
730 f.write_str("AVRO ");
731 f.write_node(inner);
732 }
733 Self::Protobuf(inner) => {
734 f.write_str("PROTOBUF ");
735 f.write_node(inner);
736 }
737 Self::Regex(regex) => {
738 f.write_str("REGEX '");
739 f.write_node(&display::escape_single_quote_string(regex));
740 f.write_str("'");
741 }
742 Self::Csv { columns, delimiter } => {
743 f.write_str("CSV WITH ");
744 f.write_node(columns);
745
746 if *delimiter != ',' {
747 f.write_str(" DELIMITED BY '");
748 f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
749 f.write_str("'");
750 }
751 }
752 Self::Json { array } => {
753 f.write_str("JSON");
754 if *array {
755 f.write_str(" ARRAY");
756 }
757 }
758 Self::Text => f.write_str("TEXT"),
759 }
760 }
761}
762impl_display_t!(Format);
763
764#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
768pub enum ConnectionOptionName {
769 AccessKeyId,
770 AssumeRoleArn,
771 AssumeRoleSessionName,
772 AvailabilityZones,
773 AwsConnection,
774 AwsPrivatelink,
775 Broker,
776 Brokers,
777 Credential,
778 Database,
779 Endpoint,
780 GcpConnection,
781 Host,
782 Password,
783 Port,
784 ProgressTopic,
785 ProgressTopicReplicationFactor,
786 PublicKey1,
787 PublicKey2,
788 Region,
789 Registry,
790 SaslMechanisms,
791 SaslPassword,
792 SaslUsername,
793 Scope,
794 SecretAccessKey,
795 SecurityProtocol,
796 ServiceAccountKey,
797 ServiceName,
798 SshTunnel,
799 SslCertificate,
800 SslCertificateAuthority,
801 SslKey,
802 SslMode,
803 SessionToken,
804 CatalogType,
805 Url,
806 User,
807 Warehouse,
808}
809
810impl AstDisplay for ConnectionOptionName {
811 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
812 f.write_str(match self {
813 ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
814 ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
815 ConnectionOptionName::AwsConnection => "AWS CONNECTION",
816 ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
817 ConnectionOptionName::Broker => "BROKER",
818 ConnectionOptionName::Brokers => "BROKERS",
819 ConnectionOptionName::Credential => "CREDENTIAL",
820 ConnectionOptionName::Database => "DATABASE",
821 ConnectionOptionName::Endpoint => "ENDPOINT",
822 ConnectionOptionName::GcpConnection => "GCP CONNECTION",
823 ConnectionOptionName::Host => "HOST",
824 ConnectionOptionName::Password => "PASSWORD",
825 ConnectionOptionName::Port => "PORT",
826 ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
827 ConnectionOptionName::ProgressTopicReplicationFactor => {
828 "PROGRESS TOPIC REPLICATION FACTOR"
829 }
830 ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
831 ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
832 ConnectionOptionName::Region => "REGION",
833 ConnectionOptionName::Registry => "REGISTRY",
834 ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
835 ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
836 ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
837 ConnectionOptionName::SaslPassword => "SASL PASSWORD",
838 ConnectionOptionName::SaslUsername => "SASL USERNAME",
839 ConnectionOptionName::Scope => "SCOPE",
840 ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
841 ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
842 ConnectionOptionName::ServiceAccountKey => "SERVICE ACCOUNT KEY",
843 ConnectionOptionName::ServiceName => "SERVICE NAME",
844 ConnectionOptionName::SshTunnel => "SSH TUNNEL",
845 ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
846 ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
847 ConnectionOptionName::SslKey => "SSL KEY",
848 ConnectionOptionName::SslMode => "SSL MODE",
849 ConnectionOptionName::SessionToken => "SESSION TOKEN",
850 ConnectionOptionName::CatalogType => "CATALOG TYPE",
851 ConnectionOptionName::Url => "URL",
852 ConnectionOptionName::User => "USER",
853 ConnectionOptionName::Warehouse => "WAREHOUSE",
854 })
855 }
856}
857impl_display!(ConnectionOptionName);
858
859impl WithOptionName for ConnectionOptionName {
860 fn redact_value(&self) -> bool {
866 match self {
867 ConnectionOptionName::AccessKeyId
868 | ConnectionOptionName::AvailabilityZones
869 | ConnectionOptionName::AwsConnection
870 | ConnectionOptionName::AwsPrivatelink
871 | ConnectionOptionName::Broker
872 | ConnectionOptionName::Brokers
873 | ConnectionOptionName::Credential
874 | ConnectionOptionName::Database
875 | ConnectionOptionName::Endpoint
876 | ConnectionOptionName::GcpConnection
877 | ConnectionOptionName::Host
878 | ConnectionOptionName::Password
879 | ConnectionOptionName::Port
880 | ConnectionOptionName::ProgressTopic
881 | ConnectionOptionName::ProgressTopicReplicationFactor
882 | ConnectionOptionName::PublicKey1
883 | ConnectionOptionName::PublicKey2
884 | ConnectionOptionName::Region
885 | ConnectionOptionName::Registry
886 | ConnectionOptionName::AssumeRoleArn
887 | ConnectionOptionName::AssumeRoleSessionName
888 | ConnectionOptionName::SaslMechanisms
889 | ConnectionOptionName::SaslPassword
890 | ConnectionOptionName::SaslUsername
891 | ConnectionOptionName::Scope
892 | ConnectionOptionName::SecurityProtocol
893 | ConnectionOptionName::SecretAccessKey
894 | ConnectionOptionName::ServiceAccountKey
895 | ConnectionOptionName::ServiceName
896 | ConnectionOptionName::SshTunnel
897 | ConnectionOptionName::SslCertificate
898 | ConnectionOptionName::SslCertificateAuthority
899 | ConnectionOptionName::SslKey
900 | ConnectionOptionName::SslMode
901 | ConnectionOptionName::SessionToken
902 | ConnectionOptionName::CatalogType
903 | ConnectionOptionName::Url
904 | ConnectionOptionName::User
905 | ConnectionOptionName::Warehouse => false,
906 }
907 }
908}
909
910#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
911pub struct ConnectionOption<T: AstInfo> {
913 pub name: ConnectionOptionName,
914 pub value: Option<WithOptionValue<T>>,
915}
916impl_display_for_with_option!(ConnectionOption);
917impl_display_t!(ConnectionOption);
918
919#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
920pub enum CreateConnectionType {
921 Aws,
922 AwsPrivatelink,
923 GlueSchemaRegistry,
924 Gcp,
925 Kafka,
926 Csr,
927 Postgres,
928 Ssh,
929 SqlServer,
930 MySql,
931 IcebergCatalog,
932}
933
934impl CreateConnectionType {
935 pub fn as_str(&self) -> &'static str {
936 match self {
937 Self::Kafka => "kafka",
938 Self::Csr => "confluent-schema-registry",
939 Self::Postgres => "postgres",
940 Self::Aws => "aws",
941 Self::AwsPrivatelink => "aws-privatelink",
942 Self::GlueSchemaRegistry => "glue-schema-registry",
943 Self::Gcp => "gcp",
944 Self::Ssh => "ssh-tunnel",
945 Self::MySql => "mysql",
946 Self::SqlServer => "sql-server",
947 Self::IcebergCatalog => "iceberg-catalog",
948 }
949 }
950}
951
952impl AstDisplay for CreateConnectionType {
953 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
954 match self {
955 Self::Kafka => {
956 f.write_str("KAFKA");
957 }
958 Self::Csr => {
959 f.write_str("CONFLUENT SCHEMA REGISTRY");
960 }
961 Self::Postgres => {
962 f.write_str("POSTGRES");
963 }
964 Self::Aws => {
965 f.write_str("AWS");
966 }
967 Self::AwsPrivatelink => {
968 f.write_str("AWS PRIVATELINK");
969 }
970 Self::GlueSchemaRegistry => {
971 f.write_str("AWS GLUE SCHEMA REGISTRY");
972 }
973 Self::Gcp => {
974 f.write_str("GCP");
975 }
976 Self::Ssh => {
977 f.write_str("SSH TUNNEL");
978 }
979 Self::SqlServer => {
980 f.write_str("SQL SERVER");
981 }
982 Self::MySql => {
983 f.write_str("MYSQL");
984 }
985 Self::IcebergCatalog => {
986 f.write_str("ICEBERG CATALOG");
987 }
988 }
989 }
990}
991impl_display!(CreateConnectionType);
992
993#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
994pub enum CreateConnectionOptionName {
995 Validate,
996}
997
998impl AstDisplay for CreateConnectionOptionName {
999 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1000 f.write_str(match self {
1001 CreateConnectionOptionName::Validate => "VALIDATE",
1002 })
1003 }
1004}
1005impl_display!(CreateConnectionOptionName);
1006
1007impl WithOptionName for CreateConnectionOptionName {
1008 fn redact_value(&self) -> bool {
1014 match self {
1015 CreateConnectionOptionName::Validate => false,
1016 }
1017 }
1018}
1019
1020#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1021pub struct CreateConnectionOption<T: AstInfo> {
1023 pub name: CreateConnectionOptionName,
1024 pub value: Option<WithOptionValue<T>>,
1025}
1026impl_display_for_with_option!(CreateConnectionOption);
1027impl_display_t!(CreateConnectionOption);
1028
1029#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1030pub enum KafkaSourceConfigOptionName {
1031 GroupIdPrefix,
1032 Topic,
1033 TopicMetadataRefreshInterval,
1034 StartTimestamp,
1035 StartOffset,
1036}
1037
1038impl AstDisplay for KafkaSourceConfigOptionName {
1039 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1040 f.write_str(match self {
1041 KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
1042 KafkaSourceConfigOptionName::Topic => "TOPIC",
1043 KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
1044 "TOPIC METADATA REFRESH INTERVAL"
1045 }
1046 KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
1047 KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
1048 })
1049 }
1050}
1051impl_display!(KafkaSourceConfigOptionName);
1052
1053impl WithOptionName for KafkaSourceConfigOptionName {
1054 fn redact_value(&self) -> bool {
1060 match self {
1061 KafkaSourceConfigOptionName::GroupIdPrefix
1062 | KafkaSourceConfigOptionName::Topic
1063 | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1064 | KafkaSourceConfigOptionName::StartOffset
1065 | KafkaSourceConfigOptionName::StartTimestamp => false,
1066 }
1067 }
1068}
1069
1070#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1071pub struct KafkaSourceConfigOption<T: AstInfo> {
1072 pub name: KafkaSourceConfigOptionName,
1073 pub value: Option<WithOptionValue<T>>,
1074}
1075impl_display_for_with_option!(KafkaSourceConfigOption);
1076impl_display_t!(KafkaSourceConfigOption);
1077
1078#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1079pub enum KafkaSinkConfigOptionName {
1080 CompressionType,
1081 PartitionBy,
1082 ProgressGroupIdPrefix,
1083 Topic,
1084 TransactionalIdPrefix,
1085 LegacyIds,
1086 TopicConfig,
1087 TopicMetadataRefreshInterval,
1088 TopicPartitionCount,
1089 TopicReplicationFactor,
1090}
1091
1092impl AstDisplay for KafkaSinkConfigOptionName {
1093 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1094 f.write_str(match self {
1095 KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1096 KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1097 KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1098 KafkaSinkConfigOptionName::Topic => "TOPIC",
1099 KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1100 KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1101 KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1102 KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1103 "TOPIC METADATA REFRESH INTERVAL"
1104 }
1105 KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1106 KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1107 })
1108 }
1109}
1110impl_display!(KafkaSinkConfigOptionName);
1111
1112impl WithOptionName for KafkaSinkConfigOptionName {
1113 fn redact_value(&self) -> bool {
1119 match self {
1120 KafkaSinkConfigOptionName::CompressionType
1121 | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1122 | KafkaSinkConfigOptionName::Topic
1123 | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1124 | KafkaSinkConfigOptionName::TransactionalIdPrefix
1125 | KafkaSinkConfigOptionName::LegacyIds
1126 | KafkaSinkConfigOptionName::TopicConfig
1127 | KafkaSinkConfigOptionName::TopicPartitionCount
1128 | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1129 KafkaSinkConfigOptionName::PartitionBy => true,
1130 }
1131 }
1132}
1133
1134#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1135pub struct KafkaSinkConfigOption<T: AstInfo> {
1136 pub name: KafkaSinkConfigOptionName,
1137 pub value: Option<WithOptionValue<T>>,
1138}
1139impl_display_for_with_option!(KafkaSinkConfigOption);
1140impl_display_t!(KafkaSinkConfigOption);
1141
1142#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1143pub enum IcebergSinkConfigOptionName {
1144 Namespace,
1145 Table,
1146}
1147
1148impl AstDisplay for IcebergSinkConfigOptionName {
1149 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1150 f.write_str(match self {
1151 IcebergSinkConfigOptionName::Namespace => "NAMESPACE",
1152 IcebergSinkConfigOptionName::Table => "TABLE",
1153 })
1154 }
1155}
1156impl_display!(IcebergSinkConfigOptionName);
1157
1158impl WithOptionName for IcebergSinkConfigOptionName {
1159 fn redact_value(&self) -> bool {
1165 match self {
1166 IcebergSinkConfigOptionName::Namespace | IcebergSinkConfigOptionName::Table => false,
1167 }
1168 }
1169}
1170
1171#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1172pub struct IcebergSinkConfigOption<T: AstInfo> {
1173 pub name: IcebergSinkConfigOptionName,
1174 pub value: Option<WithOptionValue<T>>,
1175}
1176impl_display_for_with_option!(IcebergSinkConfigOption);
1177impl_display_t!(IcebergSinkConfigOption);
1178
1179#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1180pub enum PgConfigOptionName {
1181 Details,
1184 Publication,
1186 TextColumns,
1193 ExcludeColumns,
1200}
1201
1202impl AstDisplay for PgConfigOptionName {
1203 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1204 f.write_str(match self {
1205 PgConfigOptionName::Details => "DETAILS",
1206 PgConfigOptionName::Publication => "PUBLICATION",
1207 PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1208 PgConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1209 })
1210 }
1211}
1212impl_display!(PgConfigOptionName);
1213
1214impl WithOptionName for PgConfigOptionName {
1215 fn redact_value(&self) -> bool {
1221 match self {
1222 PgConfigOptionName::Details
1223 | PgConfigOptionName::Publication
1224 | PgConfigOptionName::TextColumns
1225 | PgConfigOptionName::ExcludeColumns => false,
1226 }
1227 }
1228}
1229
1230#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1231pub struct PgConfigOption<T: AstInfo> {
1233 pub name: PgConfigOptionName,
1234 pub value: Option<WithOptionValue<T>>,
1235}
1236impl_display_for_with_option!(PgConfigOption);
1237impl_display_t!(PgConfigOption);
1238
1239#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1240pub enum MySqlConfigOptionName {
1241 Details,
1244 TextColumns,
1251 ExcludeColumns,
1258}
1259
1260impl AstDisplay for MySqlConfigOptionName {
1261 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1262 f.write_str(match self {
1263 MySqlConfigOptionName::Details => "DETAILS",
1264 MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1265 MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1266 })
1267 }
1268}
1269impl_display!(MySqlConfigOptionName);
1270
1271impl WithOptionName for MySqlConfigOptionName {
1272 fn redact_value(&self) -> bool {
1278 match self {
1279 MySqlConfigOptionName::Details
1280 | MySqlConfigOptionName::TextColumns
1281 | MySqlConfigOptionName::ExcludeColumns => false,
1282 }
1283 }
1284}
1285
1286#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1287pub struct MySqlConfigOption<T: AstInfo> {
1289 pub name: MySqlConfigOptionName,
1290 pub value: Option<WithOptionValue<T>>,
1291}
1292impl_display_for_with_option!(MySqlConfigOption);
1293impl_display_t!(MySqlConfigOption);
1294
1295#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1296pub enum SqlServerConfigOptionName {
1297 Details,
1300 TextColumns,
1308 ExcludeColumns,
1316}
1317
1318impl AstDisplay for SqlServerConfigOptionName {
1319 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1320 f.write_str(match self {
1321 SqlServerConfigOptionName::Details => "DETAILS",
1322 SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1323 SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1324 })
1325 }
1326}
1327impl_display!(SqlServerConfigOptionName);
1328
1329impl WithOptionName for SqlServerConfigOptionName {
1330 fn redact_value(&self) -> bool {
1336 match self {
1337 SqlServerConfigOptionName::Details
1338 | SqlServerConfigOptionName::TextColumns
1339 | SqlServerConfigOptionName::ExcludeColumns => false,
1340 }
1341 }
1342}
1343
1344#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1345pub struct SqlServerConfigOption<T: AstInfo> {
1347 pub name: SqlServerConfigOptionName,
1348 pub value: Option<WithOptionValue<T>>,
1349}
1350impl_display_for_with_option!(SqlServerConfigOption);
1351impl_display_t!(SqlServerConfigOption);
1352
1353#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1354pub enum CreateSourceConnection<T: AstInfo> {
1355 Kafka {
1356 connection: T::ItemName,
1357 options: Vec<KafkaSourceConfigOption<T>>,
1358 },
1359 Postgres {
1360 connection: T::ItemName,
1361 options: Vec<PgConfigOption<T>>,
1362 },
1363 SqlServer {
1364 connection: T::ItemName,
1365 options: Vec<SqlServerConfigOption<T>>,
1366 },
1367 MySql {
1368 connection: T::ItemName,
1369 options: Vec<MySqlConfigOption<T>>,
1370 },
1371 LoadGenerator {
1372 generator: LoadGenerator,
1373 options: Vec<LoadGeneratorOption<T>>,
1374 },
1375}
1376
1377impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1378 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1379 match self {
1380 CreateSourceConnection::Kafka {
1381 connection,
1382 options,
1383 } => {
1384 f.write_str("KAFKA CONNECTION ");
1385 f.write_node(connection);
1386 if !options.is_empty() {
1387 f.write_str(" (");
1388 f.write_node(&display::comma_separated(options));
1389 f.write_str(")");
1390 }
1391 }
1392 CreateSourceConnection::Postgres {
1393 connection,
1394 options,
1395 } => {
1396 f.write_str("POSTGRES CONNECTION ");
1397 f.write_node(connection);
1398 if !options.is_empty() {
1399 f.write_str(" (");
1400 f.write_node(&display::comma_separated(options));
1401 f.write_str(")");
1402 }
1403 }
1404 CreateSourceConnection::SqlServer {
1405 connection,
1406 options,
1407 } => {
1408 f.write_str("SQL SERVER CONNECTION ");
1409 f.write_node(connection);
1410 if !options.is_empty() {
1411 f.write_str(" (");
1412 f.write_node(&display::comma_separated(options));
1413 f.write_str(")");
1414 }
1415 }
1416 CreateSourceConnection::MySql {
1417 connection,
1418 options,
1419 } => {
1420 f.write_str("MYSQL CONNECTION ");
1421 f.write_node(connection);
1422 if !options.is_empty() {
1423 f.write_str(" (");
1424 f.write_node(&display::comma_separated(options));
1425 f.write_str(")");
1426 }
1427 }
1428 CreateSourceConnection::LoadGenerator { generator, options } => {
1429 f.write_str("LOAD GENERATOR ");
1430 f.write_node(generator);
1431 if !options.is_empty() {
1432 f.write_str(" (");
1433 f.write_node(&display::comma_separated(options));
1434 f.write_str(")");
1435 }
1436 }
1437 }
1438 }
1439}
1440impl_display_t!(CreateSourceConnection);
1441
1442#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1443pub enum LoadGenerator {
1444 Clock,
1445 Counter,
1446 Marketing,
1447 Auction,
1448 Datums,
1449 Tpch,
1450 KeyValue,
1451}
1452
1453impl AstDisplay for LoadGenerator {
1454 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1455 match self {
1456 Self::Counter => f.write_str("COUNTER"),
1457 Self::Clock => f.write_str("CLOCK"),
1458 Self::Marketing => f.write_str("MARKETING"),
1459 Self::Auction => f.write_str("AUCTION"),
1460 Self::Datums => f.write_str("DATUMS"),
1461 Self::Tpch => f.write_str("TPCH"),
1462 Self::KeyValue => f.write_str("KEY VALUE"),
1463 }
1464 }
1465}
1466impl_display!(LoadGenerator);
1467
1468impl LoadGenerator {
1469 pub fn schema_name(&self) -> &'static str {
1474 match self {
1475 LoadGenerator::Counter => "counter",
1476 LoadGenerator::Clock => "clock",
1477 LoadGenerator::Marketing => "marketing",
1478 LoadGenerator::Auction => "auction",
1479 LoadGenerator::Datums => "datums",
1480 LoadGenerator::Tpch => "tpch",
1481 LoadGenerator::KeyValue => "key_value",
1482 }
1483 }
1484}
1485
1486#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1487pub enum LoadGeneratorOptionName {
1488 ScaleFactor,
1489 TickInterval,
1490 AsOf,
1491 UpTo,
1492 MaxCardinality,
1493 Keys,
1494 SnapshotRounds,
1495 TransactionalSnapshot,
1496 ValueSize,
1497 Seed,
1498 Partitions,
1499 BatchSize,
1500}
1501
1502impl AstDisplay for LoadGeneratorOptionName {
1503 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1504 f.write_str(match self {
1505 LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1506 LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1507 LoadGeneratorOptionName::AsOf => "AS OF",
1508 LoadGeneratorOptionName::UpTo => "UP TO",
1509 LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1510 LoadGeneratorOptionName::Keys => "KEYS",
1511 LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1512 LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1513 LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1514 LoadGeneratorOptionName::Seed => "SEED",
1515 LoadGeneratorOptionName::Partitions => "PARTITIONS",
1516 LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1517 })
1518 }
1519}
1520impl_display!(LoadGeneratorOptionName);
1521
1522impl WithOptionName for LoadGeneratorOptionName {
1523 fn redact_value(&self) -> bool {
1529 match self {
1530 LoadGeneratorOptionName::ScaleFactor
1531 | LoadGeneratorOptionName::TickInterval
1532 | LoadGeneratorOptionName::AsOf
1533 | LoadGeneratorOptionName::UpTo
1534 | LoadGeneratorOptionName::MaxCardinality
1535 | LoadGeneratorOptionName::Keys
1536 | LoadGeneratorOptionName::SnapshotRounds
1537 | LoadGeneratorOptionName::TransactionalSnapshot
1538 | LoadGeneratorOptionName::ValueSize
1539 | LoadGeneratorOptionName::Partitions
1540 | LoadGeneratorOptionName::BatchSize
1541 | LoadGeneratorOptionName::Seed => false,
1542 }
1543 }
1544}
1545
1546#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1547pub struct LoadGeneratorOption<T: AstInfo> {
1549 pub name: LoadGeneratorOptionName,
1550 pub value: Option<WithOptionValue<T>>,
1551}
1552impl_display_for_with_option!(LoadGeneratorOption);
1553impl_display_t!(LoadGeneratorOption);
1554
1555#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1556pub enum CreateSinkConnection<T: AstInfo> {
1557 Kafka {
1558 connection: T::ItemName,
1559 options: Vec<KafkaSinkConfigOption<T>>,
1560 key: Option<SinkKey>,
1561 headers: Option<Ident>,
1562 },
1563 Iceberg {
1564 catalog_connection: T::ItemName,
1565
1566 aws_connection: Option<T::ItemName>,
1568
1569 key: Option<SinkKey>,
1570 options: Vec<IcebergSinkConfigOption<T>>,
1571 },
1572}
1573
1574impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1575 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1576 match self {
1577 CreateSinkConnection::Kafka {
1578 connection,
1579 options,
1580 key,
1581 headers,
1582 } => {
1583 f.write_str("KAFKA CONNECTION ");
1584 f.write_node(connection);
1585 if !options.is_empty() {
1586 f.write_str(" (");
1587 f.write_node(&display::comma_separated(options));
1588 f.write_str(")");
1589 }
1590 if let Some(key) = key.as_ref() {
1591 f.write_str(" ");
1592 f.write_node(key);
1593 }
1594 if let Some(headers) = headers {
1595 f.write_str(" HEADERS ");
1596 f.write_node(headers);
1597 }
1598 }
1599 CreateSinkConnection::Iceberg {
1600 catalog_connection,
1601 aws_connection,
1602 key,
1603 options,
1604 } => {
1605 f.write_str("ICEBERG CATALOG CONNECTION ");
1606 f.write_node(catalog_connection);
1607 if !options.is_empty() {
1608 f.write_str(" (");
1609 f.write_node(&display::comma_separated(options));
1610 f.write_str(")");
1611 }
1612 if let Some(aws_connection) = aws_connection {
1613 f.write_str(" USING AWS CONNECTION ");
1614 f.write_node(aws_connection);
1615 }
1616 if let Some(key) = key.as_ref() {
1617 f.write_str(" ");
1618 f.write_node(key);
1619 }
1620 }
1621 }
1622 }
1623}
1624impl_display_t!(CreateSinkConnection);
1625
1626#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1627pub struct SinkKey {
1628 pub key_columns: Vec<Ident>,
1629 pub not_enforced: bool,
1630}
1631
1632impl AstDisplay for SinkKey {
1633 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1634 f.write_str("KEY (");
1635 f.write_node(&display::comma_separated(&self.key_columns));
1636 f.write_str(")");
1637 if self.not_enforced {
1638 f.write_str(" NOT ENFORCED");
1639 }
1640 }
1641}
1642
1643#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1646pub enum TableConstraint<T: AstInfo> {
1647 Unique {
1649 name: Option<Ident>,
1650 columns: Vec<Ident>,
1651 is_primary: bool,
1653 nulls_not_distinct: bool,
1656 },
1657 ForeignKey {
1660 name: Option<Ident>,
1661 columns: Vec<Ident>,
1662 foreign_table: T::ItemName,
1663 referred_columns: Vec<Ident>,
1664 },
1665 Check {
1667 name: Option<Ident>,
1668 expr: Box<Expr<T>>,
1669 },
1670}
1671
1672impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1673 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1674 match self {
1675 TableConstraint::Unique {
1676 name,
1677 columns,
1678 is_primary,
1679 nulls_not_distinct,
1680 } => {
1681 f.write_node(&display_constraint_name(name));
1682 if *is_primary {
1683 f.write_str("PRIMARY KEY ");
1684 } else {
1685 f.write_str("UNIQUE ");
1686 if *nulls_not_distinct {
1687 f.write_str("NULLS NOT DISTINCT ");
1688 }
1689 }
1690 f.write_str("(");
1691 f.write_node(&display::comma_separated(columns));
1692 f.write_str(")");
1693 }
1694 TableConstraint::ForeignKey {
1695 name,
1696 columns,
1697 foreign_table,
1698 referred_columns,
1699 } => {
1700 f.write_node(&display_constraint_name(name));
1701 f.write_str("FOREIGN KEY (");
1702 f.write_node(&display::comma_separated(columns));
1703 f.write_str(") REFERENCES ");
1704 f.write_node(foreign_table);
1705 f.write_str("(");
1706 f.write_node(&display::comma_separated(referred_columns));
1707 f.write_str(")");
1708 }
1709 TableConstraint::Check { name, expr } => {
1710 f.write_node(&display_constraint_name(name));
1711 f.write_str("CHECK (");
1712 f.write_node(&expr);
1713 f.write_str(")");
1714 }
1715 }
1716 }
1717}
1718impl_display_t!(TableConstraint);
1719
1720#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1722pub enum KeyConstraint {
1723 PrimaryKeyNotEnforced { columns: Vec<Ident> },
1725}
1726
1727impl AstDisplay for KeyConstraint {
1728 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1729 match self {
1730 KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1731 f.write_str("PRIMARY KEY ");
1732 f.write_str("(");
1733 f.write_node(&display::comma_separated(columns));
1734 f.write_str(") ");
1735 f.write_str("NOT ENFORCED");
1736 }
1737 }
1738 }
1739}
1740impl_display!(KeyConstraint);
1741
1742#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1743pub enum CreateSourceOptionName {
1744 TimestampInterval,
1745 RetainHistory,
1746}
1747
1748impl AstDisplay for CreateSourceOptionName {
1749 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1750 f.write_str(match self {
1751 CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1752 CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1753 })
1754 }
1755}
1756impl_display!(CreateSourceOptionName);
1757
1758impl WithOptionName for CreateSourceOptionName {
1759 fn redact_value(&self) -> bool {
1765 match self {
1766 CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1767 false
1768 }
1769 }
1770 }
1771}
1772
1773#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1774pub struct CreateSourceOption<T: AstInfo> {
1776 pub name: CreateSourceOptionName,
1777 pub value: Option<WithOptionValue<T>>,
1778}
1779impl_display_for_with_option!(CreateSourceOption);
1780impl_display_t!(CreateSourceOption);
1781
1782#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1784pub struct ColumnDef<T: AstInfo> {
1785 pub name: Ident,
1786 pub data_type: T::DataType,
1787 pub collation: Option<UnresolvedItemName>,
1788 pub options: Vec<ColumnOptionDef<T>>,
1789}
1790
1791impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1792 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1793 f.write_node(&self.name);
1794 f.write_str(" ");
1795 f.write_node(&self.data_type);
1796 if let Some(collation) = &self.collation {
1797 f.write_str(" COLLATE ");
1798 f.write_node(collation);
1799 }
1800 for option in &self.options {
1801 f.write_str(" ");
1802 f.write_node(option);
1803 }
1804 }
1805}
1806impl_display_t!(ColumnDef);
1807
1808#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1825pub struct ColumnOptionDef<T: AstInfo> {
1826 pub name: Option<Ident>,
1827 pub option: ColumnOption<T>,
1828}
1829
1830impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1831 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1832 f.write_node(&display_constraint_name(&self.name));
1833 f.write_node(&self.option);
1834 }
1835}
1836impl_display_t!(ColumnOptionDef);
1837
1838#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1841pub enum ColumnOption<T: AstInfo> {
1842 Null,
1844 NotNull,
1846 Default(Expr<T>),
1848 Unique { is_primary: bool },
1850 ForeignKey {
1853 foreign_table: UnresolvedItemName,
1854 referred_columns: Vec<Ident>,
1855 },
1856 Check(Expr<T>),
1858 Versioned {
1860 action: ColumnVersioned,
1861 version: Version,
1862 },
1863}
1864
1865impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1866 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1867 use ColumnOption::*;
1868 match self {
1869 Null => f.write_str("NULL"),
1870 NotNull => f.write_str("NOT NULL"),
1871 Default(expr) => {
1872 f.write_str("DEFAULT ");
1873 f.write_node(expr);
1874 }
1875 Unique { is_primary } => {
1876 if *is_primary {
1877 f.write_str("PRIMARY KEY");
1878 } else {
1879 f.write_str("UNIQUE");
1880 }
1881 }
1882 ForeignKey {
1883 foreign_table,
1884 referred_columns,
1885 } => {
1886 f.write_str("REFERENCES ");
1887 f.write_node(foreign_table);
1888 f.write_str(" (");
1889 f.write_node(&display::comma_separated(referred_columns));
1890 f.write_str(")");
1891 }
1892 Check(expr) => {
1893 f.write_str("CHECK (");
1894 f.write_node(expr);
1895 f.write_str(")");
1896 }
1897 Versioned { action, version } => {
1898 f.write_str("VERSION ");
1899 f.write_node(action);
1900 f.write_str(" ");
1901 f.write_node(version);
1902 }
1903 }
1904 }
1905}
1906impl_display_t!(ColumnOption);
1907
1908#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1909pub enum ColumnVersioned {
1910 Added,
1911}
1912
1913impl AstDisplay for ColumnVersioned {
1914 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1915 where
1916 W: fmt::Write,
1917 {
1918 match self {
1919 ColumnVersioned::Added => f.write_str("ADDED"),
1921 }
1922 }
1923}
1924impl_display!(ColumnVersioned);
1925
1926fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
1927 struct ConstraintName<'a>(&'a Option<Ident>);
1928 impl<'a> AstDisplay for ConstraintName<'a> {
1929 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1930 where
1931 W: fmt::Write,
1932 {
1933 if let Some(name) = self.0 {
1934 f.write_str("CONSTRAINT ");
1935 f.write_node(name);
1936 f.write_str(" ");
1937 }
1938 }
1939 }
1940 ConstraintName(name)
1941}