1use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28 AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33 AssertNotNull,
35 PartitionBy,
36 RetainHistory,
37 Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43 match self {
44 MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45 MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46 MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47 MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48 }
49 }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53 fn redact_value(&self) -> bool {
59 match self {
60 MaterializedViewOptionName::AssertNotNull
61 | MaterializedViewOptionName::PartitionBy
62 | MaterializedViewOptionName::RetainHistory
63 | MaterializedViewOptionName::Refresh => false,
64 }
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70 pub name: MaterializedViewOptionName,
71 pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
76pub enum ContinualTaskOptionName {
77 Snapshot,
79}
80
81impl AstDisplay for ContinualTaskOptionName {
82 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
83 match self {
84 ContinualTaskOptionName::Snapshot => f.write_str("SNAPSHOT"),
85 }
86 }
87}
88
89impl WithOptionName for ContinualTaskOptionName {
90 fn redact_value(&self) -> bool {
96 match self {
97 ContinualTaskOptionName::Snapshot => false,
98 }
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
103pub struct ContinualTaskOption<T: AstInfo> {
104 pub name: ContinualTaskOptionName,
105 pub value: Option<WithOptionValue<T>>,
106}
107impl_display_for_with_option!(ContinualTaskOption);
108
109#[derive(Debug, Clone, PartialEq, Eq, Hash)]
110pub struct Schema {
111 pub schema: String,
112}
113
114impl AstDisplay for Schema {
115 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
116 f.write_str("SCHEMA '");
117 f.write_node(&display::escape_single_quote_string(&self.schema));
118 f.write_str("'");
119 }
120}
121impl_display!(Schema);
122
123#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
124pub enum AvroSchemaOptionName {
125 ConfluentWireFormat,
127}
128
129impl AstDisplay for AvroSchemaOptionName {
130 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
131 match self {
132 AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
133 }
134 }
135}
136
137impl WithOptionName for AvroSchemaOptionName {
138 fn redact_value(&self) -> bool {
144 match self {
145 Self::ConfluentWireFormat => false,
146 }
147 }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
151pub struct AvroSchemaOption<T: AstInfo> {
152 pub name: AvroSchemaOptionName,
153 pub value: Option<WithOptionValue<T>>,
154}
155impl_display_for_with_option!(AvroSchemaOption);
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum AvroSchema<T: AstInfo> {
159 Csr {
160 csr_connection: CsrConnectionAvro<T>,
161 },
162 InlineSchema {
163 schema: Schema,
164 with_options: Vec<AvroSchemaOption<T>>,
165 },
166}
167
168impl<T: AstInfo> AstDisplay for AvroSchema<T> {
169 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
170 match self {
171 Self::Csr { csr_connection } => {
172 f.write_node(csr_connection);
173 }
174 Self::InlineSchema {
175 schema,
176 with_options,
177 } => {
178 f.write_str("USING ");
179 schema.fmt(f);
180 if !with_options.is_empty() {
181 f.write_str(" (");
182 f.write_node(&display::comma_separated(with_options));
183 f.write_str(")");
184 }
185 }
186 }
187 }
188}
189impl_display_t!(AvroSchema);
190
191#[derive(Debug, Clone, PartialEq, Eq, Hash)]
192pub enum ProtobufSchema<T: AstInfo> {
193 Csr {
194 csr_connection: CsrConnectionProtobuf<T>,
195 },
196 InlineSchema {
197 message_name: String,
198 schema: Schema,
199 },
200}
201
202impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
203 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
204 match self {
205 Self::Csr { csr_connection } => {
206 f.write_node(csr_connection);
207 }
208 Self::InlineSchema {
209 message_name,
210 schema,
211 } => {
212 f.write_str("MESSAGE '");
213 f.write_node(&display::escape_single_quote_string(message_name));
214 f.write_str("' USING ");
215 f.write_str(schema);
216 }
217 }
218 }
219}
220impl_display_t!(ProtobufSchema);
221
222#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
223pub enum CsrConfigOptionName<T: AstInfo> {
224 AvroKeyFullname,
225 AvroValueFullname,
226 NullDefaults,
227 AvroDocOn(AvroDocOn<T>),
228 KeyCompatibilityLevel,
229 ValueCompatibilityLevel,
230}
231
232impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
233 fn redact_value(&self) -> bool {
239 match self {
240 Self::AvroKeyFullname
241 | Self::AvroValueFullname
242 | Self::NullDefaults
243 | Self::AvroDocOn(_)
244 | Self::KeyCompatibilityLevel
245 | Self::ValueCompatibilityLevel => false,
246 }
247 }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
251pub struct AvroDocOn<T: AstInfo> {
252 pub identifier: DocOnIdentifier<T>,
253 pub for_schema: DocOnSchema,
254}
255#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
256pub enum DocOnSchema {
257 KeyOnly,
258 ValueOnly,
259 All,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
263pub enum DocOnIdentifier<T: AstInfo> {
264 Column(ColumnName<T>),
265 Type(T::ItemName),
266}
267
268impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
269 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
270 match &self.for_schema {
271 DocOnSchema::KeyOnly => f.write_str("KEY "),
272 DocOnSchema::ValueOnly => f.write_str("VALUE "),
273 DocOnSchema::All => {}
274 }
275 match &self.identifier {
276 DocOnIdentifier::Column(name) => {
277 f.write_str("DOC ON COLUMN ");
278 f.write_node(name);
279 }
280 DocOnIdentifier::Type(name) => {
281 f.write_str("DOC ON TYPE ");
282 f.write_node(name);
283 }
284 }
285 }
286}
287impl_display_t!(AvroDocOn);
288
289impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
290 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
291 match self {
292 CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
293 CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
294 CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
295 CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
296 CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
297 CsrConfigOptionName::ValueCompatibilityLevel => {
298 f.write_str("VALUE COMPATIBILITY LEVEL")
299 }
300 }
301 }
302}
303impl_display_t!(CsrConfigOptionName);
304
305#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
306pub struct CsrConfigOption<T: AstInfo> {
308 pub name: CsrConfigOptionName<T>,
309 pub value: Option<WithOptionValue<T>>,
310}
311impl_display_for_with_option!(CsrConfigOption);
312impl_display_t!(CsrConfigOption);
313
314#[derive(Debug, Clone, PartialEq, Eq, Hash)]
315pub struct CsrConnection<T: AstInfo> {
316 pub connection: T::ItemName,
317 pub options: Vec<CsrConfigOption<T>>,
318}
319
320impl<T: AstInfo> AstDisplay for CsrConnection<T> {
321 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
322 f.write_str("CONNECTION ");
323 f.write_node(&self.connection);
324 if !self.options.is_empty() {
325 f.write_str(" (");
326 f.write_node(&display::comma_separated(&self.options));
327 f.write_str(")");
328 }
329 }
330}
331impl_display_t!(CsrConnection);
332
333#[derive(Debug, Clone, PartialEq, Eq, Hash)]
334pub enum ReaderSchemaSelectionStrategy {
335 Latest,
336 Inline(String),
337 ById(i32),
338}
339
340impl Default for ReaderSchemaSelectionStrategy {
341 fn default() -> Self {
342 Self::Latest
343 }
344}
345
346#[derive(Debug, Clone, PartialEq, Eq, Hash)]
347pub struct CsrConnectionAvro<T: AstInfo> {
348 pub connection: CsrConnection<T>,
349 pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
350 pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
351 pub seed: Option<CsrSeedAvro>,
352}
353
354impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
355 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
356 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
357 f.write_node(&self.connection);
358 if let Some(seed) = &self.seed {
359 f.write_str(" ");
360 f.write_node(seed);
361 }
362 }
363}
364impl_display_t!(CsrConnectionAvro);
365
366#[derive(Debug, Clone, PartialEq, Eq, Hash)]
367pub struct CsrConnectionProtobuf<T: AstInfo> {
368 pub connection: CsrConnection<T>,
369 pub seed: Option<CsrSeedProtobuf>,
370}
371
372impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
373 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
374 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
375 f.write_node(&self.connection);
376
377 if let Some(seed) = &self.seed {
378 f.write_str(" ");
379 f.write_node(seed);
380 }
381 }
382}
383impl_display_t!(CsrConnectionProtobuf);
384
385#[derive(Debug, Clone, PartialEq, Eq, Hash)]
386pub struct CsrSeedAvro {
387 pub key_schema: Option<String>,
388 pub value_schema: String,
389}
390
391impl AstDisplay for CsrSeedAvro {
392 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
393 f.write_str("SEED");
394 if let Some(key_schema) = &self.key_schema {
395 f.write_str(" KEY SCHEMA '");
396 f.write_node(&display::escape_single_quote_string(key_schema));
397 f.write_str("'");
398 }
399 f.write_str(" VALUE SCHEMA '");
400 f.write_node(&display::escape_single_quote_string(&self.value_schema));
401 f.write_str("'");
402 }
403}
404impl_display!(CsrSeedAvro);
405
406#[derive(Debug, Clone, PartialEq, Eq, Hash)]
407pub struct CsrSeedProtobuf {
408 pub key: Option<CsrSeedProtobufSchema>,
409 pub value: CsrSeedProtobufSchema,
410}
411
412impl AstDisplay for CsrSeedProtobuf {
413 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
414 f.write_str("SEED");
415 if let Some(key) = &self.key {
416 f.write_str(" KEY ");
417 f.write_node(key);
418 }
419 f.write_str(" VALUE ");
420 f.write_node(&self.value);
421 }
422}
423impl_display!(CsrSeedProtobuf);
424
425#[derive(Debug, Clone, PartialEq, Eq, Hash)]
426pub struct CsrSeedProtobufSchema {
427 pub schema: String,
429 pub message_name: String,
430}
431impl AstDisplay for CsrSeedProtobufSchema {
432 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
433 f.write_str("SCHEMA '");
434 f.write_str(&display::escape_single_quote_string(&self.schema));
435 f.write_str("' MESSAGE '");
436 f.write_str(&self.message_name);
437 f.write_str("'");
438 }
439}
440impl_display!(CsrSeedProtobufSchema);
441
442#[derive(Debug, Clone, PartialEq, Eq, Hash)]
443pub enum FormatSpecifier<T: AstInfo> {
444 Bare(Format<T>),
446 KeyValue { key: Format<T>, value: Format<T> },
448}
449
450impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
451 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
452 match self {
453 FormatSpecifier::Bare(format) => {
454 f.write_str("FORMAT ");
455 f.write_node(format)
456 }
457 FormatSpecifier::KeyValue { key, value } => {
458 f.write_str("KEY FORMAT ");
459 f.write_node(key);
460 f.write_str(" VALUE FORMAT ");
461 f.write_node(value);
462 }
463 }
464 }
465}
466impl_display_t!(FormatSpecifier);
467
468#[derive(Debug, Clone, PartialEq, Eq, Hash)]
469pub enum Format<T: AstInfo> {
470 Bytes,
471 Avro(AvroSchema<T>),
472 Protobuf(ProtobufSchema<T>),
473 Regex(String),
474 Csv {
475 columns: CsvColumns,
476 delimiter: char,
477 },
478 Json {
479 array: bool,
480 },
481 Text,
482}
483
484#[derive(Debug, Clone, PartialEq, Eq, Hash)]
485pub enum CsvColumns {
486 Count(u64),
488 Header { names: Vec<Ident> },
490}
491
492impl AstDisplay for CsvColumns {
493 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
494 match self {
495 CsvColumns::Count(n) => {
496 f.write_str(n);
497 f.write_str(" COLUMNS")
498 }
499 CsvColumns::Header { names } => {
500 f.write_str("HEADER");
501 if !names.is_empty() {
502 f.write_str(" (");
503 f.write_node(&display::comma_separated(names));
504 f.write_str(")");
505 }
506 }
507 }
508 }
509}
510
511#[derive(Debug, Clone, PartialEq, Eq, Hash)]
512pub enum SourceIncludeMetadata {
513 Key {
514 alias: Option<Ident>,
515 },
516 Timestamp {
517 alias: Option<Ident>,
518 },
519 Partition {
520 alias: Option<Ident>,
521 },
522 Offset {
523 alias: Option<Ident>,
524 },
525 Headers {
526 alias: Option<Ident>,
527 },
528 Header {
529 key: String,
530 alias: Ident,
531 use_bytes: bool,
532 },
533}
534
535impl AstDisplay for SourceIncludeMetadata {
536 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
537 let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
538 if let Some(alias) = alias {
539 f.write_str(" AS ");
540 f.write_node(alias);
541 }
542 };
543
544 match self {
545 SourceIncludeMetadata::Key { alias } => {
546 f.write_str("KEY");
547 print_alias(f, alias);
548 }
549 SourceIncludeMetadata::Timestamp { alias } => {
550 f.write_str("TIMESTAMP");
551 print_alias(f, alias);
552 }
553 SourceIncludeMetadata::Partition { alias } => {
554 f.write_str("PARTITION");
555 print_alias(f, alias);
556 }
557 SourceIncludeMetadata::Offset { alias } => {
558 f.write_str("OFFSET");
559 print_alias(f, alias);
560 }
561 SourceIncludeMetadata::Headers { alias } => {
562 f.write_str("HEADERS");
563 print_alias(f, alias);
564 }
565 SourceIncludeMetadata::Header {
566 alias,
567 key,
568 use_bytes,
569 } => {
570 f.write_str("HEADER '");
571 f.write_str(&display::escape_single_quote_string(key));
572 f.write_str("'");
573 print_alias(f, &Some(alias.clone()));
574 if *use_bytes {
575 f.write_str(" BYTES");
576 }
577 }
578 }
579 }
580}
581impl_display!(SourceIncludeMetadata);
582
583#[derive(Debug, Clone, PartialEq, Eq, Hash)]
584pub enum SourceErrorPolicy {
585 Inline {
586 alias: Option<Ident>,
588 },
589}
590
591impl AstDisplay for SourceErrorPolicy {
592 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
593 match self {
594 Self::Inline { alias } => {
595 f.write_str("INLINE");
596 if let Some(alias) = alias {
597 f.write_str(" AS ");
598 f.write_node(alias);
599 }
600 }
601 }
602 }
603}
604impl_display!(SourceErrorPolicy);
605
606#[derive(Debug, Clone, PartialEq, Eq, Hash)]
607pub enum SourceEnvelope {
608 None,
609 Debezium,
610 Upsert {
611 value_decode_err_policy: Vec<SourceErrorPolicy>,
612 },
613 CdcV2,
614}
615
616impl SourceEnvelope {
617 pub fn requires_all_input(&self) -> bool {
620 match self {
621 SourceEnvelope::None => false,
622 SourceEnvelope::Debezium => false,
623 SourceEnvelope::Upsert { .. } => false,
624 SourceEnvelope::CdcV2 => true,
625 }
626 }
627}
628
629impl AstDisplay for SourceEnvelope {
630 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
631 match self {
632 Self::None => {
633 f.write_str("NONE");
635 }
636 Self::Debezium => {
637 f.write_str("DEBEZIUM");
638 }
639 Self::Upsert {
640 value_decode_err_policy,
641 } => {
642 if value_decode_err_policy.is_empty() {
643 f.write_str("UPSERT");
644 } else {
645 f.write_str("UPSERT (VALUE DECODING ERRORS = (");
646 f.write_node(&display::comma_separated(value_decode_err_policy));
647 f.write_str("))")
648 }
649 }
650 Self::CdcV2 => {
651 f.write_str("MATERIALIZE");
652 }
653 }
654 }
655}
656impl_display!(SourceEnvelope);
657
658#[derive(Debug, Clone, PartialEq, Eq, Hash)]
659pub enum SinkEnvelope {
660 Debezium,
661 Upsert,
662}
663
664impl AstDisplay for SinkEnvelope {
665 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
666 match self {
667 Self::Upsert => {
668 f.write_str("UPSERT");
669 }
670 Self::Debezium => {
671 f.write_str("DEBEZIUM");
672 }
673 }
674 }
675}
676impl_display!(SinkEnvelope);
677
678#[derive(Debug, Clone, PartialEq, Eq, Hash)]
679pub enum SubscribeOutput<T: AstInfo> {
680 Diffs,
681 WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
682 EnvelopeUpsert { key_columns: Vec<Ident> },
683 EnvelopeDebezium { key_columns: Vec<Ident> },
684}
685
686impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
687 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
688 match self {
689 Self::Diffs => {}
690 Self::WithinTimestampOrderBy { order_by } => {
691 f.write_str(" WITHIN TIMESTAMP ORDER BY ");
692 f.write_node(&display::comma_separated(order_by));
693 }
694 Self::EnvelopeUpsert { key_columns } => {
695 f.write_str(" ENVELOPE UPSERT (KEY (");
696 f.write_node(&display::comma_separated(key_columns));
697 f.write_str("))");
698 }
699 Self::EnvelopeDebezium { key_columns } => {
700 f.write_str(" ENVELOPE DEBEZIUM (KEY (");
701 f.write_node(&display::comma_separated(key_columns));
702 f.write_str("))");
703 }
704 }
705 }
706}
707impl_display_t!(SubscribeOutput);
708
709impl<T: AstInfo> AstDisplay for Format<T> {
710 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
711 match self {
712 Self::Bytes => f.write_str("BYTES"),
713 Self::Avro(inner) => {
714 f.write_str("AVRO ");
715 f.write_node(inner);
716 }
717 Self::Protobuf(inner) => {
718 f.write_str("PROTOBUF ");
719 f.write_node(inner);
720 }
721 Self::Regex(regex) => {
722 f.write_str("REGEX '");
723 f.write_node(&display::escape_single_quote_string(regex));
724 f.write_str("'");
725 }
726 Self::Csv { columns, delimiter } => {
727 f.write_str("CSV WITH ");
728 f.write_node(columns);
729
730 if *delimiter != ',' {
731 f.write_str(" DELIMITED BY '");
732 f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
733 f.write_str("'");
734 }
735 }
736 Self::Json { array } => {
737 f.write_str("JSON");
738 if *array {
739 f.write_str(" ARRAY");
740 }
741 }
742 Self::Text => f.write_str("TEXT"),
743 }
744 }
745}
746impl_display_t!(Format);
747
748#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
752pub enum ConnectionOptionName {
753 AccessKeyId,
754 AssumeRoleArn,
755 AssumeRoleSessionName,
756 AvailabilityZones,
757 AwsConnection,
758 AwsPrivatelink,
759 Broker,
760 Brokers,
761 Credential,
762 Database,
763 Endpoint,
764 Host,
765 Password,
766 Port,
767 ProgressTopic,
768 ProgressTopicReplicationFactor,
769 PublicKey1,
770 PublicKey2,
771 Region,
772 SaslMechanisms,
773 SaslPassword,
774 SaslUsername,
775 Scope,
776 SecretAccessKey,
777 SecurityProtocol,
778 ServiceName,
779 SshTunnel,
780 SslCertificate,
781 SslCertificateAuthority,
782 SslKey,
783 SslMode,
784 SessionToken,
785 CatalogType,
786 Url,
787 User,
788 Warehouse,
789}
790
791impl AstDisplay for ConnectionOptionName {
792 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
793 f.write_str(match self {
794 ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
795 ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
796 ConnectionOptionName::AwsConnection => "AWS CONNECTION",
797 ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
798 ConnectionOptionName::Broker => "BROKER",
799 ConnectionOptionName::Brokers => "BROKERS",
800 ConnectionOptionName::Credential => "CREDENTIAL",
801 ConnectionOptionName::Database => "DATABASE",
802 ConnectionOptionName::Endpoint => "ENDPOINT",
803 ConnectionOptionName::Host => "HOST",
804 ConnectionOptionName::Password => "PASSWORD",
805 ConnectionOptionName::Port => "PORT",
806 ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
807 ConnectionOptionName::ProgressTopicReplicationFactor => {
808 "PROGRESS TOPIC REPLICATION FACTOR"
809 }
810 ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
811 ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
812 ConnectionOptionName::Region => "REGION",
813 ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
814 ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
815 ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
816 ConnectionOptionName::SaslPassword => "SASL PASSWORD",
817 ConnectionOptionName::SaslUsername => "SASL USERNAME",
818 ConnectionOptionName::Scope => "SCOPE",
819 ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
820 ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
821 ConnectionOptionName::ServiceName => "SERVICE NAME",
822 ConnectionOptionName::SshTunnel => "SSH TUNNEL",
823 ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
824 ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
825 ConnectionOptionName::SslKey => "SSL KEY",
826 ConnectionOptionName::SslMode => "SSL MODE",
827 ConnectionOptionName::SessionToken => "SESSION TOKEN",
828 ConnectionOptionName::CatalogType => "CATALOG TYPE",
829 ConnectionOptionName::Url => "URL",
830 ConnectionOptionName::User => "USER",
831 ConnectionOptionName::Warehouse => "WAREHOUSE",
832 })
833 }
834}
835impl_display!(ConnectionOptionName);
836
837impl WithOptionName for ConnectionOptionName {
838 fn redact_value(&self) -> bool {
844 match self {
845 ConnectionOptionName::AccessKeyId
846 | ConnectionOptionName::AvailabilityZones
847 | ConnectionOptionName::AwsConnection
848 | ConnectionOptionName::AwsPrivatelink
849 | ConnectionOptionName::Broker
850 | ConnectionOptionName::Brokers
851 | ConnectionOptionName::Credential
852 | ConnectionOptionName::Database
853 | ConnectionOptionName::Endpoint
854 | ConnectionOptionName::Host
855 | ConnectionOptionName::Password
856 | ConnectionOptionName::Port
857 | ConnectionOptionName::ProgressTopic
858 | ConnectionOptionName::ProgressTopicReplicationFactor
859 | ConnectionOptionName::PublicKey1
860 | ConnectionOptionName::PublicKey2
861 | ConnectionOptionName::Region
862 | ConnectionOptionName::AssumeRoleArn
863 | ConnectionOptionName::AssumeRoleSessionName
864 | ConnectionOptionName::SaslMechanisms
865 | ConnectionOptionName::SaslPassword
866 | ConnectionOptionName::SaslUsername
867 | ConnectionOptionName::Scope
868 | ConnectionOptionName::SecurityProtocol
869 | ConnectionOptionName::SecretAccessKey
870 | ConnectionOptionName::ServiceName
871 | ConnectionOptionName::SshTunnel
872 | ConnectionOptionName::SslCertificate
873 | ConnectionOptionName::SslCertificateAuthority
874 | ConnectionOptionName::SslKey
875 | ConnectionOptionName::SslMode
876 | ConnectionOptionName::SessionToken
877 | ConnectionOptionName::CatalogType
878 | ConnectionOptionName::Url
879 | ConnectionOptionName::User
880 | ConnectionOptionName::Warehouse => false,
881 }
882 }
883}
884
885#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
886pub struct ConnectionOption<T: AstInfo> {
888 pub name: ConnectionOptionName,
889 pub value: Option<WithOptionValue<T>>,
890}
891impl_display_for_with_option!(ConnectionOption);
892impl_display_t!(ConnectionOption);
893
894#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
895pub enum CreateConnectionType {
896 Aws,
897 AwsPrivatelink,
898 Kafka,
899 Csr,
900 Postgres,
901 Ssh,
902 SqlServer,
903 MySql,
904 IcebergCatalog,
905}
906
907impl AstDisplay for CreateConnectionType {
908 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
909 match self {
910 Self::Kafka => {
911 f.write_str("KAFKA");
912 }
913 Self::Csr => {
914 f.write_str("CONFLUENT SCHEMA REGISTRY");
915 }
916 Self::Postgres => {
917 f.write_str("POSTGRES");
918 }
919 Self::Aws => {
920 f.write_str("AWS");
921 }
922 Self::AwsPrivatelink => {
923 f.write_str("AWS PRIVATELINK");
924 }
925 Self::Ssh => {
926 f.write_str("SSH TUNNEL");
927 }
928 Self::SqlServer => {
929 f.write_str("SQL SERVER");
930 }
931 Self::MySql => {
932 f.write_str("MYSQL");
933 }
934 Self::IcebergCatalog => {
935 f.write_str("ICEBERG CATALOG");
936 }
937 }
938 }
939}
940impl_display!(CreateConnectionType);
941
942#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
943pub enum CreateConnectionOptionName {
944 Validate,
945}
946
947impl AstDisplay for CreateConnectionOptionName {
948 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
949 f.write_str(match self {
950 CreateConnectionOptionName::Validate => "VALIDATE",
951 })
952 }
953}
954impl_display!(CreateConnectionOptionName);
955
956impl WithOptionName for CreateConnectionOptionName {
957 fn redact_value(&self) -> bool {
963 match self {
964 CreateConnectionOptionName::Validate => false,
965 }
966 }
967}
968
969#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
970pub struct CreateConnectionOption<T: AstInfo> {
972 pub name: CreateConnectionOptionName,
973 pub value: Option<WithOptionValue<T>>,
974}
975impl_display_for_with_option!(CreateConnectionOption);
976impl_display_t!(CreateConnectionOption);
977
978#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
979pub enum KafkaSourceConfigOptionName {
980 GroupIdPrefix,
981 Topic,
982 TopicMetadataRefreshInterval,
983 StartTimestamp,
984 StartOffset,
985}
986
987impl AstDisplay for KafkaSourceConfigOptionName {
988 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
989 f.write_str(match self {
990 KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
991 KafkaSourceConfigOptionName::Topic => "TOPIC",
992 KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
993 "TOPIC METADATA REFRESH INTERVAL"
994 }
995 KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
996 KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
997 })
998 }
999}
1000impl_display!(KafkaSourceConfigOptionName);
1001
1002impl WithOptionName for KafkaSourceConfigOptionName {
1003 fn redact_value(&self) -> bool {
1009 match self {
1010 KafkaSourceConfigOptionName::GroupIdPrefix
1011 | KafkaSourceConfigOptionName::Topic
1012 | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1013 | KafkaSourceConfigOptionName::StartOffset
1014 | KafkaSourceConfigOptionName::StartTimestamp => false,
1015 }
1016 }
1017}
1018
1019#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1020pub struct KafkaSourceConfigOption<T: AstInfo> {
1021 pub name: KafkaSourceConfigOptionName,
1022 pub value: Option<WithOptionValue<T>>,
1023}
1024impl_display_for_with_option!(KafkaSourceConfigOption);
1025impl_display_t!(KafkaSourceConfigOption);
1026
1027#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1028pub enum KafkaSinkConfigOptionName {
1029 CompressionType,
1030 PartitionBy,
1031 ProgressGroupIdPrefix,
1032 Topic,
1033 TransactionalIdPrefix,
1034 LegacyIds,
1035 TopicConfig,
1036 TopicMetadataRefreshInterval,
1037 TopicPartitionCount,
1038 TopicReplicationFactor,
1039}
1040
1041impl AstDisplay for KafkaSinkConfigOptionName {
1042 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1043 f.write_str(match self {
1044 KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1045 KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1046 KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1047 KafkaSinkConfigOptionName::Topic => "TOPIC",
1048 KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1049 KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1050 KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1051 KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1052 "TOPIC METADATA REFRESH INTERVAL"
1053 }
1054 KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1055 KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1056 })
1057 }
1058}
1059impl_display!(KafkaSinkConfigOptionName);
1060
1061impl WithOptionName for KafkaSinkConfigOptionName {
1062 fn redact_value(&self) -> bool {
1068 match self {
1069 KafkaSinkConfigOptionName::CompressionType
1070 | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1071 | KafkaSinkConfigOptionName::Topic
1072 | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1073 | KafkaSinkConfigOptionName::TransactionalIdPrefix
1074 | KafkaSinkConfigOptionName::LegacyIds
1075 | KafkaSinkConfigOptionName::TopicConfig
1076 | KafkaSinkConfigOptionName::TopicPartitionCount
1077 | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1078 KafkaSinkConfigOptionName::PartitionBy => true,
1079 }
1080 }
1081}
1082
1083#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1084pub struct KafkaSinkConfigOption<T: AstInfo> {
1085 pub name: KafkaSinkConfigOptionName,
1086 pub value: Option<WithOptionValue<T>>,
1087}
1088impl_display_for_with_option!(KafkaSinkConfigOption);
1089impl_display_t!(KafkaSinkConfigOption);
1090
1091#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1092pub enum IcebergSinkConfigOptionName {
1093 Namespace,
1094 Table,
1095}
1096
1097impl AstDisplay for IcebergSinkConfigOptionName {
1098 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1099 f.write_str(match self {
1100 IcebergSinkConfigOptionName::Namespace => "NAMESPACE",
1101 IcebergSinkConfigOptionName::Table => "TABLE",
1102 })
1103 }
1104}
1105impl_display!(IcebergSinkConfigOptionName);
1106
1107impl WithOptionName for IcebergSinkConfigOptionName {
1108 fn redact_value(&self) -> bool {
1114 match self {
1115 IcebergSinkConfigOptionName::Namespace | IcebergSinkConfigOptionName::Table => false,
1116 }
1117 }
1118}
1119
1120#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1121pub struct IcebergSinkConfigOption<T: AstInfo> {
1122 pub name: IcebergSinkConfigOptionName,
1123 pub value: Option<WithOptionValue<T>>,
1124}
1125impl_display_for_with_option!(IcebergSinkConfigOption);
1126impl_display_t!(IcebergSinkConfigOption);
1127
1128#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1129pub enum PgConfigOptionName {
1130 Details,
1133 Publication,
1135 TextColumns,
1142 ExcludeColumns,
1149}
1150
1151impl AstDisplay for PgConfigOptionName {
1152 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1153 f.write_str(match self {
1154 PgConfigOptionName::Details => "DETAILS",
1155 PgConfigOptionName::Publication => "PUBLICATION",
1156 PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1157 PgConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1158 })
1159 }
1160}
1161impl_display!(PgConfigOptionName);
1162
1163impl WithOptionName for PgConfigOptionName {
1164 fn redact_value(&self) -> bool {
1170 match self {
1171 PgConfigOptionName::Details
1172 | PgConfigOptionName::Publication
1173 | PgConfigOptionName::TextColumns
1174 | PgConfigOptionName::ExcludeColumns => false,
1175 }
1176 }
1177}
1178
1179#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1180pub struct PgConfigOption<T: AstInfo> {
1182 pub name: PgConfigOptionName,
1183 pub value: Option<WithOptionValue<T>>,
1184}
1185impl_display_for_with_option!(PgConfigOption);
1186impl_display_t!(PgConfigOption);
1187
1188#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1189pub enum MySqlConfigOptionName {
1190 Details,
1193 TextColumns,
1200 ExcludeColumns,
1207}
1208
1209impl AstDisplay for MySqlConfigOptionName {
1210 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1211 f.write_str(match self {
1212 MySqlConfigOptionName::Details => "DETAILS",
1213 MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1214 MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1215 })
1216 }
1217}
1218impl_display!(MySqlConfigOptionName);
1219
1220impl WithOptionName for MySqlConfigOptionName {
1221 fn redact_value(&self) -> bool {
1227 match self {
1228 MySqlConfigOptionName::Details
1229 | MySqlConfigOptionName::TextColumns
1230 | MySqlConfigOptionName::ExcludeColumns => false,
1231 }
1232 }
1233}
1234
1235#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1236pub struct MySqlConfigOption<T: AstInfo> {
1238 pub name: MySqlConfigOptionName,
1239 pub value: Option<WithOptionValue<T>>,
1240}
1241impl_display_for_with_option!(MySqlConfigOption);
1242impl_display_t!(MySqlConfigOption);
1243
1244#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1245pub enum SqlServerConfigOptionName {
1246 Details,
1249 TextColumns,
1257 ExcludeColumns,
1265}
1266
1267impl AstDisplay for SqlServerConfigOptionName {
1268 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1269 f.write_str(match self {
1270 SqlServerConfigOptionName::Details => "DETAILS",
1271 SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1272 SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1273 })
1274 }
1275}
1276impl_display!(SqlServerConfigOptionName);
1277
1278impl WithOptionName for SqlServerConfigOptionName {
1279 fn redact_value(&self) -> bool {
1285 match self {
1286 SqlServerConfigOptionName::Details
1287 | SqlServerConfigOptionName::TextColumns
1288 | SqlServerConfigOptionName::ExcludeColumns => false,
1289 }
1290 }
1291}
1292
1293#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1294pub struct SqlServerConfigOption<T: AstInfo> {
1296 pub name: SqlServerConfigOptionName,
1297 pub value: Option<WithOptionValue<T>>,
1298}
1299impl_display_for_with_option!(SqlServerConfigOption);
1300impl_display_t!(SqlServerConfigOption);
1301
1302#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1303pub enum CreateSourceConnection<T: AstInfo> {
1304 Kafka {
1305 connection: T::ItemName,
1306 options: Vec<KafkaSourceConfigOption<T>>,
1307 },
1308 Postgres {
1309 connection: T::ItemName,
1310 options: Vec<PgConfigOption<T>>,
1311 },
1312 SqlServer {
1313 connection: T::ItemName,
1314 options: Vec<SqlServerConfigOption<T>>,
1315 },
1316 MySql {
1317 connection: T::ItemName,
1318 options: Vec<MySqlConfigOption<T>>,
1319 },
1320 LoadGenerator {
1321 generator: LoadGenerator,
1322 options: Vec<LoadGeneratorOption<T>>,
1323 },
1324}
1325
1326impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1327 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1328 match self {
1329 CreateSourceConnection::Kafka {
1330 connection,
1331 options,
1332 } => {
1333 f.write_str("KAFKA CONNECTION ");
1334 f.write_node(connection);
1335 if !options.is_empty() {
1336 f.write_str(" (");
1337 f.write_node(&display::comma_separated(options));
1338 f.write_str(")");
1339 }
1340 }
1341 CreateSourceConnection::Postgres {
1342 connection,
1343 options,
1344 } => {
1345 f.write_str("POSTGRES CONNECTION ");
1346 f.write_node(connection);
1347 if !options.is_empty() {
1348 f.write_str(" (");
1349 f.write_node(&display::comma_separated(options));
1350 f.write_str(")");
1351 }
1352 }
1353 CreateSourceConnection::SqlServer {
1354 connection,
1355 options,
1356 } => {
1357 f.write_str("SQL SERVER CONNECTION ");
1358 f.write_node(connection);
1359 if !options.is_empty() {
1360 f.write_str(" (");
1361 f.write_node(&display::comma_separated(options));
1362 f.write_str(")");
1363 }
1364 }
1365 CreateSourceConnection::MySql {
1366 connection,
1367 options,
1368 } => {
1369 f.write_str("MYSQL CONNECTION ");
1370 f.write_node(connection);
1371 if !options.is_empty() {
1372 f.write_str(" (");
1373 f.write_node(&display::comma_separated(options));
1374 f.write_str(")");
1375 }
1376 }
1377 CreateSourceConnection::LoadGenerator { generator, options } => {
1378 f.write_str("LOAD GENERATOR ");
1379 f.write_node(generator);
1380 if !options.is_empty() {
1381 f.write_str(" (");
1382 f.write_node(&display::comma_separated(options));
1383 f.write_str(")");
1384 }
1385 }
1386 }
1387 }
1388}
1389impl_display_t!(CreateSourceConnection);
1390
1391#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1392pub enum LoadGenerator {
1393 Clock,
1394 Counter,
1395 Marketing,
1396 Auction,
1397 Datums,
1398 Tpch,
1399 KeyValue,
1400}
1401
1402impl AstDisplay for LoadGenerator {
1403 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1404 match self {
1405 Self::Counter => f.write_str("COUNTER"),
1406 Self::Clock => f.write_str("CLOCK"),
1407 Self::Marketing => f.write_str("MARKETING"),
1408 Self::Auction => f.write_str("AUCTION"),
1409 Self::Datums => f.write_str("DATUMS"),
1410 Self::Tpch => f.write_str("TPCH"),
1411 Self::KeyValue => f.write_str("KEY VALUE"),
1412 }
1413 }
1414}
1415impl_display!(LoadGenerator);
1416
1417impl LoadGenerator {
1418 pub fn schema_name(&self) -> &'static str {
1423 match self {
1424 LoadGenerator::Counter => "counter",
1425 LoadGenerator::Clock => "clock",
1426 LoadGenerator::Marketing => "marketing",
1427 LoadGenerator::Auction => "auction",
1428 LoadGenerator::Datums => "datums",
1429 LoadGenerator::Tpch => "tpch",
1430 LoadGenerator::KeyValue => "key_value",
1431 }
1432 }
1433}
1434
1435#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1436pub enum LoadGeneratorOptionName {
1437 ScaleFactor,
1438 TickInterval,
1439 AsOf,
1440 UpTo,
1441 MaxCardinality,
1442 Keys,
1443 SnapshotRounds,
1444 TransactionalSnapshot,
1445 ValueSize,
1446 Seed,
1447 Partitions,
1448 BatchSize,
1449}
1450
1451impl AstDisplay for LoadGeneratorOptionName {
1452 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1453 f.write_str(match self {
1454 LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1455 LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1456 LoadGeneratorOptionName::AsOf => "AS OF",
1457 LoadGeneratorOptionName::UpTo => "UP TO",
1458 LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1459 LoadGeneratorOptionName::Keys => "KEYS",
1460 LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1461 LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1462 LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1463 LoadGeneratorOptionName::Seed => "SEED",
1464 LoadGeneratorOptionName::Partitions => "PARTITIONS",
1465 LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1466 })
1467 }
1468}
1469impl_display!(LoadGeneratorOptionName);
1470
1471impl WithOptionName for LoadGeneratorOptionName {
1472 fn redact_value(&self) -> bool {
1478 match self {
1479 LoadGeneratorOptionName::ScaleFactor
1480 | LoadGeneratorOptionName::TickInterval
1481 | LoadGeneratorOptionName::AsOf
1482 | LoadGeneratorOptionName::UpTo
1483 | LoadGeneratorOptionName::MaxCardinality
1484 | LoadGeneratorOptionName::Keys
1485 | LoadGeneratorOptionName::SnapshotRounds
1486 | LoadGeneratorOptionName::TransactionalSnapshot
1487 | LoadGeneratorOptionName::ValueSize
1488 | LoadGeneratorOptionName::Partitions
1489 | LoadGeneratorOptionName::BatchSize
1490 | LoadGeneratorOptionName::Seed => false,
1491 }
1492 }
1493}
1494
1495#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1496pub struct LoadGeneratorOption<T: AstInfo> {
1498 pub name: LoadGeneratorOptionName,
1499 pub value: Option<WithOptionValue<T>>,
1500}
1501impl_display_for_with_option!(LoadGeneratorOption);
1502impl_display_t!(LoadGeneratorOption);
1503
1504#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1505pub enum CreateSinkConnection<T: AstInfo> {
1506 Kafka {
1507 connection: T::ItemName,
1508 options: Vec<KafkaSinkConfigOption<T>>,
1509 key: Option<SinkKey>,
1510 headers: Option<Ident>,
1511 },
1512 Iceberg {
1513 connection: T::ItemName,
1514 aws_connection: T::ItemName,
1515 key: Option<SinkKey>,
1516 options: Vec<IcebergSinkConfigOption<T>>,
1517 },
1518}
1519
1520impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1521 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1522 match self {
1523 CreateSinkConnection::Kafka {
1524 connection,
1525 options,
1526 key,
1527 headers,
1528 } => {
1529 f.write_str("KAFKA CONNECTION ");
1530 f.write_node(connection);
1531 if !options.is_empty() {
1532 f.write_str(" (");
1533 f.write_node(&display::comma_separated(options));
1534 f.write_str(")");
1535 }
1536 if let Some(key) = key.as_ref() {
1537 f.write_str(" ");
1538 f.write_node(key);
1539 }
1540 if let Some(headers) = headers {
1541 f.write_str(" HEADERS ");
1542 f.write_node(headers);
1543 }
1544 }
1545 CreateSinkConnection::Iceberg {
1546 connection,
1547 aws_connection,
1548 key,
1549 options,
1550 } => {
1551 f.write_str("ICEBERG CATALOG CONNECTION ");
1552 f.write_node(connection);
1553 if !options.is_empty() {
1554 f.write_str(" (");
1555 f.write_node(&display::comma_separated(options));
1556 f.write_str(")");
1557 }
1558 f.write_str(" USING AWS CONNECTION ");
1559 f.write_node(aws_connection);
1560 if let Some(key) = key.as_ref() {
1561 f.write_str(" ");
1562 f.write_node(key);
1563 }
1564 }
1565 }
1566 }
1567}
1568impl_display_t!(CreateSinkConnection);
1569
1570#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1571pub struct SinkKey {
1572 pub key_columns: Vec<Ident>,
1573 pub not_enforced: bool,
1574}
1575
1576impl AstDisplay for SinkKey {
1577 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1578 f.write_str("KEY (");
1579 f.write_node(&display::comma_separated(&self.key_columns));
1580 f.write_str(")");
1581 if self.not_enforced {
1582 f.write_str(" NOT ENFORCED");
1583 }
1584 }
1585}
1586
1587#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1590pub enum TableConstraint<T: AstInfo> {
1591 Unique {
1593 name: Option<Ident>,
1594 columns: Vec<Ident>,
1595 is_primary: bool,
1597 nulls_not_distinct: bool,
1600 },
1601 ForeignKey {
1604 name: Option<Ident>,
1605 columns: Vec<Ident>,
1606 foreign_table: T::ItemName,
1607 referred_columns: Vec<Ident>,
1608 },
1609 Check {
1611 name: Option<Ident>,
1612 expr: Box<Expr<T>>,
1613 },
1614}
1615
1616impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1617 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1618 match self {
1619 TableConstraint::Unique {
1620 name,
1621 columns,
1622 is_primary,
1623 nulls_not_distinct,
1624 } => {
1625 f.write_node(&display_constraint_name(name));
1626 if *is_primary {
1627 f.write_str("PRIMARY KEY ");
1628 } else {
1629 f.write_str("UNIQUE ");
1630 if *nulls_not_distinct {
1631 f.write_str("NULLS NOT DISTINCT ");
1632 }
1633 }
1634 f.write_str("(");
1635 f.write_node(&display::comma_separated(columns));
1636 f.write_str(")");
1637 }
1638 TableConstraint::ForeignKey {
1639 name,
1640 columns,
1641 foreign_table,
1642 referred_columns,
1643 } => {
1644 f.write_node(&display_constraint_name(name));
1645 f.write_str("FOREIGN KEY (");
1646 f.write_node(&display::comma_separated(columns));
1647 f.write_str(") REFERENCES ");
1648 f.write_node(foreign_table);
1649 f.write_str("(");
1650 f.write_node(&display::comma_separated(referred_columns));
1651 f.write_str(")");
1652 }
1653 TableConstraint::Check { name, expr } => {
1654 f.write_node(&display_constraint_name(name));
1655 f.write_str("CHECK (");
1656 f.write_node(&expr);
1657 f.write_str(")");
1658 }
1659 }
1660 }
1661}
1662impl_display_t!(TableConstraint);
1663
1664#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1666pub enum KeyConstraint {
1667 PrimaryKeyNotEnforced { columns: Vec<Ident> },
1669}
1670
1671impl AstDisplay for KeyConstraint {
1672 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1673 match self {
1674 KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1675 f.write_str("PRIMARY KEY ");
1676 f.write_str("(");
1677 f.write_node(&display::comma_separated(columns));
1678 f.write_str(") ");
1679 f.write_str("NOT ENFORCED");
1680 }
1681 }
1682 }
1683}
1684impl_display!(KeyConstraint);
1685
1686#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1687pub enum CreateSourceOptionName {
1688 TimestampInterval,
1689 RetainHistory,
1690}
1691
1692impl AstDisplay for CreateSourceOptionName {
1693 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1694 f.write_str(match self {
1695 CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1696 CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1697 })
1698 }
1699}
1700impl_display!(CreateSourceOptionName);
1701
1702impl WithOptionName for CreateSourceOptionName {
1703 fn redact_value(&self) -> bool {
1709 match self {
1710 CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1711 false
1712 }
1713 }
1714 }
1715}
1716
1717#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1718pub struct CreateSourceOption<T: AstInfo> {
1720 pub name: CreateSourceOptionName,
1721 pub value: Option<WithOptionValue<T>>,
1722}
1723impl_display_for_with_option!(CreateSourceOption);
1724impl_display_t!(CreateSourceOption);
1725
1726#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1728pub struct ColumnDef<T: AstInfo> {
1729 pub name: Ident,
1730 pub data_type: T::DataType,
1731 pub collation: Option<UnresolvedItemName>,
1732 pub options: Vec<ColumnOptionDef<T>>,
1733}
1734
1735impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1736 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1737 f.write_node(&self.name);
1738 f.write_str(" ");
1739 f.write_node(&self.data_type);
1740 if let Some(collation) = &self.collation {
1741 f.write_str(" COLLATE ");
1742 f.write_node(collation);
1743 }
1744 for option in &self.options {
1745 f.write_str(" ");
1746 f.write_node(option);
1747 }
1748 }
1749}
1750impl_display_t!(ColumnDef);
1751
1752#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1769pub struct ColumnOptionDef<T: AstInfo> {
1770 pub name: Option<Ident>,
1771 pub option: ColumnOption<T>,
1772}
1773
1774impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1775 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1776 f.write_node(&display_constraint_name(&self.name));
1777 f.write_node(&self.option);
1778 }
1779}
1780impl_display_t!(ColumnOptionDef);
1781
1782#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1785pub enum ColumnOption<T: AstInfo> {
1786 Null,
1788 NotNull,
1790 Default(Expr<T>),
1792 Unique { is_primary: bool },
1794 ForeignKey {
1797 foreign_table: UnresolvedItemName,
1798 referred_columns: Vec<Ident>,
1799 },
1800 Check(Expr<T>),
1802 Versioned {
1804 action: ColumnVersioned,
1805 version: Version,
1806 },
1807}
1808
1809impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1810 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1811 use ColumnOption::*;
1812 match self {
1813 Null => f.write_str("NULL"),
1814 NotNull => f.write_str("NOT NULL"),
1815 Default(expr) => {
1816 f.write_str("DEFAULT ");
1817 f.write_node(expr);
1818 }
1819 Unique { is_primary } => {
1820 if *is_primary {
1821 f.write_str("PRIMARY KEY");
1822 } else {
1823 f.write_str("UNIQUE");
1824 }
1825 }
1826 ForeignKey {
1827 foreign_table,
1828 referred_columns,
1829 } => {
1830 f.write_str("REFERENCES ");
1831 f.write_node(foreign_table);
1832 f.write_str(" (");
1833 f.write_node(&display::comma_separated(referred_columns));
1834 f.write_str(")");
1835 }
1836 Check(expr) => {
1837 f.write_str("CHECK (");
1838 f.write_node(expr);
1839 f.write_str(")");
1840 }
1841 Versioned { action, version } => {
1842 f.write_str("VERSION ");
1843 f.write_node(action);
1844 f.write_str(" ");
1845 f.write_node(version);
1846 }
1847 }
1848 }
1849}
1850impl_display_t!(ColumnOption);
1851
1852#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1853pub enum ColumnVersioned {
1854 Added,
1855}
1856
1857impl AstDisplay for ColumnVersioned {
1858 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1859 where
1860 W: fmt::Write,
1861 {
1862 match self {
1863 ColumnVersioned::Added => f.write_str("ADDED"),
1865 }
1866 }
1867}
1868impl_display!(ColumnVersioned);
1869
1870fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
1871 struct ConstraintName<'a>(&'a Option<Ident>);
1872 impl<'a> AstDisplay for ConstraintName<'a> {
1873 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1874 where
1875 W: fmt::Write,
1876 {
1877 if let Some(name) = self.0 {
1878 f.write_str("CONSTRAINT ");
1879 f.write_node(name);
1880 f.write_str(" ");
1881 }
1882 }
1883 }
1884 ConstraintName(name)
1885}