1use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28 AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33 AssertNotNull,
35 PartitionBy,
36 RetainHistory,
37 Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43 match self {
44 MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45 MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46 MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47 MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48 }
49 }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53 fn redact_value(&self) -> bool {
59 match self {
60 MaterializedViewOptionName::AssertNotNull
61 | MaterializedViewOptionName::PartitionBy
62 | MaterializedViewOptionName::RetainHistory
63 | MaterializedViewOptionName::Refresh => false,
64 }
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70 pub name: MaterializedViewOptionName,
71 pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
76pub enum ContinualTaskOptionName {
77 Snapshot,
79}
80
81impl AstDisplay for ContinualTaskOptionName {
82 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
83 match self {
84 ContinualTaskOptionName::Snapshot => f.write_str("SNAPSHOT"),
85 }
86 }
87}
88
89impl WithOptionName for ContinualTaskOptionName {
90 fn redact_value(&self) -> bool {
96 match self {
97 ContinualTaskOptionName::Snapshot => false,
98 }
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
103pub struct ContinualTaskOption<T: AstInfo> {
104 pub name: ContinualTaskOptionName,
105 pub value: Option<WithOptionValue<T>>,
106}
107impl_display_for_with_option!(ContinualTaskOption);
108
109#[derive(Debug, Clone, PartialEq, Eq, Hash)]
110pub struct Schema {
111 pub schema: String,
112}
113
114impl AstDisplay for Schema {
115 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
116 f.write_str("SCHEMA '");
117 f.write_node(&display::escape_single_quote_string(&self.schema));
118 f.write_str("'");
119 }
120}
121impl_display!(Schema);
122
123#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
124pub enum AvroSchemaOptionName {
125 ConfluentWireFormat,
127}
128
129impl AstDisplay for AvroSchemaOptionName {
130 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
131 match self {
132 AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
133 }
134 }
135}
136
137impl WithOptionName for AvroSchemaOptionName {
138 fn redact_value(&self) -> bool {
144 match self {
145 Self::ConfluentWireFormat => false,
146 }
147 }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
151pub struct AvroSchemaOption<T: AstInfo> {
152 pub name: AvroSchemaOptionName,
153 pub value: Option<WithOptionValue<T>>,
154}
155impl_display_for_with_option!(AvroSchemaOption);
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum AvroSchema<T: AstInfo> {
159 Csr {
160 csr_connection: CsrConnectionAvro<T>,
161 },
162 InlineSchema {
163 schema: Schema,
164 with_options: Vec<AvroSchemaOption<T>>,
165 },
166}
167
168impl<T: AstInfo> AstDisplay for AvroSchema<T> {
169 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
170 match self {
171 Self::Csr { csr_connection } => {
172 f.write_node(csr_connection);
173 }
174 Self::InlineSchema {
175 schema,
176 with_options,
177 } => {
178 f.write_str("USING ");
179 schema.fmt(f);
180 if !with_options.is_empty() {
181 f.write_str(" (");
182 f.write_node(&display::comma_separated(with_options));
183 f.write_str(")");
184 }
185 }
186 }
187 }
188}
189impl_display_t!(AvroSchema);
190
191#[derive(Debug, Clone, PartialEq, Eq, Hash)]
192pub enum ProtobufSchema<T: AstInfo> {
193 Csr {
194 csr_connection: CsrConnectionProtobuf<T>,
195 },
196 InlineSchema {
197 message_name: String,
198 schema: Schema,
199 },
200}
201
202impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
203 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
204 match self {
205 Self::Csr { csr_connection } => {
206 f.write_node(csr_connection);
207 }
208 Self::InlineSchema {
209 message_name,
210 schema,
211 } => {
212 f.write_str("MESSAGE '");
213 f.write_node(&display::escape_single_quote_string(message_name));
214 f.write_str("' USING ");
215 f.write_str(schema);
216 }
217 }
218 }
219}
220impl_display_t!(ProtobufSchema);
221
222#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
223pub enum CsrConfigOptionName<T: AstInfo> {
224 AvroKeyFullname,
225 AvroValueFullname,
226 NullDefaults,
227 AvroDocOn(AvroDocOn<T>),
228 KeyCompatibilityLevel,
229 ValueCompatibilityLevel,
230}
231
232impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
233 fn redact_value(&self) -> bool {
239 match self {
240 Self::AvroKeyFullname
241 | Self::AvroValueFullname
242 | Self::NullDefaults
243 | Self::AvroDocOn(_)
244 | Self::KeyCompatibilityLevel
245 | Self::ValueCompatibilityLevel => false,
246 }
247 }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
251pub struct AvroDocOn<T: AstInfo> {
252 pub identifier: DocOnIdentifier<T>,
253 pub for_schema: DocOnSchema,
254}
255#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
256pub enum DocOnSchema {
257 KeyOnly,
258 ValueOnly,
259 All,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
263pub enum DocOnIdentifier<T: AstInfo> {
264 Column(ColumnName<T>),
265 Type(T::ItemName),
266}
267
268impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
269 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
270 match &self.for_schema {
271 DocOnSchema::KeyOnly => f.write_str("KEY "),
272 DocOnSchema::ValueOnly => f.write_str("VALUE "),
273 DocOnSchema::All => {}
274 }
275 match &self.identifier {
276 DocOnIdentifier::Column(name) => {
277 f.write_str("DOC ON COLUMN ");
278 f.write_node(name);
279 }
280 DocOnIdentifier::Type(name) => {
281 f.write_str("DOC ON TYPE ");
282 f.write_node(name);
283 }
284 }
285 }
286}
287impl_display_t!(AvroDocOn);
288
289impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
290 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
291 match self {
292 CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
293 CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
294 CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
295 CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
296 CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
297 CsrConfigOptionName::ValueCompatibilityLevel => {
298 f.write_str("VALUE COMPATIBILITY LEVEL")
299 }
300 }
301 }
302}
303impl_display_t!(CsrConfigOptionName);
304
305#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
306pub struct CsrConfigOption<T: AstInfo> {
308 pub name: CsrConfigOptionName<T>,
309 pub value: Option<WithOptionValue<T>>,
310}
311impl_display_for_with_option!(CsrConfigOption);
312impl_display_t!(CsrConfigOption);
313
314#[derive(Debug, Clone, PartialEq, Eq, Hash)]
315pub struct CsrConnection<T: AstInfo> {
316 pub connection: T::ItemName,
317 pub options: Vec<CsrConfigOption<T>>,
318}
319
320impl<T: AstInfo> AstDisplay for CsrConnection<T> {
321 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
322 f.write_str("CONNECTION ");
323 f.write_node(&self.connection);
324 if !self.options.is_empty() {
325 f.write_str(" (");
326 f.write_node(&display::comma_separated(&self.options));
327 f.write_str(")");
328 }
329 }
330}
331impl_display_t!(CsrConnection);
332
333#[derive(Debug, Clone, PartialEq, Eq, Hash)]
334pub enum ReaderSchemaSelectionStrategy {
335 Latest,
336 Inline(String),
337 ById(i32),
338}
339
340impl Default for ReaderSchemaSelectionStrategy {
341 fn default() -> Self {
342 Self::Latest
343 }
344}
345
346#[derive(Debug, Clone, PartialEq, Eq, Hash)]
347pub struct CsrConnectionAvro<T: AstInfo> {
348 pub connection: CsrConnection<T>,
349 pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
350 pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
351 pub seed: Option<CsrSeedAvro>,
352}
353
354impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
355 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
356 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
357 f.write_node(&self.connection);
358 if let Some(seed) = &self.seed {
359 f.write_str(" ");
360 f.write_node(seed);
361 }
362 }
363}
364impl_display_t!(CsrConnectionAvro);
365
366#[derive(Debug, Clone, PartialEq, Eq, Hash)]
367pub struct CsrConnectionProtobuf<T: AstInfo> {
368 pub connection: CsrConnection<T>,
369 pub seed: Option<CsrSeedProtobuf>,
370}
371
372impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
373 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
374 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
375 f.write_node(&self.connection);
376
377 if let Some(seed) = &self.seed {
378 f.write_str(" ");
379 f.write_node(seed);
380 }
381 }
382}
383impl_display_t!(CsrConnectionProtobuf);
384
385#[derive(Debug, Clone, PartialEq, Eq, Hash)]
386pub struct CsrSeedAvro {
387 pub key_schema: Option<String>,
388 pub value_schema: String,
389}
390
391impl AstDisplay for CsrSeedAvro {
392 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
393 f.write_str("SEED");
394 if let Some(key_schema) = &self.key_schema {
395 f.write_str(" KEY SCHEMA '");
396 f.write_node(&display::escape_single_quote_string(key_schema));
397 f.write_str("'");
398 }
399 f.write_str(" VALUE SCHEMA '");
400 f.write_node(&display::escape_single_quote_string(&self.value_schema));
401 f.write_str("'");
402 }
403}
404impl_display!(CsrSeedAvro);
405
406#[derive(Debug, Clone, PartialEq, Eq, Hash)]
407pub struct CsrSeedProtobuf {
408 pub key: Option<CsrSeedProtobufSchema>,
409 pub value: CsrSeedProtobufSchema,
410}
411
412impl AstDisplay for CsrSeedProtobuf {
413 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
414 f.write_str("SEED");
415 if let Some(key) = &self.key {
416 f.write_str(" KEY ");
417 f.write_node(key);
418 }
419 f.write_str(" VALUE ");
420 f.write_node(&self.value);
421 }
422}
423impl_display!(CsrSeedProtobuf);
424
425#[derive(Debug, Clone, PartialEq, Eq, Hash)]
426pub struct CsrSeedProtobufSchema {
427 pub schema: String,
429 pub message_name: String,
430}
431impl AstDisplay for CsrSeedProtobufSchema {
432 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
433 f.write_str("SCHEMA '");
434 f.write_str(&display::escape_single_quote_string(&self.schema));
435 f.write_str("' MESSAGE '");
436 f.write_str(&self.message_name);
437 f.write_str("'");
438 }
439}
440impl_display!(CsrSeedProtobufSchema);
441
442#[derive(Debug, Clone, PartialEq, Eq, Hash)]
443pub enum FormatSpecifier<T: AstInfo> {
444 Bare(Format<T>),
446 KeyValue { key: Format<T>, value: Format<T> },
448}
449
450impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
451 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
452 match self {
453 FormatSpecifier::Bare(format) => {
454 f.write_str("FORMAT ");
455 f.write_node(format)
456 }
457 FormatSpecifier::KeyValue { key, value } => {
458 f.write_str("KEY FORMAT ");
459 f.write_node(key);
460 f.write_str(" VALUE FORMAT ");
461 f.write_node(value);
462 }
463 }
464 }
465}
466impl_display_t!(FormatSpecifier);
467
468#[derive(Debug, Clone, PartialEq, Eq, Hash)]
469pub enum Format<T: AstInfo> {
470 Bytes,
471 Avro(AvroSchema<T>),
472 Protobuf(ProtobufSchema<T>),
473 Regex(String),
474 Csv {
475 columns: CsvColumns,
476 delimiter: char,
477 },
478 Json {
479 array: bool,
480 },
481 Text,
482}
483
484#[derive(Debug, Clone, PartialEq, Eq, Hash)]
485pub enum CsvColumns {
486 Count(u64),
488 Header { names: Vec<Ident> },
490}
491
492impl AstDisplay for CsvColumns {
493 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
494 match self {
495 CsvColumns::Count(n) => {
496 f.write_str(n);
497 f.write_str(" COLUMNS")
498 }
499 CsvColumns::Header { names } => {
500 f.write_str("HEADER");
501 if !names.is_empty() {
502 f.write_str(" (");
503 f.write_node(&display::comma_separated(names));
504 f.write_str(")");
505 }
506 }
507 }
508 }
509}
510
511#[derive(Debug, Clone, PartialEq, Eq, Hash)]
512pub enum SourceIncludeMetadata {
513 Key {
514 alias: Option<Ident>,
515 },
516 Timestamp {
517 alias: Option<Ident>,
518 },
519 Partition {
520 alias: Option<Ident>,
521 },
522 Offset {
523 alias: Option<Ident>,
524 },
525 Headers {
526 alias: Option<Ident>,
527 },
528 Header {
529 key: String,
530 alias: Ident,
531 use_bytes: bool,
532 },
533}
534
535impl AstDisplay for SourceIncludeMetadata {
536 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
537 let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
538 if let Some(alias) = alias {
539 f.write_str(" AS ");
540 f.write_node(alias);
541 }
542 };
543
544 match self {
545 SourceIncludeMetadata::Key { alias } => {
546 f.write_str("KEY");
547 print_alias(f, alias);
548 }
549 SourceIncludeMetadata::Timestamp { alias } => {
550 f.write_str("TIMESTAMP");
551 print_alias(f, alias);
552 }
553 SourceIncludeMetadata::Partition { alias } => {
554 f.write_str("PARTITION");
555 print_alias(f, alias);
556 }
557 SourceIncludeMetadata::Offset { alias } => {
558 f.write_str("OFFSET");
559 print_alias(f, alias);
560 }
561 SourceIncludeMetadata::Headers { alias } => {
562 f.write_str("HEADERS");
563 print_alias(f, alias);
564 }
565 SourceIncludeMetadata::Header {
566 alias,
567 key,
568 use_bytes,
569 } => {
570 f.write_str("HEADER '");
571 f.write_str(&display::escape_single_quote_string(key));
572 f.write_str("'");
573 print_alias(f, &Some(alias.clone()));
574 if *use_bytes {
575 f.write_str(" BYTES");
576 }
577 }
578 }
579 }
580}
581impl_display!(SourceIncludeMetadata);
582
583#[derive(Debug, Clone, PartialEq, Eq, Hash)]
584pub enum SourceErrorPolicy {
585 Inline {
586 alias: Option<Ident>,
588 },
589}
590
591impl AstDisplay for SourceErrorPolicy {
592 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
593 match self {
594 Self::Inline { alias } => {
595 f.write_str("INLINE");
596 if let Some(alias) = alias {
597 f.write_str(" AS ");
598 f.write_node(alias);
599 }
600 }
601 }
602 }
603}
604impl_display!(SourceErrorPolicy);
605
606#[derive(Debug, Clone, PartialEq, Eq, Hash)]
607pub enum SourceEnvelope {
608 None,
609 Debezium,
610 Upsert {
611 value_decode_err_policy: Vec<SourceErrorPolicy>,
612 },
613 CdcV2,
614}
615
616impl SourceEnvelope {
617 pub fn requires_all_input(&self) -> bool {
620 match self {
621 SourceEnvelope::None => false,
622 SourceEnvelope::Debezium => false,
623 SourceEnvelope::Upsert { .. } => false,
624 SourceEnvelope::CdcV2 => true,
625 }
626 }
627}
628
629impl AstDisplay for SourceEnvelope {
630 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
631 match self {
632 Self::None => {
633 f.write_str("NONE");
635 }
636 Self::Debezium => {
637 f.write_str("DEBEZIUM");
638 }
639 Self::Upsert {
640 value_decode_err_policy,
641 } => {
642 if value_decode_err_policy.is_empty() {
643 f.write_str("UPSERT");
644 } else {
645 f.write_str("UPSERT (VALUE DECODING ERRORS = (");
646 f.write_node(&display::comma_separated(value_decode_err_policy));
647 f.write_str("))")
648 }
649 }
650 Self::CdcV2 => {
651 f.write_str("MATERIALIZE");
652 }
653 }
654 }
655}
656impl_display!(SourceEnvelope);
657
658#[derive(Debug, Clone, PartialEq, Eq, Hash)]
659pub enum SinkEnvelope {
660 Debezium,
661 Upsert,
662}
663
664impl AstDisplay for SinkEnvelope {
665 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
666 match self {
667 Self::Upsert => {
668 f.write_str("UPSERT");
669 }
670 Self::Debezium => {
671 f.write_str("DEBEZIUM");
672 }
673 }
674 }
675}
676impl_display!(SinkEnvelope);
677
678#[derive(Debug, Clone, PartialEq, Eq, Hash)]
679pub enum SubscribeOutput<T: AstInfo> {
680 Diffs,
681 WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
682 EnvelopeUpsert { key_columns: Vec<Ident> },
683 EnvelopeDebezium { key_columns: Vec<Ident> },
684}
685
686impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
687 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
688 match self {
689 Self::Diffs => {}
690 Self::WithinTimestampOrderBy { order_by } => {
691 f.write_str(" WITHIN TIMESTAMP ORDER BY ");
692 f.write_node(&display::comma_separated(order_by));
693 }
694 Self::EnvelopeUpsert { key_columns } => {
695 f.write_str(" ENVELOPE UPSERT (KEY (");
696 f.write_node(&display::comma_separated(key_columns));
697 f.write_str("))");
698 }
699 Self::EnvelopeDebezium { key_columns } => {
700 f.write_str(" ENVELOPE DEBEZIUM (KEY (");
701 f.write_node(&display::comma_separated(key_columns));
702 f.write_str("))");
703 }
704 }
705 }
706}
707impl_display_t!(SubscribeOutput);
708
709impl<T: AstInfo> AstDisplay for Format<T> {
710 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
711 match self {
712 Self::Bytes => f.write_str("BYTES"),
713 Self::Avro(inner) => {
714 f.write_str("AVRO ");
715 f.write_node(inner);
716 }
717 Self::Protobuf(inner) => {
718 f.write_str("PROTOBUF ");
719 f.write_node(inner);
720 }
721 Self::Regex(regex) => {
722 f.write_str("REGEX '");
723 f.write_node(&display::escape_single_quote_string(regex));
724 f.write_str("'");
725 }
726 Self::Csv { columns, delimiter } => {
727 f.write_str("CSV WITH ");
728 f.write_node(columns);
729
730 if *delimiter != ',' {
731 f.write_str(" DELIMITED BY '");
732 f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
733 f.write_str("'");
734 }
735 }
736 Self::Json { array } => {
737 f.write_str("JSON");
738 if *array {
739 f.write_str(" ARRAY");
740 }
741 }
742 Self::Text => f.write_str("TEXT"),
743 }
744 }
745}
746impl_display_t!(Format);
747
748#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
752pub enum ConnectionOptionName {
753 AccessKeyId,
754 AssumeRoleArn,
755 AssumeRoleSessionName,
756 AvailabilityZones,
757 AwsConnection,
758 AwsPrivatelink,
759 Broker,
760 Brokers,
761 Credential,
762 Database,
763 Endpoint,
764 Host,
765 Password,
766 Port,
767 ProgressTopic,
768 ProgressTopicReplicationFactor,
769 PublicKey1,
770 PublicKey2,
771 Region,
772 SaslMechanisms,
773 SaslPassword,
774 SaslUsername,
775 Scope,
776 SecretAccessKey,
777 SecurityProtocol,
778 ServiceName,
779 SshTunnel,
780 SslCertificate,
781 SslCertificateAuthority,
782 SslKey,
783 SslMode,
784 SessionToken,
785 CatalogType,
786 Url,
787 User,
788 Warehouse,
789}
790
791impl AstDisplay for ConnectionOptionName {
792 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
793 f.write_str(match self {
794 ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
795 ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
796 ConnectionOptionName::AwsConnection => "AWS CONNECTION",
797 ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
798 ConnectionOptionName::Broker => "BROKER",
799 ConnectionOptionName::Brokers => "BROKERS",
800 ConnectionOptionName::Credential => "CREDENTIAL",
801 ConnectionOptionName::Database => "DATABASE",
802 ConnectionOptionName::Endpoint => "ENDPOINT",
803 ConnectionOptionName::Host => "HOST",
804 ConnectionOptionName::Password => "PASSWORD",
805 ConnectionOptionName::Port => "PORT",
806 ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
807 ConnectionOptionName::ProgressTopicReplicationFactor => {
808 "PROGRESS TOPIC REPLICATION FACTOR"
809 }
810 ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
811 ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
812 ConnectionOptionName::Region => "REGION",
813 ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
814 ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
815 ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
816 ConnectionOptionName::SaslPassword => "SASL PASSWORD",
817 ConnectionOptionName::SaslUsername => "SASL USERNAME",
818 ConnectionOptionName::Scope => "SCOPE",
819 ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
820 ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
821 ConnectionOptionName::ServiceName => "SERVICE NAME",
822 ConnectionOptionName::SshTunnel => "SSH TUNNEL",
823 ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
824 ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
825 ConnectionOptionName::SslKey => "SSL KEY",
826 ConnectionOptionName::SslMode => "SSL MODE",
827 ConnectionOptionName::SessionToken => "SESSION TOKEN",
828 ConnectionOptionName::CatalogType => "CATALOG TYPE",
829 ConnectionOptionName::Url => "URL",
830 ConnectionOptionName::User => "USER",
831 ConnectionOptionName::Warehouse => "WAREHOUSE",
832 })
833 }
834}
835impl_display!(ConnectionOptionName);
836
837impl WithOptionName for ConnectionOptionName {
838 fn redact_value(&self) -> bool {
844 match self {
845 ConnectionOptionName::AccessKeyId
846 | ConnectionOptionName::AvailabilityZones
847 | ConnectionOptionName::AwsConnection
848 | ConnectionOptionName::AwsPrivatelink
849 | ConnectionOptionName::Broker
850 | ConnectionOptionName::Brokers
851 | ConnectionOptionName::Credential
852 | ConnectionOptionName::Database
853 | ConnectionOptionName::Endpoint
854 | ConnectionOptionName::Host
855 | ConnectionOptionName::Password
856 | ConnectionOptionName::Port
857 | ConnectionOptionName::ProgressTopic
858 | ConnectionOptionName::ProgressTopicReplicationFactor
859 | ConnectionOptionName::PublicKey1
860 | ConnectionOptionName::PublicKey2
861 | ConnectionOptionName::Region
862 | ConnectionOptionName::AssumeRoleArn
863 | ConnectionOptionName::AssumeRoleSessionName
864 | ConnectionOptionName::SaslMechanisms
865 | ConnectionOptionName::SaslPassword
866 | ConnectionOptionName::SaslUsername
867 | ConnectionOptionName::Scope
868 | ConnectionOptionName::SecurityProtocol
869 | ConnectionOptionName::SecretAccessKey
870 | ConnectionOptionName::ServiceName
871 | ConnectionOptionName::SshTunnel
872 | ConnectionOptionName::SslCertificate
873 | ConnectionOptionName::SslCertificateAuthority
874 | ConnectionOptionName::SslKey
875 | ConnectionOptionName::SslMode
876 | ConnectionOptionName::SessionToken
877 | ConnectionOptionName::CatalogType
878 | ConnectionOptionName::Url
879 | ConnectionOptionName::User
880 | ConnectionOptionName::Warehouse => false,
881 }
882 }
883}
884
885#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
886pub struct ConnectionOption<T: AstInfo> {
888 pub name: ConnectionOptionName,
889 pub value: Option<WithOptionValue<T>>,
890}
891impl_display_for_with_option!(ConnectionOption);
892impl_display_t!(ConnectionOption);
893
894#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
895pub enum CreateConnectionType {
896 Aws,
897 AwsPrivatelink,
898 Kafka,
899 Csr,
900 Postgres,
901 Ssh,
902 SqlServer,
903 MySql,
904 Yugabyte,
905 IcebergCatalog,
906}
907
908impl AstDisplay for CreateConnectionType {
909 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
910 match self {
911 Self::Kafka => {
912 f.write_str("KAFKA");
913 }
914 Self::Csr => {
915 f.write_str("CONFLUENT SCHEMA REGISTRY");
916 }
917 Self::Postgres => {
918 f.write_str("POSTGRES");
919 }
920 Self::Aws => {
921 f.write_str("AWS");
922 }
923 Self::AwsPrivatelink => {
924 f.write_str("AWS PRIVATELINK");
925 }
926 Self::Ssh => {
927 f.write_str("SSH TUNNEL");
928 }
929 Self::SqlServer => {
930 f.write_str("SQL SERVER");
931 }
932 Self::MySql => {
933 f.write_str("MYSQL");
934 }
935 Self::Yugabyte => {
936 f.write_str("YUGABYTE");
937 }
938 Self::IcebergCatalog => {
939 f.write_str("ICEBERG CATALOG");
940 }
941 }
942 }
943}
944impl_display!(CreateConnectionType);
945
946#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
947pub enum CreateConnectionOptionName {
948 Validate,
949}
950
951impl AstDisplay for CreateConnectionOptionName {
952 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
953 f.write_str(match self {
954 CreateConnectionOptionName::Validate => "VALIDATE",
955 })
956 }
957}
958impl_display!(CreateConnectionOptionName);
959
960impl WithOptionName for CreateConnectionOptionName {
961 fn redact_value(&self) -> bool {
967 match self {
968 CreateConnectionOptionName::Validate => false,
969 }
970 }
971}
972
973#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
974pub struct CreateConnectionOption<T: AstInfo> {
976 pub name: CreateConnectionOptionName,
977 pub value: Option<WithOptionValue<T>>,
978}
979impl_display_for_with_option!(CreateConnectionOption);
980impl_display_t!(CreateConnectionOption);
981
982#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
983pub enum KafkaSourceConfigOptionName {
984 GroupIdPrefix,
985 Topic,
986 TopicMetadataRefreshInterval,
987 StartTimestamp,
988 StartOffset,
989}
990
991impl AstDisplay for KafkaSourceConfigOptionName {
992 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
993 f.write_str(match self {
994 KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
995 KafkaSourceConfigOptionName::Topic => "TOPIC",
996 KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
997 "TOPIC METADATA REFRESH INTERVAL"
998 }
999 KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
1000 KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
1001 })
1002 }
1003}
1004impl_display!(KafkaSourceConfigOptionName);
1005
1006impl WithOptionName for KafkaSourceConfigOptionName {
1007 fn redact_value(&self) -> bool {
1013 match self {
1014 KafkaSourceConfigOptionName::GroupIdPrefix
1015 | KafkaSourceConfigOptionName::Topic
1016 | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1017 | KafkaSourceConfigOptionName::StartOffset
1018 | KafkaSourceConfigOptionName::StartTimestamp => false,
1019 }
1020 }
1021}
1022
1023#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1024pub struct KafkaSourceConfigOption<T: AstInfo> {
1025 pub name: KafkaSourceConfigOptionName,
1026 pub value: Option<WithOptionValue<T>>,
1027}
1028impl_display_for_with_option!(KafkaSourceConfigOption);
1029impl_display_t!(KafkaSourceConfigOption);
1030
1031#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1032pub enum KafkaSinkConfigOptionName {
1033 CompressionType,
1034 PartitionBy,
1035 ProgressGroupIdPrefix,
1036 Topic,
1037 TransactionalIdPrefix,
1038 LegacyIds,
1039 TopicConfig,
1040 TopicMetadataRefreshInterval,
1041 TopicPartitionCount,
1042 TopicReplicationFactor,
1043}
1044
1045impl AstDisplay for KafkaSinkConfigOptionName {
1046 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1047 f.write_str(match self {
1048 KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1049 KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1050 KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1051 KafkaSinkConfigOptionName::Topic => "TOPIC",
1052 KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1053 KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1054 KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1055 KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1056 "TOPIC METADATA REFRESH INTERVAL"
1057 }
1058 KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1059 KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1060 })
1061 }
1062}
1063impl_display!(KafkaSinkConfigOptionName);
1064
1065impl WithOptionName for KafkaSinkConfigOptionName {
1066 fn redact_value(&self) -> bool {
1072 match self {
1073 KafkaSinkConfigOptionName::CompressionType
1074 | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1075 | KafkaSinkConfigOptionName::Topic
1076 | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1077 | KafkaSinkConfigOptionName::TransactionalIdPrefix
1078 | KafkaSinkConfigOptionName::LegacyIds
1079 | KafkaSinkConfigOptionName::TopicConfig
1080 | KafkaSinkConfigOptionName::TopicPartitionCount
1081 | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1082 KafkaSinkConfigOptionName::PartitionBy => true,
1083 }
1084 }
1085}
1086
1087#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1088pub struct KafkaSinkConfigOption<T: AstInfo> {
1089 pub name: KafkaSinkConfigOptionName,
1090 pub value: Option<WithOptionValue<T>>,
1091}
1092impl_display_for_with_option!(KafkaSinkConfigOption);
1093impl_display_t!(KafkaSinkConfigOption);
1094
1095#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1096pub enum PgConfigOptionName {
1097 Details,
1100 Publication,
1102 TextColumns,
1109}
1110
1111impl AstDisplay for PgConfigOptionName {
1112 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1113 f.write_str(match self {
1114 PgConfigOptionName::Details => "DETAILS",
1115 PgConfigOptionName::Publication => "PUBLICATION",
1116 PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1117 })
1118 }
1119}
1120impl_display!(PgConfigOptionName);
1121
1122impl WithOptionName for PgConfigOptionName {
1123 fn redact_value(&self) -> bool {
1129 match self {
1130 PgConfigOptionName::Details
1131 | PgConfigOptionName::Publication
1132 | PgConfigOptionName::TextColumns => false,
1133 }
1134 }
1135}
1136
1137#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1138pub struct PgConfigOption<T: AstInfo> {
1140 pub name: PgConfigOptionName,
1141 pub value: Option<WithOptionValue<T>>,
1142}
1143impl_display_for_with_option!(PgConfigOption);
1144impl_display_t!(PgConfigOption);
1145
1146#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1147pub enum MySqlConfigOptionName {
1148 Details,
1151 TextColumns,
1158 ExcludeColumns,
1165}
1166
1167impl AstDisplay for MySqlConfigOptionName {
1168 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1169 f.write_str(match self {
1170 MySqlConfigOptionName::Details => "DETAILS",
1171 MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1172 MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1173 })
1174 }
1175}
1176impl_display!(MySqlConfigOptionName);
1177
1178impl WithOptionName for MySqlConfigOptionName {
1179 fn redact_value(&self) -> bool {
1185 match self {
1186 MySqlConfigOptionName::Details
1187 | MySqlConfigOptionName::TextColumns
1188 | MySqlConfigOptionName::ExcludeColumns => false,
1189 }
1190 }
1191}
1192
1193#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1194pub struct MySqlConfigOption<T: AstInfo> {
1196 pub name: MySqlConfigOptionName,
1197 pub value: Option<WithOptionValue<T>>,
1198}
1199impl_display_for_with_option!(MySqlConfigOption);
1200impl_display_t!(MySqlConfigOption);
1201
1202#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1203pub enum SqlServerConfigOptionName {
1204 Details,
1207 TextColumns,
1215 ExcludeColumns,
1223}
1224
1225impl AstDisplay for SqlServerConfigOptionName {
1226 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1227 f.write_str(match self {
1228 SqlServerConfigOptionName::Details => "DETAILS",
1229 SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1230 SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1231 })
1232 }
1233}
1234impl_display!(SqlServerConfigOptionName);
1235
1236impl WithOptionName for SqlServerConfigOptionName {
1237 fn redact_value(&self) -> bool {
1243 match self {
1244 SqlServerConfigOptionName::Details
1245 | SqlServerConfigOptionName::TextColumns
1246 | SqlServerConfigOptionName::ExcludeColumns => false,
1247 }
1248 }
1249}
1250
1251#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1252pub struct SqlServerConfigOption<T: AstInfo> {
1254 pub name: SqlServerConfigOptionName,
1255 pub value: Option<WithOptionValue<T>>,
1256}
1257impl_display_for_with_option!(SqlServerConfigOption);
1258impl_display_t!(SqlServerConfigOption);
1259
1260#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1261pub enum CreateSourceConnection<T: AstInfo> {
1262 Kafka {
1263 connection: T::ItemName,
1264 options: Vec<KafkaSourceConfigOption<T>>,
1265 },
1266 Postgres {
1267 connection: T::ItemName,
1268 options: Vec<PgConfigOption<T>>,
1269 },
1270 Yugabyte {
1271 connection: T::ItemName,
1272 options: Vec<PgConfigOption<T>>,
1273 },
1274 SqlServer {
1275 connection: T::ItemName,
1276 options: Vec<SqlServerConfigOption<T>>,
1277 },
1278 MySql {
1279 connection: T::ItemName,
1280 options: Vec<MySqlConfigOption<T>>,
1281 },
1282 LoadGenerator {
1283 generator: LoadGenerator,
1284 options: Vec<LoadGeneratorOption<T>>,
1285 },
1286}
1287
1288impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1289 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1290 match self {
1291 CreateSourceConnection::Kafka {
1292 connection,
1293 options,
1294 } => {
1295 f.write_str("KAFKA CONNECTION ");
1296 f.write_node(connection);
1297 if !options.is_empty() {
1298 f.write_str(" (");
1299 f.write_node(&display::comma_separated(options));
1300 f.write_str(")");
1301 }
1302 }
1303 CreateSourceConnection::Postgres {
1304 connection,
1305 options,
1306 } => {
1307 f.write_str("POSTGRES CONNECTION ");
1308 f.write_node(connection);
1309 if !options.is_empty() {
1310 f.write_str(" (");
1311 f.write_node(&display::comma_separated(options));
1312 f.write_str(")");
1313 }
1314 }
1315 CreateSourceConnection::Yugabyte {
1316 connection,
1317 options,
1318 } => {
1319 f.write_str("YUGABYTE CONNECTION ");
1320 f.write_node(connection);
1321 if !options.is_empty() {
1322 f.write_str(" (");
1323 f.write_node(&display::comma_separated(options));
1324 f.write_str(")");
1325 }
1326 }
1327 CreateSourceConnection::SqlServer {
1328 connection,
1329 options,
1330 } => {
1331 f.write_str("SQL SERVER CONNECTION ");
1332 f.write_node(connection);
1333 if !options.is_empty() {
1334 f.write_str(" (");
1335 f.write_node(&display::comma_separated(options));
1336 f.write_str(")");
1337 }
1338 }
1339 CreateSourceConnection::MySql {
1340 connection,
1341 options,
1342 } => {
1343 f.write_str("MYSQL CONNECTION ");
1344 f.write_node(connection);
1345 if !options.is_empty() {
1346 f.write_str(" (");
1347 f.write_node(&display::comma_separated(options));
1348 f.write_str(")");
1349 }
1350 }
1351 CreateSourceConnection::LoadGenerator { generator, options } => {
1352 f.write_str("LOAD GENERATOR ");
1353 f.write_node(generator);
1354 if !options.is_empty() {
1355 f.write_str(" (");
1356 f.write_node(&display::comma_separated(options));
1357 f.write_str(")");
1358 }
1359 }
1360 }
1361 }
1362}
1363impl_display_t!(CreateSourceConnection);
1364
1365#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1366pub enum LoadGenerator {
1367 Clock,
1368 Counter,
1369 Marketing,
1370 Auction,
1371 Datums,
1372 Tpch,
1373 KeyValue,
1374}
1375
1376impl AstDisplay for LoadGenerator {
1377 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1378 match self {
1379 Self::Counter => f.write_str("COUNTER"),
1380 Self::Clock => f.write_str("CLOCK"),
1381 Self::Marketing => f.write_str("MARKETING"),
1382 Self::Auction => f.write_str("AUCTION"),
1383 Self::Datums => f.write_str("DATUMS"),
1384 Self::Tpch => f.write_str("TPCH"),
1385 Self::KeyValue => f.write_str("KEY VALUE"),
1386 }
1387 }
1388}
1389impl_display!(LoadGenerator);
1390
1391impl LoadGenerator {
1392 pub fn schema_name(&self) -> &'static str {
1397 match self {
1398 LoadGenerator::Counter => "counter",
1399 LoadGenerator::Clock => "clock",
1400 LoadGenerator::Marketing => "marketing",
1401 LoadGenerator::Auction => "auction",
1402 LoadGenerator::Datums => "datums",
1403 LoadGenerator::Tpch => "tpch",
1404 LoadGenerator::KeyValue => "key_value",
1405 }
1406 }
1407}
1408
1409#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1410pub enum LoadGeneratorOptionName {
1411 ScaleFactor,
1412 TickInterval,
1413 AsOf,
1414 UpTo,
1415 MaxCardinality,
1416 Keys,
1417 SnapshotRounds,
1418 TransactionalSnapshot,
1419 ValueSize,
1420 Seed,
1421 Partitions,
1422 BatchSize,
1423}
1424
1425impl AstDisplay for LoadGeneratorOptionName {
1426 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1427 f.write_str(match self {
1428 LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1429 LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1430 LoadGeneratorOptionName::AsOf => "AS OF",
1431 LoadGeneratorOptionName::UpTo => "UP TO",
1432 LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1433 LoadGeneratorOptionName::Keys => "KEYS",
1434 LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1435 LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1436 LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1437 LoadGeneratorOptionName::Seed => "SEED",
1438 LoadGeneratorOptionName::Partitions => "PARTITIONS",
1439 LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1440 })
1441 }
1442}
1443impl_display!(LoadGeneratorOptionName);
1444
1445impl WithOptionName for LoadGeneratorOptionName {
1446 fn redact_value(&self) -> bool {
1452 match self {
1453 LoadGeneratorOptionName::ScaleFactor
1454 | LoadGeneratorOptionName::TickInterval
1455 | LoadGeneratorOptionName::AsOf
1456 | LoadGeneratorOptionName::UpTo
1457 | LoadGeneratorOptionName::MaxCardinality
1458 | LoadGeneratorOptionName::Keys
1459 | LoadGeneratorOptionName::SnapshotRounds
1460 | LoadGeneratorOptionName::TransactionalSnapshot
1461 | LoadGeneratorOptionName::ValueSize
1462 | LoadGeneratorOptionName::Partitions
1463 | LoadGeneratorOptionName::BatchSize
1464 | LoadGeneratorOptionName::Seed => false,
1465 }
1466 }
1467}
1468
1469#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1470pub struct LoadGeneratorOption<T: AstInfo> {
1472 pub name: LoadGeneratorOptionName,
1473 pub value: Option<WithOptionValue<T>>,
1474}
1475impl_display_for_with_option!(LoadGeneratorOption);
1476impl_display_t!(LoadGeneratorOption);
1477
1478#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1479pub enum CreateSinkConnection<T: AstInfo> {
1480 Kafka {
1481 connection: T::ItemName,
1482 options: Vec<KafkaSinkConfigOption<T>>,
1483 key: Option<KafkaSinkKey>,
1484 headers: Option<Ident>,
1485 },
1486}
1487
1488impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1489 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1490 match self {
1491 CreateSinkConnection::Kafka {
1492 connection,
1493 options,
1494 key,
1495 headers,
1496 } => {
1497 f.write_str("KAFKA CONNECTION ");
1498 f.write_node(connection);
1499 if !options.is_empty() {
1500 f.write_str(" (");
1501 f.write_node(&display::comma_separated(options));
1502 f.write_str(")");
1503 }
1504 if let Some(key) = key.as_ref() {
1505 f.write_node(key);
1506 }
1507 if let Some(headers) = headers {
1508 f.write_str(" HEADERS ");
1509 f.write_node(headers);
1510 }
1511 }
1512 }
1513 }
1514}
1515impl_display_t!(CreateSinkConnection);
1516
1517#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1518pub struct KafkaSinkKey {
1519 pub key_columns: Vec<Ident>,
1520 pub not_enforced: bool,
1521}
1522
1523impl AstDisplay for KafkaSinkKey {
1524 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1525 f.write_str(" KEY (");
1526 f.write_node(&display::comma_separated(&self.key_columns));
1527 f.write_str(")");
1528 if self.not_enforced {
1529 f.write_str(" NOT ENFORCED");
1530 }
1531 }
1532}
1533
1534#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1537pub enum TableConstraint<T: AstInfo> {
1538 Unique {
1540 name: Option<Ident>,
1541 columns: Vec<Ident>,
1542 is_primary: bool,
1544 nulls_not_distinct: bool,
1547 },
1548 ForeignKey {
1551 name: Option<Ident>,
1552 columns: Vec<Ident>,
1553 foreign_table: T::ItemName,
1554 referred_columns: Vec<Ident>,
1555 },
1556 Check {
1558 name: Option<Ident>,
1559 expr: Box<Expr<T>>,
1560 },
1561}
1562
1563impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1564 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1565 match self {
1566 TableConstraint::Unique {
1567 name,
1568 columns,
1569 is_primary,
1570 nulls_not_distinct,
1571 } => {
1572 f.write_node(&display_constraint_name(name));
1573 if *is_primary {
1574 f.write_str("PRIMARY KEY ");
1575 } else {
1576 f.write_str("UNIQUE ");
1577 if *nulls_not_distinct {
1578 f.write_str("NULLS NOT DISTINCT ");
1579 }
1580 }
1581 f.write_str("(");
1582 f.write_node(&display::comma_separated(columns));
1583 f.write_str(")");
1584 }
1585 TableConstraint::ForeignKey {
1586 name,
1587 columns,
1588 foreign_table,
1589 referred_columns,
1590 } => {
1591 f.write_node(&display_constraint_name(name));
1592 f.write_str("FOREIGN KEY (");
1593 f.write_node(&display::comma_separated(columns));
1594 f.write_str(") REFERENCES ");
1595 f.write_node(foreign_table);
1596 f.write_str("(");
1597 f.write_node(&display::comma_separated(referred_columns));
1598 f.write_str(")");
1599 }
1600 TableConstraint::Check { name, expr } => {
1601 f.write_node(&display_constraint_name(name));
1602 f.write_str("CHECK (");
1603 f.write_node(&expr);
1604 f.write_str(")");
1605 }
1606 }
1607 }
1608}
1609impl_display_t!(TableConstraint);
1610
1611#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1613pub enum KeyConstraint {
1614 PrimaryKeyNotEnforced { columns: Vec<Ident> },
1616}
1617
1618impl AstDisplay for KeyConstraint {
1619 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1620 match self {
1621 KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1622 f.write_str("PRIMARY KEY ");
1623 f.write_str("(");
1624 f.write_node(&display::comma_separated(columns));
1625 f.write_str(") ");
1626 f.write_str("NOT ENFORCED");
1627 }
1628 }
1629 }
1630}
1631impl_display!(KeyConstraint);
1632
1633#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1634pub enum CreateSourceOptionName {
1635 TimestampInterval,
1636 RetainHistory,
1637}
1638
1639impl AstDisplay for CreateSourceOptionName {
1640 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1641 f.write_str(match self {
1642 CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1643 CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1644 })
1645 }
1646}
1647impl_display!(CreateSourceOptionName);
1648
1649impl WithOptionName for CreateSourceOptionName {
1650 fn redact_value(&self) -> bool {
1656 match self {
1657 CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1658 false
1659 }
1660 }
1661 }
1662}
1663
1664#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1665pub struct CreateSourceOption<T: AstInfo> {
1667 pub name: CreateSourceOptionName,
1668 pub value: Option<WithOptionValue<T>>,
1669}
1670impl_display_for_with_option!(CreateSourceOption);
1671impl_display_t!(CreateSourceOption);
1672
1673#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1675pub struct ColumnDef<T: AstInfo> {
1676 pub name: Ident,
1677 pub data_type: T::DataType,
1678 pub collation: Option<UnresolvedItemName>,
1679 pub options: Vec<ColumnOptionDef<T>>,
1680}
1681
1682impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1683 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1684 f.write_node(&self.name);
1685 f.write_str(" ");
1686 f.write_node(&self.data_type);
1687 if let Some(collation) = &self.collation {
1688 f.write_str(" COLLATE ");
1689 f.write_node(collation);
1690 }
1691 for option in &self.options {
1692 f.write_str(" ");
1693 f.write_node(option);
1694 }
1695 }
1696}
1697impl_display_t!(ColumnDef);
1698
1699#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1716pub struct ColumnOptionDef<T: AstInfo> {
1717 pub name: Option<Ident>,
1718 pub option: ColumnOption<T>,
1719}
1720
1721impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1722 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1723 f.write_node(&display_constraint_name(&self.name));
1724 f.write_node(&self.option);
1725 }
1726}
1727impl_display_t!(ColumnOptionDef);
1728
1729#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1732pub enum ColumnOption<T: AstInfo> {
1733 Null,
1735 NotNull,
1737 Default(Expr<T>),
1739 Unique { is_primary: bool },
1741 ForeignKey {
1744 foreign_table: UnresolvedItemName,
1745 referred_columns: Vec<Ident>,
1746 },
1747 Check(Expr<T>),
1749 Versioned {
1751 action: ColumnVersioned,
1752 version: Version,
1753 },
1754}
1755
1756impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1757 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1758 use ColumnOption::*;
1759 match self {
1760 Null => f.write_str("NULL"),
1761 NotNull => f.write_str("NOT NULL"),
1762 Default(expr) => {
1763 f.write_str("DEFAULT ");
1764 f.write_node(expr);
1765 }
1766 Unique { is_primary } => {
1767 if *is_primary {
1768 f.write_str("PRIMARY KEY");
1769 } else {
1770 f.write_str("UNIQUE");
1771 }
1772 }
1773 ForeignKey {
1774 foreign_table,
1775 referred_columns,
1776 } => {
1777 f.write_str("REFERENCES ");
1778 f.write_node(foreign_table);
1779 f.write_str(" (");
1780 f.write_node(&display::comma_separated(referred_columns));
1781 f.write_str(")");
1782 }
1783 Check(expr) => {
1784 f.write_str("CHECK (");
1785 f.write_node(expr);
1786 f.write_str(")");
1787 }
1788 Versioned { action, version } => {
1789 f.write_str("VERSION ");
1790 f.write_node(action);
1791 f.write_str(" ");
1792 f.write_node(version);
1793 }
1794 }
1795 }
1796}
1797impl_display_t!(ColumnOption);
1798
1799#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1800pub enum ColumnVersioned {
1801 Added,
1802}
1803
1804impl AstDisplay for ColumnVersioned {
1805 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1806 where
1807 W: fmt::Write,
1808 {
1809 match self {
1810 ColumnVersioned::Added => f.write_str("ADDED"),
1812 }
1813 }
1814}
1815impl_display!(ColumnVersioned);
1816
1817fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
1818 struct ConstraintName<'a>(&'a Option<Ident>);
1819 impl<'a> AstDisplay for ConstraintName<'a> {
1820 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1821 where
1822 W: fmt::Write,
1823 {
1824 if let Some(name) = self.0 {
1825 f.write_str("CONSTRAINT ");
1826 f.write_node(name);
1827 f.write_str(" ");
1828 }
1829 }
1830 }
1831 ConstraintName(name)
1832}