1use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28 AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33 AssertNotNull,
35 PartitionBy,
36 RetainHistory,
37 Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43 match self {
44 MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45 MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46 MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47 MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48 }
49 }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53 fn redact_value(&self) -> bool {
59 match self {
60 MaterializedViewOptionName::AssertNotNull
61 | MaterializedViewOptionName::PartitionBy
62 | MaterializedViewOptionName::RetainHistory
63 | MaterializedViewOptionName::Refresh => false,
64 }
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70 pub name: MaterializedViewOptionName,
71 pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, Hash)]
76pub struct Schema {
77 pub schema: String,
78}
79
80impl AstDisplay for Schema {
81 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
82 f.write_str("SCHEMA '");
83 f.write_node(&display::escape_single_quote_string(&self.schema));
84 f.write_str("'");
85 }
86}
87impl_display!(Schema);
88
89#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
90pub enum AvroSchemaOptionName {
91 ConfluentWireFormat,
93}
94
95impl AstDisplay for AvroSchemaOptionName {
96 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
97 match self {
98 AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
99 }
100 }
101}
102
103impl WithOptionName for AvroSchemaOptionName {
104 fn redact_value(&self) -> bool {
110 match self {
111 Self::ConfluentWireFormat => false,
112 }
113 }
114}
115
116#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
117pub struct AvroSchemaOption<T: AstInfo> {
118 pub name: AvroSchemaOptionName,
119 pub value: Option<WithOptionValue<T>>,
120}
121impl_display_for_with_option!(AvroSchemaOption);
122
123#[derive(Debug, Clone, PartialEq, Eq, Hash)]
124pub enum AvroSchema<T: AstInfo> {
125 Csr {
126 csr_connection: CsrConnectionAvro<T>,
127 },
128 InlineSchema {
129 schema: Schema,
130 with_options: Vec<AvroSchemaOption<T>>,
131 },
132}
133
134impl<T: AstInfo> AstDisplay for AvroSchema<T> {
135 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
136 match self {
137 Self::Csr { csr_connection } => {
138 f.write_node(csr_connection);
139 }
140 Self::InlineSchema {
141 schema,
142 with_options,
143 } => {
144 f.write_str("USING ");
145 schema.fmt(f);
146 if !with_options.is_empty() {
147 f.write_str(" (");
148 f.write_node(&display::comma_separated(with_options));
149 f.write_str(")");
150 }
151 }
152 }
153 }
154}
155impl_display_t!(AvroSchema);
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum ProtobufSchema<T: AstInfo> {
159 Csr {
160 csr_connection: CsrConnectionProtobuf<T>,
161 },
162 InlineSchema {
163 message_name: String,
164 schema: Schema,
165 },
166}
167
168impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
169 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
170 match self {
171 Self::Csr { csr_connection } => {
172 f.write_node(csr_connection);
173 }
174 Self::InlineSchema {
175 message_name,
176 schema,
177 } => {
178 f.write_str("MESSAGE '");
179 f.write_node(&display::escape_single_quote_string(message_name));
180 f.write_str("' USING ");
181 f.write_str(schema);
182 }
183 }
184 }
185}
186impl_display_t!(ProtobufSchema);
187
188#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
189pub enum CsrConfigOptionName<T: AstInfo> {
190 AvroKeyFullname,
191 AvroValueFullname,
192 NullDefaults,
193 AvroDocOn(AvroDocOn<T>),
194 KeyCompatibilityLevel,
195 ValueCompatibilityLevel,
196}
197
198impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
199 fn redact_value(&self) -> bool {
205 match self {
206 Self::AvroKeyFullname
207 | Self::AvroValueFullname
208 | Self::NullDefaults
209 | Self::AvroDocOn(_)
210 | Self::KeyCompatibilityLevel
211 | Self::ValueCompatibilityLevel => false,
212 }
213 }
214}
215
216#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
217pub struct AvroDocOn<T: AstInfo> {
218 pub identifier: DocOnIdentifier<T>,
219 pub for_schema: DocOnSchema,
220}
221#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
222pub enum DocOnSchema {
223 KeyOnly,
224 ValueOnly,
225 All,
226}
227
228#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
229pub enum DocOnIdentifier<T: AstInfo> {
230 Column(ColumnName<T>),
231 Type(T::ItemName),
232}
233
234impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
235 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
236 match &self.for_schema {
237 DocOnSchema::KeyOnly => f.write_str("KEY "),
238 DocOnSchema::ValueOnly => f.write_str("VALUE "),
239 DocOnSchema::All => {}
240 }
241 match &self.identifier {
242 DocOnIdentifier::Column(name) => {
243 f.write_str("DOC ON COLUMN ");
244 f.write_node(name);
245 }
246 DocOnIdentifier::Type(name) => {
247 f.write_str("DOC ON TYPE ");
248 f.write_node(name);
249 }
250 }
251 }
252}
253impl_display_t!(AvroDocOn);
254
255impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
256 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
257 match self {
258 CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
259 CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
260 CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
261 CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
262 CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
263 CsrConfigOptionName::ValueCompatibilityLevel => {
264 f.write_str("VALUE COMPATIBILITY LEVEL")
265 }
266 }
267 }
268}
269impl_display_t!(CsrConfigOptionName);
270
271#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
272pub struct CsrConfigOption<T: AstInfo> {
274 pub name: CsrConfigOptionName<T>,
275 pub value: Option<WithOptionValue<T>>,
276}
277impl_display_for_with_option!(CsrConfigOption);
278impl_display_t!(CsrConfigOption);
279
280#[derive(Debug, Clone, PartialEq, Eq, Hash)]
281pub struct CsrConnection<T: AstInfo> {
282 pub connection: T::ItemName,
283 pub options: Vec<CsrConfigOption<T>>,
284}
285
286impl<T: AstInfo> AstDisplay for CsrConnection<T> {
287 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
288 f.write_str("CONNECTION ");
289 f.write_node(&self.connection);
290 if !self.options.is_empty() {
291 f.write_str(" (");
292 f.write_node(&display::comma_separated(&self.options));
293 f.write_str(")");
294 }
295 }
296}
297impl_display_t!(CsrConnection);
298
299#[derive(Debug, Clone, PartialEq, Eq, Hash)]
300pub enum ReaderSchemaSelectionStrategy {
301 Latest,
302 Inline(String),
303 ById(i32),
304}
305
306impl Default for ReaderSchemaSelectionStrategy {
307 fn default() -> Self {
308 Self::Latest
309 }
310}
311
312#[derive(Debug, Clone, PartialEq, Eq, Hash)]
313pub struct CsrConnectionAvro<T: AstInfo> {
314 pub connection: CsrConnection<T>,
315 pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
316 pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
317 pub seed: Option<CsrSeedAvro>,
318}
319
320impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
321 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
322 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
323 f.write_node(&self.connection);
324 if let Some(seed) = &self.seed {
325 f.write_str(" ");
326 f.write_node(seed);
327 }
328 }
329}
330impl_display_t!(CsrConnectionAvro);
331
332#[derive(Debug, Clone, PartialEq, Eq, Hash)]
333pub struct CsrConnectionProtobuf<T: AstInfo> {
334 pub connection: CsrConnection<T>,
335 pub seed: Option<CsrSeedProtobuf>,
336}
337
338impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
339 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
340 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
341 f.write_node(&self.connection);
342
343 if let Some(seed) = &self.seed {
344 f.write_str(" ");
345 f.write_node(seed);
346 }
347 }
348}
349impl_display_t!(CsrConnectionProtobuf);
350
351#[derive(Debug, Clone, PartialEq, Eq, Hash)]
352pub struct CsrSeedAvro {
353 pub key_schema: Option<String>,
354 pub value_schema: String,
355 pub key_reference_schemas: Vec<String>,
358 pub value_reference_schemas: Vec<String>,
361}
362
363impl AstDisplay for CsrSeedAvro {
364 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
365 f.write_str("SEED");
366 if let Some(key_schema) = &self.key_schema {
367 f.write_str(" KEY SCHEMA '");
368 f.write_node(&display::escape_single_quote_string(key_schema));
369 f.write_str("'");
370 if !self.key_reference_schemas.is_empty() {
371 f.write_str(" KEY REFERENCES (");
372 for (i, schema) in self.key_reference_schemas.iter().enumerate() {
373 if i > 0 {
374 f.write_str(", ");
375 }
376 f.write_str("'");
377 f.write_node(&display::escape_single_quote_string(schema));
378 f.write_str("'");
379 }
380 f.write_str(")");
381 }
382 }
383 f.write_str(" VALUE SCHEMA '");
384 f.write_node(&display::escape_single_quote_string(&self.value_schema));
385 f.write_str("'");
386 if !self.value_reference_schemas.is_empty() {
387 f.write_str(" VALUE REFERENCES (");
388 for (i, schema) in self.value_reference_schemas.iter().enumerate() {
389 if i > 0 {
390 f.write_str(", ");
391 }
392 f.write_str("'");
393 f.write_node(&display::escape_single_quote_string(schema));
394 f.write_str("'");
395 }
396 f.write_str(")");
397 }
398 }
399}
400impl_display!(CsrSeedAvro);
401
402#[derive(Debug, Clone, PartialEq, Eq, Hash)]
403pub struct CsrSeedProtobuf {
404 pub key: Option<CsrSeedProtobufSchema>,
405 pub value: CsrSeedProtobufSchema,
406}
407
408impl AstDisplay for CsrSeedProtobuf {
409 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
410 f.write_str("SEED");
411 if let Some(key) = &self.key {
412 f.write_str(" KEY ");
413 f.write_node(key);
414 }
415 f.write_str(" VALUE ");
416 f.write_node(&self.value);
417 }
418}
419impl_display!(CsrSeedProtobuf);
420
421#[derive(Debug, Clone, PartialEq, Eq, Hash)]
422pub struct CsrSeedProtobufSchema {
423 pub schema: String,
425 pub message_name: String,
426}
427impl AstDisplay for CsrSeedProtobufSchema {
428 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
429 f.write_str("SCHEMA '");
430 f.write_str(&display::escape_single_quote_string(&self.schema));
431 f.write_str("' MESSAGE '");
432 f.write_str(&self.message_name);
433 f.write_str("'");
434 }
435}
436impl_display!(CsrSeedProtobufSchema);
437
438#[derive(Debug, Clone, PartialEq, Eq, Hash)]
439pub enum FormatSpecifier<T: AstInfo> {
440 Bare(Format<T>),
442 KeyValue { key: Format<T>, value: Format<T> },
444}
445
446impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
447 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
448 match self {
449 FormatSpecifier::Bare(format) => {
450 f.write_str("FORMAT ");
451 f.write_node(format)
452 }
453 FormatSpecifier::KeyValue { key, value } => {
454 f.write_str("KEY FORMAT ");
455 f.write_node(key);
456 f.write_str(" VALUE FORMAT ");
457 f.write_node(value);
458 }
459 }
460 }
461}
462impl_display_t!(FormatSpecifier);
463
464#[derive(Debug, Clone, PartialEq, Eq, Hash)]
465pub enum Format<T: AstInfo> {
466 Bytes,
467 Avro(AvroSchema<T>),
468 Protobuf(ProtobufSchema<T>),
469 Regex(String),
470 Csv {
471 columns: CsvColumns,
472 delimiter: char,
473 },
474 Json {
475 array: bool,
476 },
477 Text,
478}
479
480#[derive(Debug, Clone, PartialEq, Eq, Hash)]
481pub enum CsvColumns {
482 Count(u64),
484 Header { names: Vec<Ident> },
486}
487
488impl AstDisplay for CsvColumns {
489 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
490 match self {
491 CsvColumns::Count(n) => {
492 f.write_str(n);
493 f.write_str(" COLUMNS")
494 }
495 CsvColumns::Header { names } => {
496 f.write_str("HEADER");
497 if !names.is_empty() {
498 f.write_str(" (");
499 f.write_node(&display::comma_separated(names));
500 f.write_str(")");
501 }
502 }
503 }
504 }
505}
506
507#[derive(Debug, Clone, PartialEq, Eq, Hash)]
508pub enum SourceIncludeMetadata {
509 Key {
510 alias: Option<Ident>,
511 },
512 Timestamp {
513 alias: Option<Ident>,
514 },
515 Partition {
516 alias: Option<Ident>,
517 },
518 Offset {
519 alias: Option<Ident>,
520 },
521 Headers {
522 alias: Option<Ident>,
523 },
524 Header {
525 key: String,
526 alias: Ident,
527 use_bytes: bool,
528 },
529}
530
531impl AstDisplay for SourceIncludeMetadata {
532 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
533 let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
534 if let Some(alias) = alias {
535 f.write_str(" AS ");
536 f.write_node(alias);
537 }
538 };
539
540 match self {
541 SourceIncludeMetadata::Key { alias } => {
542 f.write_str("KEY");
543 print_alias(f, alias);
544 }
545 SourceIncludeMetadata::Timestamp { alias } => {
546 f.write_str("TIMESTAMP");
547 print_alias(f, alias);
548 }
549 SourceIncludeMetadata::Partition { alias } => {
550 f.write_str("PARTITION");
551 print_alias(f, alias);
552 }
553 SourceIncludeMetadata::Offset { alias } => {
554 f.write_str("OFFSET");
555 print_alias(f, alias);
556 }
557 SourceIncludeMetadata::Headers { alias } => {
558 f.write_str("HEADERS");
559 print_alias(f, alias);
560 }
561 SourceIncludeMetadata::Header {
562 alias,
563 key,
564 use_bytes,
565 } => {
566 f.write_str("HEADER '");
567 f.write_str(&display::escape_single_quote_string(key));
568 f.write_str("'");
569 print_alias(f, &Some(alias.clone()));
570 if *use_bytes {
571 f.write_str(" BYTES");
572 }
573 }
574 }
575 }
576}
577impl_display!(SourceIncludeMetadata);
578
579#[derive(Debug, Clone, PartialEq, Eq, Hash)]
580pub enum SourceErrorPolicy {
581 Inline {
582 alias: Option<Ident>,
584 },
585}
586
587impl AstDisplay for SourceErrorPolicy {
588 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
589 match self {
590 Self::Inline { alias } => {
591 f.write_str("INLINE");
592 if let Some(alias) = alias {
593 f.write_str(" AS ");
594 f.write_node(alias);
595 }
596 }
597 }
598 }
599}
600impl_display!(SourceErrorPolicy);
601
602#[derive(Debug, Clone, PartialEq, Eq, Hash)]
603pub enum SourceEnvelope {
604 None,
605 Debezium,
606 Upsert {
607 value_decode_err_policy: Vec<SourceErrorPolicy>,
608 },
609 CdcV2,
610}
611
612impl SourceEnvelope {
613 pub fn requires_all_input(&self) -> bool {
616 match self {
617 SourceEnvelope::None => false,
618 SourceEnvelope::Debezium => false,
619 SourceEnvelope::Upsert { .. } => false,
620 SourceEnvelope::CdcV2 => true,
621 }
622 }
623}
624
625impl AstDisplay for SourceEnvelope {
626 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
627 match self {
628 Self::None => {
629 f.write_str("NONE");
631 }
632 Self::Debezium => {
633 f.write_str("DEBEZIUM");
634 }
635 Self::Upsert {
636 value_decode_err_policy,
637 } => {
638 if value_decode_err_policy.is_empty() {
639 f.write_str("UPSERT");
640 } else {
641 f.write_str("UPSERT (VALUE DECODING ERRORS = (");
642 f.write_node(&display::comma_separated(value_decode_err_policy));
643 f.write_str("))")
644 }
645 }
646 Self::CdcV2 => {
647 f.write_str("MATERIALIZE");
648 }
649 }
650 }
651}
652impl_display!(SourceEnvelope);
653
654#[derive(Debug, Clone, PartialEq, Eq, Hash)]
655pub enum SinkEnvelope {
656 Debezium,
657 Upsert,
658}
659
660impl AstDisplay for SinkEnvelope {
661 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
662 match self {
663 Self::Upsert => {
664 f.write_str("UPSERT");
665 }
666 Self::Debezium => {
667 f.write_str("DEBEZIUM");
668 }
669 }
670 }
671}
672impl_display!(SinkEnvelope);
673
674#[derive(Debug, Clone, PartialEq, Eq, Hash)]
675pub enum IcebergSinkMode {
676 Upsert,
677 Append,
678}
679
680impl AstDisplay for IcebergSinkMode {
681 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
682 match self {
683 Self::Upsert => {
684 f.write_str("UPSERT");
685 }
686 Self::Append => {
687 f.write_str("APPEND");
688 }
689 }
690 }
691}
692impl_display!(IcebergSinkMode);
693
694#[derive(Debug, Clone, PartialEq, Eq, Hash)]
695pub enum SubscribeOutput<T: AstInfo> {
696 Diffs,
697 WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
698 EnvelopeUpsert { key_columns: Vec<Ident> },
699 EnvelopeDebezium { key_columns: Vec<Ident> },
700}
701
702impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
703 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
704 match self {
705 Self::Diffs => {}
706 Self::WithinTimestampOrderBy { order_by } => {
707 f.write_str(" WITHIN TIMESTAMP ORDER BY ");
708 f.write_node(&display::comma_separated(order_by));
709 }
710 Self::EnvelopeUpsert { key_columns } => {
711 f.write_str(" ENVELOPE UPSERT (KEY (");
712 f.write_node(&display::comma_separated(key_columns));
713 f.write_str("))");
714 }
715 Self::EnvelopeDebezium { key_columns } => {
716 f.write_str(" ENVELOPE DEBEZIUM (KEY (");
717 f.write_node(&display::comma_separated(key_columns));
718 f.write_str("))");
719 }
720 }
721 }
722}
723impl_display_t!(SubscribeOutput);
724
725impl<T: AstInfo> AstDisplay for Format<T> {
726 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
727 match self {
728 Self::Bytes => f.write_str("BYTES"),
729 Self::Avro(inner) => {
730 f.write_str("AVRO ");
731 f.write_node(inner);
732 }
733 Self::Protobuf(inner) => {
734 f.write_str("PROTOBUF ");
735 f.write_node(inner);
736 }
737 Self::Regex(regex) => {
738 f.write_str("REGEX '");
739 f.write_node(&display::escape_single_quote_string(regex));
740 f.write_str("'");
741 }
742 Self::Csv { columns, delimiter } => {
743 f.write_str("CSV WITH ");
744 f.write_node(columns);
745
746 if *delimiter != ',' {
747 f.write_str(" DELIMITED BY '");
748 f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
749 f.write_str("'");
750 }
751 }
752 Self::Json { array } => {
753 f.write_str("JSON");
754 if *array {
755 f.write_str(" ARRAY");
756 }
757 }
758 Self::Text => f.write_str("TEXT"),
759 }
760 }
761}
762impl_display_t!(Format);
763
764#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
768pub enum ConnectionOptionName {
769 AccessKeyId,
770 AssumeRoleArn,
771 AssumeRoleSessionName,
772 AvailabilityZones,
773 AwsConnection,
774 AwsPrivatelink,
775 Broker,
776 Brokers,
777 Credential,
778 Database,
779 Endpoint,
780 Host,
781 Password,
782 Port,
783 ProgressTopic,
784 ProgressTopicReplicationFactor,
785 PublicKey1,
786 PublicKey2,
787 Region,
788 SaslMechanisms,
789 SaslPassword,
790 SaslUsername,
791 Scope,
792 SecretAccessKey,
793 SecurityProtocol,
794 ServiceName,
795 SshTunnel,
796 SslCertificate,
797 SslCertificateAuthority,
798 SslKey,
799 SslMode,
800 SessionToken,
801 CatalogType,
802 Url,
803 User,
804 Warehouse,
805}
806
807impl AstDisplay for ConnectionOptionName {
808 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
809 f.write_str(match self {
810 ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
811 ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
812 ConnectionOptionName::AwsConnection => "AWS CONNECTION",
813 ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
814 ConnectionOptionName::Broker => "BROKER",
815 ConnectionOptionName::Brokers => "BROKERS",
816 ConnectionOptionName::Credential => "CREDENTIAL",
817 ConnectionOptionName::Database => "DATABASE",
818 ConnectionOptionName::Endpoint => "ENDPOINT",
819 ConnectionOptionName::Host => "HOST",
820 ConnectionOptionName::Password => "PASSWORD",
821 ConnectionOptionName::Port => "PORT",
822 ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
823 ConnectionOptionName::ProgressTopicReplicationFactor => {
824 "PROGRESS TOPIC REPLICATION FACTOR"
825 }
826 ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
827 ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
828 ConnectionOptionName::Region => "REGION",
829 ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
830 ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
831 ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
832 ConnectionOptionName::SaslPassword => "SASL PASSWORD",
833 ConnectionOptionName::SaslUsername => "SASL USERNAME",
834 ConnectionOptionName::Scope => "SCOPE",
835 ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
836 ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
837 ConnectionOptionName::ServiceName => "SERVICE NAME",
838 ConnectionOptionName::SshTunnel => "SSH TUNNEL",
839 ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
840 ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
841 ConnectionOptionName::SslKey => "SSL KEY",
842 ConnectionOptionName::SslMode => "SSL MODE",
843 ConnectionOptionName::SessionToken => "SESSION TOKEN",
844 ConnectionOptionName::CatalogType => "CATALOG TYPE",
845 ConnectionOptionName::Url => "URL",
846 ConnectionOptionName::User => "USER",
847 ConnectionOptionName::Warehouse => "WAREHOUSE",
848 })
849 }
850}
851impl_display!(ConnectionOptionName);
852
853impl WithOptionName for ConnectionOptionName {
854 fn redact_value(&self) -> bool {
860 match self {
861 ConnectionOptionName::AccessKeyId
862 | ConnectionOptionName::AvailabilityZones
863 | ConnectionOptionName::AwsConnection
864 | ConnectionOptionName::AwsPrivatelink
865 | ConnectionOptionName::Broker
866 | ConnectionOptionName::Brokers
867 | ConnectionOptionName::Credential
868 | ConnectionOptionName::Database
869 | ConnectionOptionName::Endpoint
870 | ConnectionOptionName::Host
871 | ConnectionOptionName::Password
872 | ConnectionOptionName::Port
873 | ConnectionOptionName::ProgressTopic
874 | ConnectionOptionName::ProgressTopicReplicationFactor
875 | ConnectionOptionName::PublicKey1
876 | ConnectionOptionName::PublicKey2
877 | ConnectionOptionName::Region
878 | ConnectionOptionName::AssumeRoleArn
879 | ConnectionOptionName::AssumeRoleSessionName
880 | ConnectionOptionName::SaslMechanisms
881 | ConnectionOptionName::SaslPassword
882 | ConnectionOptionName::SaslUsername
883 | ConnectionOptionName::Scope
884 | ConnectionOptionName::SecurityProtocol
885 | ConnectionOptionName::SecretAccessKey
886 | ConnectionOptionName::ServiceName
887 | ConnectionOptionName::SshTunnel
888 | ConnectionOptionName::SslCertificate
889 | ConnectionOptionName::SslCertificateAuthority
890 | ConnectionOptionName::SslKey
891 | ConnectionOptionName::SslMode
892 | ConnectionOptionName::SessionToken
893 | ConnectionOptionName::CatalogType
894 | ConnectionOptionName::Url
895 | ConnectionOptionName::User
896 | ConnectionOptionName::Warehouse => false,
897 }
898 }
899}
900
901#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
902pub struct ConnectionOption<T: AstInfo> {
904 pub name: ConnectionOptionName,
905 pub value: Option<WithOptionValue<T>>,
906}
907impl_display_for_with_option!(ConnectionOption);
908impl_display_t!(ConnectionOption);
909
910#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
911pub enum CreateConnectionType {
912 Aws,
913 AwsPrivatelink,
914 Kafka,
915 Csr,
916 Postgres,
917 Ssh,
918 SqlServer,
919 MySql,
920 IcebergCatalog,
921}
922
923impl CreateConnectionType {
924 pub fn as_str(&self) -> &'static str {
925 match self {
926 Self::Kafka => "kafka",
927 Self::Csr => "confluent-schema-registry",
928 Self::Postgres => "postgres",
929 Self::Aws => "aws",
930 Self::AwsPrivatelink => "aws-privatelink",
931 Self::Ssh => "ssh-tunnel",
932 Self::MySql => "mysql",
933 Self::SqlServer => "sql-server",
934 Self::IcebergCatalog => "iceberg-catalog",
935 }
936 }
937}
938
939impl AstDisplay for CreateConnectionType {
940 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
941 match self {
942 Self::Kafka => {
943 f.write_str("KAFKA");
944 }
945 Self::Csr => {
946 f.write_str("CONFLUENT SCHEMA REGISTRY");
947 }
948 Self::Postgres => {
949 f.write_str("POSTGRES");
950 }
951 Self::Aws => {
952 f.write_str("AWS");
953 }
954 Self::AwsPrivatelink => {
955 f.write_str("AWS PRIVATELINK");
956 }
957 Self::Ssh => {
958 f.write_str("SSH TUNNEL");
959 }
960 Self::SqlServer => {
961 f.write_str("SQL SERVER");
962 }
963 Self::MySql => {
964 f.write_str("MYSQL");
965 }
966 Self::IcebergCatalog => {
967 f.write_str("ICEBERG CATALOG");
968 }
969 }
970 }
971}
972impl_display!(CreateConnectionType);
973
974#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
975pub enum CreateConnectionOptionName {
976 Validate,
977}
978
979impl AstDisplay for CreateConnectionOptionName {
980 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
981 f.write_str(match self {
982 CreateConnectionOptionName::Validate => "VALIDATE",
983 })
984 }
985}
986impl_display!(CreateConnectionOptionName);
987
988impl WithOptionName for CreateConnectionOptionName {
989 fn redact_value(&self) -> bool {
995 match self {
996 CreateConnectionOptionName::Validate => false,
997 }
998 }
999}
1000
1001#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1002pub struct CreateConnectionOption<T: AstInfo> {
1004 pub name: CreateConnectionOptionName,
1005 pub value: Option<WithOptionValue<T>>,
1006}
1007impl_display_for_with_option!(CreateConnectionOption);
1008impl_display_t!(CreateConnectionOption);
1009
1010#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1011pub enum KafkaSourceConfigOptionName {
1012 GroupIdPrefix,
1013 Topic,
1014 TopicMetadataRefreshInterval,
1015 StartTimestamp,
1016 StartOffset,
1017}
1018
1019impl AstDisplay for KafkaSourceConfigOptionName {
1020 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1021 f.write_str(match self {
1022 KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
1023 KafkaSourceConfigOptionName::Topic => "TOPIC",
1024 KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
1025 "TOPIC METADATA REFRESH INTERVAL"
1026 }
1027 KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
1028 KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
1029 })
1030 }
1031}
1032impl_display!(KafkaSourceConfigOptionName);
1033
1034impl WithOptionName for KafkaSourceConfigOptionName {
1035 fn redact_value(&self) -> bool {
1041 match self {
1042 KafkaSourceConfigOptionName::GroupIdPrefix
1043 | KafkaSourceConfigOptionName::Topic
1044 | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1045 | KafkaSourceConfigOptionName::StartOffset
1046 | KafkaSourceConfigOptionName::StartTimestamp => false,
1047 }
1048 }
1049}
1050
1051#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1052pub struct KafkaSourceConfigOption<T: AstInfo> {
1053 pub name: KafkaSourceConfigOptionName,
1054 pub value: Option<WithOptionValue<T>>,
1055}
1056impl_display_for_with_option!(KafkaSourceConfigOption);
1057impl_display_t!(KafkaSourceConfigOption);
1058
1059#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1060pub enum KafkaSinkConfigOptionName {
1061 CompressionType,
1062 PartitionBy,
1063 ProgressGroupIdPrefix,
1064 Topic,
1065 TransactionalIdPrefix,
1066 LegacyIds,
1067 TopicConfig,
1068 TopicMetadataRefreshInterval,
1069 TopicPartitionCount,
1070 TopicReplicationFactor,
1071}
1072
1073impl AstDisplay for KafkaSinkConfigOptionName {
1074 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1075 f.write_str(match self {
1076 KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1077 KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1078 KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1079 KafkaSinkConfigOptionName::Topic => "TOPIC",
1080 KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1081 KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1082 KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1083 KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1084 "TOPIC METADATA REFRESH INTERVAL"
1085 }
1086 KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1087 KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1088 })
1089 }
1090}
1091impl_display!(KafkaSinkConfigOptionName);
1092
1093impl WithOptionName for KafkaSinkConfigOptionName {
1094 fn redact_value(&self) -> bool {
1100 match self {
1101 KafkaSinkConfigOptionName::CompressionType
1102 | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1103 | KafkaSinkConfigOptionName::Topic
1104 | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1105 | KafkaSinkConfigOptionName::TransactionalIdPrefix
1106 | KafkaSinkConfigOptionName::LegacyIds
1107 | KafkaSinkConfigOptionName::TopicConfig
1108 | KafkaSinkConfigOptionName::TopicPartitionCount
1109 | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1110 KafkaSinkConfigOptionName::PartitionBy => true,
1111 }
1112 }
1113}
1114
1115#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1116pub struct KafkaSinkConfigOption<T: AstInfo> {
1117 pub name: KafkaSinkConfigOptionName,
1118 pub value: Option<WithOptionValue<T>>,
1119}
1120impl_display_for_with_option!(KafkaSinkConfigOption);
1121impl_display_t!(KafkaSinkConfigOption);
1122
1123#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1124pub enum IcebergSinkConfigOptionName {
1125 Namespace,
1126 Table,
1127}
1128
1129impl AstDisplay for IcebergSinkConfigOptionName {
1130 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1131 f.write_str(match self {
1132 IcebergSinkConfigOptionName::Namespace => "NAMESPACE",
1133 IcebergSinkConfigOptionName::Table => "TABLE",
1134 })
1135 }
1136}
1137impl_display!(IcebergSinkConfigOptionName);
1138
1139impl WithOptionName for IcebergSinkConfigOptionName {
1140 fn redact_value(&self) -> bool {
1146 match self {
1147 IcebergSinkConfigOptionName::Namespace | IcebergSinkConfigOptionName::Table => false,
1148 }
1149 }
1150}
1151
1152#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1153pub struct IcebergSinkConfigOption<T: AstInfo> {
1154 pub name: IcebergSinkConfigOptionName,
1155 pub value: Option<WithOptionValue<T>>,
1156}
1157impl_display_for_with_option!(IcebergSinkConfigOption);
1158impl_display_t!(IcebergSinkConfigOption);
1159
1160#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1161pub enum PgConfigOptionName {
1162 Details,
1165 Publication,
1167 TextColumns,
1174 ExcludeColumns,
1181}
1182
1183impl AstDisplay for PgConfigOptionName {
1184 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1185 f.write_str(match self {
1186 PgConfigOptionName::Details => "DETAILS",
1187 PgConfigOptionName::Publication => "PUBLICATION",
1188 PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1189 PgConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1190 })
1191 }
1192}
1193impl_display!(PgConfigOptionName);
1194
1195impl WithOptionName for PgConfigOptionName {
1196 fn redact_value(&self) -> bool {
1202 match self {
1203 PgConfigOptionName::Details
1204 | PgConfigOptionName::Publication
1205 | PgConfigOptionName::TextColumns
1206 | PgConfigOptionName::ExcludeColumns => false,
1207 }
1208 }
1209}
1210
1211#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1212pub struct PgConfigOption<T: AstInfo> {
1214 pub name: PgConfigOptionName,
1215 pub value: Option<WithOptionValue<T>>,
1216}
1217impl_display_for_with_option!(PgConfigOption);
1218impl_display_t!(PgConfigOption);
1219
1220#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1221pub enum MySqlConfigOptionName {
1222 Details,
1225 TextColumns,
1232 ExcludeColumns,
1239}
1240
1241impl AstDisplay for MySqlConfigOptionName {
1242 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1243 f.write_str(match self {
1244 MySqlConfigOptionName::Details => "DETAILS",
1245 MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1246 MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1247 })
1248 }
1249}
1250impl_display!(MySqlConfigOptionName);
1251
1252impl WithOptionName for MySqlConfigOptionName {
1253 fn redact_value(&self) -> bool {
1259 match self {
1260 MySqlConfigOptionName::Details
1261 | MySqlConfigOptionName::TextColumns
1262 | MySqlConfigOptionName::ExcludeColumns => false,
1263 }
1264 }
1265}
1266
1267#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1268pub struct MySqlConfigOption<T: AstInfo> {
1270 pub name: MySqlConfigOptionName,
1271 pub value: Option<WithOptionValue<T>>,
1272}
1273impl_display_for_with_option!(MySqlConfigOption);
1274impl_display_t!(MySqlConfigOption);
1275
1276#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1277pub enum SqlServerConfigOptionName {
1278 Details,
1281 TextColumns,
1289 ExcludeColumns,
1297}
1298
1299impl AstDisplay for SqlServerConfigOptionName {
1300 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1301 f.write_str(match self {
1302 SqlServerConfigOptionName::Details => "DETAILS",
1303 SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1304 SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1305 })
1306 }
1307}
1308impl_display!(SqlServerConfigOptionName);
1309
1310impl WithOptionName for SqlServerConfigOptionName {
1311 fn redact_value(&self) -> bool {
1317 match self {
1318 SqlServerConfigOptionName::Details
1319 | SqlServerConfigOptionName::TextColumns
1320 | SqlServerConfigOptionName::ExcludeColumns => false,
1321 }
1322 }
1323}
1324
1325#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1326pub struct SqlServerConfigOption<T: AstInfo> {
1328 pub name: SqlServerConfigOptionName,
1329 pub value: Option<WithOptionValue<T>>,
1330}
1331impl_display_for_with_option!(SqlServerConfigOption);
1332impl_display_t!(SqlServerConfigOption);
1333
1334#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1335pub enum CreateSourceConnection<T: AstInfo> {
1336 Kafka {
1337 connection: T::ItemName,
1338 options: Vec<KafkaSourceConfigOption<T>>,
1339 },
1340 Postgres {
1341 connection: T::ItemName,
1342 options: Vec<PgConfigOption<T>>,
1343 },
1344 SqlServer {
1345 connection: T::ItemName,
1346 options: Vec<SqlServerConfigOption<T>>,
1347 },
1348 MySql {
1349 connection: T::ItemName,
1350 options: Vec<MySqlConfigOption<T>>,
1351 },
1352 LoadGenerator {
1353 generator: LoadGenerator,
1354 options: Vec<LoadGeneratorOption<T>>,
1355 },
1356}
1357
1358impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1359 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1360 match self {
1361 CreateSourceConnection::Kafka {
1362 connection,
1363 options,
1364 } => {
1365 f.write_str("KAFKA CONNECTION ");
1366 f.write_node(connection);
1367 if !options.is_empty() {
1368 f.write_str(" (");
1369 f.write_node(&display::comma_separated(options));
1370 f.write_str(")");
1371 }
1372 }
1373 CreateSourceConnection::Postgres {
1374 connection,
1375 options,
1376 } => {
1377 f.write_str("POSTGRES CONNECTION ");
1378 f.write_node(connection);
1379 if !options.is_empty() {
1380 f.write_str(" (");
1381 f.write_node(&display::comma_separated(options));
1382 f.write_str(")");
1383 }
1384 }
1385 CreateSourceConnection::SqlServer {
1386 connection,
1387 options,
1388 } => {
1389 f.write_str("SQL SERVER CONNECTION ");
1390 f.write_node(connection);
1391 if !options.is_empty() {
1392 f.write_str(" (");
1393 f.write_node(&display::comma_separated(options));
1394 f.write_str(")");
1395 }
1396 }
1397 CreateSourceConnection::MySql {
1398 connection,
1399 options,
1400 } => {
1401 f.write_str("MYSQL CONNECTION ");
1402 f.write_node(connection);
1403 if !options.is_empty() {
1404 f.write_str(" (");
1405 f.write_node(&display::comma_separated(options));
1406 f.write_str(")");
1407 }
1408 }
1409 CreateSourceConnection::LoadGenerator { generator, options } => {
1410 f.write_str("LOAD GENERATOR ");
1411 f.write_node(generator);
1412 if !options.is_empty() {
1413 f.write_str(" (");
1414 f.write_node(&display::comma_separated(options));
1415 f.write_str(")");
1416 }
1417 }
1418 }
1419 }
1420}
1421impl_display_t!(CreateSourceConnection);
1422
1423#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1424pub enum LoadGenerator {
1425 Clock,
1426 Counter,
1427 Marketing,
1428 Auction,
1429 Datums,
1430 Tpch,
1431 KeyValue,
1432}
1433
1434impl AstDisplay for LoadGenerator {
1435 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1436 match self {
1437 Self::Counter => f.write_str("COUNTER"),
1438 Self::Clock => f.write_str("CLOCK"),
1439 Self::Marketing => f.write_str("MARKETING"),
1440 Self::Auction => f.write_str("AUCTION"),
1441 Self::Datums => f.write_str("DATUMS"),
1442 Self::Tpch => f.write_str("TPCH"),
1443 Self::KeyValue => f.write_str("KEY VALUE"),
1444 }
1445 }
1446}
1447impl_display!(LoadGenerator);
1448
1449impl LoadGenerator {
1450 pub fn schema_name(&self) -> &'static str {
1455 match self {
1456 LoadGenerator::Counter => "counter",
1457 LoadGenerator::Clock => "clock",
1458 LoadGenerator::Marketing => "marketing",
1459 LoadGenerator::Auction => "auction",
1460 LoadGenerator::Datums => "datums",
1461 LoadGenerator::Tpch => "tpch",
1462 LoadGenerator::KeyValue => "key_value",
1463 }
1464 }
1465}
1466
1467#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1468pub enum LoadGeneratorOptionName {
1469 ScaleFactor,
1470 TickInterval,
1471 AsOf,
1472 UpTo,
1473 MaxCardinality,
1474 Keys,
1475 SnapshotRounds,
1476 TransactionalSnapshot,
1477 ValueSize,
1478 Seed,
1479 Partitions,
1480 BatchSize,
1481}
1482
1483impl AstDisplay for LoadGeneratorOptionName {
1484 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1485 f.write_str(match self {
1486 LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1487 LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1488 LoadGeneratorOptionName::AsOf => "AS OF",
1489 LoadGeneratorOptionName::UpTo => "UP TO",
1490 LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1491 LoadGeneratorOptionName::Keys => "KEYS",
1492 LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1493 LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1494 LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1495 LoadGeneratorOptionName::Seed => "SEED",
1496 LoadGeneratorOptionName::Partitions => "PARTITIONS",
1497 LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1498 })
1499 }
1500}
1501impl_display!(LoadGeneratorOptionName);
1502
1503impl WithOptionName for LoadGeneratorOptionName {
1504 fn redact_value(&self) -> bool {
1510 match self {
1511 LoadGeneratorOptionName::ScaleFactor
1512 | LoadGeneratorOptionName::TickInterval
1513 | LoadGeneratorOptionName::AsOf
1514 | LoadGeneratorOptionName::UpTo
1515 | LoadGeneratorOptionName::MaxCardinality
1516 | LoadGeneratorOptionName::Keys
1517 | LoadGeneratorOptionName::SnapshotRounds
1518 | LoadGeneratorOptionName::TransactionalSnapshot
1519 | LoadGeneratorOptionName::ValueSize
1520 | LoadGeneratorOptionName::Partitions
1521 | LoadGeneratorOptionName::BatchSize
1522 | LoadGeneratorOptionName::Seed => false,
1523 }
1524 }
1525}
1526
1527#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1528pub struct LoadGeneratorOption<T: AstInfo> {
1530 pub name: LoadGeneratorOptionName,
1531 pub value: Option<WithOptionValue<T>>,
1532}
1533impl_display_for_with_option!(LoadGeneratorOption);
1534impl_display_t!(LoadGeneratorOption);
1535
1536#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1537pub enum CreateSinkConnection<T: AstInfo> {
1538 Kafka {
1539 connection: T::ItemName,
1540 options: Vec<KafkaSinkConfigOption<T>>,
1541 key: Option<SinkKey>,
1542 headers: Option<Ident>,
1543 },
1544 Iceberg {
1545 connection: T::ItemName,
1546 aws_connection: T::ItemName,
1547 key: Option<SinkKey>,
1548 options: Vec<IcebergSinkConfigOption<T>>,
1549 },
1550}
1551
1552impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1553 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1554 match self {
1555 CreateSinkConnection::Kafka {
1556 connection,
1557 options,
1558 key,
1559 headers,
1560 } => {
1561 f.write_str("KAFKA CONNECTION ");
1562 f.write_node(connection);
1563 if !options.is_empty() {
1564 f.write_str(" (");
1565 f.write_node(&display::comma_separated(options));
1566 f.write_str(")");
1567 }
1568 if let Some(key) = key.as_ref() {
1569 f.write_str(" ");
1570 f.write_node(key);
1571 }
1572 if let Some(headers) = headers {
1573 f.write_str(" HEADERS ");
1574 f.write_node(headers);
1575 }
1576 }
1577 CreateSinkConnection::Iceberg {
1578 connection,
1579 aws_connection,
1580 key,
1581 options,
1582 } => {
1583 f.write_str("ICEBERG CATALOG CONNECTION ");
1584 f.write_node(connection);
1585 if !options.is_empty() {
1586 f.write_str(" (");
1587 f.write_node(&display::comma_separated(options));
1588 f.write_str(")");
1589 }
1590 f.write_str(" USING AWS CONNECTION ");
1591 f.write_node(aws_connection);
1592 if let Some(key) = key.as_ref() {
1593 f.write_str(" ");
1594 f.write_node(key);
1595 }
1596 }
1597 }
1598 }
1599}
1600impl_display_t!(CreateSinkConnection);
1601
1602#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1603pub struct SinkKey {
1604 pub key_columns: Vec<Ident>,
1605 pub not_enforced: bool,
1606}
1607
1608impl AstDisplay for SinkKey {
1609 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1610 f.write_str("KEY (");
1611 f.write_node(&display::comma_separated(&self.key_columns));
1612 f.write_str(")");
1613 if self.not_enforced {
1614 f.write_str(" NOT ENFORCED");
1615 }
1616 }
1617}
1618
1619#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1622pub enum TableConstraint<T: AstInfo> {
1623 Unique {
1625 name: Option<Ident>,
1626 columns: Vec<Ident>,
1627 is_primary: bool,
1629 nulls_not_distinct: bool,
1632 },
1633 ForeignKey {
1636 name: Option<Ident>,
1637 columns: Vec<Ident>,
1638 foreign_table: T::ItemName,
1639 referred_columns: Vec<Ident>,
1640 },
1641 Check {
1643 name: Option<Ident>,
1644 expr: Box<Expr<T>>,
1645 },
1646}
1647
1648impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1649 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1650 match self {
1651 TableConstraint::Unique {
1652 name,
1653 columns,
1654 is_primary,
1655 nulls_not_distinct,
1656 } => {
1657 f.write_node(&display_constraint_name(name));
1658 if *is_primary {
1659 f.write_str("PRIMARY KEY ");
1660 } else {
1661 f.write_str("UNIQUE ");
1662 if *nulls_not_distinct {
1663 f.write_str("NULLS NOT DISTINCT ");
1664 }
1665 }
1666 f.write_str("(");
1667 f.write_node(&display::comma_separated(columns));
1668 f.write_str(")");
1669 }
1670 TableConstraint::ForeignKey {
1671 name,
1672 columns,
1673 foreign_table,
1674 referred_columns,
1675 } => {
1676 f.write_node(&display_constraint_name(name));
1677 f.write_str("FOREIGN KEY (");
1678 f.write_node(&display::comma_separated(columns));
1679 f.write_str(") REFERENCES ");
1680 f.write_node(foreign_table);
1681 f.write_str("(");
1682 f.write_node(&display::comma_separated(referred_columns));
1683 f.write_str(")");
1684 }
1685 TableConstraint::Check { name, expr } => {
1686 f.write_node(&display_constraint_name(name));
1687 f.write_str("CHECK (");
1688 f.write_node(&expr);
1689 f.write_str(")");
1690 }
1691 }
1692 }
1693}
1694impl_display_t!(TableConstraint);
1695
1696#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1698pub enum KeyConstraint {
1699 PrimaryKeyNotEnforced { columns: Vec<Ident> },
1701}
1702
1703impl AstDisplay for KeyConstraint {
1704 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1705 match self {
1706 KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1707 f.write_str("PRIMARY KEY ");
1708 f.write_str("(");
1709 f.write_node(&display::comma_separated(columns));
1710 f.write_str(") ");
1711 f.write_str("NOT ENFORCED");
1712 }
1713 }
1714 }
1715}
1716impl_display!(KeyConstraint);
1717
1718#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1719pub enum CreateSourceOptionName {
1720 TimestampInterval,
1721 RetainHistory,
1722}
1723
1724impl AstDisplay for CreateSourceOptionName {
1725 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1726 f.write_str(match self {
1727 CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1728 CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1729 })
1730 }
1731}
1732impl_display!(CreateSourceOptionName);
1733
1734impl WithOptionName for CreateSourceOptionName {
1735 fn redact_value(&self) -> bool {
1741 match self {
1742 CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1743 false
1744 }
1745 }
1746 }
1747}
1748
1749#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1750pub struct CreateSourceOption<T: AstInfo> {
1752 pub name: CreateSourceOptionName,
1753 pub value: Option<WithOptionValue<T>>,
1754}
1755impl_display_for_with_option!(CreateSourceOption);
1756impl_display_t!(CreateSourceOption);
1757
1758#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1760pub struct ColumnDef<T: AstInfo> {
1761 pub name: Ident,
1762 pub data_type: T::DataType,
1763 pub collation: Option<UnresolvedItemName>,
1764 pub options: Vec<ColumnOptionDef<T>>,
1765}
1766
1767impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1768 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1769 f.write_node(&self.name);
1770 f.write_str(" ");
1771 f.write_node(&self.data_type);
1772 if let Some(collation) = &self.collation {
1773 f.write_str(" COLLATE ");
1774 f.write_node(collation);
1775 }
1776 for option in &self.options {
1777 f.write_str(" ");
1778 f.write_node(option);
1779 }
1780 }
1781}
1782impl_display_t!(ColumnDef);
1783
1784#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1801pub struct ColumnOptionDef<T: AstInfo> {
1802 pub name: Option<Ident>,
1803 pub option: ColumnOption<T>,
1804}
1805
1806impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1807 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1808 f.write_node(&display_constraint_name(&self.name));
1809 f.write_node(&self.option);
1810 }
1811}
1812impl_display_t!(ColumnOptionDef);
1813
1814#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1817pub enum ColumnOption<T: AstInfo> {
1818 Null,
1820 NotNull,
1822 Default(Expr<T>),
1824 Unique { is_primary: bool },
1826 ForeignKey {
1829 foreign_table: UnresolvedItemName,
1830 referred_columns: Vec<Ident>,
1831 },
1832 Check(Expr<T>),
1834 Versioned {
1836 action: ColumnVersioned,
1837 version: Version,
1838 },
1839}
1840
1841impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1842 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1843 use ColumnOption::*;
1844 match self {
1845 Null => f.write_str("NULL"),
1846 NotNull => f.write_str("NOT NULL"),
1847 Default(expr) => {
1848 f.write_str("DEFAULT ");
1849 f.write_node(expr);
1850 }
1851 Unique { is_primary } => {
1852 if *is_primary {
1853 f.write_str("PRIMARY KEY");
1854 } else {
1855 f.write_str("UNIQUE");
1856 }
1857 }
1858 ForeignKey {
1859 foreign_table,
1860 referred_columns,
1861 } => {
1862 f.write_str("REFERENCES ");
1863 f.write_node(foreign_table);
1864 f.write_str(" (");
1865 f.write_node(&display::comma_separated(referred_columns));
1866 f.write_str(")");
1867 }
1868 Check(expr) => {
1869 f.write_str("CHECK (");
1870 f.write_node(expr);
1871 f.write_str(")");
1872 }
1873 Versioned { action, version } => {
1874 f.write_str("VERSION ");
1875 f.write_node(action);
1876 f.write_str(" ");
1877 f.write_node(version);
1878 }
1879 }
1880 }
1881}
1882impl_display_t!(ColumnOption);
1883
1884#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1885pub enum ColumnVersioned {
1886 Added,
1887}
1888
1889impl AstDisplay for ColumnVersioned {
1890 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1891 where
1892 W: fmt::Write,
1893 {
1894 match self {
1895 ColumnVersioned::Added => f.write_str("ADDED"),
1897 }
1898 }
1899}
1900impl_display!(ColumnVersioned);
1901
1902fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
1903 struct ConstraintName<'a>(&'a Option<Ident>);
1904 impl<'a> AstDisplay for ConstraintName<'a> {
1905 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1906 where
1907 W: fmt::Write,
1908 {
1909 if let Some(name) = self.0 {
1910 f.write_str("CONSTRAINT ");
1911 f.write_node(name);
1912 f.write_str(" ");
1913 }
1914 }
1915 }
1916 ConstraintName(name)
1917}