1use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28 AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33 AssertNotNull,
35 PartitionBy,
36 RetainHistory,
37 Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43 match self {
44 MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45 MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46 MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47 MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48 }
49 }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53 fn redact_value(&self) -> bool {
59 match self {
60 MaterializedViewOptionName::AssertNotNull
61 | MaterializedViewOptionName::PartitionBy
62 | MaterializedViewOptionName::RetainHistory
63 | MaterializedViewOptionName::Refresh => false,
64 }
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70 pub name: MaterializedViewOptionName,
71 pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
76pub enum ContinualTaskOptionName {
77 Snapshot,
79}
80
81impl AstDisplay for ContinualTaskOptionName {
82 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
83 match self {
84 ContinualTaskOptionName::Snapshot => f.write_str("SNAPSHOT"),
85 }
86 }
87}
88
89impl WithOptionName for ContinualTaskOptionName {
90 fn redact_value(&self) -> bool {
96 match self {
97 ContinualTaskOptionName::Snapshot => false,
98 }
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
103pub struct ContinualTaskOption<T: AstInfo> {
104 pub name: ContinualTaskOptionName,
105 pub value: Option<WithOptionValue<T>>,
106}
107impl_display_for_with_option!(ContinualTaskOption);
108
109#[derive(Debug, Clone, PartialEq, Eq, Hash)]
110pub struct Schema {
111 pub schema: String,
112}
113
114impl AstDisplay for Schema {
115 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
116 f.write_str("SCHEMA '");
117 f.write_node(&display::escape_single_quote_string(&self.schema));
118 f.write_str("'");
119 }
120}
121impl_display!(Schema);
122
123#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
124pub enum AvroSchemaOptionName {
125 ConfluentWireFormat,
127}
128
129impl AstDisplay for AvroSchemaOptionName {
130 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
131 match self {
132 AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
133 }
134 }
135}
136
137impl WithOptionName for AvroSchemaOptionName {
138 fn redact_value(&self) -> bool {
144 match self {
145 Self::ConfluentWireFormat => false,
146 }
147 }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
151pub struct AvroSchemaOption<T: AstInfo> {
152 pub name: AvroSchemaOptionName,
153 pub value: Option<WithOptionValue<T>>,
154}
155impl_display_for_with_option!(AvroSchemaOption);
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum AvroSchema<T: AstInfo> {
159 Csr {
160 csr_connection: CsrConnectionAvro<T>,
161 },
162 InlineSchema {
163 schema: Schema,
164 with_options: Vec<AvroSchemaOption<T>>,
165 },
166}
167
168impl<T: AstInfo> AstDisplay for AvroSchema<T> {
169 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
170 match self {
171 Self::Csr { csr_connection } => {
172 f.write_node(csr_connection);
173 }
174 Self::InlineSchema {
175 schema,
176 with_options,
177 } => {
178 f.write_str("USING ");
179 schema.fmt(f);
180 if !with_options.is_empty() {
181 f.write_str(" (");
182 f.write_node(&display::comma_separated(with_options));
183 f.write_str(")");
184 }
185 }
186 }
187 }
188}
189impl_display_t!(AvroSchema);
190
191#[derive(Debug, Clone, PartialEq, Eq, Hash)]
192pub enum ProtobufSchema<T: AstInfo> {
193 Csr {
194 csr_connection: CsrConnectionProtobuf<T>,
195 },
196 InlineSchema {
197 message_name: String,
198 schema: Schema,
199 },
200}
201
202impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
203 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
204 match self {
205 Self::Csr { csr_connection } => {
206 f.write_node(csr_connection);
207 }
208 Self::InlineSchema {
209 message_name,
210 schema,
211 } => {
212 f.write_str("MESSAGE '");
213 f.write_node(&display::escape_single_quote_string(message_name));
214 f.write_str("' USING ");
215 f.write_str(schema);
216 }
217 }
218 }
219}
220impl_display_t!(ProtobufSchema);
221
222#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
223pub enum CsrConfigOptionName<T: AstInfo> {
224 AvroKeyFullname,
225 AvroValueFullname,
226 NullDefaults,
227 AvroDocOn(AvroDocOn<T>),
228 KeyCompatibilityLevel,
229 ValueCompatibilityLevel,
230}
231
232impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
233 fn redact_value(&self) -> bool {
239 match self {
240 Self::AvroKeyFullname
241 | Self::AvroValueFullname
242 | Self::NullDefaults
243 | Self::AvroDocOn(_)
244 | Self::KeyCompatibilityLevel
245 | Self::ValueCompatibilityLevel => false,
246 }
247 }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
251pub struct AvroDocOn<T: AstInfo> {
252 pub identifier: DocOnIdentifier<T>,
253 pub for_schema: DocOnSchema,
254}
255#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
256pub enum DocOnSchema {
257 KeyOnly,
258 ValueOnly,
259 All,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
263pub enum DocOnIdentifier<T: AstInfo> {
264 Column(ColumnName<T>),
265 Type(T::ItemName),
266}
267
268impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
269 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
270 match &self.for_schema {
271 DocOnSchema::KeyOnly => f.write_str("KEY "),
272 DocOnSchema::ValueOnly => f.write_str("VALUE "),
273 DocOnSchema::All => {}
274 }
275 match &self.identifier {
276 DocOnIdentifier::Column(name) => {
277 f.write_str("DOC ON COLUMN ");
278 f.write_node(name);
279 }
280 DocOnIdentifier::Type(name) => {
281 f.write_str("DOC ON TYPE ");
282 f.write_node(name);
283 }
284 }
285 }
286}
287impl_display_t!(AvroDocOn);
288
289impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
290 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
291 match self {
292 CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
293 CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
294 CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
295 CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
296 CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
297 CsrConfigOptionName::ValueCompatibilityLevel => {
298 f.write_str("VALUE COMPATIBILITY LEVEL")
299 }
300 }
301 }
302}
303impl_display_t!(CsrConfigOptionName);
304
305#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
306pub struct CsrConfigOption<T: AstInfo> {
308 pub name: CsrConfigOptionName<T>,
309 pub value: Option<WithOptionValue<T>>,
310}
311impl_display_for_with_option!(CsrConfigOption);
312impl_display_t!(CsrConfigOption);
313
314#[derive(Debug, Clone, PartialEq, Eq, Hash)]
315pub struct CsrConnection<T: AstInfo> {
316 pub connection: T::ItemName,
317 pub options: Vec<CsrConfigOption<T>>,
318}
319
320impl<T: AstInfo> AstDisplay for CsrConnection<T> {
321 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
322 f.write_str("CONNECTION ");
323 f.write_node(&self.connection);
324 if !self.options.is_empty() {
325 f.write_str(" (");
326 f.write_node(&display::comma_separated(&self.options));
327 f.write_str(")");
328 }
329 }
330}
331impl_display_t!(CsrConnection);
332
333#[derive(Debug, Clone, PartialEq, Eq, Hash)]
334pub enum ReaderSchemaSelectionStrategy {
335 Latest,
336 Inline(String),
337 ById(i32),
338}
339
340impl Default for ReaderSchemaSelectionStrategy {
341 fn default() -> Self {
342 Self::Latest
343 }
344}
345
346#[derive(Debug, Clone, PartialEq, Eq, Hash)]
347pub struct CsrConnectionAvro<T: AstInfo> {
348 pub connection: CsrConnection<T>,
349 pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
350 pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
351 pub seed: Option<CsrSeedAvro>,
352}
353
354impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
355 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
356 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
357 f.write_node(&self.connection);
358 if let Some(seed) = &self.seed {
359 f.write_str(" ");
360 f.write_node(seed);
361 }
362 }
363}
364impl_display_t!(CsrConnectionAvro);
365
366#[derive(Debug, Clone, PartialEq, Eq, Hash)]
367pub struct CsrConnectionProtobuf<T: AstInfo> {
368 pub connection: CsrConnection<T>,
369 pub seed: Option<CsrSeedProtobuf>,
370}
371
372impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
373 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
374 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
375 f.write_node(&self.connection);
376
377 if let Some(seed) = &self.seed {
378 f.write_str(" ");
379 f.write_node(seed);
380 }
381 }
382}
383impl_display_t!(CsrConnectionProtobuf);
384
385#[derive(Debug, Clone, PartialEq, Eq, Hash)]
386pub struct CsrSeedAvro {
387 pub key_schema: Option<String>,
388 pub value_schema: String,
389 pub key_reference_schemas: Vec<String>,
392 pub value_reference_schemas: Vec<String>,
395}
396
397impl AstDisplay for CsrSeedAvro {
398 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
399 f.write_str("SEED");
400 if let Some(key_schema) = &self.key_schema {
401 f.write_str(" KEY SCHEMA '");
402 f.write_node(&display::escape_single_quote_string(key_schema));
403 f.write_str("'");
404 if !self.key_reference_schemas.is_empty() {
405 f.write_str(" KEY REFERENCES (");
406 for (i, schema) in self.key_reference_schemas.iter().enumerate() {
407 if i > 0 {
408 f.write_str(", ");
409 }
410 f.write_str("'");
411 f.write_node(&display::escape_single_quote_string(schema));
412 f.write_str("'");
413 }
414 f.write_str(")");
415 }
416 }
417 f.write_str(" VALUE SCHEMA '");
418 f.write_node(&display::escape_single_quote_string(&self.value_schema));
419 f.write_str("'");
420 if !self.value_reference_schemas.is_empty() {
421 f.write_str(" VALUE REFERENCES (");
422 for (i, schema) in self.value_reference_schemas.iter().enumerate() {
423 if i > 0 {
424 f.write_str(", ");
425 }
426 f.write_str("'");
427 f.write_node(&display::escape_single_quote_string(schema));
428 f.write_str("'");
429 }
430 f.write_str(")");
431 }
432 }
433}
434impl_display!(CsrSeedAvro);
435
436#[derive(Debug, Clone, PartialEq, Eq, Hash)]
437pub struct CsrSeedProtobuf {
438 pub key: Option<CsrSeedProtobufSchema>,
439 pub value: CsrSeedProtobufSchema,
440}
441
442impl AstDisplay for CsrSeedProtobuf {
443 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
444 f.write_str("SEED");
445 if let Some(key) = &self.key {
446 f.write_str(" KEY ");
447 f.write_node(key);
448 }
449 f.write_str(" VALUE ");
450 f.write_node(&self.value);
451 }
452}
453impl_display!(CsrSeedProtobuf);
454
455#[derive(Debug, Clone, PartialEq, Eq, Hash)]
456pub struct CsrSeedProtobufSchema {
457 pub schema: String,
459 pub message_name: String,
460}
461impl AstDisplay for CsrSeedProtobufSchema {
462 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
463 f.write_str("SCHEMA '");
464 f.write_str(&display::escape_single_quote_string(&self.schema));
465 f.write_str("' MESSAGE '");
466 f.write_str(&self.message_name);
467 f.write_str("'");
468 }
469}
470impl_display!(CsrSeedProtobufSchema);
471
472#[derive(Debug, Clone, PartialEq, Eq, Hash)]
473pub enum FormatSpecifier<T: AstInfo> {
474 Bare(Format<T>),
476 KeyValue { key: Format<T>, value: Format<T> },
478}
479
480impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
481 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
482 match self {
483 FormatSpecifier::Bare(format) => {
484 f.write_str("FORMAT ");
485 f.write_node(format)
486 }
487 FormatSpecifier::KeyValue { key, value } => {
488 f.write_str("KEY FORMAT ");
489 f.write_node(key);
490 f.write_str(" VALUE FORMAT ");
491 f.write_node(value);
492 }
493 }
494 }
495}
496impl_display_t!(FormatSpecifier);
497
498#[derive(Debug, Clone, PartialEq, Eq, Hash)]
499pub enum Format<T: AstInfo> {
500 Bytes,
501 Avro(AvroSchema<T>),
502 Protobuf(ProtobufSchema<T>),
503 Regex(String),
504 Csv {
505 columns: CsvColumns,
506 delimiter: char,
507 },
508 Json {
509 array: bool,
510 },
511 Text,
512}
513
514#[derive(Debug, Clone, PartialEq, Eq, Hash)]
515pub enum CsvColumns {
516 Count(u64),
518 Header { names: Vec<Ident> },
520}
521
522impl AstDisplay for CsvColumns {
523 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
524 match self {
525 CsvColumns::Count(n) => {
526 f.write_str(n);
527 f.write_str(" COLUMNS")
528 }
529 CsvColumns::Header { names } => {
530 f.write_str("HEADER");
531 if !names.is_empty() {
532 f.write_str(" (");
533 f.write_node(&display::comma_separated(names));
534 f.write_str(")");
535 }
536 }
537 }
538 }
539}
540
541#[derive(Debug, Clone, PartialEq, Eq, Hash)]
542pub enum SourceIncludeMetadata {
543 Key {
544 alias: Option<Ident>,
545 },
546 Timestamp {
547 alias: Option<Ident>,
548 },
549 Partition {
550 alias: Option<Ident>,
551 },
552 Offset {
553 alias: Option<Ident>,
554 },
555 Headers {
556 alias: Option<Ident>,
557 },
558 Header {
559 key: String,
560 alias: Ident,
561 use_bytes: bool,
562 },
563}
564
565impl AstDisplay for SourceIncludeMetadata {
566 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
567 let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
568 if let Some(alias) = alias {
569 f.write_str(" AS ");
570 f.write_node(alias);
571 }
572 };
573
574 match self {
575 SourceIncludeMetadata::Key { alias } => {
576 f.write_str("KEY");
577 print_alias(f, alias);
578 }
579 SourceIncludeMetadata::Timestamp { alias } => {
580 f.write_str("TIMESTAMP");
581 print_alias(f, alias);
582 }
583 SourceIncludeMetadata::Partition { alias } => {
584 f.write_str("PARTITION");
585 print_alias(f, alias);
586 }
587 SourceIncludeMetadata::Offset { alias } => {
588 f.write_str("OFFSET");
589 print_alias(f, alias);
590 }
591 SourceIncludeMetadata::Headers { alias } => {
592 f.write_str("HEADERS");
593 print_alias(f, alias);
594 }
595 SourceIncludeMetadata::Header {
596 alias,
597 key,
598 use_bytes,
599 } => {
600 f.write_str("HEADER '");
601 f.write_str(&display::escape_single_quote_string(key));
602 f.write_str("'");
603 print_alias(f, &Some(alias.clone()));
604 if *use_bytes {
605 f.write_str(" BYTES");
606 }
607 }
608 }
609 }
610}
611impl_display!(SourceIncludeMetadata);
612
613#[derive(Debug, Clone, PartialEq, Eq, Hash)]
614pub enum SourceErrorPolicy {
615 Inline {
616 alias: Option<Ident>,
618 },
619}
620
621impl AstDisplay for SourceErrorPolicy {
622 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
623 match self {
624 Self::Inline { alias } => {
625 f.write_str("INLINE");
626 if let Some(alias) = alias {
627 f.write_str(" AS ");
628 f.write_node(alias);
629 }
630 }
631 }
632 }
633}
634impl_display!(SourceErrorPolicy);
635
636#[derive(Debug, Clone, PartialEq, Eq, Hash)]
637pub enum SourceEnvelope {
638 None,
639 Debezium,
640 Upsert {
641 value_decode_err_policy: Vec<SourceErrorPolicy>,
642 },
643 CdcV2,
644}
645
646impl SourceEnvelope {
647 pub fn requires_all_input(&self) -> bool {
650 match self {
651 SourceEnvelope::None => false,
652 SourceEnvelope::Debezium => false,
653 SourceEnvelope::Upsert { .. } => false,
654 SourceEnvelope::CdcV2 => true,
655 }
656 }
657}
658
659impl AstDisplay for SourceEnvelope {
660 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
661 match self {
662 Self::None => {
663 f.write_str("NONE");
665 }
666 Self::Debezium => {
667 f.write_str("DEBEZIUM");
668 }
669 Self::Upsert {
670 value_decode_err_policy,
671 } => {
672 if value_decode_err_policy.is_empty() {
673 f.write_str("UPSERT");
674 } else {
675 f.write_str("UPSERT (VALUE DECODING ERRORS = (");
676 f.write_node(&display::comma_separated(value_decode_err_policy));
677 f.write_str("))")
678 }
679 }
680 Self::CdcV2 => {
681 f.write_str("MATERIALIZE");
682 }
683 }
684 }
685}
686impl_display!(SourceEnvelope);
687
688#[derive(Debug, Clone, PartialEq, Eq, Hash)]
689pub enum SinkEnvelope {
690 Debezium,
691 Upsert,
692}
693
694impl AstDisplay for SinkEnvelope {
695 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
696 match self {
697 Self::Upsert => {
698 f.write_str("UPSERT");
699 }
700 Self::Debezium => {
701 f.write_str("DEBEZIUM");
702 }
703 }
704 }
705}
706impl_display!(SinkEnvelope);
707
708#[derive(Debug, Clone, PartialEq, Eq, Hash)]
709pub enum IcebergSinkMode {
710 Upsert,
711 Append,
712}
713
714impl AstDisplay for IcebergSinkMode {
715 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
716 match self {
717 Self::Upsert => {
718 f.write_str("UPSERT");
719 }
720 Self::Append => {
721 f.write_str("APPEND");
722 }
723 }
724 }
725}
726impl_display!(IcebergSinkMode);
727
728#[derive(Debug, Clone, PartialEq, Eq, Hash)]
729pub enum SubscribeOutput<T: AstInfo> {
730 Diffs,
731 WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
732 EnvelopeUpsert { key_columns: Vec<Ident> },
733 EnvelopeDebezium { key_columns: Vec<Ident> },
734}
735
736impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
737 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
738 match self {
739 Self::Diffs => {}
740 Self::WithinTimestampOrderBy { order_by } => {
741 f.write_str(" WITHIN TIMESTAMP ORDER BY ");
742 f.write_node(&display::comma_separated(order_by));
743 }
744 Self::EnvelopeUpsert { key_columns } => {
745 f.write_str(" ENVELOPE UPSERT (KEY (");
746 f.write_node(&display::comma_separated(key_columns));
747 f.write_str("))");
748 }
749 Self::EnvelopeDebezium { key_columns } => {
750 f.write_str(" ENVELOPE DEBEZIUM (KEY (");
751 f.write_node(&display::comma_separated(key_columns));
752 f.write_str("))");
753 }
754 }
755 }
756}
757impl_display_t!(SubscribeOutput);
758
759impl<T: AstInfo> AstDisplay for Format<T> {
760 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
761 match self {
762 Self::Bytes => f.write_str("BYTES"),
763 Self::Avro(inner) => {
764 f.write_str("AVRO ");
765 f.write_node(inner);
766 }
767 Self::Protobuf(inner) => {
768 f.write_str("PROTOBUF ");
769 f.write_node(inner);
770 }
771 Self::Regex(regex) => {
772 f.write_str("REGEX '");
773 f.write_node(&display::escape_single_quote_string(regex));
774 f.write_str("'");
775 }
776 Self::Csv { columns, delimiter } => {
777 f.write_str("CSV WITH ");
778 f.write_node(columns);
779
780 if *delimiter != ',' {
781 f.write_str(" DELIMITED BY '");
782 f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
783 f.write_str("'");
784 }
785 }
786 Self::Json { array } => {
787 f.write_str("JSON");
788 if *array {
789 f.write_str(" ARRAY");
790 }
791 }
792 Self::Text => f.write_str("TEXT"),
793 }
794 }
795}
796impl_display_t!(Format);
797
798#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
802pub enum ConnectionOptionName {
803 AccessKeyId,
804 AssumeRoleArn,
805 AssumeRoleSessionName,
806 AvailabilityZones,
807 AwsConnection,
808 AwsPrivatelink,
809 Broker,
810 Brokers,
811 Credential,
812 Database,
813 Endpoint,
814 Host,
815 Password,
816 Port,
817 ProgressTopic,
818 ProgressTopicReplicationFactor,
819 PublicKey1,
820 PublicKey2,
821 Region,
822 SaslMechanisms,
823 SaslPassword,
824 SaslUsername,
825 Scope,
826 SecretAccessKey,
827 SecurityProtocol,
828 ServiceName,
829 SshTunnel,
830 SslCertificate,
831 SslCertificateAuthority,
832 SslKey,
833 SslMode,
834 SessionToken,
835 CatalogType,
836 Url,
837 User,
838 Warehouse,
839}
840
841impl AstDisplay for ConnectionOptionName {
842 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
843 f.write_str(match self {
844 ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
845 ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
846 ConnectionOptionName::AwsConnection => "AWS CONNECTION",
847 ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
848 ConnectionOptionName::Broker => "BROKER",
849 ConnectionOptionName::Brokers => "BROKERS",
850 ConnectionOptionName::Credential => "CREDENTIAL",
851 ConnectionOptionName::Database => "DATABASE",
852 ConnectionOptionName::Endpoint => "ENDPOINT",
853 ConnectionOptionName::Host => "HOST",
854 ConnectionOptionName::Password => "PASSWORD",
855 ConnectionOptionName::Port => "PORT",
856 ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
857 ConnectionOptionName::ProgressTopicReplicationFactor => {
858 "PROGRESS TOPIC REPLICATION FACTOR"
859 }
860 ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
861 ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
862 ConnectionOptionName::Region => "REGION",
863 ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
864 ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
865 ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
866 ConnectionOptionName::SaslPassword => "SASL PASSWORD",
867 ConnectionOptionName::SaslUsername => "SASL USERNAME",
868 ConnectionOptionName::Scope => "SCOPE",
869 ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
870 ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
871 ConnectionOptionName::ServiceName => "SERVICE NAME",
872 ConnectionOptionName::SshTunnel => "SSH TUNNEL",
873 ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
874 ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
875 ConnectionOptionName::SslKey => "SSL KEY",
876 ConnectionOptionName::SslMode => "SSL MODE",
877 ConnectionOptionName::SessionToken => "SESSION TOKEN",
878 ConnectionOptionName::CatalogType => "CATALOG TYPE",
879 ConnectionOptionName::Url => "URL",
880 ConnectionOptionName::User => "USER",
881 ConnectionOptionName::Warehouse => "WAREHOUSE",
882 })
883 }
884}
885impl_display!(ConnectionOptionName);
886
887impl WithOptionName for ConnectionOptionName {
888 fn redact_value(&self) -> bool {
894 match self {
895 ConnectionOptionName::AccessKeyId
896 | ConnectionOptionName::AvailabilityZones
897 | ConnectionOptionName::AwsConnection
898 | ConnectionOptionName::AwsPrivatelink
899 | ConnectionOptionName::Broker
900 | ConnectionOptionName::Brokers
901 | ConnectionOptionName::Credential
902 | ConnectionOptionName::Database
903 | ConnectionOptionName::Endpoint
904 | ConnectionOptionName::Host
905 | ConnectionOptionName::Password
906 | ConnectionOptionName::Port
907 | ConnectionOptionName::ProgressTopic
908 | ConnectionOptionName::ProgressTopicReplicationFactor
909 | ConnectionOptionName::PublicKey1
910 | ConnectionOptionName::PublicKey2
911 | ConnectionOptionName::Region
912 | ConnectionOptionName::AssumeRoleArn
913 | ConnectionOptionName::AssumeRoleSessionName
914 | ConnectionOptionName::SaslMechanisms
915 | ConnectionOptionName::SaslPassword
916 | ConnectionOptionName::SaslUsername
917 | ConnectionOptionName::Scope
918 | ConnectionOptionName::SecurityProtocol
919 | ConnectionOptionName::SecretAccessKey
920 | ConnectionOptionName::ServiceName
921 | ConnectionOptionName::SshTunnel
922 | ConnectionOptionName::SslCertificate
923 | ConnectionOptionName::SslCertificateAuthority
924 | ConnectionOptionName::SslKey
925 | ConnectionOptionName::SslMode
926 | ConnectionOptionName::SessionToken
927 | ConnectionOptionName::CatalogType
928 | ConnectionOptionName::Url
929 | ConnectionOptionName::User
930 | ConnectionOptionName::Warehouse => false,
931 }
932 }
933}
934
935#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
936pub struct ConnectionOption<T: AstInfo> {
938 pub name: ConnectionOptionName,
939 pub value: Option<WithOptionValue<T>>,
940}
941impl_display_for_with_option!(ConnectionOption);
942impl_display_t!(ConnectionOption);
943
944#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
945pub enum CreateConnectionType {
946 Aws,
947 AwsPrivatelink,
948 Kafka,
949 Csr,
950 Postgres,
951 Ssh,
952 SqlServer,
953 MySql,
954 IcebergCatalog,
955}
956
957impl AstDisplay for CreateConnectionType {
958 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
959 match self {
960 Self::Kafka => {
961 f.write_str("KAFKA");
962 }
963 Self::Csr => {
964 f.write_str("CONFLUENT SCHEMA REGISTRY");
965 }
966 Self::Postgres => {
967 f.write_str("POSTGRES");
968 }
969 Self::Aws => {
970 f.write_str("AWS");
971 }
972 Self::AwsPrivatelink => {
973 f.write_str("AWS PRIVATELINK");
974 }
975 Self::Ssh => {
976 f.write_str("SSH TUNNEL");
977 }
978 Self::SqlServer => {
979 f.write_str("SQL SERVER");
980 }
981 Self::MySql => {
982 f.write_str("MYSQL");
983 }
984 Self::IcebergCatalog => {
985 f.write_str("ICEBERG CATALOG");
986 }
987 }
988 }
989}
990impl_display!(CreateConnectionType);
991
992#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
993pub enum CreateConnectionOptionName {
994 Validate,
995}
996
997impl AstDisplay for CreateConnectionOptionName {
998 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
999 f.write_str(match self {
1000 CreateConnectionOptionName::Validate => "VALIDATE",
1001 })
1002 }
1003}
1004impl_display!(CreateConnectionOptionName);
1005
1006impl WithOptionName for CreateConnectionOptionName {
1007 fn redact_value(&self) -> bool {
1013 match self {
1014 CreateConnectionOptionName::Validate => false,
1015 }
1016 }
1017}
1018
1019#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1020pub struct CreateConnectionOption<T: AstInfo> {
1022 pub name: CreateConnectionOptionName,
1023 pub value: Option<WithOptionValue<T>>,
1024}
1025impl_display_for_with_option!(CreateConnectionOption);
1026impl_display_t!(CreateConnectionOption);
1027
1028#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1029pub enum KafkaSourceConfigOptionName {
1030 GroupIdPrefix,
1031 Topic,
1032 TopicMetadataRefreshInterval,
1033 StartTimestamp,
1034 StartOffset,
1035}
1036
1037impl AstDisplay for KafkaSourceConfigOptionName {
1038 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1039 f.write_str(match self {
1040 KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
1041 KafkaSourceConfigOptionName::Topic => "TOPIC",
1042 KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
1043 "TOPIC METADATA REFRESH INTERVAL"
1044 }
1045 KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
1046 KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
1047 })
1048 }
1049}
1050impl_display!(KafkaSourceConfigOptionName);
1051
1052impl WithOptionName for KafkaSourceConfigOptionName {
1053 fn redact_value(&self) -> bool {
1059 match self {
1060 KafkaSourceConfigOptionName::GroupIdPrefix
1061 | KafkaSourceConfigOptionName::Topic
1062 | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1063 | KafkaSourceConfigOptionName::StartOffset
1064 | KafkaSourceConfigOptionName::StartTimestamp => false,
1065 }
1066 }
1067}
1068
1069#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1070pub struct KafkaSourceConfigOption<T: AstInfo> {
1071 pub name: KafkaSourceConfigOptionName,
1072 pub value: Option<WithOptionValue<T>>,
1073}
1074impl_display_for_with_option!(KafkaSourceConfigOption);
1075impl_display_t!(KafkaSourceConfigOption);
1076
1077#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1078pub enum KafkaSinkConfigOptionName {
1079 CompressionType,
1080 PartitionBy,
1081 ProgressGroupIdPrefix,
1082 Topic,
1083 TransactionalIdPrefix,
1084 LegacyIds,
1085 TopicConfig,
1086 TopicMetadataRefreshInterval,
1087 TopicPartitionCount,
1088 TopicReplicationFactor,
1089}
1090
1091impl AstDisplay for KafkaSinkConfigOptionName {
1092 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1093 f.write_str(match self {
1094 KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1095 KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1096 KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1097 KafkaSinkConfigOptionName::Topic => "TOPIC",
1098 KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1099 KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1100 KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1101 KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1102 "TOPIC METADATA REFRESH INTERVAL"
1103 }
1104 KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1105 KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1106 })
1107 }
1108}
1109impl_display!(KafkaSinkConfigOptionName);
1110
1111impl WithOptionName for KafkaSinkConfigOptionName {
1112 fn redact_value(&self) -> bool {
1118 match self {
1119 KafkaSinkConfigOptionName::CompressionType
1120 | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1121 | KafkaSinkConfigOptionName::Topic
1122 | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1123 | KafkaSinkConfigOptionName::TransactionalIdPrefix
1124 | KafkaSinkConfigOptionName::LegacyIds
1125 | KafkaSinkConfigOptionName::TopicConfig
1126 | KafkaSinkConfigOptionName::TopicPartitionCount
1127 | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1128 KafkaSinkConfigOptionName::PartitionBy => true,
1129 }
1130 }
1131}
1132
1133#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1134pub struct KafkaSinkConfigOption<T: AstInfo> {
1135 pub name: KafkaSinkConfigOptionName,
1136 pub value: Option<WithOptionValue<T>>,
1137}
1138impl_display_for_with_option!(KafkaSinkConfigOption);
1139impl_display_t!(KafkaSinkConfigOption);
1140
1141#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1142pub enum IcebergSinkConfigOptionName {
1143 Namespace,
1144 Table,
1145}
1146
1147impl AstDisplay for IcebergSinkConfigOptionName {
1148 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1149 f.write_str(match self {
1150 IcebergSinkConfigOptionName::Namespace => "NAMESPACE",
1151 IcebergSinkConfigOptionName::Table => "TABLE",
1152 })
1153 }
1154}
1155impl_display!(IcebergSinkConfigOptionName);
1156
1157impl WithOptionName for IcebergSinkConfigOptionName {
1158 fn redact_value(&self) -> bool {
1164 match self {
1165 IcebergSinkConfigOptionName::Namespace | IcebergSinkConfigOptionName::Table => false,
1166 }
1167 }
1168}
1169
1170#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1171pub struct IcebergSinkConfigOption<T: AstInfo> {
1172 pub name: IcebergSinkConfigOptionName,
1173 pub value: Option<WithOptionValue<T>>,
1174}
1175impl_display_for_with_option!(IcebergSinkConfigOption);
1176impl_display_t!(IcebergSinkConfigOption);
1177
1178#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1179pub enum PgConfigOptionName {
1180 Details,
1183 Publication,
1185 TextColumns,
1192 ExcludeColumns,
1199}
1200
1201impl AstDisplay for PgConfigOptionName {
1202 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1203 f.write_str(match self {
1204 PgConfigOptionName::Details => "DETAILS",
1205 PgConfigOptionName::Publication => "PUBLICATION",
1206 PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1207 PgConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1208 })
1209 }
1210}
1211impl_display!(PgConfigOptionName);
1212
1213impl WithOptionName for PgConfigOptionName {
1214 fn redact_value(&self) -> bool {
1220 match self {
1221 PgConfigOptionName::Details
1222 | PgConfigOptionName::Publication
1223 | PgConfigOptionName::TextColumns
1224 | PgConfigOptionName::ExcludeColumns => false,
1225 }
1226 }
1227}
1228
1229#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1230pub struct PgConfigOption<T: AstInfo> {
1232 pub name: PgConfigOptionName,
1233 pub value: Option<WithOptionValue<T>>,
1234}
1235impl_display_for_with_option!(PgConfigOption);
1236impl_display_t!(PgConfigOption);
1237
1238#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1239pub enum MySqlConfigOptionName {
1240 Details,
1243 TextColumns,
1250 ExcludeColumns,
1257}
1258
1259impl AstDisplay for MySqlConfigOptionName {
1260 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1261 f.write_str(match self {
1262 MySqlConfigOptionName::Details => "DETAILS",
1263 MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1264 MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1265 })
1266 }
1267}
1268impl_display!(MySqlConfigOptionName);
1269
1270impl WithOptionName for MySqlConfigOptionName {
1271 fn redact_value(&self) -> bool {
1277 match self {
1278 MySqlConfigOptionName::Details
1279 | MySqlConfigOptionName::TextColumns
1280 | MySqlConfigOptionName::ExcludeColumns => false,
1281 }
1282 }
1283}
1284
1285#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1286pub struct MySqlConfigOption<T: AstInfo> {
1288 pub name: MySqlConfigOptionName,
1289 pub value: Option<WithOptionValue<T>>,
1290}
1291impl_display_for_with_option!(MySqlConfigOption);
1292impl_display_t!(MySqlConfigOption);
1293
1294#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1295pub enum SqlServerConfigOptionName {
1296 Details,
1299 TextColumns,
1307 ExcludeColumns,
1315}
1316
1317impl AstDisplay for SqlServerConfigOptionName {
1318 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1319 f.write_str(match self {
1320 SqlServerConfigOptionName::Details => "DETAILS",
1321 SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1322 SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1323 })
1324 }
1325}
1326impl_display!(SqlServerConfigOptionName);
1327
1328impl WithOptionName for SqlServerConfigOptionName {
1329 fn redact_value(&self) -> bool {
1335 match self {
1336 SqlServerConfigOptionName::Details
1337 | SqlServerConfigOptionName::TextColumns
1338 | SqlServerConfigOptionName::ExcludeColumns => false,
1339 }
1340 }
1341}
1342
1343#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1344pub struct SqlServerConfigOption<T: AstInfo> {
1346 pub name: SqlServerConfigOptionName,
1347 pub value: Option<WithOptionValue<T>>,
1348}
1349impl_display_for_with_option!(SqlServerConfigOption);
1350impl_display_t!(SqlServerConfigOption);
1351
1352#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1353pub enum CreateSourceConnection<T: AstInfo> {
1354 Kafka {
1355 connection: T::ItemName,
1356 options: Vec<KafkaSourceConfigOption<T>>,
1357 },
1358 Postgres {
1359 connection: T::ItemName,
1360 options: Vec<PgConfigOption<T>>,
1361 },
1362 SqlServer {
1363 connection: T::ItemName,
1364 options: Vec<SqlServerConfigOption<T>>,
1365 },
1366 MySql {
1367 connection: T::ItemName,
1368 options: Vec<MySqlConfigOption<T>>,
1369 },
1370 LoadGenerator {
1371 generator: LoadGenerator,
1372 options: Vec<LoadGeneratorOption<T>>,
1373 },
1374}
1375
1376impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1377 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1378 match self {
1379 CreateSourceConnection::Kafka {
1380 connection,
1381 options,
1382 } => {
1383 f.write_str("KAFKA CONNECTION ");
1384 f.write_node(connection);
1385 if !options.is_empty() {
1386 f.write_str(" (");
1387 f.write_node(&display::comma_separated(options));
1388 f.write_str(")");
1389 }
1390 }
1391 CreateSourceConnection::Postgres {
1392 connection,
1393 options,
1394 } => {
1395 f.write_str("POSTGRES CONNECTION ");
1396 f.write_node(connection);
1397 if !options.is_empty() {
1398 f.write_str(" (");
1399 f.write_node(&display::comma_separated(options));
1400 f.write_str(")");
1401 }
1402 }
1403 CreateSourceConnection::SqlServer {
1404 connection,
1405 options,
1406 } => {
1407 f.write_str("SQL SERVER CONNECTION ");
1408 f.write_node(connection);
1409 if !options.is_empty() {
1410 f.write_str(" (");
1411 f.write_node(&display::comma_separated(options));
1412 f.write_str(")");
1413 }
1414 }
1415 CreateSourceConnection::MySql {
1416 connection,
1417 options,
1418 } => {
1419 f.write_str("MYSQL CONNECTION ");
1420 f.write_node(connection);
1421 if !options.is_empty() {
1422 f.write_str(" (");
1423 f.write_node(&display::comma_separated(options));
1424 f.write_str(")");
1425 }
1426 }
1427 CreateSourceConnection::LoadGenerator { generator, options } => {
1428 f.write_str("LOAD GENERATOR ");
1429 f.write_node(generator);
1430 if !options.is_empty() {
1431 f.write_str(" (");
1432 f.write_node(&display::comma_separated(options));
1433 f.write_str(")");
1434 }
1435 }
1436 }
1437 }
1438}
1439impl_display_t!(CreateSourceConnection);
1440
1441#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1442pub enum LoadGenerator {
1443 Clock,
1444 Counter,
1445 Marketing,
1446 Auction,
1447 Datums,
1448 Tpch,
1449 KeyValue,
1450}
1451
1452impl AstDisplay for LoadGenerator {
1453 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1454 match self {
1455 Self::Counter => f.write_str("COUNTER"),
1456 Self::Clock => f.write_str("CLOCK"),
1457 Self::Marketing => f.write_str("MARKETING"),
1458 Self::Auction => f.write_str("AUCTION"),
1459 Self::Datums => f.write_str("DATUMS"),
1460 Self::Tpch => f.write_str("TPCH"),
1461 Self::KeyValue => f.write_str("KEY VALUE"),
1462 }
1463 }
1464}
1465impl_display!(LoadGenerator);
1466
1467impl LoadGenerator {
1468 pub fn schema_name(&self) -> &'static str {
1473 match self {
1474 LoadGenerator::Counter => "counter",
1475 LoadGenerator::Clock => "clock",
1476 LoadGenerator::Marketing => "marketing",
1477 LoadGenerator::Auction => "auction",
1478 LoadGenerator::Datums => "datums",
1479 LoadGenerator::Tpch => "tpch",
1480 LoadGenerator::KeyValue => "key_value",
1481 }
1482 }
1483}
1484
1485#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1486pub enum LoadGeneratorOptionName {
1487 ScaleFactor,
1488 TickInterval,
1489 AsOf,
1490 UpTo,
1491 MaxCardinality,
1492 Keys,
1493 SnapshotRounds,
1494 TransactionalSnapshot,
1495 ValueSize,
1496 Seed,
1497 Partitions,
1498 BatchSize,
1499}
1500
1501impl AstDisplay for LoadGeneratorOptionName {
1502 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1503 f.write_str(match self {
1504 LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1505 LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1506 LoadGeneratorOptionName::AsOf => "AS OF",
1507 LoadGeneratorOptionName::UpTo => "UP TO",
1508 LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1509 LoadGeneratorOptionName::Keys => "KEYS",
1510 LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1511 LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1512 LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1513 LoadGeneratorOptionName::Seed => "SEED",
1514 LoadGeneratorOptionName::Partitions => "PARTITIONS",
1515 LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1516 })
1517 }
1518}
1519impl_display!(LoadGeneratorOptionName);
1520
1521impl WithOptionName for LoadGeneratorOptionName {
1522 fn redact_value(&self) -> bool {
1528 match self {
1529 LoadGeneratorOptionName::ScaleFactor
1530 | LoadGeneratorOptionName::TickInterval
1531 | LoadGeneratorOptionName::AsOf
1532 | LoadGeneratorOptionName::UpTo
1533 | LoadGeneratorOptionName::MaxCardinality
1534 | LoadGeneratorOptionName::Keys
1535 | LoadGeneratorOptionName::SnapshotRounds
1536 | LoadGeneratorOptionName::TransactionalSnapshot
1537 | LoadGeneratorOptionName::ValueSize
1538 | LoadGeneratorOptionName::Partitions
1539 | LoadGeneratorOptionName::BatchSize
1540 | LoadGeneratorOptionName::Seed => false,
1541 }
1542 }
1543}
1544
1545#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1546pub struct LoadGeneratorOption<T: AstInfo> {
1548 pub name: LoadGeneratorOptionName,
1549 pub value: Option<WithOptionValue<T>>,
1550}
1551impl_display_for_with_option!(LoadGeneratorOption);
1552impl_display_t!(LoadGeneratorOption);
1553
1554#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1555pub enum CreateSinkConnection<T: AstInfo> {
1556 Kafka {
1557 connection: T::ItemName,
1558 options: Vec<KafkaSinkConfigOption<T>>,
1559 key: Option<SinkKey>,
1560 headers: Option<Ident>,
1561 },
1562 Iceberg {
1563 connection: T::ItemName,
1564 aws_connection: T::ItemName,
1565 key: Option<SinkKey>,
1566 options: Vec<IcebergSinkConfigOption<T>>,
1567 },
1568}
1569
1570impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1571 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1572 match self {
1573 CreateSinkConnection::Kafka {
1574 connection,
1575 options,
1576 key,
1577 headers,
1578 } => {
1579 f.write_str("KAFKA CONNECTION ");
1580 f.write_node(connection);
1581 if !options.is_empty() {
1582 f.write_str(" (");
1583 f.write_node(&display::comma_separated(options));
1584 f.write_str(")");
1585 }
1586 if let Some(key) = key.as_ref() {
1587 f.write_str(" ");
1588 f.write_node(key);
1589 }
1590 if let Some(headers) = headers {
1591 f.write_str(" HEADERS ");
1592 f.write_node(headers);
1593 }
1594 }
1595 CreateSinkConnection::Iceberg {
1596 connection,
1597 aws_connection,
1598 key,
1599 options,
1600 } => {
1601 f.write_str("ICEBERG CATALOG CONNECTION ");
1602 f.write_node(connection);
1603 if !options.is_empty() {
1604 f.write_str(" (");
1605 f.write_node(&display::comma_separated(options));
1606 f.write_str(")");
1607 }
1608 f.write_str(" USING AWS CONNECTION ");
1609 f.write_node(aws_connection);
1610 if let Some(key) = key.as_ref() {
1611 f.write_str(" ");
1612 f.write_node(key);
1613 }
1614 }
1615 }
1616 }
1617}
1618impl_display_t!(CreateSinkConnection);
1619
1620#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1621pub struct SinkKey {
1622 pub key_columns: Vec<Ident>,
1623 pub not_enforced: bool,
1624}
1625
1626impl AstDisplay for SinkKey {
1627 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1628 f.write_str("KEY (");
1629 f.write_node(&display::comma_separated(&self.key_columns));
1630 f.write_str(")");
1631 if self.not_enforced {
1632 f.write_str(" NOT ENFORCED");
1633 }
1634 }
1635}
1636
1637#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1640pub enum TableConstraint<T: AstInfo> {
1641 Unique {
1643 name: Option<Ident>,
1644 columns: Vec<Ident>,
1645 is_primary: bool,
1647 nulls_not_distinct: bool,
1650 },
1651 ForeignKey {
1654 name: Option<Ident>,
1655 columns: Vec<Ident>,
1656 foreign_table: T::ItemName,
1657 referred_columns: Vec<Ident>,
1658 },
1659 Check {
1661 name: Option<Ident>,
1662 expr: Box<Expr<T>>,
1663 },
1664}
1665
1666impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1667 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1668 match self {
1669 TableConstraint::Unique {
1670 name,
1671 columns,
1672 is_primary,
1673 nulls_not_distinct,
1674 } => {
1675 f.write_node(&display_constraint_name(name));
1676 if *is_primary {
1677 f.write_str("PRIMARY KEY ");
1678 } else {
1679 f.write_str("UNIQUE ");
1680 if *nulls_not_distinct {
1681 f.write_str("NULLS NOT DISTINCT ");
1682 }
1683 }
1684 f.write_str("(");
1685 f.write_node(&display::comma_separated(columns));
1686 f.write_str(")");
1687 }
1688 TableConstraint::ForeignKey {
1689 name,
1690 columns,
1691 foreign_table,
1692 referred_columns,
1693 } => {
1694 f.write_node(&display_constraint_name(name));
1695 f.write_str("FOREIGN KEY (");
1696 f.write_node(&display::comma_separated(columns));
1697 f.write_str(") REFERENCES ");
1698 f.write_node(foreign_table);
1699 f.write_str("(");
1700 f.write_node(&display::comma_separated(referred_columns));
1701 f.write_str(")");
1702 }
1703 TableConstraint::Check { name, expr } => {
1704 f.write_node(&display_constraint_name(name));
1705 f.write_str("CHECK (");
1706 f.write_node(&expr);
1707 f.write_str(")");
1708 }
1709 }
1710 }
1711}
1712impl_display_t!(TableConstraint);
1713
1714#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1716pub enum KeyConstraint {
1717 PrimaryKeyNotEnforced { columns: Vec<Ident> },
1719}
1720
1721impl AstDisplay for KeyConstraint {
1722 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1723 match self {
1724 KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1725 f.write_str("PRIMARY KEY ");
1726 f.write_str("(");
1727 f.write_node(&display::comma_separated(columns));
1728 f.write_str(") ");
1729 f.write_str("NOT ENFORCED");
1730 }
1731 }
1732 }
1733}
1734impl_display!(KeyConstraint);
1735
1736#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1737pub enum CreateSourceOptionName {
1738 TimestampInterval,
1739 RetainHistory,
1740}
1741
1742impl AstDisplay for CreateSourceOptionName {
1743 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1744 f.write_str(match self {
1745 CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1746 CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1747 })
1748 }
1749}
1750impl_display!(CreateSourceOptionName);
1751
1752impl WithOptionName for CreateSourceOptionName {
1753 fn redact_value(&self) -> bool {
1759 match self {
1760 CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1761 false
1762 }
1763 }
1764 }
1765}
1766
1767#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1768pub struct CreateSourceOption<T: AstInfo> {
1770 pub name: CreateSourceOptionName,
1771 pub value: Option<WithOptionValue<T>>,
1772}
1773impl_display_for_with_option!(CreateSourceOption);
1774impl_display_t!(CreateSourceOption);
1775
1776#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1778pub struct ColumnDef<T: AstInfo> {
1779 pub name: Ident,
1780 pub data_type: T::DataType,
1781 pub collation: Option<UnresolvedItemName>,
1782 pub options: Vec<ColumnOptionDef<T>>,
1783}
1784
1785impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1786 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1787 f.write_node(&self.name);
1788 f.write_str(" ");
1789 f.write_node(&self.data_type);
1790 if let Some(collation) = &self.collation {
1791 f.write_str(" COLLATE ");
1792 f.write_node(collation);
1793 }
1794 for option in &self.options {
1795 f.write_str(" ");
1796 f.write_node(option);
1797 }
1798 }
1799}
1800impl_display_t!(ColumnDef);
1801
1802#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1819pub struct ColumnOptionDef<T: AstInfo> {
1820 pub name: Option<Ident>,
1821 pub option: ColumnOption<T>,
1822}
1823
1824impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1825 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1826 f.write_node(&display_constraint_name(&self.name));
1827 f.write_node(&self.option);
1828 }
1829}
1830impl_display_t!(ColumnOptionDef);
1831
1832#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1835pub enum ColumnOption<T: AstInfo> {
1836 Null,
1838 NotNull,
1840 Default(Expr<T>),
1842 Unique { is_primary: bool },
1844 ForeignKey {
1847 foreign_table: UnresolvedItemName,
1848 referred_columns: Vec<Ident>,
1849 },
1850 Check(Expr<T>),
1852 Versioned {
1854 action: ColumnVersioned,
1855 version: Version,
1856 },
1857}
1858
1859impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1860 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1861 use ColumnOption::*;
1862 match self {
1863 Null => f.write_str("NULL"),
1864 NotNull => f.write_str("NOT NULL"),
1865 Default(expr) => {
1866 f.write_str("DEFAULT ");
1867 f.write_node(expr);
1868 }
1869 Unique { is_primary } => {
1870 if *is_primary {
1871 f.write_str("PRIMARY KEY");
1872 } else {
1873 f.write_str("UNIQUE");
1874 }
1875 }
1876 ForeignKey {
1877 foreign_table,
1878 referred_columns,
1879 } => {
1880 f.write_str("REFERENCES ");
1881 f.write_node(foreign_table);
1882 f.write_str(" (");
1883 f.write_node(&display::comma_separated(referred_columns));
1884 f.write_str(")");
1885 }
1886 Check(expr) => {
1887 f.write_str("CHECK (");
1888 f.write_node(expr);
1889 f.write_str(")");
1890 }
1891 Versioned { action, version } => {
1892 f.write_str("VERSION ");
1893 f.write_node(action);
1894 f.write_str(" ");
1895 f.write_node(version);
1896 }
1897 }
1898 }
1899}
1900impl_display_t!(ColumnOption);
1901
1902#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1903pub enum ColumnVersioned {
1904 Added,
1905}
1906
1907impl AstDisplay for ColumnVersioned {
1908 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1909 where
1910 W: fmt::Write,
1911 {
1912 match self {
1913 ColumnVersioned::Added => f.write_str("ADDED"),
1915 }
1916 }
1917}
1918impl_display!(ColumnVersioned);
1919
1920fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
1921 struct ConstraintName<'a>(&'a Option<Ident>);
1922 impl<'a> AstDisplay for ConstraintName<'a> {
1923 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1924 where
1925 W: fmt::Write,
1926 {
1927 if let Some(name) = self.0 {
1928 f.write_str("CONSTRAINT ");
1929 f.write_node(name);
1930 f.write_str(" ");
1931 }
1932 }
1933 }
1934 ConstraintName(name)
1935}