1use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28 AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33 AssertNotNull,
35 PartitionBy,
36 RetainHistory,
37 Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43 match self {
44 MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45 MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46 MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47 MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48 }
49 }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53 fn redact_value(&self) -> bool {
59 match self {
60 MaterializedViewOptionName::AssertNotNull
61 | MaterializedViewOptionName::PartitionBy
62 | MaterializedViewOptionName::RetainHistory
63 | MaterializedViewOptionName::Refresh => false,
64 }
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70 pub name: MaterializedViewOptionName,
71 pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
76pub enum ContinualTaskOptionName {
77 Snapshot,
79}
80
81impl AstDisplay for ContinualTaskOptionName {
82 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
83 match self {
84 ContinualTaskOptionName::Snapshot => f.write_str("SNAPSHOT"),
85 }
86 }
87}
88
89impl WithOptionName for ContinualTaskOptionName {
90 fn redact_value(&self) -> bool {
96 match self {
97 ContinualTaskOptionName::Snapshot => false,
98 }
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
103pub struct ContinualTaskOption<T: AstInfo> {
104 pub name: ContinualTaskOptionName,
105 pub value: Option<WithOptionValue<T>>,
106}
107impl_display_for_with_option!(ContinualTaskOption);
108
109#[derive(Debug, Clone, PartialEq, Eq, Hash)]
110pub struct Schema {
111 pub schema: String,
112}
113
114impl AstDisplay for Schema {
115 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
116 f.write_str("SCHEMA '");
117 f.write_node(&display::escape_single_quote_string(&self.schema));
118 f.write_str("'");
119 }
120}
121impl_display!(Schema);
122
123#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
124pub enum AvroSchemaOptionName {
125 ConfluentWireFormat,
127}
128
129impl AstDisplay for AvroSchemaOptionName {
130 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
131 match self {
132 AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
133 }
134 }
135}
136
137impl WithOptionName for AvroSchemaOptionName {
138 fn redact_value(&self) -> bool {
144 match self {
145 Self::ConfluentWireFormat => false,
146 }
147 }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
151pub struct AvroSchemaOption<T: AstInfo> {
152 pub name: AvroSchemaOptionName,
153 pub value: Option<WithOptionValue<T>>,
154}
155impl_display_for_with_option!(AvroSchemaOption);
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum AvroSchema<T: AstInfo> {
159 Csr {
160 csr_connection: CsrConnectionAvro<T>,
161 },
162 InlineSchema {
163 schema: Schema,
164 with_options: Vec<AvroSchemaOption<T>>,
165 },
166}
167
168impl<T: AstInfo> AstDisplay for AvroSchema<T> {
169 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
170 match self {
171 Self::Csr { csr_connection } => {
172 f.write_node(csr_connection);
173 }
174 Self::InlineSchema {
175 schema,
176 with_options,
177 } => {
178 f.write_str("USING ");
179 schema.fmt(f);
180 if !with_options.is_empty() {
181 f.write_str(" (");
182 f.write_node(&display::comma_separated(with_options));
183 f.write_str(")");
184 }
185 }
186 }
187 }
188}
189impl_display_t!(AvroSchema);
190
191#[derive(Debug, Clone, PartialEq, Eq, Hash)]
192pub enum ProtobufSchema<T: AstInfo> {
193 Csr {
194 csr_connection: CsrConnectionProtobuf<T>,
195 },
196 InlineSchema {
197 message_name: String,
198 schema: Schema,
199 },
200}
201
202impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
203 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
204 match self {
205 Self::Csr { csr_connection } => {
206 f.write_node(csr_connection);
207 }
208 Self::InlineSchema {
209 message_name,
210 schema,
211 } => {
212 f.write_str("MESSAGE '");
213 f.write_node(&display::escape_single_quote_string(message_name));
214 f.write_str("' USING ");
215 f.write_str(schema);
216 }
217 }
218 }
219}
220impl_display_t!(ProtobufSchema);
221
222#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
223pub enum CsrConfigOptionName<T: AstInfo> {
224 AvroKeyFullname,
225 AvroValueFullname,
226 NullDefaults,
227 AvroDocOn(AvroDocOn<T>),
228 KeyCompatibilityLevel,
229 ValueCompatibilityLevel,
230}
231
232impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
233 fn redact_value(&self) -> bool {
239 match self {
240 Self::AvroKeyFullname
241 | Self::AvroValueFullname
242 | Self::NullDefaults
243 | Self::AvroDocOn(_)
244 | Self::KeyCompatibilityLevel
245 | Self::ValueCompatibilityLevel => false,
246 }
247 }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
251pub struct AvroDocOn<T: AstInfo> {
252 pub identifier: DocOnIdentifier<T>,
253 pub for_schema: DocOnSchema,
254}
255#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
256pub enum DocOnSchema {
257 KeyOnly,
258 ValueOnly,
259 All,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
263pub enum DocOnIdentifier<T: AstInfo> {
264 Column(ColumnName<T>),
265 Type(T::ItemName),
266}
267
268impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
269 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
270 match &self.for_schema {
271 DocOnSchema::KeyOnly => f.write_str("KEY "),
272 DocOnSchema::ValueOnly => f.write_str("VALUE "),
273 DocOnSchema::All => {}
274 }
275 match &self.identifier {
276 DocOnIdentifier::Column(name) => {
277 f.write_str("DOC ON COLUMN ");
278 f.write_node(name);
279 }
280 DocOnIdentifier::Type(name) => {
281 f.write_str("DOC ON TYPE ");
282 f.write_node(name);
283 }
284 }
285 }
286}
287impl_display_t!(AvroDocOn);
288
289impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
290 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
291 match self {
292 CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
293 CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
294 CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
295 CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
296 CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
297 CsrConfigOptionName::ValueCompatibilityLevel => {
298 f.write_str("VALUE COMPATIBILITY LEVEL")
299 }
300 }
301 }
302}
303impl_display_t!(CsrConfigOptionName);
304
305#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
306pub struct CsrConfigOption<T: AstInfo> {
308 pub name: CsrConfigOptionName<T>,
309 pub value: Option<WithOptionValue<T>>,
310}
311impl_display_for_with_option!(CsrConfigOption);
312impl_display_t!(CsrConfigOption);
313
314#[derive(Debug, Clone, PartialEq, Eq, Hash)]
315pub struct CsrConnection<T: AstInfo> {
316 pub connection: T::ItemName,
317 pub options: Vec<CsrConfigOption<T>>,
318}
319
320impl<T: AstInfo> AstDisplay for CsrConnection<T> {
321 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
322 f.write_str("CONNECTION ");
323 f.write_node(&self.connection);
324 if !self.options.is_empty() {
325 f.write_str(" (");
326 f.write_node(&display::comma_separated(&self.options));
327 f.write_str(")");
328 }
329 }
330}
331impl_display_t!(CsrConnection);
332
333#[derive(Debug, Clone, PartialEq, Eq, Hash)]
334pub enum ReaderSchemaSelectionStrategy {
335 Latest,
336 Inline(String),
337 ById(i32),
338}
339
340impl Default for ReaderSchemaSelectionStrategy {
341 fn default() -> Self {
342 Self::Latest
343 }
344}
345
346#[derive(Debug, Clone, PartialEq, Eq, Hash)]
347pub struct CsrConnectionAvro<T: AstInfo> {
348 pub connection: CsrConnection<T>,
349 pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
350 pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
351 pub seed: Option<CsrSeedAvro>,
352}
353
354impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
355 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
356 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
357 f.write_node(&self.connection);
358 if let Some(seed) = &self.seed {
359 f.write_str(" ");
360 f.write_node(seed);
361 }
362 }
363}
364impl_display_t!(CsrConnectionAvro);
365
366#[derive(Debug, Clone, PartialEq, Eq, Hash)]
367pub struct CsrConnectionProtobuf<T: AstInfo> {
368 pub connection: CsrConnection<T>,
369 pub seed: Option<CsrSeedProtobuf>,
370}
371
372impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
373 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
374 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
375 f.write_node(&self.connection);
376
377 if let Some(seed) = &self.seed {
378 f.write_str(" ");
379 f.write_node(seed);
380 }
381 }
382}
383impl_display_t!(CsrConnectionProtobuf);
384
385#[derive(Debug, Clone, PartialEq, Eq, Hash)]
386pub struct CsrSeedAvro {
387 pub key_schema: Option<String>,
388 pub value_schema: String,
389}
390
391impl AstDisplay for CsrSeedAvro {
392 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
393 f.write_str("SEED");
394 if let Some(key_schema) = &self.key_schema {
395 f.write_str(" KEY SCHEMA '");
396 f.write_node(&display::escape_single_quote_string(key_schema));
397 f.write_str("'");
398 }
399 f.write_str(" VALUE SCHEMA '");
400 f.write_node(&display::escape_single_quote_string(&self.value_schema));
401 f.write_str("'");
402 }
403}
404impl_display!(CsrSeedAvro);
405
406#[derive(Debug, Clone, PartialEq, Eq, Hash)]
407pub struct CsrSeedProtobuf {
408 pub key: Option<CsrSeedProtobufSchema>,
409 pub value: CsrSeedProtobufSchema,
410}
411
412impl AstDisplay for CsrSeedProtobuf {
413 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
414 f.write_str("SEED");
415 if let Some(key) = &self.key {
416 f.write_str(" KEY ");
417 f.write_node(key);
418 }
419 f.write_str(" VALUE ");
420 f.write_node(&self.value);
421 }
422}
423impl_display!(CsrSeedProtobuf);
424
425#[derive(Debug, Clone, PartialEq, Eq, Hash)]
426pub struct CsrSeedProtobufSchema {
427 pub schema: String,
429 pub message_name: String,
430}
431impl AstDisplay for CsrSeedProtobufSchema {
432 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
433 f.write_str("SCHEMA '");
434 f.write_str(&display::escape_single_quote_string(&self.schema));
435 f.write_str("' MESSAGE '");
436 f.write_str(&self.message_name);
437 f.write_str("'");
438 }
439}
440impl_display!(CsrSeedProtobufSchema);
441
442#[derive(Debug, Clone, PartialEq, Eq, Hash)]
443pub enum FormatSpecifier<T: AstInfo> {
444 Bare(Format<T>),
446 KeyValue { key: Format<T>, value: Format<T> },
448}
449
450impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
451 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
452 match self {
453 FormatSpecifier::Bare(format) => {
454 f.write_str("FORMAT ");
455 f.write_node(format)
456 }
457 FormatSpecifier::KeyValue { key, value } => {
458 f.write_str("KEY FORMAT ");
459 f.write_node(key);
460 f.write_str(" VALUE FORMAT ");
461 f.write_node(value);
462 }
463 }
464 }
465}
466impl_display_t!(FormatSpecifier);
467
468#[derive(Debug, Clone, PartialEq, Eq, Hash)]
469pub enum Format<T: AstInfo> {
470 Bytes,
471 Avro(AvroSchema<T>),
472 Protobuf(ProtobufSchema<T>),
473 Regex(String),
474 Csv {
475 columns: CsvColumns,
476 delimiter: char,
477 },
478 Json {
479 array: bool,
480 },
481 Text,
482}
483
484#[derive(Debug, Clone, PartialEq, Eq, Hash)]
485pub enum CsvColumns {
486 Count(u64),
488 Header { names: Vec<Ident> },
490}
491
492impl AstDisplay for CsvColumns {
493 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
494 match self {
495 CsvColumns::Count(n) => {
496 f.write_str(n);
497 f.write_str(" COLUMNS")
498 }
499 CsvColumns::Header { names } => {
500 f.write_str("HEADER");
501 if !names.is_empty() {
502 f.write_str(" (");
503 f.write_node(&display::comma_separated(names));
504 f.write_str(")");
505 }
506 }
507 }
508 }
509}
510
511#[derive(Debug, Clone, PartialEq, Eq, Hash)]
512pub enum SourceIncludeMetadata {
513 Key {
514 alias: Option<Ident>,
515 },
516 Timestamp {
517 alias: Option<Ident>,
518 },
519 Partition {
520 alias: Option<Ident>,
521 },
522 Offset {
523 alias: Option<Ident>,
524 },
525 Headers {
526 alias: Option<Ident>,
527 },
528 Header {
529 key: String,
530 alias: Ident,
531 use_bytes: bool,
532 },
533}
534
535impl AstDisplay for SourceIncludeMetadata {
536 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
537 let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
538 if let Some(alias) = alias {
539 f.write_str(" AS ");
540 f.write_node(alias);
541 }
542 };
543
544 match self {
545 SourceIncludeMetadata::Key { alias } => {
546 f.write_str("KEY");
547 print_alias(f, alias);
548 }
549 SourceIncludeMetadata::Timestamp { alias } => {
550 f.write_str("TIMESTAMP");
551 print_alias(f, alias);
552 }
553 SourceIncludeMetadata::Partition { alias } => {
554 f.write_str("PARTITION");
555 print_alias(f, alias);
556 }
557 SourceIncludeMetadata::Offset { alias } => {
558 f.write_str("OFFSET");
559 print_alias(f, alias);
560 }
561 SourceIncludeMetadata::Headers { alias } => {
562 f.write_str("HEADERS");
563 print_alias(f, alias);
564 }
565 SourceIncludeMetadata::Header {
566 alias,
567 key,
568 use_bytes,
569 } => {
570 f.write_str("HEADER '");
571 f.write_str(&display::escape_single_quote_string(key));
572 f.write_str("'");
573 print_alias(f, &Some(alias.clone()));
574 if *use_bytes {
575 f.write_str(" BYTES");
576 }
577 }
578 }
579 }
580}
581impl_display!(SourceIncludeMetadata);
582
583#[derive(Debug, Clone, PartialEq, Eq, Hash)]
584pub enum SourceErrorPolicy {
585 Inline {
586 alias: Option<Ident>,
588 },
589}
590
591impl AstDisplay for SourceErrorPolicy {
592 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
593 match self {
594 Self::Inline { alias } => {
595 f.write_str("INLINE");
596 if let Some(alias) = alias {
597 f.write_str(" AS ");
598 f.write_node(alias);
599 }
600 }
601 }
602 }
603}
604impl_display!(SourceErrorPolicy);
605
606#[derive(Debug, Clone, PartialEq, Eq, Hash)]
607pub enum SourceEnvelope {
608 None,
609 Debezium,
610 Upsert {
611 value_decode_err_policy: Vec<SourceErrorPolicy>,
612 },
613 CdcV2,
614}
615
616impl SourceEnvelope {
617 pub fn requires_all_input(&self) -> bool {
620 match self {
621 SourceEnvelope::None => false,
622 SourceEnvelope::Debezium => false,
623 SourceEnvelope::Upsert { .. } => false,
624 SourceEnvelope::CdcV2 => true,
625 }
626 }
627}
628
629impl AstDisplay for SourceEnvelope {
630 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
631 match self {
632 Self::None => {
633 f.write_str("NONE");
635 }
636 Self::Debezium => {
637 f.write_str("DEBEZIUM");
638 }
639 Self::Upsert {
640 value_decode_err_policy,
641 } => {
642 if value_decode_err_policy.is_empty() {
643 f.write_str("UPSERT");
644 } else {
645 f.write_str("UPSERT (VALUE DECODING ERRORS = (");
646 f.write_node(&display::comma_separated(value_decode_err_policy));
647 f.write_str("))")
648 }
649 }
650 Self::CdcV2 => {
651 f.write_str("MATERIALIZE");
652 }
653 }
654 }
655}
656impl_display!(SourceEnvelope);
657
658#[derive(Debug, Clone, PartialEq, Eq, Hash)]
659pub enum SinkEnvelope {
660 Debezium,
661 Upsert,
662}
663
664impl AstDisplay for SinkEnvelope {
665 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
666 match self {
667 Self::Upsert => {
668 f.write_str("UPSERT");
669 }
670 Self::Debezium => {
671 f.write_str("DEBEZIUM");
672 }
673 }
674 }
675}
676impl_display!(SinkEnvelope);
677
678#[derive(Debug, Clone, PartialEq, Eq, Hash)]
679pub enum SubscribeOutput<T: AstInfo> {
680 Diffs,
681 WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
682 EnvelopeUpsert { key_columns: Vec<Ident> },
683 EnvelopeDebezium { key_columns: Vec<Ident> },
684}
685
686impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
687 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
688 match self {
689 Self::Diffs => {}
690 Self::WithinTimestampOrderBy { order_by } => {
691 f.write_str(" WITHIN TIMESTAMP ORDER BY ");
692 f.write_node(&display::comma_separated(order_by));
693 }
694 Self::EnvelopeUpsert { key_columns } => {
695 f.write_str(" ENVELOPE UPSERT (KEY (");
696 f.write_node(&display::comma_separated(key_columns));
697 f.write_str("))");
698 }
699 Self::EnvelopeDebezium { key_columns } => {
700 f.write_str(" ENVELOPE DEBEZIUM (KEY (");
701 f.write_node(&display::comma_separated(key_columns));
702 f.write_str("))");
703 }
704 }
705 }
706}
707impl_display_t!(SubscribeOutput);
708
709impl<T: AstInfo> AstDisplay for Format<T> {
710 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
711 match self {
712 Self::Bytes => f.write_str("BYTES"),
713 Self::Avro(inner) => {
714 f.write_str("AVRO ");
715 f.write_node(inner);
716 }
717 Self::Protobuf(inner) => {
718 f.write_str("PROTOBUF ");
719 f.write_node(inner);
720 }
721 Self::Regex(regex) => {
722 f.write_str("REGEX '");
723 f.write_node(&display::escape_single_quote_string(regex));
724 f.write_str("'");
725 }
726 Self::Csv { columns, delimiter } => {
727 f.write_str("CSV WITH ");
728 f.write_node(columns);
729
730 if *delimiter != ',' {
731 f.write_str(" DELIMITED BY '");
732 f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
733 f.write_str("'");
734 }
735 }
736 Self::Json { array } => {
737 f.write_str("JSON");
738 if *array {
739 f.write_str(" ARRAY");
740 }
741 }
742 Self::Text => f.write_str("TEXT"),
743 }
744 }
745}
746impl_display_t!(Format);
747
748#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
752pub enum ConnectionOptionName {
753 AccessKeyId,
754 AssumeRoleArn,
755 AssumeRoleSessionName,
756 AvailabilityZones,
757 AwsConnection,
758 AwsPrivatelink,
759 Broker,
760 Brokers,
761 Credential,
762 Database,
763 Endpoint,
764 Host,
765 Password,
766 Port,
767 ProgressTopic,
768 ProgressTopicReplicationFactor,
769 PublicKey1,
770 PublicKey2,
771 Region,
772 SaslMechanisms,
773 SaslPassword,
774 SaslUsername,
775 Scope,
776 SecretAccessKey,
777 SecurityProtocol,
778 ServiceName,
779 SshTunnel,
780 SslCertificate,
781 SslCertificateAuthority,
782 SslKey,
783 SslMode,
784 SessionToken,
785 CatalogType,
786 Url,
787 User,
788 Warehouse,
789}
790
791impl AstDisplay for ConnectionOptionName {
792 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
793 f.write_str(match self {
794 ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
795 ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
796 ConnectionOptionName::AwsConnection => "AWS CONNECTION",
797 ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
798 ConnectionOptionName::Broker => "BROKER",
799 ConnectionOptionName::Brokers => "BROKERS",
800 ConnectionOptionName::Credential => "CREDENTIAL",
801 ConnectionOptionName::Database => "DATABASE",
802 ConnectionOptionName::Endpoint => "ENDPOINT",
803 ConnectionOptionName::Host => "HOST",
804 ConnectionOptionName::Password => "PASSWORD",
805 ConnectionOptionName::Port => "PORT",
806 ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
807 ConnectionOptionName::ProgressTopicReplicationFactor => {
808 "PROGRESS TOPIC REPLICATION FACTOR"
809 }
810 ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
811 ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
812 ConnectionOptionName::Region => "REGION",
813 ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
814 ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
815 ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
816 ConnectionOptionName::SaslPassword => "SASL PASSWORD",
817 ConnectionOptionName::SaslUsername => "SASL USERNAME",
818 ConnectionOptionName::Scope => "SCOPE",
819 ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
820 ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
821 ConnectionOptionName::ServiceName => "SERVICE NAME",
822 ConnectionOptionName::SshTunnel => "SSH TUNNEL",
823 ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
824 ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
825 ConnectionOptionName::SslKey => "SSL KEY",
826 ConnectionOptionName::SslMode => "SSL MODE",
827 ConnectionOptionName::SessionToken => "SESSION TOKEN",
828 ConnectionOptionName::CatalogType => "CATALOG TYPE",
829 ConnectionOptionName::Url => "URL",
830 ConnectionOptionName::User => "USER",
831 ConnectionOptionName::Warehouse => "WAREHOUSE",
832 })
833 }
834}
835impl_display!(ConnectionOptionName);
836
837impl WithOptionName for ConnectionOptionName {
838 fn redact_value(&self) -> bool {
844 match self {
845 ConnectionOptionName::AccessKeyId
846 | ConnectionOptionName::AvailabilityZones
847 | ConnectionOptionName::AwsConnection
848 | ConnectionOptionName::AwsPrivatelink
849 | ConnectionOptionName::Broker
850 | ConnectionOptionName::Brokers
851 | ConnectionOptionName::Credential
852 | ConnectionOptionName::Database
853 | ConnectionOptionName::Endpoint
854 | ConnectionOptionName::Host
855 | ConnectionOptionName::Password
856 | ConnectionOptionName::Port
857 | ConnectionOptionName::ProgressTopic
858 | ConnectionOptionName::ProgressTopicReplicationFactor
859 | ConnectionOptionName::PublicKey1
860 | ConnectionOptionName::PublicKey2
861 | ConnectionOptionName::Region
862 | ConnectionOptionName::AssumeRoleArn
863 | ConnectionOptionName::AssumeRoleSessionName
864 | ConnectionOptionName::SaslMechanisms
865 | ConnectionOptionName::SaslPassword
866 | ConnectionOptionName::SaslUsername
867 | ConnectionOptionName::Scope
868 | ConnectionOptionName::SecurityProtocol
869 | ConnectionOptionName::SecretAccessKey
870 | ConnectionOptionName::ServiceName
871 | ConnectionOptionName::SshTunnel
872 | ConnectionOptionName::SslCertificate
873 | ConnectionOptionName::SslCertificateAuthority
874 | ConnectionOptionName::SslKey
875 | ConnectionOptionName::SslMode
876 | ConnectionOptionName::SessionToken
877 | ConnectionOptionName::CatalogType
878 | ConnectionOptionName::Url
879 | ConnectionOptionName::User
880 | ConnectionOptionName::Warehouse => false,
881 }
882 }
883}
884
885#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
886pub struct ConnectionOption<T: AstInfo> {
888 pub name: ConnectionOptionName,
889 pub value: Option<WithOptionValue<T>>,
890}
891impl_display_for_with_option!(ConnectionOption);
892impl_display_t!(ConnectionOption);
893
894#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
895pub enum CreateConnectionType {
896 Aws,
897 AwsPrivatelink,
898 Kafka,
899 Csr,
900 Postgres,
901 Ssh,
902 SqlServer,
903 MySql,
904 IcebergCatalog,
905}
906
907impl AstDisplay for CreateConnectionType {
908 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
909 match self {
910 Self::Kafka => {
911 f.write_str("KAFKA");
912 }
913 Self::Csr => {
914 f.write_str("CONFLUENT SCHEMA REGISTRY");
915 }
916 Self::Postgres => {
917 f.write_str("POSTGRES");
918 }
919 Self::Aws => {
920 f.write_str("AWS");
921 }
922 Self::AwsPrivatelink => {
923 f.write_str("AWS PRIVATELINK");
924 }
925 Self::Ssh => {
926 f.write_str("SSH TUNNEL");
927 }
928 Self::SqlServer => {
929 f.write_str("SQL SERVER");
930 }
931 Self::MySql => {
932 f.write_str("MYSQL");
933 }
934 Self::IcebergCatalog => {
935 f.write_str("ICEBERG CATALOG");
936 }
937 }
938 }
939}
940impl_display!(CreateConnectionType);
941
942#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
943pub enum CreateConnectionOptionName {
944 Validate,
945}
946
947impl AstDisplay for CreateConnectionOptionName {
948 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
949 f.write_str(match self {
950 CreateConnectionOptionName::Validate => "VALIDATE",
951 })
952 }
953}
954impl_display!(CreateConnectionOptionName);
955
956impl WithOptionName for CreateConnectionOptionName {
957 fn redact_value(&self) -> bool {
963 match self {
964 CreateConnectionOptionName::Validate => false,
965 }
966 }
967}
968
969#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
970pub struct CreateConnectionOption<T: AstInfo> {
972 pub name: CreateConnectionOptionName,
973 pub value: Option<WithOptionValue<T>>,
974}
975impl_display_for_with_option!(CreateConnectionOption);
976impl_display_t!(CreateConnectionOption);
977
978#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
979pub enum KafkaSourceConfigOptionName {
980 GroupIdPrefix,
981 Topic,
982 TopicMetadataRefreshInterval,
983 StartTimestamp,
984 StartOffset,
985}
986
987impl AstDisplay for KafkaSourceConfigOptionName {
988 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
989 f.write_str(match self {
990 KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
991 KafkaSourceConfigOptionName::Topic => "TOPIC",
992 KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
993 "TOPIC METADATA REFRESH INTERVAL"
994 }
995 KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
996 KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
997 })
998 }
999}
1000impl_display!(KafkaSourceConfigOptionName);
1001
1002impl WithOptionName for KafkaSourceConfigOptionName {
1003 fn redact_value(&self) -> bool {
1009 match self {
1010 KafkaSourceConfigOptionName::GroupIdPrefix
1011 | KafkaSourceConfigOptionName::Topic
1012 | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1013 | KafkaSourceConfigOptionName::StartOffset
1014 | KafkaSourceConfigOptionName::StartTimestamp => false,
1015 }
1016 }
1017}
1018
1019#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1020pub struct KafkaSourceConfigOption<T: AstInfo> {
1021 pub name: KafkaSourceConfigOptionName,
1022 pub value: Option<WithOptionValue<T>>,
1023}
1024impl_display_for_with_option!(KafkaSourceConfigOption);
1025impl_display_t!(KafkaSourceConfigOption);
1026
1027#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1028pub enum KafkaSinkConfigOptionName {
1029 CompressionType,
1030 PartitionBy,
1031 ProgressGroupIdPrefix,
1032 Topic,
1033 TransactionalIdPrefix,
1034 LegacyIds,
1035 TopicConfig,
1036 TopicMetadataRefreshInterval,
1037 TopicPartitionCount,
1038 TopicReplicationFactor,
1039}
1040
1041impl AstDisplay for KafkaSinkConfigOptionName {
1042 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1043 f.write_str(match self {
1044 KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1045 KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1046 KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1047 KafkaSinkConfigOptionName::Topic => "TOPIC",
1048 KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1049 KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1050 KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1051 KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1052 "TOPIC METADATA REFRESH INTERVAL"
1053 }
1054 KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1055 KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1056 })
1057 }
1058}
1059impl_display!(KafkaSinkConfigOptionName);
1060
1061impl WithOptionName for KafkaSinkConfigOptionName {
1062 fn redact_value(&self) -> bool {
1068 match self {
1069 KafkaSinkConfigOptionName::CompressionType
1070 | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1071 | KafkaSinkConfigOptionName::Topic
1072 | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1073 | KafkaSinkConfigOptionName::TransactionalIdPrefix
1074 | KafkaSinkConfigOptionName::LegacyIds
1075 | KafkaSinkConfigOptionName::TopicConfig
1076 | KafkaSinkConfigOptionName::TopicPartitionCount
1077 | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1078 KafkaSinkConfigOptionName::PartitionBy => true,
1079 }
1080 }
1081}
1082
1083#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1084pub struct KafkaSinkConfigOption<T: AstInfo> {
1085 pub name: KafkaSinkConfigOptionName,
1086 pub value: Option<WithOptionValue<T>>,
1087}
1088impl_display_for_with_option!(KafkaSinkConfigOption);
1089impl_display_t!(KafkaSinkConfigOption);
1090
1091#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1092pub enum PgConfigOptionName {
1093 Details,
1096 Publication,
1098 TextColumns,
1105}
1106
1107impl AstDisplay for PgConfigOptionName {
1108 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1109 f.write_str(match self {
1110 PgConfigOptionName::Details => "DETAILS",
1111 PgConfigOptionName::Publication => "PUBLICATION",
1112 PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1113 })
1114 }
1115}
1116impl_display!(PgConfigOptionName);
1117
1118impl WithOptionName for PgConfigOptionName {
1119 fn redact_value(&self) -> bool {
1125 match self {
1126 PgConfigOptionName::Details
1127 | PgConfigOptionName::Publication
1128 | PgConfigOptionName::TextColumns => false,
1129 }
1130 }
1131}
1132
1133#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1134pub struct PgConfigOption<T: AstInfo> {
1136 pub name: PgConfigOptionName,
1137 pub value: Option<WithOptionValue<T>>,
1138}
1139impl_display_for_with_option!(PgConfigOption);
1140impl_display_t!(PgConfigOption);
1141
1142#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1143pub enum MySqlConfigOptionName {
1144 Details,
1147 TextColumns,
1154 ExcludeColumns,
1161}
1162
1163impl AstDisplay for MySqlConfigOptionName {
1164 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1165 f.write_str(match self {
1166 MySqlConfigOptionName::Details => "DETAILS",
1167 MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1168 MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1169 })
1170 }
1171}
1172impl_display!(MySqlConfigOptionName);
1173
1174impl WithOptionName for MySqlConfigOptionName {
1175 fn redact_value(&self) -> bool {
1181 match self {
1182 MySqlConfigOptionName::Details
1183 | MySqlConfigOptionName::TextColumns
1184 | MySqlConfigOptionName::ExcludeColumns => false,
1185 }
1186 }
1187}
1188
1189#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1190pub struct MySqlConfigOption<T: AstInfo> {
1192 pub name: MySqlConfigOptionName,
1193 pub value: Option<WithOptionValue<T>>,
1194}
1195impl_display_for_with_option!(MySqlConfigOption);
1196impl_display_t!(MySqlConfigOption);
1197
1198#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1199pub enum SqlServerConfigOptionName {
1200 Details,
1203 TextColumns,
1211 ExcludeColumns,
1219}
1220
1221impl AstDisplay for SqlServerConfigOptionName {
1222 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1223 f.write_str(match self {
1224 SqlServerConfigOptionName::Details => "DETAILS",
1225 SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1226 SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1227 })
1228 }
1229}
1230impl_display!(SqlServerConfigOptionName);
1231
1232impl WithOptionName for SqlServerConfigOptionName {
1233 fn redact_value(&self) -> bool {
1239 match self {
1240 SqlServerConfigOptionName::Details
1241 | SqlServerConfigOptionName::TextColumns
1242 | SqlServerConfigOptionName::ExcludeColumns => false,
1243 }
1244 }
1245}
1246
1247#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1248pub struct SqlServerConfigOption<T: AstInfo> {
1250 pub name: SqlServerConfigOptionName,
1251 pub value: Option<WithOptionValue<T>>,
1252}
1253impl_display_for_with_option!(SqlServerConfigOption);
1254impl_display_t!(SqlServerConfigOption);
1255
1256#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1257pub enum CreateSourceConnection<T: AstInfo> {
1258 Kafka {
1259 connection: T::ItemName,
1260 options: Vec<KafkaSourceConfigOption<T>>,
1261 },
1262 Postgres {
1263 connection: T::ItemName,
1264 options: Vec<PgConfigOption<T>>,
1265 },
1266 SqlServer {
1267 connection: T::ItemName,
1268 options: Vec<SqlServerConfigOption<T>>,
1269 },
1270 MySql {
1271 connection: T::ItemName,
1272 options: Vec<MySqlConfigOption<T>>,
1273 },
1274 LoadGenerator {
1275 generator: LoadGenerator,
1276 options: Vec<LoadGeneratorOption<T>>,
1277 },
1278}
1279
1280impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1281 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1282 match self {
1283 CreateSourceConnection::Kafka {
1284 connection,
1285 options,
1286 } => {
1287 f.write_str("KAFKA CONNECTION ");
1288 f.write_node(connection);
1289 if !options.is_empty() {
1290 f.write_str(" (");
1291 f.write_node(&display::comma_separated(options));
1292 f.write_str(")");
1293 }
1294 }
1295 CreateSourceConnection::Postgres {
1296 connection,
1297 options,
1298 } => {
1299 f.write_str("POSTGRES CONNECTION ");
1300 f.write_node(connection);
1301 if !options.is_empty() {
1302 f.write_str(" (");
1303 f.write_node(&display::comma_separated(options));
1304 f.write_str(")");
1305 }
1306 }
1307 CreateSourceConnection::SqlServer {
1308 connection,
1309 options,
1310 } => {
1311 f.write_str("SQL SERVER CONNECTION ");
1312 f.write_node(connection);
1313 if !options.is_empty() {
1314 f.write_str(" (");
1315 f.write_node(&display::comma_separated(options));
1316 f.write_str(")");
1317 }
1318 }
1319 CreateSourceConnection::MySql {
1320 connection,
1321 options,
1322 } => {
1323 f.write_str("MYSQL CONNECTION ");
1324 f.write_node(connection);
1325 if !options.is_empty() {
1326 f.write_str(" (");
1327 f.write_node(&display::comma_separated(options));
1328 f.write_str(")");
1329 }
1330 }
1331 CreateSourceConnection::LoadGenerator { generator, options } => {
1332 f.write_str("LOAD GENERATOR ");
1333 f.write_node(generator);
1334 if !options.is_empty() {
1335 f.write_str(" (");
1336 f.write_node(&display::comma_separated(options));
1337 f.write_str(")");
1338 }
1339 }
1340 }
1341 }
1342}
1343impl_display_t!(CreateSourceConnection);
1344
1345#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1346pub enum LoadGenerator {
1347 Clock,
1348 Counter,
1349 Marketing,
1350 Auction,
1351 Datums,
1352 Tpch,
1353 KeyValue,
1354}
1355
1356impl AstDisplay for LoadGenerator {
1357 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1358 match self {
1359 Self::Counter => f.write_str("COUNTER"),
1360 Self::Clock => f.write_str("CLOCK"),
1361 Self::Marketing => f.write_str("MARKETING"),
1362 Self::Auction => f.write_str("AUCTION"),
1363 Self::Datums => f.write_str("DATUMS"),
1364 Self::Tpch => f.write_str("TPCH"),
1365 Self::KeyValue => f.write_str("KEY VALUE"),
1366 }
1367 }
1368}
1369impl_display!(LoadGenerator);
1370
1371impl LoadGenerator {
1372 pub fn schema_name(&self) -> &'static str {
1377 match self {
1378 LoadGenerator::Counter => "counter",
1379 LoadGenerator::Clock => "clock",
1380 LoadGenerator::Marketing => "marketing",
1381 LoadGenerator::Auction => "auction",
1382 LoadGenerator::Datums => "datums",
1383 LoadGenerator::Tpch => "tpch",
1384 LoadGenerator::KeyValue => "key_value",
1385 }
1386 }
1387}
1388
1389#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1390pub enum LoadGeneratorOptionName {
1391 ScaleFactor,
1392 TickInterval,
1393 AsOf,
1394 UpTo,
1395 MaxCardinality,
1396 Keys,
1397 SnapshotRounds,
1398 TransactionalSnapshot,
1399 ValueSize,
1400 Seed,
1401 Partitions,
1402 BatchSize,
1403}
1404
1405impl AstDisplay for LoadGeneratorOptionName {
1406 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1407 f.write_str(match self {
1408 LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1409 LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1410 LoadGeneratorOptionName::AsOf => "AS OF",
1411 LoadGeneratorOptionName::UpTo => "UP TO",
1412 LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1413 LoadGeneratorOptionName::Keys => "KEYS",
1414 LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1415 LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1416 LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1417 LoadGeneratorOptionName::Seed => "SEED",
1418 LoadGeneratorOptionName::Partitions => "PARTITIONS",
1419 LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1420 })
1421 }
1422}
1423impl_display!(LoadGeneratorOptionName);
1424
1425impl WithOptionName for LoadGeneratorOptionName {
1426 fn redact_value(&self) -> bool {
1432 match self {
1433 LoadGeneratorOptionName::ScaleFactor
1434 | LoadGeneratorOptionName::TickInterval
1435 | LoadGeneratorOptionName::AsOf
1436 | LoadGeneratorOptionName::UpTo
1437 | LoadGeneratorOptionName::MaxCardinality
1438 | LoadGeneratorOptionName::Keys
1439 | LoadGeneratorOptionName::SnapshotRounds
1440 | LoadGeneratorOptionName::TransactionalSnapshot
1441 | LoadGeneratorOptionName::ValueSize
1442 | LoadGeneratorOptionName::Partitions
1443 | LoadGeneratorOptionName::BatchSize
1444 | LoadGeneratorOptionName::Seed => false,
1445 }
1446 }
1447}
1448
1449#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1450pub struct LoadGeneratorOption<T: AstInfo> {
1452 pub name: LoadGeneratorOptionName,
1453 pub value: Option<WithOptionValue<T>>,
1454}
1455impl_display_for_with_option!(LoadGeneratorOption);
1456impl_display_t!(LoadGeneratorOption);
1457
1458#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1459pub enum CreateSinkConnection<T: AstInfo> {
1460 Kafka {
1461 connection: T::ItemName,
1462 options: Vec<KafkaSinkConfigOption<T>>,
1463 key: Option<KafkaSinkKey>,
1464 headers: Option<Ident>,
1465 },
1466}
1467
1468impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1469 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1470 match self {
1471 CreateSinkConnection::Kafka {
1472 connection,
1473 options,
1474 key,
1475 headers,
1476 } => {
1477 f.write_str("KAFKA CONNECTION ");
1478 f.write_node(connection);
1479 if !options.is_empty() {
1480 f.write_str(" (");
1481 f.write_node(&display::comma_separated(options));
1482 f.write_str(")");
1483 }
1484 if let Some(key) = key.as_ref() {
1485 f.write_node(key);
1486 }
1487 if let Some(headers) = headers {
1488 f.write_str(" HEADERS ");
1489 f.write_node(headers);
1490 }
1491 }
1492 }
1493 }
1494}
1495impl_display_t!(CreateSinkConnection);
1496
1497#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1498pub struct KafkaSinkKey {
1499 pub key_columns: Vec<Ident>,
1500 pub not_enforced: bool,
1501}
1502
1503impl AstDisplay for KafkaSinkKey {
1504 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1505 f.write_str(" KEY (");
1506 f.write_node(&display::comma_separated(&self.key_columns));
1507 f.write_str(")");
1508 if self.not_enforced {
1509 f.write_str(" NOT ENFORCED");
1510 }
1511 }
1512}
1513
1514#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1517pub enum TableConstraint<T: AstInfo> {
1518 Unique {
1520 name: Option<Ident>,
1521 columns: Vec<Ident>,
1522 is_primary: bool,
1524 nulls_not_distinct: bool,
1527 },
1528 ForeignKey {
1531 name: Option<Ident>,
1532 columns: Vec<Ident>,
1533 foreign_table: T::ItemName,
1534 referred_columns: Vec<Ident>,
1535 },
1536 Check {
1538 name: Option<Ident>,
1539 expr: Box<Expr<T>>,
1540 },
1541}
1542
1543impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1544 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1545 match self {
1546 TableConstraint::Unique {
1547 name,
1548 columns,
1549 is_primary,
1550 nulls_not_distinct,
1551 } => {
1552 f.write_node(&display_constraint_name(name));
1553 if *is_primary {
1554 f.write_str("PRIMARY KEY ");
1555 } else {
1556 f.write_str("UNIQUE ");
1557 if *nulls_not_distinct {
1558 f.write_str("NULLS NOT DISTINCT ");
1559 }
1560 }
1561 f.write_str("(");
1562 f.write_node(&display::comma_separated(columns));
1563 f.write_str(")");
1564 }
1565 TableConstraint::ForeignKey {
1566 name,
1567 columns,
1568 foreign_table,
1569 referred_columns,
1570 } => {
1571 f.write_node(&display_constraint_name(name));
1572 f.write_str("FOREIGN KEY (");
1573 f.write_node(&display::comma_separated(columns));
1574 f.write_str(") REFERENCES ");
1575 f.write_node(foreign_table);
1576 f.write_str("(");
1577 f.write_node(&display::comma_separated(referred_columns));
1578 f.write_str(")");
1579 }
1580 TableConstraint::Check { name, expr } => {
1581 f.write_node(&display_constraint_name(name));
1582 f.write_str("CHECK (");
1583 f.write_node(&expr);
1584 f.write_str(")");
1585 }
1586 }
1587 }
1588}
1589impl_display_t!(TableConstraint);
1590
1591#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1593pub enum KeyConstraint {
1594 PrimaryKeyNotEnforced { columns: Vec<Ident> },
1596}
1597
1598impl AstDisplay for KeyConstraint {
1599 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1600 match self {
1601 KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1602 f.write_str("PRIMARY KEY ");
1603 f.write_str("(");
1604 f.write_node(&display::comma_separated(columns));
1605 f.write_str(") ");
1606 f.write_str("NOT ENFORCED");
1607 }
1608 }
1609 }
1610}
1611impl_display!(KeyConstraint);
1612
1613#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1614pub enum CreateSourceOptionName {
1615 TimestampInterval,
1616 RetainHistory,
1617}
1618
1619impl AstDisplay for CreateSourceOptionName {
1620 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1621 f.write_str(match self {
1622 CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1623 CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1624 })
1625 }
1626}
1627impl_display!(CreateSourceOptionName);
1628
1629impl WithOptionName for CreateSourceOptionName {
1630 fn redact_value(&self) -> bool {
1636 match self {
1637 CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1638 false
1639 }
1640 }
1641 }
1642}
1643
1644#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1645pub struct CreateSourceOption<T: AstInfo> {
1647 pub name: CreateSourceOptionName,
1648 pub value: Option<WithOptionValue<T>>,
1649}
1650impl_display_for_with_option!(CreateSourceOption);
1651impl_display_t!(CreateSourceOption);
1652
1653#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1655pub struct ColumnDef<T: AstInfo> {
1656 pub name: Ident,
1657 pub data_type: T::DataType,
1658 pub collation: Option<UnresolvedItemName>,
1659 pub options: Vec<ColumnOptionDef<T>>,
1660}
1661
1662impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1663 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1664 f.write_node(&self.name);
1665 f.write_str(" ");
1666 f.write_node(&self.data_type);
1667 if let Some(collation) = &self.collation {
1668 f.write_str(" COLLATE ");
1669 f.write_node(collation);
1670 }
1671 for option in &self.options {
1672 f.write_str(" ");
1673 f.write_node(option);
1674 }
1675 }
1676}
1677impl_display_t!(ColumnDef);
1678
1679#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1696pub struct ColumnOptionDef<T: AstInfo> {
1697 pub name: Option<Ident>,
1698 pub option: ColumnOption<T>,
1699}
1700
1701impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1702 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1703 f.write_node(&display_constraint_name(&self.name));
1704 f.write_node(&self.option);
1705 }
1706}
1707impl_display_t!(ColumnOptionDef);
1708
1709#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1712pub enum ColumnOption<T: AstInfo> {
1713 Null,
1715 NotNull,
1717 Default(Expr<T>),
1719 Unique { is_primary: bool },
1721 ForeignKey {
1724 foreign_table: UnresolvedItemName,
1725 referred_columns: Vec<Ident>,
1726 },
1727 Check(Expr<T>),
1729 Versioned {
1731 action: ColumnVersioned,
1732 version: Version,
1733 },
1734}
1735
1736impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1737 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1738 use ColumnOption::*;
1739 match self {
1740 Null => f.write_str("NULL"),
1741 NotNull => f.write_str("NOT NULL"),
1742 Default(expr) => {
1743 f.write_str("DEFAULT ");
1744 f.write_node(expr);
1745 }
1746 Unique { is_primary } => {
1747 if *is_primary {
1748 f.write_str("PRIMARY KEY");
1749 } else {
1750 f.write_str("UNIQUE");
1751 }
1752 }
1753 ForeignKey {
1754 foreign_table,
1755 referred_columns,
1756 } => {
1757 f.write_str("REFERENCES ");
1758 f.write_node(foreign_table);
1759 f.write_str(" (");
1760 f.write_node(&display::comma_separated(referred_columns));
1761 f.write_str(")");
1762 }
1763 Check(expr) => {
1764 f.write_str("CHECK (");
1765 f.write_node(expr);
1766 f.write_str(")");
1767 }
1768 Versioned { action, version } => {
1769 f.write_str("VERSION ");
1770 f.write_node(action);
1771 f.write_str(" ");
1772 f.write_node(version);
1773 }
1774 }
1775 }
1776}
1777impl_display_t!(ColumnOption);
1778
1779#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1780pub enum ColumnVersioned {
1781 Added,
1782}
1783
1784impl AstDisplay for ColumnVersioned {
1785 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1786 where
1787 W: fmt::Write,
1788 {
1789 match self {
1790 ColumnVersioned::Added => f.write_str("ADDED"),
1792 }
1793 }
1794}
1795impl_display!(ColumnVersioned);
1796
1797fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
1798 struct ConstraintName<'a>(&'a Option<Ident>);
1799 impl<'a> AstDisplay for ConstraintName<'a> {
1800 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1801 where
1802 W: fmt::Write,
1803 {
1804 if let Some(name) = self.0 {
1805 f.write_str("CONSTRAINT ");
1806 f.write_node(name);
1807 f.write_str(" ");
1808 }
1809 }
1810 }
1811 ConstraintName(name)
1812}