1use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28 AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33 AssertNotNull,
35 PartitionBy,
36 RetainHistory,
37 Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43 match self {
44 MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45 MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46 MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47 MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48 }
49 }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53 fn redact_value(&self) -> bool {
59 match self {
60 MaterializedViewOptionName::AssertNotNull
61 | MaterializedViewOptionName::PartitionBy
62 | MaterializedViewOptionName::RetainHistory
63 | MaterializedViewOptionName::Refresh => false,
64 }
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70 pub name: MaterializedViewOptionName,
71 pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
76pub enum ContinualTaskOptionName {
77 Snapshot,
79}
80
81impl AstDisplay for ContinualTaskOptionName {
82 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
83 match self {
84 ContinualTaskOptionName::Snapshot => f.write_str("SNAPSHOT"),
85 }
86 }
87}
88
89impl WithOptionName for ContinualTaskOptionName {
90 fn redact_value(&self) -> bool {
96 match self {
97 ContinualTaskOptionName::Snapshot => false,
98 }
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
103pub struct ContinualTaskOption<T: AstInfo> {
104 pub name: ContinualTaskOptionName,
105 pub value: Option<WithOptionValue<T>>,
106}
107impl_display_for_with_option!(ContinualTaskOption);
108
109#[derive(Debug, Clone, PartialEq, Eq, Hash)]
110pub struct Schema {
111 pub schema: String,
112}
113
114impl AstDisplay for Schema {
115 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
116 f.write_str("SCHEMA '");
117 f.write_node(&display::escape_single_quote_string(&self.schema));
118 f.write_str("'");
119 }
120}
121impl_display!(Schema);
122
123#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
124pub enum AvroSchemaOptionName {
125 ConfluentWireFormat,
127}
128
129impl AstDisplay for AvroSchemaOptionName {
130 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
131 match self {
132 AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
133 }
134 }
135}
136
137impl WithOptionName for AvroSchemaOptionName {
138 fn redact_value(&self) -> bool {
144 match self {
145 Self::ConfluentWireFormat => false,
146 }
147 }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
151pub struct AvroSchemaOption<T: AstInfo> {
152 pub name: AvroSchemaOptionName,
153 pub value: Option<WithOptionValue<T>>,
154}
155impl_display_for_with_option!(AvroSchemaOption);
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum AvroSchema<T: AstInfo> {
159 Csr {
160 csr_connection: CsrConnectionAvro<T>,
161 },
162 InlineSchema {
163 schema: Schema,
164 with_options: Vec<AvroSchemaOption<T>>,
165 },
166}
167
168impl<T: AstInfo> AstDisplay for AvroSchema<T> {
169 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
170 match self {
171 Self::Csr { csr_connection } => {
172 f.write_node(csr_connection);
173 }
174 Self::InlineSchema {
175 schema,
176 with_options,
177 } => {
178 f.write_str("USING ");
179 schema.fmt(f);
180 if !with_options.is_empty() {
181 f.write_str(" (");
182 f.write_node(&display::comma_separated(with_options));
183 f.write_str(")");
184 }
185 }
186 }
187 }
188}
189impl_display_t!(AvroSchema);
190
191#[derive(Debug, Clone, PartialEq, Eq, Hash)]
192pub enum ProtobufSchema<T: AstInfo> {
193 Csr {
194 csr_connection: CsrConnectionProtobuf<T>,
195 },
196 InlineSchema {
197 message_name: String,
198 schema: Schema,
199 },
200}
201
202impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
203 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
204 match self {
205 Self::Csr { csr_connection } => {
206 f.write_node(csr_connection);
207 }
208 Self::InlineSchema {
209 message_name,
210 schema,
211 } => {
212 f.write_str("MESSAGE '");
213 f.write_node(&display::escape_single_quote_string(message_name));
214 f.write_str("' USING ");
215 f.write_str(schema);
216 }
217 }
218 }
219}
220impl_display_t!(ProtobufSchema);
221
222#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
223pub enum CsrConfigOptionName<T: AstInfo> {
224 AvroKeyFullname,
225 AvroValueFullname,
226 NullDefaults,
227 AvroDocOn(AvroDocOn<T>),
228 KeyCompatibilityLevel,
229 ValueCompatibilityLevel,
230}
231
232impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
233 fn redact_value(&self) -> bool {
239 match self {
240 Self::AvroKeyFullname
241 | Self::AvroValueFullname
242 | Self::NullDefaults
243 | Self::AvroDocOn(_)
244 | Self::KeyCompatibilityLevel
245 | Self::ValueCompatibilityLevel => false,
246 }
247 }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
251pub struct AvroDocOn<T: AstInfo> {
252 pub identifier: DocOnIdentifier<T>,
253 pub for_schema: DocOnSchema,
254}
255#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
256pub enum DocOnSchema {
257 KeyOnly,
258 ValueOnly,
259 All,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
263pub enum DocOnIdentifier<T: AstInfo> {
264 Column(ColumnName<T>),
265 Type(T::ItemName),
266}
267
268impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
269 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
270 match &self.for_schema {
271 DocOnSchema::KeyOnly => f.write_str("KEY "),
272 DocOnSchema::ValueOnly => f.write_str("VALUE "),
273 DocOnSchema::All => {}
274 }
275 match &self.identifier {
276 DocOnIdentifier::Column(name) => {
277 f.write_str("DOC ON COLUMN ");
278 f.write_node(name);
279 }
280 DocOnIdentifier::Type(name) => {
281 f.write_str("DOC ON TYPE ");
282 f.write_node(name);
283 }
284 }
285 }
286}
287impl_display_t!(AvroDocOn);
288
289impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
290 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
291 match self {
292 CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
293 CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
294 CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
295 CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
296 CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
297 CsrConfigOptionName::ValueCompatibilityLevel => {
298 f.write_str("VALUE COMPATIBILITY LEVEL")
299 }
300 }
301 }
302}
303impl_display_t!(CsrConfigOptionName);
304
305#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
306pub struct CsrConfigOption<T: AstInfo> {
308 pub name: CsrConfigOptionName<T>,
309 pub value: Option<WithOptionValue<T>>,
310}
311impl_display_for_with_option!(CsrConfigOption);
312impl_display_t!(CsrConfigOption);
313
314#[derive(Debug, Clone, PartialEq, Eq, Hash)]
315pub struct CsrConnection<T: AstInfo> {
316 pub connection: T::ItemName,
317 pub options: Vec<CsrConfigOption<T>>,
318}
319
320impl<T: AstInfo> AstDisplay for CsrConnection<T> {
321 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
322 f.write_str("CONNECTION ");
323 f.write_node(&self.connection);
324 if !self.options.is_empty() {
325 f.write_str(" (");
326 f.write_node(&display::comma_separated(&self.options));
327 f.write_str(")");
328 }
329 }
330}
331impl_display_t!(CsrConnection);
332
333#[derive(Debug, Clone, PartialEq, Eq, Hash)]
334pub enum ReaderSchemaSelectionStrategy {
335 Latest,
336 Inline(String),
337 ById(i32),
338}
339
340impl Default for ReaderSchemaSelectionStrategy {
341 fn default() -> Self {
342 Self::Latest
343 }
344}
345
346#[derive(Debug, Clone, PartialEq, Eq, Hash)]
347pub struct CsrConnectionAvro<T: AstInfo> {
348 pub connection: CsrConnection<T>,
349 pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
350 pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
351 pub seed: Option<CsrSeedAvro>,
352}
353
354impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
355 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
356 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
357 f.write_node(&self.connection);
358 if let Some(seed) = &self.seed {
359 f.write_str(" ");
360 f.write_node(seed);
361 }
362 }
363}
364impl_display_t!(CsrConnectionAvro);
365
366#[derive(Debug, Clone, PartialEq, Eq, Hash)]
367pub struct CsrConnectionProtobuf<T: AstInfo> {
368 pub connection: CsrConnection<T>,
369 pub seed: Option<CsrSeedProtobuf>,
370}
371
372impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
373 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
374 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
375 f.write_node(&self.connection);
376
377 if let Some(seed) = &self.seed {
378 f.write_str(" ");
379 f.write_node(seed);
380 }
381 }
382}
383impl_display_t!(CsrConnectionProtobuf);
384
385#[derive(Debug, Clone, PartialEq, Eq, Hash)]
386pub struct CsrSeedAvro {
387 pub key_schema: Option<String>,
388 pub value_schema: String,
389}
390
391impl AstDisplay for CsrSeedAvro {
392 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
393 f.write_str("SEED");
394 if let Some(key_schema) = &self.key_schema {
395 f.write_str(" KEY SCHEMA '");
396 f.write_node(&display::escape_single_quote_string(key_schema));
397 f.write_str("'");
398 }
399 f.write_str(" VALUE SCHEMA '");
400 f.write_node(&display::escape_single_quote_string(&self.value_schema));
401 f.write_str("'");
402 }
403}
404impl_display!(CsrSeedAvro);
405
406#[derive(Debug, Clone, PartialEq, Eq, Hash)]
407pub struct CsrSeedProtobuf {
408 pub key: Option<CsrSeedProtobufSchema>,
409 pub value: CsrSeedProtobufSchema,
410}
411
412impl AstDisplay for CsrSeedProtobuf {
413 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
414 f.write_str("SEED");
415 if let Some(key) = &self.key {
416 f.write_str(" KEY ");
417 f.write_node(key);
418 }
419 f.write_str(" VALUE ");
420 f.write_node(&self.value);
421 }
422}
423impl_display!(CsrSeedProtobuf);
424
425#[derive(Debug, Clone, PartialEq, Eq, Hash)]
426pub struct CsrSeedProtobufSchema {
427 pub schema: String,
429 pub message_name: String,
430}
431impl AstDisplay for CsrSeedProtobufSchema {
432 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
433 f.write_str("SCHEMA '");
434 f.write_str(&display::escape_single_quote_string(&self.schema));
435 f.write_str("' MESSAGE '");
436 f.write_str(&self.message_name);
437 f.write_str("'");
438 }
439}
440impl_display!(CsrSeedProtobufSchema);
441
442#[derive(Debug, Clone, PartialEq, Eq, Hash)]
443pub enum FormatSpecifier<T: AstInfo> {
444 Bare(Format<T>),
446 KeyValue { key: Format<T>, value: Format<T> },
448}
449
450impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
451 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
452 match self {
453 FormatSpecifier::Bare(format) => {
454 f.write_str("FORMAT ");
455 f.write_node(format)
456 }
457 FormatSpecifier::KeyValue { key, value } => {
458 f.write_str("KEY FORMAT ");
459 f.write_node(key);
460 f.write_str(" VALUE FORMAT ");
461 f.write_node(value);
462 }
463 }
464 }
465}
466impl_display_t!(FormatSpecifier);
467
468#[derive(Debug, Clone, PartialEq, Eq, Hash)]
469pub enum Format<T: AstInfo> {
470 Bytes,
471 Avro(AvroSchema<T>),
472 Protobuf(ProtobufSchema<T>),
473 Regex(String),
474 Csv {
475 columns: CsvColumns,
476 delimiter: char,
477 },
478 Json {
479 array: bool,
480 },
481 Text,
482}
483
484#[derive(Debug, Clone, PartialEq, Eq, Hash)]
485pub enum CsvColumns {
486 Count(u64),
488 Header { names: Vec<Ident> },
490}
491
492impl AstDisplay for CsvColumns {
493 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
494 match self {
495 CsvColumns::Count(n) => {
496 f.write_str(n);
497 f.write_str(" COLUMNS")
498 }
499 CsvColumns::Header { names } => {
500 f.write_str("HEADER");
501 if !names.is_empty() {
502 f.write_str(" (");
503 f.write_node(&display::comma_separated(names));
504 f.write_str(")");
505 }
506 }
507 }
508 }
509}
510
511#[derive(Debug, Clone, PartialEq, Eq, Hash)]
512pub enum SourceIncludeMetadata {
513 Key {
514 alias: Option<Ident>,
515 },
516 Timestamp {
517 alias: Option<Ident>,
518 },
519 Partition {
520 alias: Option<Ident>,
521 },
522 Offset {
523 alias: Option<Ident>,
524 },
525 Headers {
526 alias: Option<Ident>,
527 },
528 Header {
529 key: String,
530 alias: Ident,
531 use_bytes: bool,
532 },
533}
534
535impl AstDisplay for SourceIncludeMetadata {
536 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
537 let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
538 if let Some(alias) = alias {
539 f.write_str(" AS ");
540 f.write_node(alias);
541 }
542 };
543
544 match self {
545 SourceIncludeMetadata::Key { alias } => {
546 f.write_str("KEY");
547 print_alias(f, alias);
548 }
549 SourceIncludeMetadata::Timestamp { alias } => {
550 f.write_str("TIMESTAMP");
551 print_alias(f, alias);
552 }
553 SourceIncludeMetadata::Partition { alias } => {
554 f.write_str("PARTITION");
555 print_alias(f, alias);
556 }
557 SourceIncludeMetadata::Offset { alias } => {
558 f.write_str("OFFSET");
559 print_alias(f, alias);
560 }
561 SourceIncludeMetadata::Headers { alias } => {
562 f.write_str("HEADERS");
563 print_alias(f, alias);
564 }
565 SourceIncludeMetadata::Header {
566 alias,
567 key,
568 use_bytes,
569 } => {
570 f.write_str("HEADER '");
571 f.write_str(&display::escape_single_quote_string(key));
572 f.write_str("'");
573 print_alias(f, &Some(alias.clone()));
574 if *use_bytes {
575 f.write_str(" BYTES");
576 }
577 }
578 }
579 }
580}
581impl_display!(SourceIncludeMetadata);
582
583#[derive(Debug, Clone, PartialEq, Eq, Hash)]
584pub enum SourceErrorPolicy {
585 Inline {
586 alias: Option<Ident>,
588 },
589}
590
591impl AstDisplay for SourceErrorPolicy {
592 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
593 match self {
594 Self::Inline { alias } => {
595 f.write_str("INLINE");
596 if let Some(alias) = alias {
597 f.write_str(" AS ");
598 f.write_node(alias);
599 }
600 }
601 }
602 }
603}
604impl_display!(SourceErrorPolicy);
605
606#[derive(Debug, Clone, PartialEq, Eq, Hash)]
607pub enum SourceEnvelope {
608 None,
609 Debezium,
610 Upsert {
611 value_decode_err_policy: Vec<SourceErrorPolicy>,
612 },
613 CdcV2,
614}
615
616impl SourceEnvelope {
617 pub fn requires_all_input(&self) -> bool {
620 match self {
621 SourceEnvelope::None => false,
622 SourceEnvelope::Debezium => false,
623 SourceEnvelope::Upsert { .. } => false,
624 SourceEnvelope::CdcV2 => true,
625 }
626 }
627}
628
629impl AstDisplay for SourceEnvelope {
630 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
631 match self {
632 Self::None => {
633 f.write_str("NONE");
635 }
636 Self::Debezium => {
637 f.write_str("DEBEZIUM");
638 }
639 Self::Upsert {
640 value_decode_err_policy,
641 } => {
642 if value_decode_err_policy.is_empty() {
643 f.write_str("UPSERT");
644 } else {
645 f.write_str("UPSERT (VALUE DECODING ERRORS = (");
646 f.write_node(&display::comma_separated(value_decode_err_policy));
647 f.write_str("))")
648 }
649 }
650 Self::CdcV2 => {
651 f.write_str("MATERIALIZE");
652 }
653 }
654 }
655}
656impl_display!(SourceEnvelope);
657
658#[derive(Debug, Clone, PartialEq, Eq, Hash)]
659pub enum SinkEnvelope {
660 Debezium,
661 Upsert,
662}
663
664impl AstDisplay for SinkEnvelope {
665 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
666 match self {
667 Self::Upsert => {
668 f.write_str("UPSERT");
669 }
670 Self::Debezium => {
671 f.write_str("DEBEZIUM");
672 }
673 }
674 }
675}
676impl_display!(SinkEnvelope);
677
678#[derive(Debug, Clone, PartialEq, Eq, Hash)]
679pub enum SubscribeOutput<T: AstInfo> {
680 Diffs,
681 WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
682 EnvelopeUpsert { key_columns: Vec<Ident> },
683 EnvelopeDebezium { key_columns: Vec<Ident> },
684}
685
686impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
687 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
688 match self {
689 Self::Diffs => {}
690 Self::WithinTimestampOrderBy { order_by } => {
691 f.write_str(" WITHIN TIMESTAMP ORDER BY ");
692 f.write_node(&display::comma_separated(order_by));
693 }
694 Self::EnvelopeUpsert { key_columns } => {
695 f.write_str(" ENVELOPE UPSERT (KEY (");
696 f.write_node(&display::comma_separated(key_columns));
697 f.write_str("))");
698 }
699 Self::EnvelopeDebezium { key_columns } => {
700 f.write_str(" ENVELOPE DEBEZIUM (KEY (");
701 f.write_node(&display::comma_separated(key_columns));
702 f.write_str("))");
703 }
704 }
705 }
706}
707impl_display_t!(SubscribeOutput);
708
709impl<T: AstInfo> AstDisplay for Format<T> {
710 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
711 match self {
712 Self::Bytes => f.write_str("BYTES"),
713 Self::Avro(inner) => {
714 f.write_str("AVRO ");
715 f.write_node(inner);
716 }
717 Self::Protobuf(inner) => {
718 f.write_str("PROTOBUF ");
719 f.write_node(inner);
720 }
721 Self::Regex(regex) => {
722 f.write_str("REGEX '");
723 f.write_node(&display::escape_single_quote_string(regex));
724 f.write_str("'");
725 }
726 Self::Csv { columns, delimiter } => {
727 f.write_str("CSV WITH ");
728 f.write_node(columns);
729
730 if *delimiter != ',' {
731 f.write_str(" DELIMITED BY '");
732 f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
733 f.write_str("'");
734 }
735 }
736 Self::Json { array } => {
737 f.write_str("JSON");
738 if *array {
739 f.write_str(" ARRAY");
740 }
741 }
742 Self::Text => f.write_str("TEXT"),
743 }
744 }
745}
746impl_display_t!(Format);
747
748#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
752pub enum ConnectionOptionName {
753 AccessKeyId,
754 AssumeRoleArn,
755 AssumeRoleSessionName,
756 AvailabilityZones,
757 AwsConnection,
758 AwsPrivatelink,
759 Broker,
760 Brokers,
761 Database,
762 Endpoint,
763 Host,
764 Password,
765 Port,
766 ProgressTopic,
767 ProgressTopicReplicationFactor,
768 PublicKey1,
769 PublicKey2,
770 Region,
771 SaslMechanisms,
772 SaslPassword,
773 SaslUsername,
774 SecretAccessKey,
775 SecurityProtocol,
776 ServiceName,
777 SshTunnel,
778 SslCertificate,
779 SslCertificateAuthority,
780 SslKey,
781 SslMode,
782 SessionToken,
783 Url,
784 User,
785}
786
787impl AstDisplay for ConnectionOptionName {
788 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
789 f.write_str(match self {
790 ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
791 ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
792 ConnectionOptionName::AwsConnection => "AWS CONNECTION",
793 ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
794 ConnectionOptionName::Broker => "BROKER",
795 ConnectionOptionName::Brokers => "BROKERS",
796 ConnectionOptionName::Database => "DATABASE",
797 ConnectionOptionName::Endpoint => "ENDPOINT",
798 ConnectionOptionName::Host => "HOST",
799 ConnectionOptionName::Password => "PASSWORD",
800 ConnectionOptionName::Port => "PORT",
801 ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
802 ConnectionOptionName::ProgressTopicReplicationFactor => {
803 "PROGRESS TOPIC REPLICATION FACTOR"
804 }
805 ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
806 ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
807 ConnectionOptionName::Region => "REGION",
808 ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
809 ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
810 ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
811 ConnectionOptionName::SaslPassword => "SASL PASSWORD",
812 ConnectionOptionName::SaslUsername => "SASL USERNAME",
813 ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
814 ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
815 ConnectionOptionName::ServiceName => "SERVICE NAME",
816 ConnectionOptionName::SshTunnel => "SSH TUNNEL",
817 ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
818 ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
819 ConnectionOptionName::SslKey => "SSL KEY",
820 ConnectionOptionName::SslMode => "SSL MODE",
821 ConnectionOptionName::SessionToken => "SESSION TOKEN",
822 ConnectionOptionName::Url => "URL",
823 ConnectionOptionName::User => "USER",
824 })
825 }
826}
827impl_display!(ConnectionOptionName);
828
829impl WithOptionName for ConnectionOptionName {
830 fn redact_value(&self) -> bool {
836 match self {
837 ConnectionOptionName::AccessKeyId
838 | ConnectionOptionName::AvailabilityZones
839 | ConnectionOptionName::AwsConnection
840 | ConnectionOptionName::AwsPrivatelink
841 | ConnectionOptionName::Broker
842 | ConnectionOptionName::Brokers
843 | ConnectionOptionName::Database
844 | ConnectionOptionName::Endpoint
845 | ConnectionOptionName::Host
846 | ConnectionOptionName::Password
847 | ConnectionOptionName::Port
848 | ConnectionOptionName::ProgressTopic
849 | ConnectionOptionName::ProgressTopicReplicationFactor
850 | ConnectionOptionName::PublicKey1
851 | ConnectionOptionName::PublicKey2
852 | ConnectionOptionName::Region
853 | ConnectionOptionName::AssumeRoleArn
854 | ConnectionOptionName::AssumeRoleSessionName
855 | ConnectionOptionName::SaslMechanisms
856 | ConnectionOptionName::SaslPassword
857 | ConnectionOptionName::SaslUsername
858 | ConnectionOptionName::SecurityProtocol
859 | ConnectionOptionName::SecretAccessKey
860 | ConnectionOptionName::ServiceName
861 | ConnectionOptionName::SshTunnel
862 | ConnectionOptionName::SslCertificate
863 | ConnectionOptionName::SslCertificateAuthority
864 | ConnectionOptionName::SslKey
865 | ConnectionOptionName::SslMode
866 | ConnectionOptionName::SessionToken
867 | ConnectionOptionName::Url
868 | ConnectionOptionName::User => false,
869 }
870 }
871}
872
873#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
874pub struct ConnectionOption<T: AstInfo> {
876 pub name: ConnectionOptionName,
877 pub value: Option<WithOptionValue<T>>,
878}
879impl_display_for_with_option!(ConnectionOption);
880impl_display_t!(ConnectionOption);
881
882#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
883pub enum CreateConnectionType {
884 Aws,
885 AwsPrivatelink,
886 Kafka,
887 Csr,
888 Postgres,
889 Ssh,
890 SqlServer,
891 MySql,
892 Yugabyte,
893}
894
895impl AstDisplay for CreateConnectionType {
896 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
897 match self {
898 Self::Kafka => {
899 f.write_str("KAFKA");
900 }
901 Self::Csr => {
902 f.write_str("CONFLUENT SCHEMA REGISTRY");
903 }
904 Self::Postgres => {
905 f.write_str("POSTGRES");
906 }
907 Self::Aws => {
908 f.write_str("AWS");
909 }
910 Self::AwsPrivatelink => {
911 f.write_str("AWS PRIVATELINK");
912 }
913 Self::Ssh => {
914 f.write_str("SSH TUNNEL");
915 }
916 Self::SqlServer => {
917 f.write_str("SQL SERVER");
918 }
919 Self::MySql => {
920 f.write_str("MYSQL");
921 }
922 Self::Yugabyte => {
923 f.write_str("YUGABYTE");
924 }
925 }
926 }
927}
928impl_display!(CreateConnectionType);
929
930#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
931pub enum CreateConnectionOptionName {
932 Validate,
933}
934
935impl AstDisplay for CreateConnectionOptionName {
936 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
937 f.write_str(match self {
938 CreateConnectionOptionName::Validate => "VALIDATE",
939 })
940 }
941}
942impl_display!(CreateConnectionOptionName);
943
944impl WithOptionName for CreateConnectionOptionName {
945 fn redact_value(&self) -> bool {
951 match self {
952 CreateConnectionOptionName::Validate => false,
953 }
954 }
955}
956
957#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
958pub struct CreateConnectionOption<T: AstInfo> {
960 pub name: CreateConnectionOptionName,
961 pub value: Option<WithOptionValue<T>>,
962}
963impl_display_for_with_option!(CreateConnectionOption);
964impl_display_t!(CreateConnectionOption);
965
966#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
967pub enum KafkaSourceConfigOptionName {
968 GroupIdPrefix,
969 Topic,
970 TopicMetadataRefreshInterval,
971 StartTimestamp,
972 StartOffset,
973}
974
975impl AstDisplay for KafkaSourceConfigOptionName {
976 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
977 f.write_str(match self {
978 KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
979 KafkaSourceConfigOptionName::Topic => "TOPIC",
980 KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
981 "TOPIC METADATA REFRESH INTERVAL"
982 }
983 KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
984 KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
985 })
986 }
987}
988impl_display!(KafkaSourceConfigOptionName);
989
990impl WithOptionName for KafkaSourceConfigOptionName {
991 fn redact_value(&self) -> bool {
997 match self {
998 KafkaSourceConfigOptionName::GroupIdPrefix
999 | KafkaSourceConfigOptionName::Topic
1000 | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1001 | KafkaSourceConfigOptionName::StartOffset
1002 | KafkaSourceConfigOptionName::StartTimestamp => false,
1003 }
1004 }
1005}
1006
1007#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1008pub struct KafkaSourceConfigOption<T: AstInfo> {
1009 pub name: KafkaSourceConfigOptionName,
1010 pub value: Option<WithOptionValue<T>>,
1011}
1012impl_display_for_with_option!(KafkaSourceConfigOption);
1013impl_display_t!(KafkaSourceConfigOption);
1014
1015#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1016pub enum KafkaSinkConfigOptionName {
1017 CompressionType,
1018 PartitionBy,
1019 ProgressGroupIdPrefix,
1020 Topic,
1021 TransactionalIdPrefix,
1022 LegacyIds,
1023 TopicConfig,
1024 TopicMetadataRefreshInterval,
1025 TopicPartitionCount,
1026 TopicReplicationFactor,
1027}
1028
1029impl AstDisplay for KafkaSinkConfigOptionName {
1030 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1031 f.write_str(match self {
1032 KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1033 KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1034 KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1035 KafkaSinkConfigOptionName::Topic => "TOPIC",
1036 KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1037 KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1038 KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1039 KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1040 "TOPIC METADATA REFRESH INTERVAL"
1041 }
1042 KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1043 KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1044 })
1045 }
1046}
1047impl_display!(KafkaSinkConfigOptionName);
1048
1049impl WithOptionName for KafkaSinkConfigOptionName {
1050 fn redact_value(&self) -> bool {
1056 match self {
1057 KafkaSinkConfigOptionName::CompressionType
1058 | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1059 | KafkaSinkConfigOptionName::Topic
1060 | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1061 | KafkaSinkConfigOptionName::TransactionalIdPrefix
1062 | KafkaSinkConfigOptionName::LegacyIds
1063 | KafkaSinkConfigOptionName::TopicConfig
1064 | KafkaSinkConfigOptionName::TopicPartitionCount
1065 | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1066 KafkaSinkConfigOptionName::PartitionBy => true,
1067 }
1068 }
1069}
1070
1071#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1072pub struct KafkaSinkConfigOption<T: AstInfo> {
1073 pub name: KafkaSinkConfigOptionName,
1074 pub value: Option<WithOptionValue<T>>,
1075}
1076impl_display_for_with_option!(KafkaSinkConfigOption);
1077impl_display_t!(KafkaSinkConfigOption);
1078
1079#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1080pub enum PgConfigOptionName {
1081 Details,
1084 Publication,
1086 TextColumns,
1093}
1094
1095impl AstDisplay for PgConfigOptionName {
1096 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1097 f.write_str(match self {
1098 PgConfigOptionName::Details => "DETAILS",
1099 PgConfigOptionName::Publication => "PUBLICATION",
1100 PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1101 })
1102 }
1103}
1104impl_display!(PgConfigOptionName);
1105
1106impl WithOptionName for PgConfigOptionName {
1107 fn redact_value(&self) -> bool {
1113 match self {
1114 PgConfigOptionName::Details
1115 | PgConfigOptionName::Publication
1116 | PgConfigOptionName::TextColumns => false,
1117 }
1118 }
1119}
1120
1121#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1122pub struct PgConfigOption<T: AstInfo> {
1124 pub name: PgConfigOptionName,
1125 pub value: Option<WithOptionValue<T>>,
1126}
1127impl_display_for_with_option!(PgConfigOption);
1128impl_display_t!(PgConfigOption);
1129
1130#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1131pub enum MySqlConfigOptionName {
1132 Details,
1135 TextColumns,
1142 ExcludeColumns,
1149}
1150
1151impl AstDisplay for MySqlConfigOptionName {
1152 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1153 f.write_str(match self {
1154 MySqlConfigOptionName::Details => "DETAILS",
1155 MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1156 MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1157 })
1158 }
1159}
1160impl_display!(MySqlConfigOptionName);
1161
1162impl WithOptionName for MySqlConfigOptionName {
1163 fn redact_value(&self) -> bool {
1169 match self {
1170 MySqlConfigOptionName::Details
1171 | MySqlConfigOptionName::TextColumns
1172 | MySqlConfigOptionName::ExcludeColumns => false,
1173 }
1174 }
1175}
1176
1177#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1178pub struct MySqlConfigOption<T: AstInfo> {
1180 pub name: MySqlConfigOptionName,
1181 pub value: Option<WithOptionValue<T>>,
1182}
1183impl_display_for_with_option!(MySqlConfigOption);
1184impl_display_t!(MySqlConfigOption);
1185
1186#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1187pub enum SqlServerConfigOptionName {
1188 Details,
1191 TextColumns,
1199 ExcludeColumns,
1207}
1208
1209impl AstDisplay for SqlServerConfigOptionName {
1210 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1211 f.write_str(match self {
1212 SqlServerConfigOptionName::Details => "DETAILS",
1213 SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1214 SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1215 })
1216 }
1217}
1218impl_display!(SqlServerConfigOptionName);
1219
1220impl WithOptionName for SqlServerConfigOptionName {
1221 fn redact_value(&self) -> bool {
1227 match self {
1228 SqlServerConfigOptionName::Details
1229 | SqlServerConfigOptionName::TextColumns
1230 | SqlServerConfigOptionName::ExcludeColumns => false,
1231 }
1232 }
1233}
1234
1235#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1236pub struct SqlServerConfigOption<T: AstInfo> {
1238 pub name: SqlServerConfigOptionName,
1239 pub value: Option<WithOptionValue<T>>,
1240}
1241impl_display_for_with_option!(SqlServerConfigOption);
1242impl_display_t!(SqlServerConfigOption);
1243
1244#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1245pub enum CreateSourceConnection<T: AstInfo> {
1246 Kafka {
1247 connection: T::ItemName,
1248 options: Vec<KafkaSourceConfigOption<T>>,
1249 },
1250 Postgres {
1251 connection: T::ItemName,
1252 options: Vec<PgConfigOption<T>>,
1253 },
1254 Yugabyte {
1255 connection: T::ItemName,
1256 options: Vec<PgConfigOption<T>>,
1257 },
1258 SqlServer {
1259 connection: T::ItemName,
1260 options: Vec<SqlServerConfigOption<T>>,
1261 },
1262 MySql {
1263 connection: T::ItemName,
1264 options: Vec<MySqlConfigOption<T>>,
1265 },
1266 LoadGenerator {
1267 generator: LoadGenerator,
1268 options: Vec<LoadGeneratorOption<T>>,
1269 },
1270}
1271
1272impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1273 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1274 match self {
1275 CreateSourceConnection::Kafka {
1276 connection,
1277 options,
1278 } => {
1279 f.write_str("KAFKA CONNECTION ");
1280 f.write_node(connection);
1281 if !options.is_empty() {
1282 f.write_str(" (");
1283 f.write_node(&display::comma_separated(options));
1284 f.write_str(")");
1285 }
1286 }
1287 CreateSourceConnection::Postgres {
1288 connection,
1289 options,
1290 } => {
1291 f.write_str("POSTGRES CONNECTION ");
1292 f.write_node(connection);
1293 if !options.is_empty() {
1294 f.write_str(" (");
1295 f.write_node(&display::comma_separated(options));
1296 f.write_str(")");
1297 }
1298 }
1299 CreateSourceConnection::Yugabyte {
1300 connection,
1301 options,
1302 } => {
1303 f.write_str("YUGABYTE CONNECTION ");
1304 f.write_node(connection);
1305 if !options.is_empty() {
1306 f.write_str(" (");
1307 f.write_node(&display::comma_separated(options));
1308 f.write_str(")");
1309 }
1310 }
1311 CreateSourceConnection::SqlServer {
1312 connection,
1313 options,
1314 } => {
1315 f.write_str("SQL SERVER CONNECTION ");
1316 f.write_node(connection);
1317 if !options.is_empty() {
1318 f.write_str(" (");
1319 f.write_node(&display::comma_separated(options));
1320 f.write_str(")");
1321 }
1322 }
1323 CreateSourceConnection::MySql {
1324 connection,
1325 options,
1326 } => {
1327 f.write_str("MYSQL CONNECTION ");
1328 f.write_node(connection);
1329 if !options.is_empty() {
1330 f.write_str(" (");
1331 f.write_node(&display::comma_separated(options));
1332 f.write_str(")");
1333 }
1334 }
1335 CreateSourceConnection::LoadGenerator { generator, options } => {
1336 f.write_str("LOAD GENERATOR ");
1337 f.write_node(generator);
1338 if !options.is_empty() {
1339 f.write_str(" (");
1340 f.write_node(&display::comma_separated(options));
1341 f.write_str(")");
1342 }
1343 }
1344 }
1345 }
1346}
1347impl_display_t!(CreateSourceConnection);
1348
1349#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1350pub enum LoadGenerator {
1351 Clock,
1352 Counter,
1353 Marketing,
1354 Auction,
1355 Datums,
1356 Tpch,
1357 KeyValue,
1358}
1359
1360impl AstDisplay for LoadGenerator {
1361 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1362 match self {
1363 Self::Counter => f.write_str("COUNTER"),
1364 Self::Clock => f.write_str("CLOCK"),
1365 Self::Marketing => f.write_str("MARKETING"),
1366 Self::Auction => f.write_str("AUCTION"),
1367 Self::Datums => f.write_str("DATUMS"),
1368 Self::Tpch => f.write_str("TPCH"),
1369 Self::KeyValue => f.write_str("KEY VALUE"),
1370 }
1371 }
1372}
1373impl_display!(LoadGenerator);
1374
1375impl LoadGenerator {
1376 pub fn schema_name(&self) -> &'static str {
1381 match self {
1382 LoadGenerator::Counter => "counter",
1383 LoadGenerator::Clock => "clock",
1384 LoadGenerator::Marketing => "marketing",
1385 LoadGenerator::Auction => "auction",
1386 LoadGenerator::Datums => "datums",
1387 LoadGenerator::Tpch => "tpch",
1388 LoadGenerator::KeyValue => "key_value",
1389 }
1390 }
1391}
1392
1393#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1394pub enum LoadGeneratorOptionName {
1395 ScaleFactor,
1396 TickInterval,
1397 AsOf,
1398 UpTo,
1399 MaxCardinality,
1400 Keys,
1401 SnapshotRounds,
1402 TransactionalSnapshot,
1403 ValueSize,
1404 Seed,
1405 Partitions,
1406 BatchSize,
1407}
1408
1409impl AstDisplay for LoadGeneratorOptionName {
1410 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1411 f.write_str(match self {
1412 LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1413 LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1414 LoadGeneratorOptionName::AsOf => "AS OF",
1415 LoadGeneratorOptionName::UpTo => "UP TO",
1416 LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1417 LoadGeneratorOptionName::Keys => "KEYS",
1418 LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1419 LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1420 LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1421 LoadGeneratorOptionName::Seed => "SEED",
1422 LoadGeneratorOptionName::Partitions => "PARTITIONS",
1423 LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1424 })
1425 }
1426}
1427impl_display!(LoadGeneratorOptionName);
1428
1429impl WithOptionName for LoadGeneratorOptionName {
1430 fn redact_value(&self) -> bool {
1436 match self {
1437 LoadGeneratorOptionName::ScaleFactor
1438 | LoadGeneratorOptionName::TickInterval
1439 | LoadGeneratorOptionName::AsOf
1440 | LoadGeneratorOptionName::UpTo
1441 | LoadGeneratorOptionName::MaxCardinality
1442 | LoadGeneratorOptionName::Keys
1443 | LoadGeneratorOptionName::SnapshotRounds
1444 | LoadGeneratorOptionName::TransactionalSnapshot
1445 | LoadGeneratorOptionName::ValueSize
1446 | LoadGeneratorOptionName::Partitions
1447 | LoadGeneratorOptionName::BatchSize
1448 | LoadGeneratorOptionName::Seed => false,
1449 }
1450 }
1451}
1452
1453#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1454pub struct LoadGeneratorOption<T: AstInfo> {
1456 pub name: LoadGeneratorOptionName,
1457 pub value: Option<WithOptionValue<T>>,
1458}
1459impl_display_for_with_option!(LoadGeneratorOption);
1460impl_display_t!(LoadGeneratorOption);
1461
1462#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1463pub enum CreateSinkConnection<T: AstInfo> {
1464 Kafka {
1465 connection: T::ItemName,
1466 options: Vec<KafkaSinkConfigOption<T>>,
1467 key: Option<KafkaSinkKey>,
1468 headers: Option<Ident>,
1469 },
1470}
1471
1472impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1473 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1474 match self {
1475 CreateSinkConnection::Kafka {
1476 connection,
1477 options,
1478 key,
1479 headers,
1480 } => {
1481 f.write_str("KAFKA CONNECTION ");
1482 f.write_node(connection);
1483 if !options.is_empty() {
1484 f.write_str(" (");
1485 f.write_node(&display::comma_separated(options));
1486 f.write_str(")");
1487 }
1488 if let Some(key) = key.as_ref() {
1489 f.write_node(key);
1490 }
1491 if let Some(headers) = headers {
1492 f.write_str(" HEADERS ");
1493 f.write_node(headers);
1494 }
1495 }
1496 }
1497 }
1498}
1499impl_display_t!(CreateSinkConnection);
1500
1501#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1502pub struct KafkaSinkKey {
1503 pub key_columns: Vec<Ident>,
1504 pub not_enforced: bool,
1505}
1506
1507impl AstDisplay for KafkaSinkKey {
1508 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1509 f.write_str(" KEY (");
1510 f.write_node(&display::comma_separated(&self.key_columns));
1511 f.write_str(")");
1512 if self.not_enforced {
1513 f.write_str(" NOT ENFORCED");
1514 }
1515 }
1516}
1517
1518#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1521pub enum TableConstraint<T: AstInfo> {
1522 Unique {
1524 name: Option<Ident>,
1525 columns: Vec<Ident>,
1526 is_primary: bool,
1528 nulls_not_distinct: bool,
1531 },
1532 ForeignKey {
1535 name: Option<Ident>,
1536 columns: Vec<Ident>,
1537 foreign_table: T::ItemName,
1538 referred_columns: Vec<Ident>,
1539 },
1540 Check {
1542 name: Option<Ident>,
1543 expr: Box<Expr<T>>,
1544 },
1545}
1546
1547impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1548 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1549 match self {
1550 TableConstraint::Unique {
1551 name,
1552 columns,
1553 is_primary,
1554 nulls_not_distinct,
1555 } => {
1556 f.write_node(&display_constraint_name(name));
1557 if *is_primary {
1558 f.write_str("PRIMARY KEY ");
1559 } else {
1560 f.write_str("UNIQUE ");
1561 if *nulls_not_distinct {
1562 f.write_str("NULLS NOT DISTINCT ");
1563 }
1564 }
1565 f.write_str("(");
1566 f.write_node(&display::comma_separated(columns));
1567 f.write_str(")");
1568 }
1569 TableConstraint::ForeignKey {
1570 name,
1571 columns,
1572 foreign_table,
1573 referred_columns,
1574 } => {
1575 f.write_node(&display_constraint_name(name));
1576 f.write_str("FOREIGN KEY (");
1577 f.write_node(&display::comma_separated(columns));
1578 f.write_str(") REFERENCES ");
1579 f.write_node(foreign_table);
1580 f.write_str("(");
1581 f.write_node(&display::comma_separated(referred_columns));
1582 f.write_str(")");
1583 }
1584 TableConstraint::Check { name, expr } => {
1585 f.write_node(&display_constraint_name(name));
1586 f.write_str("CHECK (");
1587 f.write_node(&expr);
1588 f.write_str(")");
1589 }
1590 }
1591 }
1592}
1593impl_display_t!(TableConstraint);
1594
1595#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1597pub enum KeyConstraint {
1598 PrimaryKeyNotEnforced { columns: Vec<Ident> },
1600}
1601
1602impl AstDisplay for KeyConstraint {
1603 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1604 match self {
1605 KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1606 f.write_str("PRIMARY KEY ");
1607 f.write_str("(");
1608 f.write_node(&display::comma_separated(columns));
1609 f.write_str(") ");
1610 f.write_str("NOT ENFORCED");
1611 }
1612 }
1613 }
1614}
1615impl_display!(KeyConstraint);
1616
1617#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1618pub enum CreateSourceOptionName {
1619 TimestampInterval,
1620 RetainHistory,
1621}
1622
1623impl AstDisplay for CreateSourceOptionName {
1624 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1625 f.write_str(match self {
1626 CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1627 CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1628 })
1629 }
1630}
1631impl_display!(CreateSourceOptionName);
1632
1633impl WithOptionName for CreateSourceOptionName {
1634 fn redact_value(&self) -> bool {
1640 match self {
1641 CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1642 false
1643 }
1644 }
1645 }
1646}
1647
1648#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1649pub struct CreateSourceOption<T: AstInfo> {
1651 pub name: CreateSourceOptionName,
1652 pub value: Option<WithOptionValue<T>>,
1653}
1654impl_display_for_with_option!(CreateSourceOption);
1655impl_display_t!(CreateSourceOption);
1656
1657#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1659pub struct ColumnDef<T: AstInfo> {
1660 pub name: Ident,
1661 pub data_type: T::DataType,
1662 pub collation: Option<UnresolvedItemName>,
1663 pub options: Vec<ColumnOptionDef<T>>,
1664}
1665
1666impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1667 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1668 f.write_node(&self.name);
1669 f.write_str(" ");
1670 f.write_node(&self.data_type);
1671 if let Some(collation) = &self.collation {
1672 f.write_str(" COLLATE ");
1673 f.write_node(collation);
1674 }
1675 for option in &self.options {
1676 f.write_str(" ");
1677 f.write_node(option);
1678 }
1679 }
1680}
1681impl_display_t!(ColumnDef);
1682
1683#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1700pub struct ColumnOptionDef<T: AstInfo> {
1701 pub name: Option<Ident>,
1702 pub option: ColumnOption<T>,
1703}
1704
1705impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1706 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1707 f.write_node(&display_constraint_name(&self.name));
1708 f.write_node(&self.option);
1709 }
1710}
1711impl_display_t!(ColumnOptionDef);
1712
1713#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1716pub enum ColumnOption<T: AstInfo> {
1717 Null,
1719 NotNull,
1721 Default(Expr<T>),
1723 Unique { is_primary: bool },
1725 ForeignKey {
1728 foreign_table: UnresolvedItemName,
1729 referred_columns: Vec<Ident>,
1730 },
1731 Check(Expr<T>),
1733 Versioned {
1735 action: ColumnVersioned,
1736 version: Version,
1737 },
1738}
1739
1740impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1741 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1742 use ColumnOption::*;
1743 match self {
1744 Null => f.write_str("NULL"),
1745 NotNull => f.write_str("NOT NULL"),
1746 Default(expr) => {
1747 f.write_str("DEFAULT ");
1748 f.write_node(expr);
1749 }
1750 Unique { is_primary } => {
1751 if *is_primary {
1752 f.write_str("PRIMARY KEY");
1753 } else {
1754 f.write_str("UNIQUE");
1755 }
1756 }
1757 ForeignKey {
1758 foreign_table,
1759 referred_columns,
1760 } => {
1761 f.write_str("REFERENCES ");
1762 f.write_node(foreign_table);
1763 f.write_str(" (");
1764 f.write_node(&display::comma_separated(referred_columns));
1765 f.write_str(")");
1766 }
1767 Check(expr) => {
1768 f.write_str("CHECK (");
1769 f.write_node(expr);
1770 f.write_str(")");
1771 }
1772 Versioned { action, version } => {
1773 f.write_str("VERSION ");
1774 f.write_node(action);
1775 f.write_str(" ");
1776 f.write_node(version);
1777 }
1778 }
1779 }
1780}
1781impl_display_t!(ColumnOption);
1782
1783#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1784pub enum ColumnVersioned {
1785 Added,
1786}
1787
1788impl AstDisplay for ColumnVersioned {
1789 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1790 where
1791 W: fmt::Write,
1792 {
1793 match self {
1794 ColumnVersioned::Added => f.write_str("ADDED"),
1796 }
1797 }
1798}
1799impl_display!(ColumnVersioned);
1800
1801fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
1802 struct ConstraintName<'a>(&'a Option<Ident>);
1803 impl<'a> AstDisplay for ConstraintName<'a> {
1804 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1805 where
1806 W: fmt::Write,
1807 {
1808 if let Some(name) = self.0 {
1809 f.write_str("CONSTRAINT ");
1810 f.write_node(name);
1811 f.write_str(" ");
1812 }
1813 }
1814 }
1815 ConstraintName(name)
1816}