1use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28 AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33 AssertNotNull,
35 PartitionBy,
36 RetainHistory,
37 Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43 match self {
44 MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45 MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46 MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47 MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48 }
49 }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53 fn redact_value(&self) -> bool {
59 match self {
60 MaterializedViewOptionName::AssertNotNull
61 | MaterializedViewOptionName::PartitionBy
62 | MaterializedViewOptionName::RetainHistory
63 | MaterializedViewOptionName::Refresh => false,
64 }
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70 pub name: MaterializedViewOptionName,
71 pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
76pub enum ContinualTaskOptionName {
77 Snapshot,
79}
80
81impl AstDisplay for ContinualTaskOptionName {
82 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
83 match self {
84 ContinualTaskOptionName::Snapshot => f.write_str("SNAPSHOT"),
85 }
86 }
87}
88
89impl WithOptionName for ContinualTaskOptionName {
90 fn redact_value(&self) -> bool {
96 match self {
97 ContinualTaskOptionName::Snapshot => false,
98 }
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
103pub struct ContinualTaskOption<T: AstInfo> {
104 pub name: ContinualTaskOptionName,
105 pub value: Option<WithOptionValue<T>>,
106}
107impl_display_for_with_option!(ContinualTaskOption);
108
109#[derive(Debug, Clone, PartialEq, Eq, Hash)]
110pub struct Schema {
111 pub schema: String,
112}
113
114impl AstDisplay for Schema {
115 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
116 f.write_str("SCHEMA '");
117 f.write_node(&display::escape_single_quote_string(&self.schema));
118 f.write_str("'");
119 }
120}
121impl_display!(Schema);
122
123#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
124pub enum AvroSchemaOptionName {
125 ConfluentWireFormat,
127}
128
129impl AstDisplay for AvroSchemaOptionName {
130 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
131 match self {
132 AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
133 }
134 }
135}
136
137impl WithOptionName for AvroSchemaOptionName {
138 fn redact_value(&self) -> bool {
144 match self {
145 Self::ConfluentWireFormat => false,
146 }
147 }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
151pub struct AvroSchemaOption<T: AstInfo> {
152 pub name: AvroSchemaOptionName,
153 pub value: Option<WithOptionValue<T>>,
154}
155impl_display_for_with_option!(AvroSchemaOption);
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum AvroSchema<T: AstInfo> {
159 Csr {
160 csr_connection: CsrConnectionAvro<T>,
161 },
162 InlineSchema {
163 schema: Schema,
164 with_options: Vec<AvroSchemaOption<T>>,
165 },
166}
167
168impl<T: AstInfo> AstDisplay for AvroSchema<T> {
169 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
170 match self {
171 Self::Csr { csr_connection } => {
172 f.write_node(csr_connection);
173 }
174 Self::InlineSchema {
175 schema,
176 with_options,
177 } => {
178 f.write_str("USING ");
179 schema.fmt(f);
180 if !with_options.is_empty() {
181 f.write_str(" (");
182 f.write_node(&display::comma_separated(with_options));
183 f.write_str(")");
184 }
185 }
186 }
187 }
188}
189impl_display_t!(AvroSchema);
190
191#[derive(Debug, Clone, PartialEq, Eq, Hash)]
192pub enum ProtobufSchema<T: AstInfo> {
193 Csr {
194 csr_connection: CsrConnectionProtobuf<T>,
195 },
196 InlineSchema {
197 message_name: String,
198 schema: Schema,
199 },
200}
201
202impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
203 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
204 match self {
205 Self::Csr { csr_connection } => {
206 f.write_node(csr_connection);
207 }
208 Self::InlineSchema {
209 message_name,
210 schema,
211 } => {
212 f.write_str("MESSAGE '");
213 f.write_node(&display::escape_single_quote_string(message_name));
214 f.write_str("' USING ");
215 f.write_str(schema);
216 }
217 }
218 }
219}
220impl_display_t!(ProtobufSchema);
221
222#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
223pub enum CsrConfigOptionName<T: AstInfo> {
224 AvroKeyFullname,
225 AvroValueFullname,
226 NullDefaults,
227 AvroDocOn(AvroDocOn<T>),
228 KeyCompatibilityLevel,
229 ValueCompatibilityLevel,
230}
231
232impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
233 fn redact_value(&self) -> bool {
239 match self {
240 Self::AvroKeyFullname
241 | Self::AvroValueFullname
242 | Self::NullDefaults
243 | Self::AvroDocOn(_)
244 | Self::KeyCompatibilityLevel
245 | Self::ValueCompatibilityLevel => false,
246 }
247 }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
251pub struct AvroDocOn<T: AstInfo> {
252 pub identifier: DocOnIdentifier<T>,
253 pub for_schema: DocOnSchema,
254}
255#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
256pub enum DocOnSchema {
257 KeyOnly,
258 ValueOnly,
259 All,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
263pub enum DocOnIdentifier<T: AstInfo> {
264 Column(ColumnName<T>),
265 Type(T::ItemName),
266}
267
268impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
269 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
270 match &self.for_schema {
271 DocOnSchema::KeyOnly => f.write_str("KEY "),
272 DocOnSchema::ValueOnly => f.write_str("VALUE "),
273 DocOnSchema::All => {}
274 }
275 match &self.identifier {
276 DocOnIdentifier::Column(name) => {
277 f.write_str("DOC ON COLUMN ");
278 f.write_node(name);
279 }
280 DocOnIdentifier::Type(name) => {
281 f.write_str("DOC ON TYPE ");
282 f.write_node(name);
283 }
284 }
285 }
286}
287impl_display_t!(AvroDocOn);
288
289impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
290 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
291 match self {
292 CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
293 CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
294 CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
295 CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
296 CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
297 CsrConfigOptionName::ValueCompatibilityLevel => {
298 f.write_str("VALUE COMPATIBILITY LEVEL")
299 }
300 }
301 }
302}
303impl_display_t!(CsrConfigOptionName);
304
305#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
306pub struct CsrConfigOption<T: AstInfo> {
308 pub name: CsrConfigOptionName<T>,
309 pub value: Option<WithOptionValue<T>>,
310}
311impl_display_for_with_option!(CsrConfigOption);
312impl_display_t!(CsrConfigOption);
313
314#[derive(Debug, Clone, PartialEq, Eq, Hash)]
315pub struct CsrConnection<T: AstInfo> {
316 pub connection: T::ItemName,
317 pub options: Vec<CsrConfigOption<T>>,
318}
319
320impl<T: AstInfo> AstDisplay for CsrConnection<T> {
321 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
322 f.write_str("CONNECTION ");
323 f.write_node(&self.connection);
324 if !self.options.is_empty() {
325 f.write_str(" (");
326 f.write_node(&display::comma_separated(&self.options));
327 f.write_str(")");
328 }
329 }
330}
331impl_display_t!(CsrConnection);
332
333#[derive(Debug, Clone, PartialEq, Eq, Hash)]
334pub enum ReaderSchemaSelectionStrategy {
335 Latest,
336 Inline(String),
337 ById(i32),
338}
339
340impl Default for ReaderSchemaSelectionStrategy {
341 fn default() -> Self {
342 Self::Latest
343 }
344}
345
346#[derive(Debug, Clone, PartialEq, Eq, Hash)]
347pub struct CsrConnectionAvro<T: AstInfo> {
348 pub connection: CsrConnection<T>,
349 pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
350 pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
351 pub seed: Option<CsrSeedAvro>,
352}
353
354impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
355 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
356 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
357 f.write_node(&self.connection);
358 if let Some(seed) = &self.seed {
359 f.write_str(" ");
360 f.write_node(seed);
361 }
362 }
363}
364impl_display_t!(CsrConnectionAvro);
365
366#[derive(Debug, Clone, PartialEq, Eq, Hash)]
367pub struct CsrConnectionProtobuf<T: AstInfo> {
368 pub connection: CsrConnection<T>,
369 pub seed: Option<CsrSeedProtobuf>,
370}
371
372impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
373 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
374 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
375 f.write_node(&self.connection);
376
377 if let Some(seed) = &self.seed {
378 f.write_str(" ");
379 f.write_node(seed);
380 }
381 }
382}
383impl_display_t!(CsrConnectionProtobuf);
384
385#[derive(Debug, Clone, PartialEq, Eq, Hash)]
386pub struct CsrSeedAvro {
387 pub key_schema: Option<String>,
388 pub value_schema: String,
389 pub key_reference_schemas: Vec<String>,
392 pub value_reference_schemas: Vec<String>,
395}
396
397impl AstDisplay for CsrSeedAvro {
398 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
399 f.write_str("SEED");
400 if let Some(key_schema) = &self.key_schema {
401 f.write_str(" KEY SCHEMA '");
402 f.write_node(&display::escape_single_quote_string(key_schema));
403 f.write_str("'");
404 if !self.key_reference_schemas.is_empty() {
405 f.write_str(" KEY REFERENCES (");
406 for (i, schema) in self.key_reference_schemas.iter().enumerate() {
407 if i > 0 {
408 f.write_str(", ");
409 }
410 f.write_str("'");
411 f.write_node(&display::escape_single_quote_string(schema));
412 f.write_str("'");
413 }
414 f.write_str(")");
415 }
416 }
417 f.write_str(" VALUE SCHEMA '");
418 f.write_node(&display::escape_single_quote_string(&self.value_schema));
419 f.write_str("'");
420 if !self.value_reference_schemas.is_empty() {
421 f.write_str(" VALUE REFERENCES (");
422 for (i, schema) in self.value_reference_schemas.iter().enumerate() {
423 if i > 0 {
424 f.write_str(", ");
425 }
426 f.write_str("'");
427 f.write_node(&display::escape_single_quote_string(schema));
428 f.write_str("'");
429 }
430 f.write_str(")");
431 }
432 }
433}
434impl_display!(CsrSeedAvro);
435
436#[derive(Debug, Clone, PartialEq, Eq, Hash)]
437pub struct CsrSeedProtobuf {
438 pub key: Option<CsrSeedProtobufSchema>,
439 pub value: CsrSeedProtobufSchema,
440}
441
442impl AstDisplay for CsrSeedProtobuf {
443 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
444 f.write_str("SEED");
445 if let Some(key) = &self.key {
446 f.write_str(" KEY ");
447 f.write_node(key);
448 }
449 f.write_str(" VALUE ");
450 f.write_node(&self.value);
451 }
452}
453impl_display!(CsrSeedProtobuf);
454
455#[derive(Debug, Clone, PartialEq, Eq, Hash)]
456pub struct CsrSeedProtobufSchema {
457 pub schema: String,
459 pub message_name: String,
460}
461impl AstDisplay for CsrSeedProtobufSchema {
462 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
463 f.write_str("SCHEMA '");
464 f.write_str(&display::escape_single_quote_string(&self.schema));
465 f.write_str("' MESSAGE '");
466 f.write_str(&self.message_name);
467 f.write_str("'");
468 }
469}
470impl_display!(CsrSeedProtobufSchema);
471
472#[derive(Debug, Clone, PartialEq, Eq, Hash)]
473pub enum FormatSpecifier<T: AstInfo> {
474 Bare(Format<T>),
476 KeyValue { key: Format<T>, value: Format<T> },
478}
479
480impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
481 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
482 match self {
483 FormatSpecifier::Bare(format) => {
484 f.write_str("FORMAT ");
485 f.write_node(format)
486 }
487 FormatSpecifier::KeyValue { key, value } => {
488 f.write_str("KEY FORMAT ");
489 f.write_node(key);
490 f.write_str(" VALUE FORMAT ");
491 f.write_node(value);
492 }
493 }
494 }
495}
496impl_display_t!(FormatSpecifier);
497
498#[derive(Debug, Clone, PartialEq, Eq, Hash)]
499pub enum Format<T: AstInfo> {
500 Bytes,
501 Avro(AvroSchema<T>),
502 Protobuf(ProtobufSchema<T>),
503 Regex(String),
504 Csv {
505 columns: CsvColumns,
506 delimiter: char,
507 },
508 Json {
509 array: bool,
510 },
511 Text,
512}
513
514#[derive(Debug, Clone, PartialEq, Eq, Hash)]
515pub enum CsvColumns {
516 Count(u64),
518 Header { names: Vec<Ident> },
520}
521
522impl AstDisplay for CsvColumns {
523 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
524 match self {
525 CsvColumns::Count(n) => {
526 f.write_str(n);
527 f.write_str(" COLUMNS")
528 }
529 CsvColumns::Header { names } => {
530 f.write_str("HEADER");
531 if !names.is_empty() {
532 f.write_str(" (");
533 f.write_node(&display::comma_separated(names));
534 f.write_str(")");
535 }
536 }
537 }
538 }
539}
540
541#[derive(Debug, Clone, PartialEq, Eq, Hash)]
542pub enum SourceIncludeMetadata {
543 Key {
544 alias: Option<Ident>,
545 },
546 Timestamp {
547 alias: Option<Ident>,
548 },
549 Partition {
550 alias: Option<Ident>,
551 },
552 Offset {
553 alias: Option<Ident>,
554 },
555 Headers {
556 alias: Option<Ident>,
557 },
558 Header {
559 key: String,
560 alias: Ident,
561 use_bytes: bool,
562 },
563}
564
565impl AstDisplay for SourceIncludeMetadata {
566 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
567 let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
568 if let Some(alias) = alias {
569 f.write_str(" AS ");
570 f.write_node(alias);
571 }
572 };
573
574 match self {
575 SourceIncludeMetadata::Key { alias } => {
576 f.write_str("KEY");
577 print_alias(f, alias);
578 }
579 SourceIncludeMetadata::Timestamp { alias } => {
580 f.write_str("TIMESTAMP");
581 print_alias(f, alias);
582 }
583 SourceIncludeMetadata::Partition { alias } => {
584 f.write_str("PARTITION");
585 print_alias(f, alias);
586 }
587 SourceIncludeMetadata::Offset { alias } => {
588 f.write_str("OFFSET");
589 print_alias(f, alias);
590 }
591 SourceIncludeMetadata::Headers { alias } => {
592 f.write_str("HEADERS");
593 print_alias(f, alias);
594 }
595 SourceIncludeMetadata::Header {
596 alias,
597 key,
598 use_bytes,
599 } => {
600 f.write_str("HEADER '");
601 f.write_str(&display::escape_single_quote_string(key));
602 f.write_str("'");
603 print_alias(f, &Some(alias.clone()));
604 if *use_bytes {
605 f.write_str(" BYTES");
606 }
607 }
608 }
609 }
610}
611impl_display!(SourceIncludeMetadata);
612
613#[derive(Debug, Clone, PartialEq, Eq, Hash)]
614pub enum SourceErrorPolicy {
615 Inline {
616 alias: Option<Ident>,
618 },
619}
620
621impl AstDisplay for SourceErrorPolicy {
622 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
623 match self {
624 Self::Inline { alias } => {
625 f.write_str("INLINE");
626 if let Some(alias) = alias {
627 f.write_str(" AS ");
628 f.write_node(alias);
629 }
630 }
631 }
632 }
633}
634impl_display!(SourceErrorPolicy);
635
636#[derive(Debug, Clone, PartialEq, Eq, Hash)]
637pub enum SourceEnvelope {
638 None,
639 Debezium,
640 Upsert {
641 value_decode_err_policy: Vec<SourceErrorPolicy>,
642 },
643 CdcV2,
644}
645
646impl SourceEnvelope {
647 pub fn requires_all_input(&self) -> bool {
650 match self {
651 SourceEnvelope::None => false,
652 SourceEnvelope::Debezium => false,
653 SourceEnvelope::Upsert { .. } => false,
654 SourceEnvelope::CdcV2 => true,
655 }
656 }
657}
658
659impl AstDisplay for SourceEnvelope {
660 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
661 match self {
662 Self::None => {
663 f.write_str("NONE");
665 }
666 Self::Debezium => {
667 f.write_str("DEBEZIUM");
668 }
669 Self::Upsert {
670 value_decode_err_policy,
671 } => {
672 if value_decode_err_policy.is_empty() {
673 f.write_str("UPSERT");
674 } else {
675 f.write_str("UPSERT (VALUE DECODING ERRORS = (");
676 f.write_node(&display::comma_separated(value_decode_err_policy));
677 f.write_str("))")
678 }
679 }
680 Self::CdcV2 => {
681 f.write_str("MATERIALIZE");
682 }
683 }
684 }
685}
686impl_display!(SourceEnvelope);
687
688#[derive(Debug, Clone, PartialEq, Eq, Hash)]
689pub enum SinkEnvelope {
690 Debezium,
691 Upsert,
692}
693
694impl AstDisplay for SinkEnvelope {
695 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
696 match self {
697 Self::Upsert => {
698 f.write_str("UPSERT");
699 }
700 Self::Debezium => {
701 f.write_str("DEBEZIUM");
702 }
703 }
704 }
705}
706impl_display!(SinkEnvelope);
707
708#[derive(Debug, Clone, PartialEq, Eq, Hash)]
709pub enum IcebergSinkMode {
710 Upsert,
711}
712
713impl AstDisplay for IcebergSinkMode {
714 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
715 match self {
716 Self::Upsert => {
717 f.write_str("UPSERT");
718 }
719 }
720 }
721}
722impl_display!(IcebergSinkMode);
723
724#[derive(Debug, Clone, PartialEq, Eq, Hash)]
725pub enum SubscribeOutput<T: AstInfo> {
726 Diffs,
727 WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
728 EnvelopeUpsert { key_columns: Vec<Ident> },
729 EnvelopeDebezium { key_columns: Vec<Ident> },
730}
731
732impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
733 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
734 match self {
735 Self::Diffs => {}
736 Self::WithinTimestampOrderBy { order_by } => {
737 f.write_str(" WITHIN TIMESTAMP ORDER BY ");
738 f.write_node(&display::comma_separated(order_by));
739 }
740 Self::EnvelopeUpsert { key_columns } => {
741 f.write_str(" ENVELOPE UPSERT (KEY (");
742 f.write_node(&display::comma_separated(key_columns));
743 f.write_str("))");
744 }
745 Self::EnvelopeDebezium { key_columns } => {
746 f.write_str(" ENVELOPE DEBEZIUM (KEY (");
747 f.write_node(&display::comma_separated(key_columns));
748 f.write_str("))");
749 }
750 }
751 }
752}
753impl_display_t!(SubscribeOutput);
754
755impl<T: AstInfo> AstDisplay for Format<T> {
756 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
757 match self {
758 Self::Bytes => f.write_str("BYTES"),
759 Self::Avro(inner) => {
760 f.write_str("AVRO ");
761 f.write_node(inner);
762 }
763 Self::Protobuf(inner) => {
764 f.write_str("PROTOBUF ");
765 f.write_node(inner);
766 }
767 Self::Regex(regex) => {
768 f.write_str("REGEX '");
769 f.write_node(&display::escape_single_quote_string(regex));
770 f.write_str("'");
771 }
772 Self::Csv { columns, delimiter } => {
773 f.write_str("CSV WITH ");
774 f.write_node(columns);
775
776 if *delimiter != ',' {
777 f.write_str(" DELIMITED BY '");
778 f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
779 f.write_str("'");
780 }
781 }
782 Self::Json { array } => {
783 f.write_str("JSON");
784 if *array {
785 f.write_str(" ARRAY");
786 }
787 }
788 Self::Text => f.write_str("TEXT"),
789 }
790 }
791}
792impl_display_t!(Format);
793
794#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
798pub enum ConnectionOptionName {
799 AccessKeyId,
800 AssumeRoleArn,
801 AssumeRoleSessionName,
802 AvailabilityZones,
803 AwsConnection,
804 AwsPrivatelink,
805 Broker,
806 Brokers,
807 Credential,
808 Database,
809 Endpoint,
810 Host,
811 Password,
812 Port,
813 ProgressTopic,
814 ProgressTopicReplicationFactor,
815 PublicKey1,
816 PublicKey2,
817 Region,
818 SaslMechanisms,
819 SaslPassword,
820 SaslUsername,
821 Scope,
822 SecretAccessKey,
823 SecurityProtocol,
824 ServiceName,
825 SshTunnel,
826 SslCertificate,
827 SslCertificateAuthority,
828 SslKey,
829 SslMode,
830 SessionToken,
831 CatalogType,
832 Url,
833 User,
834 Warehouse,
835}
836
837impl AstDisplay for ConnectionOptionName {
838 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
839 f.write_str(match self {
840 ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
841 ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
842 ConnectionOptionName::AwsConnection => "AWS CONNECTION",
843 ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
844 ConnectionOptionName::Broker => "BROKER",
845 ConnectionOptionName::Brokers => "BROKERS",
846 ConnectionOptionName::Credential => "CREDENTIAL",
847 ConnectionOptionName::Database => "DATABASE",
848 ConnectionOptionName::Endpoint => "ENDPOINT",
849 ConnectionOptionName::Host => "HOST",
850 ConnectionOptionName::Password => "PASSWORD",
851 ConnectionOptionName::Port => "PORT",
852 ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
853 ConnectionOptionName::ProgressTopicReplicationFactor => {
854 "PROGRESS TOPIC REPLICATION FACTOR"
855 }
856 ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
857 ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
858 ConnectionOptionName::Region => "REGION",
859 ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
860 ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
861 ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
862 ConnectionOptionName::SaslPassword => "SASL PASSWORD",
863 ConnectionOptionName::SaslUsername => "SASL USERNAME",
864 ConnectionOptionName::Scope => "SCOPE",
865 ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
866 ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
867 ConnectionOptionName::ServiceName => "SERVICE NAME",
868 ConnectionOptionName::SshTunnel => "SSH TUNNEL",
869 ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
870 ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
871 ConnectionOptionName::SslKey => "SSL KEY",
872 ConnectionOptionName::SslMode => "SSL MODE",
873 ConnectionOptionName::SessionToken => "SESSION TOKEN",
874 ConnectionOptionName::CatalogType => "CATALOG TYPE",
875 ConnectionOptionName::Url => "URL",
876 ConnectionOptionName::User => "USER",
877 ConnectionOptionName::Warehouse => "WAREHOUSE",
878 })
879 }
880}
881impl_display!(ConnectionOptionName);
882
883impl WithOptionName for ConnectionOptionName {
884 fn redact_value(&self) -> bool {
890 match self {
891 ConnectionOptionName::AccessKeyId
892 | ConnectionOptionName::AvailabilityZones
893 | ConnectionOptionName::AwsConnection
894 | ConnectionOptionName::AwsPrivatelink
895 | ConnectionOptionName::Broker
896 | ConnectionOptionName::Brokers
897 | ConnectionOptionName::Credential
898 | ConnectionOptionName::Database
899 | ConnectionOptionName::Endpoint
900 | ConnectionOptionName::Host
901 | ConnectionOptionName::Password
902 | ConnectionOptionName::Port
903 | ConnectionOptionName::ProgressTopic
904 | ConnectionOptionName::ProgressTopicReplicationFactor
905 | ConnectionOptionName::PublicKey1
906 | ConnectionOptionName::PublicKey2
907 | ConnectionOptionName::Region
908 | ConnectionOptionName::AssumeRoleArn
909 | ConnectionOptionName::AssumeRoleSessionName
910 | ConnectionOptionName::SaslMechanisms
911 | ConnectionOptionName::SaslPassword
912 | ConnectionOptionName::SaslUsername
913 | ConnectionOptionName::Scope
914 | ConnectionOptionName::SecurityProtocol
915 | ConnectionOptionName::SecretAccessKey
916 | ConnectionOptionName::ServiceName
917 | ConnectionOptionName::SshTunnel
918 | ConnectionOptionName::SslCertificate
919 | ConnectionOptionName::SslCertificateAuthority
920 | ConnectionOptionName::SslKey
921 | ConnectionOptionName::SslMode
922 | ConnectionOptionName::SessionToken
923 | ConnectionOptionName::CatalogType
924 | ConnectionOptionName::Url
925 | ConnectionOptionName::User
926 | ConnectionOptionName::Warehouse => false,
927 }
928 }
929}
930
931#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
932pub struct ConnectionOption<T: AstInfo> {
934 pub name: ConnectionOptionName,
935 pub value: Option<WithOptionValue<T>>,
936}
937impl_display_for_with_option!(ConnectionOption);
938impl_display_t!(ConnectionOption);
939
940#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
941pub enum CreateConnectionType {
942 Aws,
943 AwsPrivatelink,
944 Kafka,
945 Csr,
946 Postgres,
947 Ssh,
948 SqlServer,
949 MySql,
950 IcebergCatalog,
951}
952
953impl AstDisplay for CreateConnectionType {
954 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
955 match self {
956 Self::Kafka => {
957 f.write_str("KAFKA");
958 }
959 Self::Csr => {
960 f.write_str("CONFLUENT SCHEMA REGISTRY");
961 }
962 Self::Postgres => {
963 f.write_str("POSTGRES");
964 }
965 Self::Aws => {
966 f.write_str("AWS");
967 }
968 Self::AwsPrivatelink => {
969 f.write_str("AWS PRIVATELINK");
970 }
971 Self::Ssh => {
972 f.write_str("SSH TUNNEL");
973 }
974 Self::SqlServer => {
975 f.write_str("SQL SERVER");
976 }
977 Self::MySql => {
978 f.write_str("MYSQL");
979 }
980 Self::IcebergCatalog => {
981 f.write_str("ICEBERG CATALOG");
982 }
983 }
984 }
985}
986impl_display!(CreateConnectionType);
987
988#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
989pub enum CreateConnectionOptionName {
990 Validate,
991}
992
993impl AstDisplay for CreateConnectionOptionName {
994 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
995 f.write_str(match self {
996 CreateConnectionOptionName::Validate => "VALIDATE",
997 })
998 }
999}
1000impl_display!(CreateConnectionOptionName);
1001
1002impl WithOptionName for CreateConnectionOptionName {
1003 fn redact_value(&self) -> bool {
1009 match self {
1010 CreateConnectionOptionName::Validate => false,
1011 }
1012 }
1013}
1014
1015#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1016pub struct CreateConnectionOption<T: AstInfo> {
1018 pub name: CreateConnectionOptionName,
1019 pub value: Option<WithOptionValue<T>>,
1020}
1021impl_display_for_with_option!(CreateConnectionOption);
1022impl_display_t!(CreateConnectionOption);
1023
1024#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1025pub enum KafkaSourceConfigOptionName {
1026 GroupIdPrefix,
1027 Topic,
1028 TopicMetadataRefreshInterval,
1029 StartTimestamp,
1030 StartOffset,
1031}
1032
1033impl AstDisplay for KafkaSourceConfigOptionName {
1034 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1035 f.write_str(match self {
1036 KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
1037 KafkaSourceConfigOptionName::Topic => "TOPIC",
1038 KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
1039 "TOPIC METADATA REFRESH INTERVAL"
1040 }
1041 KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
1042 KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
1043 })
1044 }
1045}
1046impl_display!(KafkaSourceConfigOptionName);
1047
1048impl WithOptionName for KafkaSourceConfigOptionName {
1049 fn redact_value(&self) -> bool {
1055 match self {
1056 KafkaSourceConfigOptionName::GroupIdPrefix
1057 | KafkaSourceConfigOptionName::Topic
1058 | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1059 | KafkaSourceConfigOptionName::StartOffset
1060 | KafkaSourceConfigOptionName::StartTimestamp => false,
1061 }
1062 }
1063}
1064
1065#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1066pub struct KafkaSourceConfigOption<T: AstInfo> {
1067 pub name: KafkaSourceConfigOptionName,
1068 pub value: Option<WithOptionValue<T>>,
1069}
1070impl_display_for_with_option!(KafkaSourceConfigOption);
1071impl_display_t!(KafkaSourceConfigOption);
1072
1073#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1074pub enum KafkaSinkConfigOptionName {
1075 CompressionType,
1076 PartitionBy,
1077 ProgressGroupIdPrefix,
1078 Topic,
1079 TransactionalIdPrefix,
1080 LegacyIds,
1081 TopicConfig,
1082 TopicMetadataRefreshInterval,
1083 TopicPartitionCount,
1084 TopicReplicationFactor,
1085}
1086
1087impl AstDisplay for KafkaSinkConfigOptionName {
1088 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1089 f.write_str(match self {
1090 KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1091 KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1092 KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1093 KafkaSinkConfigOptionName::Topic => "TOPIC",
1094 KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1095 KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1096 KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1097 KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1098 "TOPIC METADATA REFRESH INTERVAL"
1099 }
1100 KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1101 KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1102 })
1103 }
1104}
1105impl_display!(KafkaSinkConfigOptionName);
1106
1107impl WithOptionName for KafkaSinkConfigOptionName {
1108 fn redact_value(&self) -> bool {
1114 match self {
1115 KafkaSinkConfigOptionName::CompressionType
1116 | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1117 | KafkaSinkConfigOptionName::Topic
1118 | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1119 | KafkaSinkConfigOptionName::TransactionalIdPrefix
1120 | KafkaSinkConfigOptionName::LegacyIds
1121 | KafkaSinkConfigOptionName::TopicConfig
1122 | KafkaSinkConfigOptionName::TopicPartitionCount
1123 | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1124 KafkaSinkConfigOptionName::PartitionBy => true,
1125 }
1126 }
1127}
1128
1129#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1130pub struct KafkaSinkConfigOption<T: AstInfo> {
1131 pub name: KafkaSinkConfigOptionName,
1132 pub value: Option<WithOptionValue<T>>,
1133}
1134impl_display_for_with_option!(KafkaSinkConfigOption);
1135impl_display_t!(KafkaSinkConfigOption);
1136
1137#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1138pub enum IcebergSinkConfigOptionName {
1139 Namespace,
1140 Table,
1141}
1142
1143impl AstDisplay for IcebergSinkConfigOptionName {
1144 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1145 f.write_str(match self {
1146 IcebergSinkConfigOptionName::Namespace => "NAMESPACE",
1147 IcebergSinkConfigOptionName::Table => "TABLE",
1148 })
1149 }
1150}
1151impl_display!(IcebergSinkConfigOptionName);
1152
1153impl WithOptionName for IcebergSinkConfigOptionName {
1154 fn redact_value(&self) -> bool {
1160 match self {
1161 IcebergSinkConfigOptionName::Namespace | IcebergSinkConfigOptionName::Table => false,
1162 }
1163 }
1164}
1165
1166#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1167pub struct IcebergSinkConfigOption<T: AstInfo> {
1168 pub name: IcebergSinkConfigOptionName,
1169 pub value: Option<WithOptionValue<T>>,
1170}
1171impl_display_for_with_option!(IcebergSinkConfigOption);
1172impl_display_t!(IcebergSinkConfigOption);
1173
1174#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1175pub enum PgConfigOptionName {
1176 Details,
1179 Publication,
1181 TextColumns,
1188 ExcludeColumns,
1195}
1196
1197impl AstDisplay for PgConfigOptionName {
1198 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1199 f.write_str(match self {
1200 PgConfigOptionName::Details => "DETAILS",
1201 PgConfigOptionName::Publication => "PUBLICATION",
1202 PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1203 PgConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1204 })
1205 }
1206}
1207impl_display!(PgConfigOptionName);
1208
1209impl WithOptionName for PgConfigOptionName {
1210 fn redact_value(&self) -> bool {
1216 match self {
1217 PgConfigOptionName::Details
1218 | PgConfigOptionName::Publication
1219 | PgConfigOptionName::TextColumns
1220 | PgConfigOptionName::ExcludeColumns => false,
1221 }
1222 }
1223}
1224
1225#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1226pub struct PgConfigOption<T: AstInfo> {
1228 pub name: PgConfigOptionName,
1229 pub value: Option<WithOptionValue<T>>,
1230}
1231impl_display_for_with_option!(PgConfigOption);
1232impl_display_t!(PgConfigOption);
1233
1234#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1235pub enum MySqlConfigOptionName {
1236 Details,
1239 TextColumns,
1246 ExcludeColumns,
1253}
1254
1255impl AstDisplay for MySqlConfigOptionName {
1256 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1257 f.write_str(match self {
1258 MySqlConfigOptionName::Details => "DETAILS",
1259 MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1260 MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1261 })
1262 }
1263}
1264impl_display!(MySqlConfigOptionName);
1265
1266impl WithOptionName for MySqlConfigOptionName {
1267 fn redact_value(&self) -> bool {
1273 match self {
1274 MySqlConfigOptionName::Details
1275 | MySqlConfigOptionName::TextColumns
1276 | MySqlConfigOptionName::ExcludeColumns => false,
1277 }
1278 }
1279}
1280
1281#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1282pub struct MySqlConfigOption<T: AstInfo> {
1284 pub name: MySqlConfigOptionName,
1285 pub value: Option<WithOptionValue<T>>,
1286}
1287impl_display_for_with_option!(MySqlConfigOption);
1288impl_display_t!(MySqlConfigOption);
1289
1290#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1291pub enum SqlServerConfigOptionName {
1292 Details,
1295 TextColumns,
1303 ExcludeColumns,
1311}
1312
1313impl AstDisplay for SqlServerConfigOptionName {
1314 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1315 f.write_str(match self {
1316 SqlServerConfigOptionName::Details => "DETAILS",
1317 SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1318 SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1319 })
1320 }
1321}
1322impl_display!(SqlServerConfigOptionName);
1323
1324impl WithOptionName for SqlServerConfigOptionName {
1325 fn redact_value(&self) -> bool {
1331 match self {
1332 SqlServerConfigOptionName::Details
1333 | SqlServerConfigOptionName::TextColumns
1334 | SqlServerConfigOptionName::ExcludeColumns => false,
1335 }
1336 }
1337}
1338
1339#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1340pub struct SqlServerConfigOption<T: AstInfo> {
1342 pub name: SqlServerConfigOptionName,
1343 pub value: Option<WithOptionValue<T>>,
1344}
1345impl_display_for_with_option!(SqlServerConfigOption);
1346impl_display_t!(SqlServerConfigOption);
1347
1348#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1349pub enum CreateSourceConnection<T: AstInfo> {
1350 Kafka {
1351 connection: T::ItemName,
1352 options: Vec<KafkaSourceConfigOption<T>>,
1353 },
1354 Postgres {
1355 connection: T::ItemName,
1356 options: Vec<PgConfigOption<T>>,
1357 },
1358 SqlServer {
1359 connection: T::ItemName,
1360 options: Vec<SqlServerConfigOption<T>>,
1361 },
1362 MySql {
1363 connection: T::ItemName,
1364 options: Vec<MySqlConfigOption<T>>,
1365 },
1366 LoadGenerator {
1367 generator: LoadGenerator,
1368 options: Vec<LoadGeneratorOption<T>>,
1369 },
1370}
1371
1372impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1373 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1374 match self {
1375 CreateSourceConnection::Kafka {
1376 connection,
1377 options,
1378 } => {
1379 f.write_str("KAFKA CONNECTION ");
1380 f.write_node(connection);
1381 if !options.is_empty() {
1382 f.write_str(" (");
1383 f.write_node(&display::comma_separated(options));
1384 f.write_str(")");
1385 }
1386 }
1387 CreateSourceConnection::Postgres {
1388 connection,
1389 options,
1390 } => {
1391 f.write_str("POSTGRES CONNECTION ");
1392 f.write_node(connection);
1393 if !options.is_empty() {
1394 f.write_str(" (");
1395 f.write_node(&display::comma_separated(options));
1396 f.write_str(")");
1397 }
1398 }
1399 CreateSourceConnection::SqlServer {
1400 connection,
1401 options,
1402 } => {
1403 f.write_str("SQL SERVER CONNECTION ");
1404 f.write_node(connection);
1405 if !options.is_empty() {
1406 f.write_str(" (");
1407 f.write_node(&display::comma_separated(options));
1408 f.write_str(")");
1409 }
1410 }
1411 CreateSourceConnection::MySql {
1412 connection,
1413 options,
1414 } => {
1415 f.write_str("MYSQL CONNECTION ");
1416 f.write_node(connection);
1417 if !options.is_empty() {
1418 f.write_str(" (");
1419 f.write_node(&display::comma_separated(options));
1420 f.write_str(")");
1421 }
1422 }
1423 CreateSourceConnection::LoadGenerator { generator, options } => {
1424 f.write_str("LOAD GENERATOR ");
1425 f.write_node(generator);
1426 if !options.is_empty() {
1427 f.write_str(" (");
1428 f.write_node(&display::comma_separated(options));
1429 f.write_str(")");
1430 }
1431 }
1432 }
1433 }
1434}
1435impl_display_t!(CreateSourceConnection);
1436
1437#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1438pub enum LoadGenerator {
1439 Clock,
1440 Counter,
1441 Marketing,
1442 Auction,
1443 Datums,
1444 Tpch,
1445 KeyValue,
1446}
1447
1448impl AstDisplay for LoadGenerator {
1449 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1450 match self {
1451 Self::Counter => f.write_str("COUNTER"),
1452 Self::Clock => f.write_str("CLOCK"),
1453 Self::Marketing => f.write_str("MARKETING"),
1454 Self::Auction => f.write_str("AUCTION"),
1455 Self::Datums => f.write_str("DATUMS"),
1456 Self::Tpch => f.write_str("TPCH"),
1457 Self::KeyValue => f.write_str("KEY VALUE"),
1458 }
1459 }
1460}
1461impl_display!(LoadGenerator);
1462
1463impl LoadGenerator {
1464 pub fn schema_name(&self) -> &'static str {
1469 match self {
1470 LoadGenerator::Counter => "counter",
1471 LoadGenerator::Clock => "clock",
1472 LoadGenerator::Marketing => "marketing",
1473 LoadGenerator::Auction => "auction",
1474 LoadGenerator::Datums => "datums",
1475 LoadGenerator::Tpch => "tpch",
1476 LoadGenerator::KeyValue => "key_value",
1477 }
1478 }
1479}
1480
1481#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1482pub enum LoadGeneratorOptionName {
1483 ScaleFactor,
1484 TickInterval,
1485 AsOf,
1486 UpTo,
1487 MaxCardinality,
1488 Keys,
1489 SnapshotRounds,
1490 TransactionalSnapshot,
1491 ValueSize,
1492 Seed,
1493 Partitions,
1494 BatchSize,
1495}
1496
1497impl AstDisplay for LoadGeneratorOptionName {
1498 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1499 f.write_str(match self {
1500 LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1501 LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1502 LoadGeneratorOptionName::AsOf => "AS OF",
1503 LoadGeneratorOptionName::UpTo => "UP TO",
1504 LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1505 LoadGeneratorOptionName::Keys => "KEYS",
1506 LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1507 LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1508 LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1509 LoadGeneratorOptionName::Seed => "SEED",
1510 LoadGeneratorOptionName::Partitions => "PARTITIONS",
1511 LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1512 })
1513 }
1514}
1515impl_display!(LoadGeneratorOptionName);
1516
1517impl WithOptionName for LoadGeneratorOptionName {
1518 fn redact_value(&self) -> bool {
1524 match self {
1525 LoadGeneratorOptionName::ScaleFactor
1526 | LoadGeneratorOptionName::TickInterval
1527 | LoadGeneratorOptionName::AsOf
1528 | LoadGeneratorOptionName::UpTo
1529 | LoadGeneratorOptionName::MaxCardinality
1530 | LoadGeneratorOptionName::Keys
1531 | LoadGeneratorOptionName::SnapshotRounds
1532 | LoadGeneratorOptionName::TransactionalSnapshot
1533 | LoadGeneratorOptionName::ValueSize
1534 | LoadGeneratorOptionName::Partitions
1535 | LoadGeneratorOptionName::BatchSize
1536 | LoadGeneratorOptionName::Seed => false,
1537 }
1538 }
1539}
1540
1541#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1542pub struct LoadGeneratorOption<T: AstInfo> {
1544 pub name: LoadGeneratorOptionName,
1545 pub value: Option<WithOptionValue<T>>,
1546}
1547impl_display_for_with_option!(LoadGeneratorOption);
1548impl_display_t!(LoadGeneratorOption);
1549
1550#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1551pub enum CreateSinkConnection<T: AstInfo> {
1552 Kafka {
1553 connection: T::ItemName,
1554 options: Vec<KafkaSinkConfigOption<T>>,
1555 key: Option<SinkKey>,
1556 headers: Option<Ident>,
1557 },
1558 Iceberg {
1559 connection: T::ItemName,
1560 aws_connection: T::ItemName,
1561 key: Option<SinkKey>,
1562 options: Vec<IcebergSinkConfigOption<T>>,
1563 },
1564}
1565
1566impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1567 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1568 match self {
1569 CreateSinkConnection::Kafka {
1570 connection,
1571 options,
1572 key,
1573 headers,
1574 } => {
1575 f.write_str("KAFKA CONNECTION ");
1576 f.write_node(connection);
1577 if !options.is_empty() {
1578 f.write_str(" (");
1579 f.write_node(&display::comma_separated(options));
1580 f.write_str(")");
1581 }
1582 if let Some(key) = key.as_ref() {
1583 f.write_str(" ");
1584 f.write_node(key);
1585 }
1586 if let Some(headers) = headers {
1587 f.write_str(" HEADERS ");
1588 f.write_node(headers);
1589 }
1590 }
1591 CreateSinkConnection::Iceberg {
1592 connection,
1593 aws_connection,
1594 key,
1595 options,
1596 } => {
1597 f.write_str("ICEBERG CATALOG CONNECTION ");
1598 f.write_node(connection);
1599 if !options.is_empty() {
1600 f.write_str(" (");
1601 f.write_node(&display::comma_separated(options));
1602 f.write_str(")");
1603 }
1604 f.write_str(" USING AWS CONNECTION ");
1605 f.write_node(aws_connection);
1606 if let Some(key) = key.as_ref() {
1607 f.write_str(" ");
1608 f.write_node(key);
1609 }
1610 }
1611 }
1612 }
1613}
1614impl_display_t!(CreateSinkConnection);
1615
1616#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1617pub struct SinkKey {
1618 pub key_columns: Vec<Ident>,
1619 pub not_enforced: bool,
1620}
1621
1622impl AstDisplay for SinkKey {
1623 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1624 f.write_str("KEY (");
1625 f.write_node(&display::comma_separated(&self.key_columns));
1626 f.write_str(")");
1627 if self.not_enforced {
1628 f.write_str(" NOT ENFORCED");
1629 }
1630 }
1631}
1632
1633#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1636pub enum TableConstraint<T: AstInfo> {
1637 Unique {
1639 name: Option<Ident>,
1640 columns: Vec<Ident>,
1641 is_primary: bool,
1643 nulls_not_distinct: bool,
1646 },
1647 ForeignKey {
1650 name: Option<Ident>,
1651 columns: Vec<Ident>,
1652 foreign_table: T::ItemName,
1653 referred_columns: Vec<Ident>,
1654 },
1655 Check {
1657 name: Option<Ident>,
1658 expr: Box<Expr<T>>,
1659 },
1660}
1661
1662impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1663 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1664 match self {
1665 TableConstraint::Unique {
1666 name,
1667 columns,
1668 is_primary,
1669 nulls_not_distinct,
1670 } => {
1671 f.write_node(&display_constraint_name(name));
1672 if *is_primary {
1673 f.write_str("PRIMARY KEY ");
1674 } else {
1675 f.write_str("UNIQUE ");
1676 if *nulls_not_distinct {
1677 f.write_str("NULLS NOT DISTINCT ");
1678 }
1679 }
1680 f.write_str("(");
1681 f.write_node(&display::comma_separated(columns));
1682 f.write_str(")");
1683 }
1684 TableConstraint::ForeignKey {
1685 name,
1686 columns,
1687 foreign_table,
1688 referred_columns,
1689 } => {
1690 f.write_node(&display_constraint_name(name));
1691 f.write_str("FOREIGN KEY (");
1692 f.write_node(&display::comma_separated(columns));
1693 f.write_str(") REFERENCES ");
1694 f.write_node(foreign_table);
1695 f.write_str("(");
1696 f.write_node(&display::comma_separated(referred_columns));
1697 f.write_str(")");
1698 }
1699 TableConstraint::Check { name, expr } => {
1700 f.write_node(&display_constraint_name(name));
1701 f.write_str("CHECK (");
1702 f.write_node(&expr);
1703 f.write_str(")");
1704 }
1705 }
1706 }
1707}
1708impl_display_t!(TableConstraint);
1709
1710#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1712pub enum KeyConstraint {
1713 PrimaryKeyNotEnforced { columns: Vec<Ident> },
1715}
1716
1717impl AstDisplay for KeyConstraint {
1718 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1719 match self {
1720 KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1721 f.write_str("PRIMARY KEY ");
1722 f.write_str("(");
1723 f.write_node(&display::comma_separated(columns));
1724 f.write_str(") ");
1725 f.write_str("NOT ENFORCED");
1726 }
1727 }
1728 }
1729}
1730impl_display!(KeyConstraint);
1731
1732#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1733pub enum CreateSourceOptionName {
1734 TimestampInterval,
1735 RetainHistory,
1736}
1737
1738impl AstDisplay for CreateSourceOptionName {
1739 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1740 f.write_str(match self {
1741 CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1742 CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1743 })
1744 }
1745}
1746impl_display!(CreateSourceOptionName);
1747
1748impl WithOptionName for CreateSourceOptionName {
1749 fn redact_value(&self) -> bool {
1755 match self {
1756 CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1757 false
1758 }
1759 }
1760 }
1761}
1762
1763#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1764pub struct CreateSourceOption<T: AstInfo> {
1766 pub name: CreateSourceOptionName,
1767 pub value: Option<WithOptionValue<T>>,
1768}
1769impl_display_for_with_option!(CreateSourceOption);
1770impl_display_t!(CreateSourceOption);
1771
1772#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1774pub struct ColumnDef<T: AstInfo> {
1775 pub name: Ident,
1776 pub data_type: T::DataType,
1777 pub collation: Option<UnresolvedItemName>,
1778 pub options: Vec<ColumnOptionDef<T>>,
1779}
1780
1781impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1782 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1783 f.write_node(&self.name);
1784 f.write_str(" ");
1785 f.write_node(&self.data_type);
1786 if let Some(collation) = &self.collation {
1787 f.write_str(" COLLATE ");
1788 f.write_node(collation);
1789 }
1790 for option in &self.options {
1791 f.write_str(" ");
1792 f.write_node(option);
1793 }
1794 }
1795}
1796impl_display_t!(ColumnDef);
1797
1798#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1815pub struct ColumnOptionDef<T: AstInfo> {
1816 pub name: Option<Ident>,
1817 pub option: ColumnOption<T>,
1818}
1819
1820impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1821 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1822 f.write_node(&display_constraint_name(&self.name));
1823 f.write_node(&self.option);
1824 }
1825}
1826impl_display_t!(ColumnOptionDef);
1827
1828#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1831pub enum ColumnOption<T: AstInfo> {
1832 Null,
1834 NotNull,
1836 Default(Expr<T>),
1838 Unique { is_primary: bool },
1840 ForeignKey {
1843 foreign_table: UnresolvedItemName,
1844 referred_columns: Vec<Ident>,
1845 },
1846 Check(Expr<T>),
1848 Versioned {
1850 action: ColumnVersioned,
1851 version: Version,
1852 },
1853}
1854
1855impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1856 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1857 use ColumnOption::*;
1858 match self {
1859 Null => f.write_str("NULL"),
1860 NotNull => f.write_str("NOT NULL"),
1861 Default(expr) => {
1862 f.write_str("DEFAULT ");
1863 f.write_node(expr);
1864 }
1865 Unique { is_primary } => {
1866 if *is_primary {
1867 f.write_str("PRIMARY KEY");
1868 } else {
1869 f.write_str("UNIQUE");
1870 }
1871 }
1872 ForeignKey {
1873 foreign_table,
1874 referred_columns,
1875 } => {
1876 f.write_str("REFERENCES ");
1877 f.write_node(foreign_table);
1878 f.write_str(" (");
1879 f.write_node(&display::comma_separated(referred_columns));
1880 f.write_str(")");
1881 }
1882 Check(expr) => {
1883 f.write_str("CHECK (");
1884 f.write_node(expr);
1885 f.write_str(")");
1886 }
1887 Versioned { action, version } => {
1888 f.write_str("VERSION ");
1889 f.write_node(action);
1890 f.write_str(" ");
1891 f.write_node(version);
1892 }
1893 }
1894 }
1895}
1896impl_display_t!(ColumnOption);
1897
1898#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1899pub enum ColumnVersioned {
1900 Added,
1901}
1902
1903impl AstDisplay for ColumnVersioned {
1904 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1905 where
1906 W: fmt::Write,
1907 {
1908 match self {
1909 ColumnVersioned::Added => f.write_str("ADDED"),
1911 }
1912 }
1913}
1914impl_display!(ColumnVersioned);
1915
1916fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
1917 struct ConstraintName<'a>(&'a Option<Ident>);
1918 impl<'a> AstDisplay for ConstraintName<'a> {
1919 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1920 where
1921 W: fmt::Write,
1922 {
1923 if let Some(name) = self.0 {
1924 f.write_str("CONSTRAINT ");
1925 f.write_node(name);
1926 f.write_str(" ");
1927 }
1928 }
1929 }
1930 ConstraintName(name)
1931}