1use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28 AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33 AssertNotNull,
35 PartitionBy,
36 RetainHistory,
37 Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43 match self {
44 MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45 MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46 MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47 MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48 }
49 }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53 fn redact_value(&self) -> bool {
59 match self {
60 MaterializedViewOptionName::AssertNotNull
61 | MaterializedViewOptionName::PartitionBy
62 | MaterializedViewOptionName::RetainHistory
63 | MaterializedViewOptionName::Refresh => false,
64 }
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70 pub name: MaterializedViewOptionName,
71 pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
76pub enum ContinualTaskOptionName {
77 Snapshot,
79}
80
81impl AstDisplay for ContinualTaskOptionName {
82 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
83 match self {
84 ContinualTaskOptionName::Snapshot => f.write_str("SNAPSHOT"),
85 }
86 }
87}
88
89impl WithOptionName for ContinualTaskOptionName {
90 fn redact_value(&self) -> bool {
96 match self {
97 ContinualTaskOptionName::Snapshot => false,
98 }
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
103pub struct ContinualTaskOption<T: AstInfo> {
104 pub name: ContinualTaskOptionName,
105 pub value: Option<WithOptionValue<T>>,
106}
107impl_display_for_with_option!(ContinualTaskOption);
108
109#[derive(Debug, Clone, PartialEq, Eq, Hash)]
110pub struct Schema {
111 pub schema: String,
112}
113
114impl AstDisplay for Schema {
115 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
116 f.write_str("SCHEMA '");
117 f.write_node(&display::escape_single_quote_string(&self.schema));
118 f.write_str("'");
119 }
120}
121impl_display!(Schema);
122
123#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
124pub enum AvroSchemaOptionName {
125 ConfluentWireFormat,
127}
128
129impl AstDisplay for AvroSchemaOptionName {
130 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
131 match self {
132 AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
133 }
134 }
135}
136
137impl WithOptionName for AvroSchemaOptionName {
138 fn redact_value(&self) -> bool {
144 match self {
145 Self::ConfluentWireFormat => false,
146 }
147 }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
151pub struct AvroSchemaOption<T: AstInfo> {
152 pub name: AvroSchemaOptionName,
153 pub value: Option<WithOptionValue<T>>,
154}
155impl_display_for_with_option!(AvroSchemaOption);
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum AvroSchema<T: AstInfo> {
159 Csr {
160 csr_connection: CsrConnectionAvro<T>,
161 },
162 InlineSchema {
163 schema: Schema,
164 with_options: Vec<AvroSchemaOption<T>>,
165 },
166}
167
168impl<T: AstInfo> AstDisplay for AvroSchema<T> {
169 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
170 match self {
171 Self::Csr { csr_connection } => {
172 f.write_node(csr_connection);
173 }
174 Self::InlineSchema {
175 schema,
176 with_options,
177 } => {
178 f.write_str("USING ");
179 schema.fmt(f);
180 if !with_options.is_empty() {
181 f.write_str(" (");
182 f.write_node(&display::comma_separated(with_options));
183 f.write_str(")");
184 }
185 }
186 }
187 }
188}
189impl_display_t!(AvroSchema);
190
191#[derive(Debug, Clone, PartialEq, Eq, Hash)]
192pub enum ProtobufSchema<T: AstInfo> {
193 Csr {
194 csr_connection: CsrConnectionProtobuf<T>,
195 },
196 InlineSchema {
197 message_name: String,
198 schema: Schema,
199 },
200}
201
202impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
203 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
204 match self {
205 Self::Csr { csr_connection } => {
206 f.write_node(csr_connection);
207 }
208 Self::InlineSchema {
209 message_name,
210 schema,
211 } => {
212 f.write_str("MESSAGE '");
213 f.write_node(&display::escape_single_quote_string(message_name));
214 f.write_str("' USING ");
215 f.write_str(schema);
216 }
217 }
218 }
219}
220impl_display_t!(ProtobufSchema);
221
222#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
223pub enum CsrConfigOptionName<T: AstInfo> {
224 AvroKeyFullname,
225 AvroValueFullname,
226 NullDefaults,
227 AvroDocOn(AvroDocOn<T>),
228 KeyCompatibilityLevel,
229 ValueCompatibilityLevel,
230}
231
232impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
233 fn redact_value(&self) -> bool {
239 match self {
240 Self::AvroKeyFullname
241 | Self::AvroValueFullname
242 | Self::NullDefaults
243 | Self::AvroDocOn(_)
244 | Self::KeyCompatibilityLevel
245 | Self::ValueCompatibilityLevel => false,
246 }
247 }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
251pub struct AvroDocOn<T: AstInfo> {
252 pub identifier: DocOnIdentifier<T>,
253 pub for_schema: DocOnSchema,
254}
255#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
256pub enum DocOnSchema {
257 KeyOnly,
258 ValueOnly,
259 All,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
263pub enum DocOnIdentifier<T: AstInfo> {
264 Column(ColumnName<T>),
265 Type(T::ItemName),
266}
267
268impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
269 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
270 match &self.for_schema {
271 DocOnSchema::KeyOnly => f.write_str("KEY "),
272 DocOnSchema::ValueOnly => f.write_str("VALUE "),
273 DocOnSchema::All => {}
274 }
275 match &self.identifier {
276 DocOnIdentifier::Column(name) => {
277 f.write_str("DOC ON COLUMN ");
278 f.write_node(name);
279 }
280 DocOnIdentifier::Type(name) => {
281 f.write_str("DOC ON TYPE ");
282 f.write_node(name);
283 }
284 }
285 }
286}
287impl_display_t!(AvroDocOn);
288
289impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
290 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
291 match self {
292 CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
293 CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
294 CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
295 CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
296 CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
297 CsrConfigOptionName::ValueCompatibilityLevel => {
298 f.write_str("VALUE COMPATIBILITY LEVEL")
299 }
300 }
301 }
302}
303impl_display_t!(CsrConfigOptionName);
304
305#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
306pub struct CsrConfigOption<T: AstInfo> {
308 pub name: CsrConfigOptionName<T>,
309 pub value: Option<WithOptionValue<T>>,
310}
311impl_display_for_with_option!(CsrConfigOption);
312impl_display_t!(CsrConfigOption);
313
314#[derive(Debug, Clone, PartialEq, Eq, Hash)]
315pub struct CsrConnection<T: AstInfo> {
316 pub connection: T::ItemName,
317 pub options: Vec<CsrConfigOption<T>>,
318}
319
320impl<T: AstInfo> AstDisplay for CsrConnection<T> {
321 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
322 f.write_str("CONNECTION ");
323 f.write_node(&self.connection);
324 if !self.options.is_empty() {
325 f.write_str(" (");
326 f.write_node(&display::comma_separated(&self.options));
327 f.write_str(")");
328 }
329 }
330}
331impl_display_t!(CsrConnection);
332
333#[derive(Debug, Clone, PartialEq, Eq, Hash)]
334pub enum ReaderSchemaSelectionStrategy {
335 Latest,
336 Inline(String),
337 ById(i32),
338}
339
340impl Default for ReaderSchemaSelectionStrategy {
341 fn default() -> Self {
342 Self::Latest
343 }
344}
345
346#[derive(Debug, Clone, PartialEq, Eq, Hash)]
347pub struct CsrConnectionAvro<T: AstInfo> {
348 pub connection: CsrConnection<T>,
349 pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
350 pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
351 pub seed: Option<CsrSeedAvro>,
352}
353
354impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
355 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
356 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
357 f.write_node(&self.connection);
358 if let Some(seed) = &self.seed {
359 f.write_str(" ");
360 f.write_node(seed);
361 }
362 }
363}
364impl_display_t!(CsrConnectionAvro);
365
366#[derive(Debug, Clone, PartialEq, Eq, Hash)]
367pub struct CsrConnectionProtobuf<T: AstInfo> {
368 pub connection: CsrConnection<T>,
369 pub seed: Option<CsrSeedProtobuf>,
370}
371
372impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
373 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
374 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
375 f.write_node(&self.connection);
376
377 if let Some(seed) = &self.seed {
378 f.write_str(" ");
379 f.write_node(seed);
380 }
381 }
382}
383impl_display_t!(CsrConnectionProtobuf);
384
385#[derive(Debug, Clone, PartialEq, Eq, Hash)]
386pub struct CsrSeedAvro {
387 pub key_schema: Option<String>,
388 pub value_schema: String,
389 pub key_reference_schemas: Vec<String>,
392 pub value_reference_schemas: Vec<String>,
395}
396
397impl AstDisplay for CsrSeedAvro {
398 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
399 f.write_str("SEED");
400 if let Some(key_schema) = &self.key_schema {
401 f.write_str(" KEY SCHEMA '");
402 f.write_node(&display::escape_single_quote_string(key_schema));
403 f.write_str("'");
404 if !self.key_reference_schemas.is_empty() {
405 f.write_str(" KEY REFERENCES (");
406 for (i, schema) in self.key_reference_schemas.iter().enumerate() {
407 if i > 0 {
408 f.write_str(", ");
409 }
410 f.write_str("'");
411 f.write_node(&display::escape_single_quote_string(schema));
412 f.write_str("'");
413 }
414 f.write_str(")");
415 }
416 }
417 f.write_str(" VALUE SCHEMA '");
418 f.write_node(&display::escape_single_quote_string(&self.value_schema));
419 f.write_str("'");
420 if !self.value_reference_schemas.is_empty() {
421 f.write_str(" VALUE REFERENCES (");
422 for (i, schema) in self.value_reference_schemas.iter().enumerate() {
423 if i > 0 {
424 f.write_str(", ");
425 }
426 f.write_str("'");
427 f.write_node(&display::escape_single_quote_string(schema));
428 f.write_str("'");
429 }
430 f.write_str(")");
431 }
432 }
433}
434impl_display!(CsrSeedAvro);
435
436#[derive(Debug, Clone, PartialEq, Eq, Hash)]
437pub struct CsrSeedProtobuf {
438 pub key: Option<CsrSeedProtobufSchema>,
439 pub value: CsrSeedProtobufSchema,
440}
441
442impl AstDisplay for CsrSeedProtobuf {
443 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
444 f.write_str("SEED");
445 if let Some(key) = &self.key {
446 f.write_str(" KEY ");
447 f.write_node(key);
448 }
449 f.write_str(" VALUE ");
450 f.write_node(&self.value);
451 }
452}
453impl_display!(CsrSeedProtobuf);
454
455#[derive(Debug, Clone, PartialEq, Eq, Hash)]
456pub struct CsrSeedProtobufSchema {
457 pub schema: String,
459 pub message_name: String,
460}
461impl AstDisplay for CsrSeedProtobufSchema {
462 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
463 f.write_str("SCHEMA '");
464 f.write_str(&display::escape_single_quote_string(&self.schema));
465 f.write_str("' MESSAGE '");
466 f.write_str(&self.message_name);
467 f.write_str("'");
468 }
469}
470impl_display!(CsrSeedProtobufSchema);
471
472#[derive(Debug, Clone, PartialEq, Eq, Hash)]
473pub enum FormatSpecifier<T: AstInfo> {
474 Bare(Format<T>),
476 KeyValue { key: Format<T>, value: Format<T> },
478}
479
480impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
481 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
482 match self {
483 FormatSpecifier::Bare(format) => {
484 f.write_str("FORMAT ");
485 f.write_node(format)
486 }
487 FormatSpecifier::KeyValue { key, value } => {
488 f.write_str("KEY FORMAT ");
489 f.write_node(key);
490 f.write_str(" VALUE FORMAT ");
491 f.write_node(value);
492 }
493 }
494 }
495}
496impl_display_t!(FormatSpecifier);
497
498#[derive(Debug, Clone, PartialEq, Eq, Hash)]
499pub enum Format<T: AstInfo> {
500 Bytes,
501 Avro(AvroSchema<T>),
502 Protobuf(ProtobufSchema<T>),
503 Regex(String),
504 Csv {
505 columns: CsvColumns,
506 delimiter: char,
507 },
508 Json {
509 array: bool,
510 },
511 Text,
512}
513
514#[derive(Debug, Clone, PartialEq, Eq, Hash)]
515pub enum CsvColumns {
516 Count(u64),
518 Header { names: Vec<Ident> },
520}
521
522impl AstDisplay for CsvColumns {
523 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
524 match self {
525 CsvColumns::Count(n) => {
526 f.write_str(n);
527 f.write_str(" COLUMNS")
528 }
529 CsvColumns::Header { names } => {
530 f.write_str("HEADER");
531 if !names.is_empty() {
532 f.write_str(" (");
533 f.write_node(&display::comma_separated(names));
534 f.write_str(")");
535 }
536 }
537 }
538 }
539}
540
541#[derive(Debug, Clone, PartialEq, Eq, Hash)]
542pub enum SourceIncludeMetadata {
543 Key {
544 alias: Option<Ident>,
545 },
546 Timestamp {
547 alias: Option<Ident>,
548 },
549 Partition {
550 alias: Option<Ident>,
551 },
552 Offset {
553 alias: Option<Ident>,
554 },
555 Headers {
556 alias: Option<Ident>,
557 },
558 Header {
559 key: String,
560 alias: Ident,
561 use_bytes: bool,
562 },
563}
564
565impl AstDisplay for SourceIncludeMetadata {
566 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
567 let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
568 if let Some(alias) = alias {
569 f.write_str(" AS ");
570 f.write_node(alias);
571 }
572 };
573
574 match self {
575 SourceIncludeMetadata::Key { alias } => {
576 f.write_str("KEY");
577 print_alias(f, alias);
578 }
579 SourceIncludeMetadata::Timestamp { alias } => {
580 f.write_str("TIMESTAMP");
581 print_alias(f, alias);
582 }
583 SourceIncludeMetadata::Partition { alias } => {
584 f.write_str("PARTITION");
585 print_alias(f, alias);
586 }
587 SourceIncludeMetadata::Offset { alias } => {
588 f.write_str("OFFSET");
589 print_alias(f, alias);
590 }
591 SourceIncludeMetadata::Headers { alias } => {
592 f.write_str("HEADERS");
593 print_alias(f, alias);
594 }
595 SourceIncludeMetadata::Header {
596 alias,
597 key,
598 use_bytes,
599 } => {
600 f.write_str("HEADER '");
601 f.write_str(&display::escape_single_quote_string(key));
602 f.write_str("'");
603 print_alias(f, &Some(alias.clone()));
604 if *use_bytes {
605 f.write_str(" BYTES");
606 }
607 }
608 }
609 }
610}
611impl_display!(SourceIncludeMetadata);
612
613#[derive(Debug, Clone, PartialEq, Eq, Hash)]
614pub enum SourceErrorPolicy {
615 Inline {
616 alias: Option<Ident>,
618 },
619}
620
621impl AstDisplay for SourceErrorPolicy {
622 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
623 match self {
624 Self::Inline { alias } => {
625 f.write_str("INLINE");
626 if let Some(alias) = alias {
627 f.write_str(" AS ");
628 f.write_node(alias);
629 }
630 }
631 }
632 }
633}
634impl_display!(SourceErrorPolicy);
635
636#[derive(Debug, Clone, PartialEq, Eq, Hash)]
637pub enum SourceEnvelope {
638 None,
639 Debezium,
640 Upsert {
641 value_decode_err_policy: Vec<SourceErrorPolicy>,
642 },
643 CdcV2,
644}
645
646impl SourceEnvelope {
647 pub fn requires_all_input(&self) -> bool {
650 match self {
651 SourceEnvelope::None => false,
652 SourceEnvelope::Debezium => false,
653 SourceEnvelope::Upsert { .. } => false,
654 SourceEnvelope::CdcV2 => true,
655 }
656 }
657}
658
659impl AstDisplay for SourceEnvelope {
660 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
661 match self {
662 Self::None => {
663 f.write_str("NONE");
665 }
666 Self::Debezium => {
667 f.write_str("DEBEZIUM");
668 }
669 Self::Upsert {
670 value_decode_err_policy,
671 } => {
672 if value_decode_err_policy.is_empty() {
673 f.write_str("UPSERT");
674 } else {
675 f.write_str("UPSERT (VALUE DECODING ERRORS = (");
676 f.write_node(&display::comma_separated(value_decode_err_policy));
677 f.write_str("))")
678 }
679 }
680 Self::CdcV2 => {
681 f.write_str("MATERIALIZE");
682 }
683 }
684 }
685}
686impl_display!(SourceEnvelope);
687
688#[derive(Debug, Clone, PartialEq, Eq, Hash)]
689pub enum SinkEnvelope {
690 Debezium,
691 Upsert,
692}
693
694impl AstDisplay for SinkEnvelope {
695 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
696 match self {
697 Self::Upsert => {
698 f.write_str("UPSERT");
699 }
700 Self::Debezium => {
701 f.write_str("DEBEZIUM");
702 }
703 }
704 }
705}
706impl_display!(SinkEnvelope);
707
708#[derive(Debug, Clone, PartialEq, Eq, Hash)]
709pub enum IcebergSinkMode {
710 Upsert,
711 Append,
712}
713
714impl AstDisplay for IcebergSinkMode {
715 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
716 match self {
717 Self::Upsert => {
718 f.write_str("UPSERT");
719 }
720 Self::Append => {
721 f.write_str("APPEND");
722 }
723 }
724 }
725}
726impl_display!(IcebergSinkMode);
727
728#[derive(Debug, Clone, PartialEq, Eq, Hash)]
729pub enum SubscribeOutput<T: AstInfo> {
730 Diffs,
731 WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
732 EnvelopeUpsert { key_columns: Vec<Ident> },
733 EnvelopeDebezium { key_columns: Vec<Ident> },
734}
735
736impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
737 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
738 match self {
739 Self::Diffs => {}
740 Self::WithinTimestampOrderBy { order_by } => {
741 f.write_str(" WITHIN TIMESTAMP ORDER BY ");
742 f.write_node(&display::comma_separated(order_by));
743 }
744 Self::EnvelopeUpsert { key_columns } => {
745 f.write_str(" ENVELOPE UPSERT (KEY (");
746 f.write_node(&display::comma_separated(key_columns));
747 f.write_str("))");
748 }
749 Self::EnvelopeDebezium { key_columns } => {
750 f.write_str(" ENVELOPE DEBEZIUM (KEY (");
751 f.write_node(&display::comma_separated(key_columns));
752 f.write_str("))");
753 }
754 }
755 }
756}
757impl_display_t!(SubscribeOutput);
758
759impl<T: AstInfo> AstDisplay for Format<T> {
760 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
761 match self {
762 Self::Bytes => f.write_str("BYTES"),
763 Self::Avro(inner) => {
764 f.write_str("AVRO ");
765 f.write_node(inner);
766 }
767 Self::Protobuf(inner) => {
768 f.write_str("PROTOBUF ");
769 f.write_node(inner);
770 }
771 Self::Regex(regex) => {
772 f.write_str("REGEX '");
773 f.write_node(&display::escape_single_quote_string(regex));
774 f.write_str("'");
775 }
776 Self::Csv { columns, delimiter } => {
777 f.write_str("CSV WITH ");
778 f.write_node(columns);
779
780 if *delimiter != ',' {
781 f.write_str(" DELIMITED BY '");
782 f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
783 f.write_str("'");
784 }
785 }
786 Self::Json { array } => {
787 f.write_str("JSON");
788 if *array {
789 f.write_str(" ARRAY");
790 }
791 }
792 Self::Text => f.write_str("TEXT"),
793 }
794 }
795}
796impl_display_t!(Format);
797
798#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
802pub enum ConnectionOptionName {
803 AccessKeyId,
804 AssumeRoleArn,
805 AssumeRoleSessionName,
806 AvailabilityZones,
807 AwsConnection,
808 AwsPrivatelink,
809 Broker,
810 Brokers,
811 Credential,
812 Database,
813 Endpoint,
814 Host,
815 Password,
816 Port,
817 ProgressTopic,
818 ProgressTopicReplicationFactor,
819 PublicKey1,
820 PublicKey2,
821 Region,
822 SaslMechanisms,
823 SaslPassword,
824 SaslUsername,
825 Scope,
826 SecretAccessKey,
827 SecurityProtocol,
828 ServiceName,
829 SshTunnel,
830 SslCertificate,
831 SslCertificateAuthority,
832 SslKey,
833 SslMode,
834 SessionToken,
835 CatalogType,
836 Url,
837 User,
838 Warehouse,
839}
840
841impl AstDisplay for ConnectionOptionName {
842 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
843 f.write_str(match self {
844 ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
845 ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
846 ConnectionOptionName::AwsConnection => "AWS CONNECTION",
847 ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
848 ConnectionOptionName::Broker => "BROKER",
849 ConnectionOptionName::Brokers => "BROKERS",
850 ConnectionOptionName::Credential => "CREDENTIAL",
851 ConnectionOptionName::Database => "DATABASE",
852 ConnectionOptionName::Endpoint => "ENDPOINT",
853 ConnectionOptionName::Host => "HOST",
854 ConnectionOptionName::Password => "PASSWORD",
855 ConnectionOptionName::Port => "PORT",
856 ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
857 ConnectionOptionName::ProgressTopicReplicationFactor => {
858 "PROGRESS TOPIC REPLICATION FACTOR"
859 }
860 ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
861 ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
862 ConnectionOptionName::Region => "REGION",
863 ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
864 ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
865 ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
866 ConnectionOptionName::SaslPassword => "SASL PASSWORD",
867 ConnectionOptionName::SaslUsername => "SASL USERNAME",
868 ConnectionOptionName::Scope => "SCOPE",
869 ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
870 ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
871 ConnectionOptionName::ServiceName => "SERVICE NAME",
872 ConnectionOptionName::SshTunnel => "SSH TUNNEL",
873 ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
874 ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
875 ConnectionOptionName::SslKey => "SSL KEY",
876 ConnectionOptionName::SslMode => "SSL MODE",
877 ConnectionOptionName::SessionToken => "SESSION TOKEN",
878 ConnectionOptionName::CatalogType => "CATALOG TYPE",
879 ConnectionOptionName::Url => "URL",
880 ConnectionOptionName::User => "USER",
881 ConnectionOptionName::Warehouse => "WAREHOUSE",
882 })
883 }
884}
885impl_display!(ConnectionOptionName);
886
887impl WithOptionName for ConnectionOptionName {
888 fn redact_value(&self) -> bool {
894 match self {
895 ConnectionOptionName::AccessKeyId
896 | ConnectionOptionName::AvailabilityZones
897 | ConnectionOptionName::AwsConnection
898 | ConnectionOptionName::AwsPrivatelink
899 | ConnectionOptionName::Broker
900 | ConnectionOptionName::Brokers
901 | ConnectionOptionName::Credential
902 | ConnectionOptionName::Database
903 | ConnectionOptionName::Endpoint
904 | ConnectionOptionName::Host
905 | ConnectionOptionName::Password
906 | ConnectionOptionName::Port
907 | ConnectionOptionName::ProgressTopic
908 | ConnectionOptionName::ProgressTopicReplicationFactor
909 | ConnectionOptionName::PublicKey1
910 | ConnectionOptionName::PublicKey2
911 | ConnectionOptionName::Region
912 | ConnectionOptionName::AssumeRoleArn
913 | ConnectionOptionName::AssumeRoleSessionName
914 | ConnectionOptionName::SaslMechanisms
915 | ConnectionOptionName::SaslPassword
916 | ConnectionOptionName::SaslUsername
917 | ConnectionOptionName::Scope
918 | ConnectionOptionName::SecurityProtocol
919 | ConnectionOptionName::SecretAccessKey
920 | ConnectionOptionName::ServiceName
921 | ConnectionOptionName::SshTunnel
922 | ConnectionOptionName::SslCertificate
923 | ConnectionOptionName::SslCertificateAuthority
924 | ConnectionOptionName::SslKey
925 | ConnectionOptionName::SslMode
926 | ConnectionOptionName::SessionToken
927 | ConnectionOptionName::CatalogType
928 | ConnectionOptionName::Url
929 | ConnectionOptionName::User
930 | ConnectionOptionName::Warehouse => false,
931 }
932 }
933}
934
935#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
936pub struct ConnectionOption<T: AstInfo> {
938 pub name: ConnectionOptionName,
939 pub value: Option<WithOptionValue<T>>,
940}
941impl_display_for_with_option!(ConnectionOption);
942impl_display_t!(ConnectionOption);
943
944#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
945pub enum CreateConnectionType {
946 Aws,
947 AwsPrivatelink,
948 Kafka,
949 Csr,
950 Postgres,
951 Ssh,
952 SqlServer,
953 MySql,
954 IcebergCatalog,
955}
956
957impl CreateConnectionType {
958 pub fn as_str(&self) -> &'static str {
959 match self {
960 Self::Kafka => "kafka",
961 Self::Csr => "confluent-schema-registry",
962 Self::Postgres => "postgres",
963 Self::Aws => "aws",
964 Self::AwsPrivatelink => "aws-privatelink",
965 Self::Ssh => "ssh-tunnel",
966 Self::MySql => "mysql",
967 Self::SqlServer => "sql-server",
968 Self::IcebergCatalog => "iceberg-catalog",
969 }
970 }
971}
972
973impl AstDisplay for CreateConnectionType {
974 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
975 match self {
976 Self::Kafka => {
977 f.write_str("KAFKA");
978 }
979 Self::Csr => {
980 f.write_str("CONFLUENT SCHEMA REGISTRY");
981 }
982 Self::Postgres => {
983 f.write_str("POSTGRES");
984 }
985 Self::Aws => {
986 f.write_str("AWS");
987 }
988 Self::AwsPrivatelink => {
989 f.write_str("AWS PRIVATELINK");
990 }
991 Self::Ssh => {
992 f.write_str("SSH TUNNEL");
993 }
994 Self::SqlServer => {
995 f.write_str("SQL SERVER");
996 }
997 Self::MySql => {
998 f.write_str("MYSQL");
999 }
1000 Self::IcebergCatalog => {
1001 f.write_str("ICEBERG CATALOG");
1002 }
1003 }
1004 }
1005}
1006impl_display!(CreateConnectionType);
1007
1008#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1009pub enum CreateConnectionOptionName {
1010 Validate,
1011}
1012
1013impl AstDisplay for CreateConnectionOptionName {
1014 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1015 f.write_str(match self {
1016 CreateConnectionOptionName::Validate => "VALIDATE",
1017 })
1018 }
1019}
1020impl_display!(CreateConnectionOptionName);
1021
1022impl WithOptionName for CreateConnectionOptionName {
1023 fn redact_value(&self) -> bool {
1029 match self {
1030 CreateConnectionOptionName::Validate => false,
1031 }
1032 }
1033}
1034
1035#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1036pub struct CreateConnectionOption<T: AstInfo> {
1038 pub name: CreateConnectionOptionName,
1039 pub value: Option<WithOptionValue<T>>,
1040}
1041impl_display_for_with_option!(CreateConnectionOption);
1042impl_display_t!(CreateConnectionOption);
1043
1044#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1045pub enum KafkaSourceConfigOptionName {
1046 GroupIdPrefix,
1047 Topic,
1048 TopicMetadataRefreshInterval,
1049 StartTimestamp,
1050 StartOffset,
1051}
1052
1053impl AstDisplay for KafkaSourceConfigOptionName {
1054 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1055 f.write_str(match self {
1056 KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
1057 KafkaSourceConfigOptionName::Topic => "TOPIC",
1058 KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
1059 "TOPIC METADATA REFRESH INTERVAL"
1060 }
1061 KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
1062 KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
1063 })
1064 }
1065}
1066impl_display!(KafkaSourceConfigOptionName);
1067
1068impl WithOptionName for KafkaSourceConfigOptionName {
1069 fn redact_value(&self) -> bool {
1075 match self {
1076 KafkaSourceConfigOptionName::GroupIdPrefix
1077 | KafkaSourceConfigOptionName::Topic
1078 | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1079 | KafkaSourceConfigOptionName::StartOffset
1080 | KafkaSourceConfigOptionName::StartTimestamp => false,
1081 }
1082 }
1083}
1084
1085#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1086pub struct KafkaSourceConfigOption<T: AstInfo> {
1087 pub name: KafkaSourceConfigOptionName,
1088 pub value: Option<WithOptionValue<T>>,
1089}
1090impl_display_for_with_option!(KafkaSourceConfigOption);
1091impl_display_t!(KafkaSourceConfigOption);
1092
1093#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1094pub enum KafkaSinkConfigOptionName {
1095 CompressionType,
1096 PartitionBy,
1097 ProgressGroupIdPrefix,
1098 Topic,
1099 TransactionalIdPrefix,
1100 LegacyIds,
1101 TopicConfig,
1102 TopicMetadataRefreshInterval,
1103 TopicPartitionCount,
1104 TopicReplicationFactor,
1105}
1106
1107impl AstDisplay for KafkaSinkConfigOptionName {
1108 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1109 f.write_str(match self {
1110 KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1111 KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1112 KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1113 KafkaSinkConfigOptionName::Topic => "TOPIC",
1114 KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1115 KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1116 KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1117 KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1118 "TOPIC METADATA REFRESH INTERVAL"
1119 }
1120 KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1121 KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1122 })
1123 }
1124}
1125impl_display!(KafkaSinkConfigOptionName);
1126
1127impl WithOptionName for KafkaSinkConfigOptionName {
1128 fn redact_value(&self) -> bool {
1134 match self {
1135 KafkaSinkConfigOptionName::CompressionType
1136 | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1137 | KafkaSinkConfigOptionName::Topic
1138 | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1139 | KafkaSinkConfigOptionName::TransactionalIdPrefix
1140 | KafkaSinkConfigOptionName::LegacyIds
1141 | KafkaSinkConfigOptionName::TopicConfig
1142 | KafkaSinkConfigOptionName::TopicPartitionCount
1143 | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1144 KafkaSinkConfigOptionName::PartitionBy => true,
1145 }
1146 }
1147}
1148
1149#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1150pub struct KafkaSinkConfigOption<T: AstInfo> {
1151 pub name: KafkaSinkConfigOptionName,
1152 pub value: Option<WithOptionValue<T>>,
1153}
1154impl_display_for_with_option!(KafkaSinkConfigOption);
1155impl_display_t!(KafkaSinkConfigOption);
1156
1157#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1158pub enum IcebergSinkConfigOptionName {
1159 Namespace,
1160 Table,
1161}
1162
1163impl AstDisplay for IcebergSinkConfigOptionName {
1164 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1165 f.write_str(match self {
1166 IcebergSinkConfigOptionName::Namespace => "NAMESPACE",
1167 IcebergSinkConfigOptionName::Table => "TABLE",
1168 })
1169 }
1170}
1171impl_display!(IcebergSinkConfigOptionName);
1172
1173impl WithOptionName for IcebergSinkConfigOptionName {
1174 fn redact_value(&self) -> bool {
1180 match self {
1181 IcebergSinkConfigOptionName::Namespace | IcebergSinkConfigOptionName::Table => false,
1182 }
1183 }
1184}
1185
1186#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1187pub struct IcebergSinkConfigOption<T: AstInfo> {
1188 pub name: IcebergSinkConfigOptionName,
1189 pub value: Option<WithOptionValue<T>>,
1190}
1191impl_display_for_with_option!(IcebergSinkConfigOption);
1192impl_display_t!(IcebergSinkConfigOption);
1193
1194#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1195pub enum PgConfigOptionName {
1196 Details,
1199 Publication,
1201 TextColumns,
1208 ExcludeColumns,
1215}
1216
1217impl AstDisplay for PgConfigOptionName {
1218 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1219 f.write_str(match self {
1220 PgConfigOptionName::Details => "DETAILS",
1221 PgConfigOptionName::Publication => "PUBLICATION",
1222 PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1223 PgConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1224 })
1225 }
1226}
1227impl_display!(PgConfigOptionName);
1228
1229impl WithOptionName for PgConfigOptionName {
1230 fn redact_value(&self) -> bool {
1236 match self {
1237 PgConfigOptionName::Details
1238 | PgConfigOptionName::Publication
1239 | PgConfigOptionName::TextColumns
1240 | PgConfigOptionName::ExcludeColumns => false,
1241 }
1242 }
1243}
1244
1245#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1246pub struct PgConfigOption<T: AstInfo> {
1248 pub name: PgConfigOptionName,
1249 pub value: Option<WithOptionValue<T>>,
1250}
1251impl_display_for_with_option!(PgConfigOption);
1252impl_display_t!(PgConfigOption);
1253
1254#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1255pub enum MySqlConfigOptionName {
1256 Details,
1259 TextColumns,
1266 ExcludeColumns,
1273}
1274
1275impl AstDisplay for MySqlConfigOptionName {
1276 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1277 f.write_str(match self {
1278 MySqlConfigOptionName::Details => "DETAILS",
1279 MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1280 MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1281 })
1282 }
1283}
1284impl_display!(MySqlConfigOptionName);
1285
1286impl WithOptionName for MySqlConfigOptionName {
1287 fn redact_value(&self) -> bool {
1293 match self {
1294 MySqlConfigOptionName::Details
1295 | MySqlConfigOptionName::TextColumns
1296 | MySqlConfigOptionName::ExcludeColumns => false,
1297 }
1298 }
1299}
1300
1301#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1302pub struct MySqlConfigOption<T: AstInfo> {
1304 pub name: MySqlConfigOptionName,
1305 pub value: Option<WithOptionValue<T>>,
1306}
1307impl_display_for_with_option!(MySqlConfigOption);
1308impl_display_t!(MySqlConfigOption);
1309
1310#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1311pub enum SqlServerConfigOptionName {
1312 Details,
1315 TextColumns,
1323 ExcludeColumns,
1331}
1332
1333impl AstDisplay for SqlServerConfigOptionName {
1334 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1335 f.write_str(match self {
1336 SqlServerConfigOptionName::Details => "DETAILS",
1337 SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1338 SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1339 })
1340 }
1341}
1342impl_display!(SqlServerConfigOptionName);
1343
1344impl WithOptionName for SqlServerConfigOptionName {
1345 fn redact_value(&self) -> bool {
1351 match self {
1352 SqlServerConfigOptionName::Details
1353 | SqlServerConfigOptionName::TextColumns
1354 | SqlServerConfigOptionName::ExcludeColumns => false,
1355 }
1356 }
1357}
1358
1359#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1360pub struct SqlServerConfigOption<T: AstInfo> {
1362 pub name: SqlServerConfigOptionName,
1363 pub value: Option<WithOptionValue<T>>,
1364}
1365impl_display_for_with_option!(SqlServerConfigOption);
1366impl_display_t!(SqlServerConfigOption);
1367
1368#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1369pub enum CreateSourceConnection<T: AstInfo> {
1370 Kafka {
1371 connection: T::ItemName,
1372 options: Vec<KafkaSourceConfigOption<T>>,
1373 },
1374 Postgres {
1375 connection: T::ItemName,
1376 options: Vec<PgConfigOption<T>>,
1377 },
1378 SqlServer {
1379 connection: T::ItemName,
1380 options: Vec<SqlServerConfigOption<T>>,
1381 },
1382 MySql {
1383 connection: T::ItemName,
1384 options: Vec<MySqlConfigOption<T>>,
1385 },
1386 LoadGenerator {
1387 generator: LoadGenerator,
1388 options: Vec<LoadGeneratorOption<T>>,
1389 },
1390}
1391
1392impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1393 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1394 match self {
1395 CreateSourceConnection::Kafka {
1396 connection,
1397 options,
1398 } => {
1399 f.write_str("KAFKA CONNECTION ");
1400 f.write_node(connection);
1401 if !options.is_empty() {
1402 f.write_str(" (");
1403 f.write_node(&display::comma_separated(options));
1404 f.write_str(")");
1405 }
1406 }
1407 CreateSourceConnection::Postgres {
1408 connection,
1409 options,
1410 } => {
1411 f.write_str("POSTGRES CONNECTION ");
1412 f.write_node(connection);
1413 if !options.is_empty() {
1414 f.write_str(" (");
1415 f.write_node(&display::comma_separated(options));
1416 f.write_str(")");
1417 }
1418 }
1419 CreateSourceConnection::SqlServer {
1420 connection,
1421 options,
1422 } => {
1423 f.write_str("SQL SERVER CONNECTION ");
1424 f.write_node(connection);
1425 if !options.is_empty() {
1426 f.write_str(" (");
1427 f.write_node(&display::comma_separated(options));
1428 f.write_str(")");
1429 }
1430 }
1431 CreateSourceConnection::MySql {
1432 connection,
1433 options,
1434 } => {
1435 f.write_str("MYSQL CONNECTION ");
1436 f.write_node(connection);
1437 if !options.is_empty() {
1438 f.write_str(" (");
1439 f.write_node(&display::comma_separated(options));
1440 f.write_str(")");
1441 }
1442 }
1443 CreateSourceConnection::LoadGenerator { generator, options } => {
1444 f.write_str("LOAD GENERATOR ");
1445 f.write_node(generator);
1446 if !options.is_empty() {
1447 f.write_str(" (");
1448 f.write_node(&display::comma_separated(options));
1449 f.write_str(")");
1450 }
1451 }
1452 }
1453 }
1454}
1455impl_display_t!(CreateSourceConnection);
1456
1457#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1458pub enum LoadGenerator {
1459 Clock,
1460 Counter,
1461 Marketing,
1462 Auction,
1463 Datums,
1464 Tpch,
1465 KeyValue,
1466}
1467
1468impl AstDisplay for LoadGenerator {
1469 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1470 match self {
1471 Self::Counter => f.write_str("COUNTER"),
1472 Self::Clock => f.write_str("CLOCK"),
1473 Self::Marketing => f.write_str("MARKETING"),
1474 Self::Auction => f.write_str("AUCTION"),
1475 Self::Datums => f.write_str("DATUMS"),
1476 Self::Tpch => f.write_str("TPCH"),
1477 Self::KeyValue => f.write_str("KEY VALUE"),
1478 }
1479 }
1480}
1481impl_display!(LoadGenerator);
1482
1483impl LoadGenerator {
1484 pub fn schema_name(&self) -> &'static str {
1489 match self {
1490 LoadGenerator::Counter => "counter",
1491 LoadGenerator::Clock => "clock",
1492 LoadGenerator::Marketing => "marketing",
1493 LoadGenerator::Auction => "auction",
1494 LoadGenerator::Datums => "datums",
1495 LoadGenerator::Tpch => "tpch",
1496 LoadGenerator::KeyValue => "key_value",
1497 }
1498 }
1499}
1500
1501#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1502pub enum LoadGeneratorOptionName {
1503 ScaleFactor,
1504 TickInterval,
1505 AsOf,
1506 UpTo,
1507 MaxCardinality,
1508 Keys,
1509 SnapshotRounds,
1510 TransactionalSnapshot,
1511 ValueSize,
1512 Seed,
1513 Partitions,
1514 BatchSize,
1515}
1516
1517impl AstDisplay for LoadGeneratorOptionName {
1518 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1519 f.write_str(match self {
1520 LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1521 LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1522 LoadGeneratorOptionName::AsOf => "AS OF",
1523 LoadGeneratorOptionName::UpTo => "UP TO",
1524 LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1525 LoadGeneratorOptionName::Keys => "KEYS",
1526 LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1527 LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1528 LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1529 LoadGeneratorOptionName::Seed => "SEED",
1530 LoadGeneratorOptionName::Partitions => "PARTITIONS",
1531 LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1532 })
1533 }
1534}
1535impl_display!(LoadGeneratorOptionName);
1536
1537impl WithOptionName for LoadGeneratorOptionName {
1538 fn redact_value(&self) -> bool {
1544 match self {
1545 LoadGeneratorOptionName::ScaleFactor
1546 | LoadGeneratorOptionName::TickInterval
1547 | LoadGeneratorOptionName::AsOf
1548 | LoadGeneratorOptionName::UpTo
1549 | LoadGeneratorOptionName::MaxCardinality
1550 | LoadGeneratorOptionName::Keys
1551 | LoadGeneratorOptionName::SnapshotRounds
1552 | LoadGeneratorOptionName::TransactionalSnapshot
1553 | LoadGeneratorOptionName::ValueSize
1554 | LoadGeneratorOptionName::Partitions
1555 | LoadGeneratorOptionName::BatchSize
1556 | LoadGeneratorOptionName::Seed => false,
1557 }
1558 }
1559}
1560
1561#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1562pub struct LoadGeneratorOption<T: AstInfo> {
1564 pub name: LoadGeneratorOptionName,
1565 pub value: Option<WithOptionValue<T>>,
1566}
1567impl_display_for_with_option!(LoadGeneratorOption);
1568impl_display_t!(LoadGeneratorOption);
1569
1570#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1571pub enum CreateSinkConnection<T: AstInfo> {
1572 Kafka {
1573 connection: T::ItemName,
1574 options: Vec<KafkaSinkConfigOption<T>>,
1575 key: Option<SinkKey>,
1576 headers: Option<Ident>,
1577 },
1578 Iceberg {
1579 connection: T::ItemName,
1580 aws_connection: T::ItemName,
1581 key: Option<SinkKey>,
1582 options: Vec<IcebergSinkConfigOption<T>>,
1583 },
1584}
1585
1586impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1587 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1588 match self {
1589 CreateSinkConnection::Kafka {
1590 connection,
1591 options,
1592 key,
1593 headers,
1594 } => {
1595 f.write_str("KAFKA CONNECTION ");
1596 f.write_node(connection);
1597 if !options.is_empty() {
1598 f.write_str(" (");
1599 f.write_node(&display::comma_separated(options));
1600 f.write_str(")");
1601 }
1602 if let Some(key) = key.as_ref() {
1603 f.write_str(" ");
1604 f.write_node(key);
1605 }
1606 if let Some(headers) = headers {
1607 f.write_str(" HEADERS ");
1608 f.write_node(headers);
1609 }
1610 }
1611 CreateSinkConnection::Iceberg {
1612 connection,
1613 aws_connection,
1614 key,
1615 options,
1616 } => {
1617 f.write_str("ICEBERG CATALOG CONNECTION ");
1618 f.write_node(connection);
1619 if !options.is_empty() {
1620 f.write_str(" (");
1621 f.write_node(&display::comma_separated(options));
1622 f.write_str(")");
1623 }
1624 f.write_str(" USING AWS CONNECTION ");
1625 f.write_node(aws_connection);
1626 if let Some(key) = key.as_ref() {
1627 f.write_str(" ");
1628 f.write_node(key);
1629 }
1630 }
1631 }
1632 }
1633}
1634impl_display_t!(CreateSinkConnection);
1635
1636#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1637pub struct SinkKey {
1638 pub key_columns: Vec<Ident>,
1639 pub not_enforced: bool,
1640}
1641
1642impl AstDisplay for SinkKey {
1643 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1644 f.write_str("KEY (");
1645 f.write_node(&display::comma_separated(&self.key_columns));
1646 f.write_str(")");
1647 if self.not_enforced {
1648 f.write_str(" NOT ENFORCED");
1649 }
1650 }
1651}
1652
1653#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1656pub enum TableConstraint<T: AstInfo> {
1657 Unique {
1659 name: Option<Ident>,
1660 columns: Vec<Ident>,
1661 is_primary: bool,
1663 nulls_not_distinct: bool,
1666 },
1667 ForeignKey {
1670 name: Option<Ident>,
1671 columns: Vec<Ident>,
1672 foreign_table: T::ItemName,
1673 referred_columns: Vec<Ident>,
1674 },
1675 Check {
1677 name: Option<Ident>,
1678 expr: Box<Expr<T>>,
1679 },
1680}
1681
1682impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1683 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1684 match self {
1685 TableConstraint::Unique {
1686 name,
1687 columns,
1688 is_primary,
1689 nulls_not_distinct,
1690 } => {
1691 f.write_node(&display_constraint_name(name));
1692 if *is_primary {
1693 f.write_str("PRIMARY KEY ");
1694 } else {
1695 f.write_str("UNIQUE ");
1696 if *nulls_not_distinct {
1697 f.write_str("NULLS NOT DISTINCT ");
1698 }
1699 }
1700 f.write_str("(");
1701 f.write_node(&display::comma_separated(columns));
1702 f.write_str(")");
1703 }
1704 TableConstraint::ForeignKey {
1705 name,
1706 columns,
1707 foreign_table,
1708 referred_columns,
1709 } => {
1710 f.write_node(&display_constraint_name(name));
1711 f.write_str("FOREIGN KEY (");
1712 f.write_node(&display::comma_separated(columns));
1713 f.write_str(") REFERENCES ");
1714 f.write_node(foreign_table);
1715 f.write_str("(");
1716 f.write_node(&display::comma_separated(referred_columns));
1717 f.write_str(")");
1718 }
1719 TableConstraint::Check { name, expr } => {
1720 f.write_node(&display_constraint_name(name));
1721 f.write_str("CHECK (");
1722 f.write_node(&expr);
1723 f.write_str(")");
1724 }
1725 }
1726 }
1727}
1728impl_display_t!(TableConstraint);
1729
1730#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1732pub enum KeyConstraint {
1733 PrimaryKeyNotEnforced { columns: Vec<Ident> },
1735}
1736
1737impl AstDisplay for KeyConstraint {
1738 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1739 match self {
1740 KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1741 f.write_str("PRIMARY KEY ");
1742 f.write_str("(");
1743 f.write_node(&display::comma_separated(columns));
1744 f.write_str(") ");
1745 f.write_str("NOT ENFORCED");
1746 }
1747 }
1748 }
1749}
1750impl_display!(KeyConstraint);
1751
1752#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1753pub enum CreateSourceOptionName {
1754 TimestampInterval,
1755 RetainHistory,
1756}
1757
1758impl AstDisplay for CreateSourceOptionName {
1759 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1760 f.write_str(match self {
1761 CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1762 CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1763 })
1764 }
1765}
1766impl_display!(CreateSourceOptionName);
1767
1768impl WithOptionName for CreateSourceOptionName {
1769 fn redact_value(&self) -> bool {
1775 match self {
1776 CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1777 false
1778 }
1779 }
1780 }
1781}
1782
1783#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1784pub struct CreateSourceOption<T: AstInfo> {
1786 pub name: CreateSourceOptionName,
1787 pub value: Option<WithOptionValue<T>>,
1788}
1789impl_display_for_with_option!(CreateSourceOption);
1790impl_display_t!(CreateSourceOption);
1791
1792#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1794pub struct ColumnDef<T: AstInfo> {
1795 pub name: Ident,
1796 pub data_type: T::DataType,
1797 pub collation: Option<UnresolvedItemName>,
1798 pub options: Vec<ColumnOptionDef<T>>,
1799}
1800
1801impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1802 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1803 f.write_node(&self.name);
1804 f.write_str(" ");
1805 f.write_node(&self.data_type);
1806 if let Some(collation) = &self.collation {
1807 f.write_str(" COLLATE ");
1808 f.write_node(collation);
1809 }
1810 for option in &self.options {
1811 f.write_str(" ");
1812 f.write_node(option);
1813 }
1814 }
1815}
1816impl_display_t!(ColumnDef);
1817
1818#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1835pub struct ColumnOptionDef<T: AstInfo> {
1836 pub name: Option<Ident>,
1837 pub option: ColumnOption<T>,
1838}
1839
1840impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1841 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1842 f.write_node(&display_constraint_name(&self.name));
1843 f.write_node(&self.option);
1844 }
1845}
1846impl_display_t!(ColumnOptionDef);
1847
1848#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1851pub enum ColumnOption<T: AstInfo> {
1852 Null,
1854 NotNull,
1856 Default(Expr<T>),
1858 Unique { is_primary: bool },
1860 ForeignKey {
1863 foreign_table: UnresolvedItemName,
1864 referred_columns: Vec<Ident>,
1865 },
1866 Check(Expr<T>),
1868 Versioned {
1870 action: ColumnVersioned,
1871 version: Version,
1872 },
1873}
1874
1875impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1876 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1877 use ColumnOption::*;
1878 match self {
1879 Null => f.write_str("NULL"),
1880 NotNull => f.write_str("NOT NULL"),
1881 Default(expr) => {
1882 f.write_str("DEFAULT ");
1883 f.write_node(expr);
1884 }
1885 Unique { is_primary } => {
1886 if *is_primary {
1887 f.write_str("PRIMARY KEY");
1888 } else {
1889 f.write_str("UNIQUE");
1890 }
1891 }
1892 ForeignKey {
1893 foreign_table,
1894 referred_columns,
1895 } => {
1896 f.write_str("REFERENCES ");
1897 f.write_node(foreign_table);
1898 f.write_str(" (");
1899 f.write_node(&display::comma_separated(referred_columns));
1900 f.write_str(")");
1901 }
1902 Check(expr) => {
1903 f.write_str("CHECK (");
1904 f.write_node(expr);
1905 f.write_str(")");
1906 }
1907 Versioned { action, version } => {
1908 f.write_str("VERSION ");
1909 f.write_node(action);
1910 f.write_str(" ");
1911 f.write_node(version);
1912 }
1913 }
1914 }
1915}
1916impl_display_t!(ColumnOption);
1917
1918#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1919pub enum ColumnVersioned {
1920 Added,
1921}
1922
1923impl AstDisplay for ColumnVersioned {
1924 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1925 where
1926 W: fmt::Write,
1927 {
1928 match self {
1929 ColumnVersioned::Added => f.write_str("ADDED"),
1931 }
1932 }
1933}
1934impl_display!(ColumnVersioned);
1935
1936fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
1937 struct ConstraintName<'a>(&'a Option<Ident>);
1938 impl<'a> AstDisplay for ConstraintName<'a> {
1939 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1940 where
1941 W: fmt::Write,
1942 {
1943 if let Some(name) = self.0 {
1944 f.write_str("CONSTRAINT ");
1945 f.write_node(name);
1946 f.write_str(" ");
1947 }
1948 }
1949 }
1950 ConstraintName(name)
1951}