1use std::fmt;
25
26use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
27use crate::ast::{
28 AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, Version, WithOptionValue,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub enum MaterializedViewOptionName {
33 AssertNotNull,
35 PartitionBy,
36 RetainHistory,
37 Refresh,
39}
40
41impl AstDisplay for MaterializedViewOptionName {
42 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
43 match self {
44 MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45 MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
46 MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
47 MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
48 }
49 }
50}
51
52impl WithOptionName for MaterializedViewOptionName {
53 fn redact_value(&self) -> bool {
59 match self {
60 MaterializedViewOptionName::AssertNotNull
61 | MaterializedViewOptionName::PartitionBy
62 | MaterializedViewOptionName::RetainHistory
63 | MaterializedViewOptionName::Refresh => false,
64 }
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct MaterializedViewOption<T: AstInfo> {
70 pub name: MaterializedViewOptionName,
71 pub value: Option<WithOptionValue<T>>,
72}
73impl_display_for_with_option!(MaterializedViewOption);
74
75#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
76pub enum ContinualTaskOptionName {
77 Snapshot,
79}
80
81impl AstDisplay for ContinualTaskOptionName {
82 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
83 match self {
84 ContinualTaskOptionName::Snapshot => f.write_str("SNAPSHOT"),
85 }
86 }
87}
88
89impl WithOptionName for ContinualTaskOptionName {
90 fn redact_value(&self) -> bool {
96 match self {
97 ContinualTaskOptionName::Snapshot => false,
98 }
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
103pub struct ContinualTaskOption<T: AstInfo> {
104 pub name: ContinualTaskOptionName,
105 pub value: Option<WithOptionValue<T>>,
106}
107impl_display_for_with_option!(ContinualTaskOption);
108
109#[derive(Debug, Clone, PartialEq, Eq, Hash)]
110pub struct Schema {
111 pub schema: String,
112}
113
114impl AstDisplay for Schema {
115 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
116 f.write_str("SCHEMA '");
117 f.write_node(&display::escape_single_quote_string(&self.schema));
118 f.write_str("'");
119 }
120}
121impl_display!(Schema);
122
123#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
124pub enum AvroSchemaOptionName {
125 ConfluentWireFormat,
127}
128
129impl AstDisplay for AvroSchemaOptionName {
130 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
131 match self {
132 AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
133 }
134 }
135}
136
137impl WithOptionName for AvroSchemaOptionName {
138 fn redact_value(&self) -> bool {
144 match self {
145 Self::ConfluentWireFormat => false,
146 }
147 }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
151pub struct AvroSchemaOption<T: AstInfo> {
152 pub name: AvroSchemaOptionName,
153 pub value: Option<WithOptionValue<T>>,
154}
155impl_display_for_with_option!(AvroSchemaOption);
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum AvroSchema<T: AstInfo> {
159 Csr {
160 csr_connection: CsrConnectionAvro<T>,
161 },
162 InlineSchema {
163 schema: Schema,
164 with_options: Vec<AvroSchemaOption<T>>,
165 },
166}
167
168impl<T: AstInfo> AstDisplay for AvroSchema<T> {
169 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
170 match self {
171 Self::Csr { csr_connection } => {
172 f.write_node(csr_connection);
173 }
174 Self::InlineSchema {
175 schema,
176 with_options,
177 } => {
178 f.write_str("USING ");
179 schema.fmt(f);
180 if !with_options.is_empty() {
181 f.write_str(" (");
182 f.write_node(&display::comma_separated(with_options));
183 f.write_str(")");
184 }
185 }
186 }
187 }
188}
189impl_display_t!(AvroSchema);
190
191#[derive(Debug, Clone, PartialEq, Eq, Hash)]
192pub enum ProtobufSchema<T: AstInfo> {
193 Csr {
194 csr_connection: CsrConnectionProtobuf<T>,
195 },
196 InlineSchema {
197 message_name: String,
198 schema: Schema,
199 },
200}
201
202impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
203 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
204 match self {
205 Self::Csr { csr_connection } => {
206 f.write_node(csr_connection);
207 }
208 Self::InlineSchema {
209 message_name,
210 schema,
211 } => {
212 f.write_str("MESSAGE '");
213 f.write_node(&display::escape_single_quote_string(message_name));
214 f.write_str("' USING ");
215 f.write_str(schema);
216 }
217 }
218 }
219}
220impl_display_t!(ProtobufSchema);
221
222#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
223pub enum CsrConfigOptionName<T: AstInfo> {
224 AvroKeyFullname,
225 AvroValueFullname,
226 NullDefaults,
227 AvroDocOn(AvroDocOn<T>),
228 KeyCompatibilityLevel,
229 ValueCompatibilityLevel,
230}
231
232impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
233 fn redact_value(&self) -> bool {
239 match self {
240 Self::AvroKeyFullname
241 | Self::AvroValueFullname
242 | Self::NullDefaults
243 | Self::AvroDocOn(_)
244 | Self::KeyCompatibilityLevel
245 | Self::ValueCompatibilityLevel => false,
246 }
247 }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
251pub struct AvroDocOn<T: AstInfo> {
252 pub identifier: DocOnIdentifier<T>,
253 pub for_schema: DocOnSchema,
254}
255#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
256pub enum DocOnSchema {
257 KeyOnly,
258 ValueOnly,
259 All,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
263pub enum DocOnIdentifier<T: AstInfo> {
264 Column(ColumnName<T>),
265 Type(T::ItemName),
266}
267
268impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
269 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
270 match &self.for_schema {
271 DocOnSchema::KeyOnly => f.write_str("KEY "),
272 DocOnSchema::ValueOnly => f.write_str("VALUE "),
273 DocOnSchema::All => {}
274 }
275 match &self.identifier {
276 DocOnIdentifier::Column(name) => {
277 f.write_str("DOC ON COLUMN ");
278 f.write_node(name);
279 }
280 DocOnIdentifier::Type(name) => {
281 f.write_str("DOC ON TYPE ");
282 f.write_node(name);
283 }
284 }
285 }
286}
287impl_display_t!(AvroDocOn);
288
289impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
290 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
291 match self {
292 CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
293 CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
294 CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
295 CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
296 CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
297 CsrConfigOptionName::ValueCompatibilityLevel => {
298 f.write_str("VALUE COMPATIBILITY LEVEL")
299 }
300 }
301 }
302}
303impl_display_t!(CsrConfigOptionName);
304
305#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
306pub struct CsrConfigOption<T: AstInfo> {
308 pub name: CsrConfigOptionName<T>,
309 pub value: Option<WithOptionValue<T>>,
310}
311impl_display_for_with_option!(CsrConfigOption);
312impl_display_t!(CsrConfigOption);
313
314#[derive(Debug, Clone, PartialEq, Eq, Hash)]
315pub struct CsrConnection<T: AstInfo> {
316 pub connection: T::ItemName,
317 pub options: Vec<CsrConfigOption<T>>,
318}
319
320impl<T: AstInfo> AstDisplay for CsrConnection<T> {
321 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
322 f.write_str("CONNECTION ");
323 f.write_node(&self.connection);
324 if !self.options.is_empty() {
325 f.write_str(" (");
326 f.write_node(&display::comma_separated(&self.options));
327 f.write_str(")");
328 }
329 }
330}
331impl_display_t!(CsrConnection);
332
333#[derive(Debug, Clone, PartialEq, Eq, Hash)]
334pub enum ReaderSchemaSelectionStrategy {
335 Latest,
336 Inline(String),
337 ById(i32),
338}
339
340impl Default for ReaderSchemaSelectionStrategy {
341 fn default() -> Self {
342 Self::Latest
343 }
344}
345
346#[derive(Debug, Clone, PartialEq, Eq, Hash)]
347pub struct CsrConnectionAvro<T: AstInfo> {
348 pub connection: CsrConnection<T>,
349 pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
350 pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
351 pub seed: Option<CsrSeedAvro>,
352}
353
354impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
355 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
356 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
357 f.write_node(&self.connection);
358 if let Some(seed) = &self.seed {
359 f.write_str(" ");
360 f.write_node(seed);
361 }
362 }
363}
364impl_display_t!(CsrConnectionAvro);
365
366#[derive(Debug, Clone, PartialEq, Eq, Hash)]
367pub struct CsrConnectionProtobuf<T: AstInfo> {
368 pub connection: CsrConnection<T>,
369 pub seed: Option<CsrSeedProtobuf>,
370}
371
372impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
373 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
374 f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
375 f.write_node(&self.connection);
376
377 if let Some(seed) = &self.seed {
378 f.write_str(" ");
379 f.write_node(seed);
380 }
381 }
382}
383impl_display_t!(CsrConnectionProtobuf);
384
385#[derive(Debug, Clone, PartialEq, Eq, Hash)]
386pub struct CsrSeedAvro {
387 pub key_schema: Option<String>,
388 pub value_schema: String,
389}
390
391impl AstDisplay for CsrSeedAvro {
392 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
393 f.write_str("SEED");
394 if let Some(key_schema) = &self.key_schema {
395 f.write_str(" KEY SCHEMA '");
396 f.write_node(&display::escape_single_quote_string(key_schema));
397 f.write_str("'");
398 }
399 f.write_str(" VALUE SCHEMA '");
400 f.write_node(&display::escape_single_quote_string(&self.value_schema));
401 f.write_str("'");
402 }
403}
404impl_display!(CsrSeedAvro);
405
406#[derive(Debug, Clone, PartialEq, Eq, Hash)]
407pub struct CsrSeedProtobuf {
408 pub key: Option<CsrSeedProtobufSchema>,
409 pub value: CsrSeedProtobufSchema,
410}
411
412impl AstDisplay for CsrSeedProtobuf {
413 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
414 f.write_str("SEED");
415 if let Some(key) = &self.key {
416 f.write_str(" KEY ");
417 f.write_node(key);
418 }
419 f.write_str(" VALUE ");
420 f.write_node(&self.value);
421 }
422}
423impl_display!(CsrSeedProtobuf);
424
425#[derive(Debug, Clone, PartialEq, Eq, Hash)]
426pub struct CsrSeedProtobufSchema {
427 pub schema: String,
429 pub message_name: String,
430}
431impl AstDisplay for CsrSeedProtobufSchema {
432 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
433 f.write_str("SCHEMA '");
434 f.write_str(&display::escape_single_quote_string(&self.schema));
435 f.write_str("' MESSAGE '");
436 f.write_str(&self.message_name);
437 f.write_str("'");
438 }
439}
440impl_display!(CsrSeedProtobufSchema);
441
442#[derive(Debug, Clone, PartialEq, Eq, Hash)]
443pub enum FormatSpecifier<T: AstInfo> {
444 Bare(Format<T>),
446 KeyValue { key: Format<T>, value: Format<T> },
448}
449
450impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
451 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
452 match self {
453 FormatSpecifier::Bare(format) => {
454 f.write_str("FORMAT ");
455 f.write_node(format)
456 }
457 FormatSpecifier::KeyValue { key, value } => {
458 f.write_str("KEY FORMAT ");
459 f.write_node(key);
460 f.write_str(" VALUE FORMAT ");
461 f.write_node(value);
462 }
463 }
464 }
465}
466impl_display_t!(FormatSpecifier);
467
468#[derive(Debug, Clone, PartialEq, Eq, Hash)]
469pub enum Format<T: AstInfo> {
470 Bytes,
471 Avro(AvroSchema<T>),
472 Protobuf(ProtobufSchema<T>),
473 Regex(String),
474 Csv {
475 columns: CsvColumns,
476 delimiter: char,
477 },
478 Json {
479 array: bool,
480 },
481 Text,
482}
483
484#[derive(Debug, Clone, PartialEq, Eq, Hash)]
485pub enum CsvColumns {
486 Count(u64),
488 Header { names: Vec<Ident> },
490}
491
492impl AstDisplay for CsvColumns {
493 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
494 match self {
495 CsvColumns::Count(n) => {
496 f.write_str(n);
497 f.write_str(" COLUMNS")
498 }
499 CsvColumns::Header { names } => {
500 f.write_str("HEADER");
501 if !names.is_empty() {
502 f.write_str(" (");
503 f.write_node(&display::comma_separated(names));
504 f.write_str(")");
505 }
506 }
507 }
508 }
509}
510
511#[derive(Debug, Clone, PartialEq, Eq, Hash)]
512pub enum SourceIncludeMetadata {
513 Key {
514 alias: Option<Ident>,
515 },
516 Timestamp {
517 alias: Option<Ident>,
518 },
519 Partition {
520 alias: Option<Ident>,
521 },
522 Offset {
523 alias: Option<Ident>,
524 },
525 Headers {
526 alias: Option<Ident>,
527 },
528 Header {
529 key: String,
530 alias: Ident,
531 use_bytes: bool,
532 },
533}
534
535impl AstDisplay for SourceIncludeMetadata {
536 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
537 let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
538 if let Some(alias) = alias {
539 f.write_str(" AS ");
540 f.write_node(alias);
541 }
542 };
543
544 match self {
545 SourceIncludeMetadata::Key { alias } => {
546 f.write_str("KEY");
547 print_alias(f, alias);
548 }
549 SourceIncludeMetadata::Timestamp { alias } => {
550 f.write_str("TIMESTAMP");
551 print_alias(f, alias);
552 }
553 SourceIncludeMetadata::Partition { alias } => {
554 f.write_str("PARTITION");
555 print_alias(f, alias);
556 }
557 SourceIncludeMetadata::Offset { alias } => {
558 f.write_str("OFFSET");
559 print_alias(f, alias);
560 }
561 SourceIncludeMetadata::Headers { alias } => {
562 f.write_str("HEADERS");
563 print_alias(f, alias);
564 }
565 SourceIncludeMetadata::Header {
566 alias,
567 key,
568 use_bytes,
569 } => {
570 f.write_str("HEADER '");
571 f.write_str(&display::escape_single_quote_string(key));
572 f.write_str("'");
573 print_alias(f, &Some(alias.clone()));
574 if *use_bytes {
575 f.write_str(" BYTES");
576 }
577 }
578 }
579 }
580}
581impl_display!(SourceIncludeMetadata);
582
583#[derive(Debug, Clone, PartialEq, Eq, Hash)]
584pub enum SourceErrorPolicy {
585 Inline {
586 alias: Option<Ident>,
588 },
589}
590
591impl AstDisplay for SourceErrorPolicy {
592 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
593 match self {
594 Self::Inline { alias } => {
595 f.write_str("INLINE");
596 if let Some(alias) = alias {
597 f.write_str(" AS ");
598 f.write_node(alias);
599 }
600 }
601 }
602 }
603}
604impl_display!(SourceErrorPolicy);
605
606#[derive(Debug, Clone, PartialEq, Eq, Hash)]
607pub enum SourceEnvelope {
608 None,
609 Debezium,
610 Upsert {
611 value_decode_err_policy: Vec<SourceErrorPolicy>,
612 },
613 CdcV2,
614}
615
616impl SourceEnvelope {
617 pub fn requires_all_input(&self) -> bool {
620 match self {
621 SourceEnvelope::None => false,
622 SourceEnvelope::Debezium => false,
623 SourceEnvelope::Upsert { .. } => false,
624 SourceEnvelope::CdcV2 => true,
625 }
626 }
627}
628
629impl AstDisplay for SourceEnvelope {
630 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
631 match self {
632 Self::None => {
633 f.write_str("NONE");
635 }
636 Self::Debezium => {
637 f.write_str("DEBEZIUM");
638 }
639 Self::Upsert {
640 value_decode_err_policy,
641 } => {
642 if value_decode_err_policy.is_empty() {
643 f.write_str("UPSERT");
644 } else {
645 f.write_str("UPSERT (VALUE DECODING ERRORS = (");
646 f.write_node(&display::comma_separated(value_decode_err_policy));
647 f.write_str("))")
648 }
649 }
650 Self::CdcV2 => {
651 f.write_str("MATERIALIZE");
652 }
653 }
654 }
655}
656impl_display!(SourceEnvelope);
657
658#[derive(Debug, Clone, PartialEq, Eq, Hash)]
659pub enum SinkEnvelope {
660 Debezium,
661 Upsert,
662}
663
664impl AstDisplay for SinkEnvelope {
665 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
666 match self {
667 Self::Upsert => {
668 f.write_str("UPSERT");
669 }
670 Self::Debezium => {
671 f.write_str("DEBEZIUM");
672 }
673 }
674 }
675}
676impl_display!(SinkEnvelope);
677
678#[derive(Debug, Clone, PartialEq, Eq, Hash)]
679pub enum IcebergSinkMode {
680 Upsert,
681}
682
683impl AstDisplay for IcebergSinkMode {
684 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
685 match self {
686 Self::Upsert => {
687 f.write_str("UPSERT");
688 }
689 }
690 }
691}
692impl_display!(IcebergSinkMode);
693
694#[derive(Debug, Clone, PartialEq, Eq, Hash)]
695pub enum SubscribeOutput<T: AstInfo> {
696 Diffs,
697 WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
698 EnvelopeUpsert { key_columns: Vec<Ident> },
699 EnvelopeDebezium { key_columns: Vec<Ident> },
700}
701
702impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
703 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
704 match self {
705 Self::Diffs => {}
706 Self::WithinTimestampOrderBy { order_by } => {
707 f.write_str(" WITHIN TIMESTAMP ORDER BY ");
708 f.write_node(&display::comma_separated(order_by));
709 }
710 Self::EnvelopeUpsert { key_columns } => {
711 f.write_str(" ENVELOPE UPSERT (KEY (");
712 f.write_node(&display::comma_separated(key_columns));
713 f.write_str("))");
714 }
715 Self::EnvelopeDebezium { key_columns } => {
716 f.write_str(" ENVELOPE DEBEZIUM (KEY (");
717 f.write_node(&display::comma_separated(key_columns));
718 f.write_str("))");
719 }
720 }
721 }
722}
723impl_display_t!(SubscribeOutput);
724
725impl<T: AstInfo> AstDisplay for Format<T> {
726 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
727 match self {
728 Self::Bytes => f.write_str("BYTES"),
729 Self::Avro(inner) => {
730 f.write_str("AVRO ");
731 f.write_node(inner);
732 }
733 Self::Protobuf(inner) => {
734 f.write_str("PROTOBUF ");
735 f.write_node(inner);
736 }
737 Self::Regex(regex) => {
738 f.write_str("REGEX '");
739 f.write_node(&display::escape_single_quote_string(regex));
740 f.write_str("'");
741 }
742 Self::Csv { columns, delimiter } => {
743 f.write_str("CSV WITH ");
744 f.write_node(columns);
745
746 if *delimiter != ',' {
747 f.write_str(" DELIMITED BY '");
748 f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
749 f.write_str("'");
750 }
751 }
752 Self::Json { array } => {
753 f.write_str("JSON");
754 if *array {
755 f.write_str(" ARRAY");
756 }
757 }
758 Self::Text => f.write_str("TEXT"),
759 }
760 }
761}
762impl_display_t!(Format);
763
764#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
768pub enum ConnectionOptionName {
769 AccessKeyId,
770 AssumeRoleArn,
771 AssumeRoleSessionName,
772 AvailabilityZones,
773 AwsConnection,
774 AwsPrivatelink,
775 Broker,
776 Brokers,
777 Credential,
778 Database,
779 Endpoint,
780 Host,
781 Password,
782 Port,
783 ProgressTopic,
784 ProgressTopicReplicationFactor,
785 PublicKey1,
786 PublicKey2,
787 Region,
788 SaslMechanisms,
789 SaslPassword,
790 SaslUsername,
791 Scope,
792 SecretAccessKey,
793 SecurityProtocol,
794 ServiceName,
795 SshTunnel,
796 SslCertificate,
797 SslCertificateAuthority,
798 SslKey,
799 SslMode,
800 SessionToken,
801 CatalogType,
802 Url,
803 User,
804 Warehouse,
805}
806
807impl AstDisplay for ConnectionOptionName {
808 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
809 f.write_str(match self {
810 ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
811 ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
812 ConnectionOptionName::AwsConnection => "AWS CONNECTION",
813 ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
814 ConnectionOptionName::Broker => "BROKER",
815 ConnectionOptionName::Brokers => "BROKERS",
816 ConnectionOptionName::Credential => "CREDENTIAL",
817 ConnectionOptionName::Database => "DATABASE",
818 ConnectionOptionName::Endpoint => "ENDPOINT",
819 ConnectionOptionName::Host => "HOST",
820 ConnectionOptionName::Password => "PASSWORD",
821 ConnectionOptionName::Port => "PORT",
822 ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
823 ConnectionOptionName::ProgressTopicReplicationFactor => {
824 "PROGRESS TOPIC REPLICATION FACTOR"
825 }
826 ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
827 ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
828 ConnectionOptionName::Region => "REGION",
829 ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
830 ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
831 ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
832 ConnectionOptionName::SaslPassword => "SASL PASSWORD",
833 ConnectionOptionName::SaslUsername => "SASL USERNAME",
834 ConnectionOptionName::Scope => "SCOPE",
835 ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
836 ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
837 ConnectionOptionName::ServiceName => "SERVICE NAME",
838 ConnectionOptionName::SshTunnel => "SSH TUNNEL",
839 ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
840 ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
841 ConnectionOptionName::SslKey => "SSL KEY",
842 ConnectionOptionName::SslMode => "SSL MODE",
843 ConnectionOptionName::SessionToken => "SESSION TOKEN",
844 ConnectionOptionName::CatalogType => "CATALOG TYPE",
845 ConnectionOptionName::Url => "URL",
846 ConnectionOptionName::User => "USER",
847 ConnectionOptionName::Warehouse => "WAREHOUSE",
848 })
849 }
850}
851impl_display!(ConnectionOptionName);
852
853impl WithOptionName for ConnectionOptionName {
854 fn redact_value(&self) -> bool {
860 match self {
861 ConnectionOptionName::AccessKeyId
862 | ConnectionOptionName::AvailabilityZones
863 | ConnectionOptionName::AwsConnection
864 | ConnectionOptionName::AwsPrivatelink
865 | ConnectionOptionName::Broker
866 | ConnectionOptionName::Brokers
867 | ConnectionOptionName::Credential
868 | ConnectionOptionName::Database
869 | ConnectionOptionName::Endpoint
870 | ConnectionOptionName::Host
871 | ConnectionOptionName::Password
872 | ConnectionOptionName::Port
873 | ConnectionOptionName::ProgressTopic
874 | ConnectionOptionName::ProgressTopicReplicationFactor
875 | ConnectionOptionName::PublicKey1
876 | ConnectionOptionName::PublicKey2
877 | ConnectionOptionName::Region
878 | ConnectionOptionName::AssumeRoleArn
879 | ConnectionOptionName::AssumeRoleSessionName
880 | ConnectionOptionName::SaslMechanisms
881 | ConnectionOptionName::SaslPassword
882 | ConnectionOptionName::SaslUsername
883 | ConnectionOptionName::Scope
884 | ConnectionOptionName::SecurityProtocol
885 | ConnectionOptionName::SecretAccessKey
886 | ConnectionOptionName::ServiceName
887 | ConnectionOptionName::SshTunnel
888 | ConnectionOptionName::SslCertificate
889 | ConnectionOptionName::SslCertificateAuthority
890 | ConnectionOptionName::SslKey
891 | ConnectionOptionName::SslMode
892 | ConnectionOptionName::SessionToken
893 | ConnectionOptionName::CatalogType
894 | ConnectionOptionName::Url
895 | ConnectionOptionName::User
896 | ConnectionOptionName::Warehouse => false,
897 }
898 }
899}
900
901#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
902pub struct ConnectionOption<T: AstInfo> {
904 pub name: ConnectionOptionName,
905 pub value: Option<WithOptionValue<T>>,
906}
907impl_display_for_with_option!(ConnectionOption);
908impl_display_t!(ConnectionOption);
909
910#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
911pub enum CreateConnectionType {
912 Aws,
913 AwsPrivatelink,
914 Kafka,
915 Csr,
916 Postgres,
917 Ssh,
918 SqlServer,
919 MySql,
920 IcebergCatalog,
921}
922
923impl AstDisplay for CreateConnectionType {
924 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
925 match self {
926 Self::Kafka => {
927 f.write_str("KAFKA");
928 }
929 Self::Csr => {
930 f.write_str("CONFLUENT SCHEMA REGISTRY");
931 }
932 Self::Postgres => {
933 f.write_str("POSTGRES");
934 }
935 Self::Aws => {
936 f.write_str("AWS");
937 }
938 Self::AwsPrivatelink => {
939 f.write_str("AWS PRIVATELINK");
940 }
941 Self::Ssh => {
942 f.write_str("SSH TUNNEL");
943 }
944 Self::SqlServer => {
945 f.write_str("SQL SERVER");
946 }
947 Self::MySql => {
948 f.write_str("MYSQL");
949 }
950 Self::IcebergCatalog => {
951 f.write_str("ICEBERG CATALOG");
952 }
953 }
954 }
955}
956impl_display!(CreateConnectionType);
957
958#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
959pub enum CreateConnectionOptionName {
960 Validate,
961}
962
963impl AstDisplay for CreateConnectionOptionName {
964 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
965 f.write_str(match self {
966 CreateConnectionOptionName::Validate => "VALIDATE",
967 })
968 }
969}
970impl_display!(CreateConnectionOptionName);
971
972impl WithOptionName for CreateConnectionOptionName {
973 fn redact_value(&self) -> bool {
979 match self {
980 CreateConnectionOptionName::Validate => false,
981 }
982 }
983}
984
985#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
986pub struct CreateConnectionOption<T: AstInfo> {
988 pub name: CreateConnectionOptionName,
989 pub value: Option<WithOptionValue<T>>,
990}
991impl_display_for_with_option!(CreateConnectionOption);
992impl_display_t!(CreateConnectionOption);
993
994#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
995pub enum KafkaSourceConfigOptionName {
996 GroupIdPrefix,
997 Topic,
998 TopicMetadataRefreshInterval,
999 StartTimestamp,
1000 StartOffset,
1001}
1002
1003impl AstDisplay for KafkaSourceConfigOptionName {
1004 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1005 f.write_str(match self {
1006 KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
1007 KafkaSourceConfigOptionName::Topic => "TOPIC",
1008 KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
1009 "TOPIC METADATA REFRESH INTERVAL"
1010 }
1011 KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
1012 KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
1013 })
1014 }
1015}
1016impl_display!(KafkaSourceConfigOptionName);
1017
1018impl WithOptionName for KafkaSourceConfigOptionName {
1019 fn redact_value(&self) -> bool {
1025 match self {
1026 KafkaSourceConfigOptionName::GroupIdPrefix
1027 | KafkaSourceConfigOptionName::Topic
1028 | KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
1029 | KafkaSourceConfigOptionName::StartOffset
1030 | KafkaSourceConfigOptionName::StartTimestamp => false,
1031 }
1032 }
1033}
1034
1035#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1036pub struct KafkaSourceConfigOption<T: AstInfo> {
1037 pub name: KafkaSourceConfigOptionName,
1038 pub value: Option<WithOptionValue<T>>,
1039}
1040impl_display_for_with_option!(KafkaSourceConfigOption);
1041impl_display_t!(KafkaSourceConfigOption);
1042
1043#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1044pub enum KafkaSinkConfigOptionName {
1045 CompressionType,
1046 PartitionBy,
1047 ProgressGroupIdPrefix,
1048 Topic,
1049 TransactionalIdPrefix,
1050 LegacyIds,
1051 TopicConfig,
1052 TopicMetadataRefreshInterval,
1053 TopicPartitionCount,
1054 TopicReplicationFactor,
1055}
1056
1057impl AstDisplay for KafkaSinkConfigOptionName {
1058 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1059 f.write_str(match self {
1060 KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
1061 KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
1062 KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
1063 KafkaSinkConfigOptionName::Topic => "TOPIC",
1064 KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
1065 KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
1066 KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
1067 KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
1068 "TOPIC METADATA REFRESH INTERVAL"
1069 }
1070 KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
1071 KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
1072 })
1073 }
1074}
1075impl_display!(KafkaSinkConfigOptionName);
1076
1077impl WithOptionName for KafkaSinkConfigOptionName {
1078 fn redact_value(&self) -> bool {
1084 match self {
1085 KafkaSinkConfigOptionName::CompressionType
1086 | KafkaSinkConfigOptionName::ProgressGroupIdPrefix
1087 | KafkaSinkConfigOptionName::Topic
1088 | KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
1089 | KafkaSinkConfigOptionName::TransactionalIdPrefix
1090 | KafkaSinkConfigOptionName::LegacyIds
1091 | KafkaSinkConfigOptionName::TopicConfig
1092 | KafkaSinkConfigOptionName::TopicPartitionCount
1093 | KafkaSinkConfigOptionName::TopicReplicationFactor => false,
1094 KafkaSinkConfigOptionName::PartitionBy => true,
1095 }
1096 }
1097}
1098
1099#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1100pub struct KafkaSinkConfigOption<T: AstInfo> {
1101 pub name: KafkaSinkConfigOptionName,
1102 pub value: Option<WithOptionValue<T>>,
1103}
1104impl_display_for_with_option!(KafkaSinkConfigOption);
1105impl_display_t!(KafkaSinkConfigOption);
1106
1107#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1108pub enum IcebergSinkConfigOptionName {
1109 Namespace,
1110 Table,
1111}
1112
1113impl AstDisplay for IcebergSinkConfigOptionName {
1114 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1115 f.write_str(match self {
1116 IcebergSinkConfigOptionName::Namespace => "NAMESPACE",
1117 IcebergSinkConfigOptionName::Table => "TABLE",
1118 })
1119 }
1120}
1121impl_display!(IcebergSinkConfigOptionName);
1122
1123impl WithOptionName for IcebergSinkConfigOptionName {
1124 fn redact_value(&self) -> bool {
1130 match self {
1131 IcebergSinkConfigOptionName::Namespace | IcebergSinkConfigOptionName::Table => false,
1132 }
1133 }
1134}
1135
1136#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1137pub struct IcebergSinkConfigOption<T: AstInfo> {
1138 pub name: IcebergSinkConfigOptionName,
1139 pub value: Option<WithOptionValue<T>>,
1140}
1141impl_display_for_with_option!(IcebergSinkConfigOption);
1142impl_display_t!(IcebergSinkConfigOption);
1143
1144#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1145pub enum PgConfigOptionName {
1146 Details,
1149 Publication,
1151 TextColumns,
1158 ExcludeColumns,
1165}
1166
1167impl AstDisplay for PgConfigOptionName {
1168 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1169 f.write_str(match self {
1170 PgConfigOptionName::Details => "DETAILS",
1171 PgConfigOptionName::Publication => "PUBLICATION",
1172 PgConfigOptionName::TextColumns => "TEXT COLUMNS",
1173 PgConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1174 })
1175 }
1176}
1177impl_display!(PgConfigOptionName);
1178
1179impl WithOptionName for PgConfigOptionName {
1180 fn redact_value(&self) -> bool {
1186 match self {
1187 PgConfigOptionName::Details
1188 | PgConfigOptionName::Publication
1189 | PgConfigOptionName::TextColumns
1190 | PgConfigOptionName::ExcludeColumns => false,
1191 }
1192 }
1193}
1194
1195#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1196pub struct PgConfigOption<T: AstInfo> {
1198 pub name: PgConfigOptionName,
1199 pub value: Option<WithOptionValue<T>>,
1200}
1201impl_display_for_with_option!(PgConfigOption);
1202impl_display_t!(PgConfigOption);
1203
1204#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1205pub enum MySqlConfigOptionName {
1206 Details,
1209 TextColumns,
1216 ExcludeColumns,
1223}
1224
1225impl AstDisplay for MySqlConfigOptionName {
1226 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1227 f.write_str(match self {
1228 MySqlConfigOptionName::Details => "DETAILS",
1229 MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
1230 MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1231 })
1232 }
1233}
1234impl_display!(MySqlConfigOptionName);
1235
1236impl WithOptionName for MySqlConfigOptionName {
1237 fn redact_value(&self) -> bool {
1243 match self {
1244 MySqlConfigOptionName::Details
1245 | MySqlConfigOptionName::TextColumns
1246 | MySqlConfigOptionName::ExcludeColumns => false,
1247 }
1248 }
1249}
1250
1251#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1252pub struct MySqlConfigOption<T: AstInfo> {
1254 pub name: MySqlConfigOptionName,
1255 pub value: Option<WithOptionValue<T>>,
1256}
1257impl_display_for_with_option!(MySqlConfigOption);
1258impl_display_t!(MySqlConfigOption);
1259
1260#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1261pub enum SqlServerConfigOptionName {
1262 Details,
1265 TextColumns,
1273 ExcludeColumns,
1281}
1282
1283impl AstDisplay for SqlServerConfigOptionName {
1284 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1285 f.write_str(match self {
1286 SqlServerConfigOptionName::Details => "DETAILS",
1287 SqlServerConfigOptionName::TextColumns => "TEXT COLUMNS",
1288 SqlServerConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
1289 })
1290 }
1291}
1292impl_display!(SqlServerConfigOptionName);
1293
1294impl WithOptionName for SqlServerConfigOptionName {
1295 fn redact_value(&self) -> bool {
1301 match self {
1302 SqlServerConfigOptionName::Details
1303 | SqlServerConfigOptionName::TextColumns
1304 | SqlServerConfigOptionName::ExcludeColumns => false,
1305 }
1306 }
1307}
1308
1309#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1310pub struct SqlServerConfigOption<T: AstInfo> {
1312 pub name: SqlServerConfigOptionName,
1313 pub value: Option<WithOptionValue<T>>,
1314}
1315impl_display_for_with_option!(SqlServerConfigOption);
1316impl_display_t!(SqlServerConfigOption);
1317
1318#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1319pub enum CreateSourceConnection<T: AstInfo> {
1320 Kafka {
1321 connection: T::ItemName,
1322 options: Vec<KafkaSourceConfigOption<T>>,
1323 },
1324 Postgres {
1325 connection: T::ItemName,
1326 options: Vec<PgConfigOption<T>>,
1327 },
1328 SqlServer {
1329 connection: T::ItemName,
1330 options: Vec<SqlServerConfigOption<T>>,
1331 },
1332 MySql {
1333 connection: T::ItemName,
1334 options: Vec<MySqlConfigOption<T>>,
1335 },
1336 LoadGenerator {
1337 generator: LoadGenerator,
1338 options: Vec<LoadGeneratorOption<T>>,
1339 },
1340}
1341
1342impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
1343 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1344 match self {
1345 CreateSourceConnection::Kafka {
1346 connection,
1347 options,
1348 } => {
1349 f.write_str("KAFKA CONNECTION ");
1350 f.write_node(connection);
1351 if !options.is_empty() {
1352 f.write_str(" (");
1353 f.write_node(&display::comma_separated(options));
1354 f.write_str(")");
1355 }
1356 }
1357 CreateSourceConnection::Postgres {
1358 connection,
1359 options,
1360 } => {
1361 f.write_str("POSTGRES CONNECTION ");
1362 f.write_node(connection);
1363 if !options.is_empty() {
1364 f.write_str(" (");
1365 f.write_node(&display::comma_separated(options));
1366 f.write_str(")");
1367 }
1368 }
1369 CreateSourceConnection::SqlServer {
1370 connection,
1371 options,
1372 } => {
1373 f.write_str("SQL SERVER CONNECTION ");
1374 f.write_node(connection);
1375 if !options.is_empty() {
1376 f.write_str(" (");
1377 f.write_node(&display::comma_separated(options));
1378 f.write_str(")");
1379 }
1380 }
1381 CreateSourceConnection::MySql {
1382 connection,
1383 options,
1384 } => {
1385 f.write_str("MYSQL CONNECTION ");
1386 f.write_node(connection);
1387 if !options.is_empty() {
1388 f.write_str(" (");
1389 f.write_node(&display::comma_separated(options));
1390 f.write_str(")");
1391 }
1392 }
1393 CreateSourceConnection::LoadGenerator { generator, options } => {
1394 f.write_str("LOAD GENERATOR ");
1395 f.write_node(generator);
1396 if !options.is_empty() {
1397 f.write_str(" (");
1398 f.write_node(&display::comma_separated(options));
1399 f.write_str(")");
1400 }
1401 }
1402 }
1403 }
1404}
1405impl_display_t!(CreateSourceConnection);
1406
1407#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1408pub enum LoadGenerator {
1409 Clock,
1410 Counter,
1411 Marketing,
1412 Auction,
1413 Datums,
1414 Tpch,
1415 KeyValue,
1416}
1417
1418impl AstDisplay for LoadGenerator {
1419 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1420 match self {
1421 Self::Counter => f.write_str("COUNTER"),
1422 Self::Clock => f.write_str("CLOCK"),
1423 Self::Marketing => f.write_str("MARKETING"),
1424 Self::Auction => f.write_str("AUCTION"),
1425 Self::Datums => f.write_str("DATUMS"),
1426 Self::Tpch => f.write_str("TPCH"),
1427 Self::KeyValue => f.write_str("KEY VALUE"),
1428 }
1429 }
1430}
1431impl_display!(LoadGenerator);
1432
1433impl LoadGenerator {
1434 pub fn schema_name(&self) -> &'static str {
1439 match self {
1440 LoadGenerator::Counter => "counter",
1441 LoadGenerator::Clock => "clock",
1442 LoadGenerator::Marketing => "marketing",
1443 LoadGenerator::Auction => "auction",
1444 LoadGenerator::Datums => "datums",
1445 LoadGenerator::Tpch => "tpch",
1446 LoadGenerator::KeyValue => "key_value",
1447 }
1448 }
1449}
1450
1451#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1452pub enum LoadGeneratorOptionName {
1453 ScaleFactor,
1454 TickInterval,
1455 AsOf,
1456 UpTo,
1457 MaxCardinality,
1458 Keys,
1459 SnapshotRounds,
1460 TransactionalSnapshot,
1461 ValueSize,
1462 Seed,
1463 Partitions,
1464 BatchSize,
1465}
1466
1467impl AstDisplay for LoadGeneratorOptionName {
1468 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1469 f.write_str(match self {
1470 LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
1471 LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
1472 LoadGeneratorOptionName::AsOf => "AS OF",
1473 LoadGeneratorOptionName::UpTo => "UP TO",
1474 LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
1475 LoadGeneratorOptionName::Keys => "KEYS",
1476 LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
1477 LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
1478 LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
1479 LoadGeneratorOptionName::Seed => "SEED",
1480 LoadGeneratorOptionName::Partitions => "PARTITIONS",
1481 LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
1482 })
1483 }
1484}
1485impl_display!(LoadGeneratorOptionName);
1486
1487impl WithOptionName for LoadGeneratorOptionName {
1488 fn redact_value(&self) -> bool {
1494 match self {
1495 LoadGeneratorOptionName::ScaleFactor
1496 | LoadGeneratorOptionName::TickInterval
1497 | LoadGeneratorOptionName::AsOf
1498 | LoadGeneratorOptionName::UpTo
1499 | LoadGeneratorOptionName::MaxCardinality
1500 | LoadGeneratorOptionName::Keys
1501 | LoadGeneratorOptionName::SnapshotRounds
1502 | LoadGeneratorOptionName::TransactionalSnapshot
1503 | LoadGeneratorOptionName::ValueSize
1504 | LoadGeneratorOptionName::Partitions
1505 | LoadGeneratorOptionName::BatchSize
1506 | LoadGeneratorOptionName::Seed => false,
1507 }
1508 }
1509}
1510
1511#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1512pub struct LoadGeneratorOption<T: AstInfo> {
1514 pub name: LoadGeneratorOptionName,
1515 pub value: Option<WithOptionValue<T>>,
1516}
1517impl_display_for_with_option!(LoadGeneratorOption);
1518impl_display_t!(LoadGeneratorOption);
1519
1520#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1521pub enum CreateSinkConnection<T: AstInfo> {
1522 Kafka {
1523 connection: T::ItemName,
1524 options: Vec<KafkaSinkConfigOption<T>>,
1525 key: Option<SinkKey>,
1526 headers: Option<Ident>,
1527 },
1528 Iceberg {
1529 connection: T::ItemName,
1530 aws_connection: T::ItemName,
1531 key: Option<SinkKey>,
1532 options: Vec<IcebergSinkConfigOption<T>>,
1533 },
1534}
1535
1536impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
1537 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1538 match self {
1539 CreateSinkConnection::Kafka {
1540 connection,
1541 options,
1542 key,
1543 headers,
1544 } => {
1545 f.write_str("KAFKA CONNECTION ");
1546 f.write_node(connection);
1547 if !options.is_empty() {
1548 f.write_str(" (");
1549 f.write_node(&display::comma_separated(options));
1550 f.write_str(")");
1551 }
1552 if let Some(key) = key.as_ref() {
1553 f.write_str(" ");
1554 f.write_node(key);
1555 }
1556 if let Some(headers) = headers {
1557 f.write_str(" HEADERS ");
1558 f.write_node(headers);
1559 }
1560 }
1561 CreateSinkConnection::Iceberg {
1562 connection,
1563 aws_connection,
1564 key,
1565 options,
1566 } => {
1567 f.write_str("ICEBERG CATALOG CONNECTION ");
1568 f.write_node(connection);
1569 if !options.is_empty() {
1570 f.write_str(" (");
1571 f.write_node(&display::comma_separated(options));
1572 f.write_str(")");
1573 }
1574 f.write_str(" USING AWS CONNECTION ");
1575 f.write_node(aws_connection);
1576 if let Some(key) = key.as_ref() {
1577 f.write_str(" ");
1578 f.write_node(key);
1579 }
1580 }
1581 }
1582 }
1583}
1584impl_display_t!(CreateSinkConnection);
1585
1586#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1587pub struct SinkKey {
1588 pub key_columns: Vec<Ident>,
1589 pub not_enforced: bool,
1590}
1591
1592impl AstDisplay for SinkKey {
1593 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1594 f.write_str("KEY (");
1595 f.write_node(&display::comma_separated(&self.key_columns));
1596 f.write_str(")");
1597 if self.not_enforced {
1598 f.write_str(" NOT ENFORCED");
1599 }
1600 }
1601}
1602
1603#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1606pub enum TableConstraint<T: AstInfo> {
1607 Unique {
1609 name: Option<Ident>,
1610 columns: Vec<Ident>,
1611 is_primary: bool,
1613 nulls_not_distinct: bool,
1616 },
1617 ForeignKey {
1620 name: Option<Ident>,
1621 columns: Vec<Ident>,
1622 foreign_table: T::ItemName,
1623 referred_columns: Vec<Ident>,
1624 },
1625 Check {
1627 name: Option<Ident>,
1628 expr: Box<Expr<T>>,
1629 },
1630}
1631
1632impl<T: AstInfo> AstDisplay for TableConstraint<T> {
1633 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1634 match self {
1635 TableConstraint::Unique {
1636 name,
1637 columns,
1638 is_primary,
1639 nulls_not_distinct,
1640 } => {
1641 f.write_node(&display_constraint_name(name));
1642 if *is_primary {
1643 f.write_str("PRIMARY KEY ");
1644 } else {
1645 f.write_str("UNIQUE ");
1646 if *nulls_not_distinct {
1647 f.write_str("NULLS NOT DISTINCT ");
1648 }
1649 }
1650 f.write_str("(");
1651 f.write_node(&display::comma_separated(columns));
1652 f.write_str(")");
1653 }
1654 TableConstraint::ForeignKey {
1655 name,
1656 columns,
1657 foreign_table,
1658 referred_columns,
1659 } => {
1660 f.write_node(&display_constraint_name(name));
1661 f.write_str("FOREIGN KEY (");
1662 f.write_node(&display::comma_separated(columns));
1663 f.write_str(") REFERENCES ");
1664 f.write_node(foreign_table);
1665 f.write_str("(");
1666 f.write_node(&display::comma_separated(referred_columns));
1667 f.write_str(")");
1668 }
1669 TableConstraint::Check { name, expr } => {
1670 f.write_node(&display_constraint_name(name));
1671 f.write_str("CHECK (");
1672 f.write_node(&expr);
1673 f.write_str(")");
1674 }
1675 }
1676 }
1677}
1678impl_display_t!(TableConstraint);
1679
1680#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1682pub enum KeyConstraint {
1683 PrimaryKeyNotEnforced { columns: Vec<Ident> },
1685}
1686
1687impl AstDisplay for KeyConstraint {
1688 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1689 match self {
1690 KeyConstraint::PrimaryKeyNotEnforced { columns } => {
1691 f.write_str("PRIMARY KEY ");
1692 f.write_str("(");
1693 f.write_node(&display::comma_separated(columns));
1694 f.write_str(") ");
1695 f.write_str("NOT ENFORCED");
1696 }
1697 }
1698 }
1699}
1700impl_display!(KeyConstraint);
1701
1702#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1703pub enum CreateSourceOptionName {
1704 TimestampInterval,
1705 RetainHistory,
1706}
1707
1708impl AstDisplay for CreateSourceOptionName {
1709 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1710 f.write_str(match self {
1711 CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
1712 CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
1713 })
1714 }
1715}
1716impl_display!(CreateSourceOptionName);
1717
1718impl WithOptionName for CreateSourceOptionName {
1719 fn redact_value(&self) -> bool {
1725 match self {
1726 CreateSourceOptionName::TimestampInterval | CreateSourceOptionName::RetainHistory => {
1727 false
1728 }
1729 }
1730 }
1731}
1732
1733#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1734pub struct CreateSourceOption<T: AstInfo> {
1736 pub name: CreateSourceOptionName,
1737 pub value: Option<WithOptionValue<T>>,
1738}
1739impl_display_for_with_option!(CreateSourceOption);
1740impl_display_t!(CreateSourceOption);
1741
1742#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1744pub struct ColumnDef<T: AstInfo> {
1745 pub name: Ident,
1746 pub data_type: T::DataType,
1747 pub collation: Option<UnresolvedItemName>,
1748 pub options: Vec<ColumnOptionDef<T>>,
1749}
1750
1751impl<T: AstInfo> AstDisplay for ColumnDef<T> {
1752 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1753 f.write_node(&self.name);
1754 f.write_str(" ");
1755 f.write_node(&self.data_type);
1756 if let Some(collation) = &self.collation {
1757 f.write_str(" COLLATE ");
1758 f.write_node(collation);
1759 }
1760 for option in &self.options {
1761 f.write_str(" ");
1762 f.write_node(option);
1763 }
1764 }
1765}
1766impl_display_t!(ColumnDef);
1767
1768#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1785pub struct ColumnOptionDef<T: AstInfo> {
1786 pub name: Option<Ident>,
1787 pub option: ColumnOption<T>,
1788}
1789
1790impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
1791 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1792 f.write_node(&display_constraint_name(&self.name));
1793 f.write_node(&self.option);
1794 }
1795}
1796impl_display_t!(ColumnOptionDef);
1797
1798#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1801pub enum ColumnOption<T: AstInfo> {
1802 Null,
1804 NotNull,
1806 Default(Expr<T>),
1808 Unique { is_primary: bool },
1810 ForeignKey {
1813 foreign_table: UnresolvedItemName,
1814 referred_columns: Vec<Ident>,
1815 },
1816 Check(Expr<T>),
1818 Versioned {
1820 action: ColumnVersioned,
1821 version: Version,
1822 },
1823}
1824
1825impl<T: AstInfo> AstDisplay for ColumnOption<T> {
1826 fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
1827 use ColumnOption::*;
1828 match self {
1829 Null => f.write_str("NULL"),
1830 NotNull => f.write_str("NOT NULL"),
1831 Default(expr) => {
1832 f.write_str("DEFAULT ");
1833 f.write_node(expr);
1834 }
1835 Unique { is_primary } => {
1836 if *is_primary {
1837 f.write_str("PRIMARY KEY");
1838 } else {
1839 f.write_str("UNIQUE");
1840 }
1841 }
1842 ForeignKey {
1843 foreign_table,
1844 referred_columns,
1845 } => {
1846 f.write_str("REFERENCES ");
1847 f.write_node(foreign_table);
1848 f.write_str(" (");
1849 f.write_node(&display::comma_separated(referred_columns));
1850 f.write_str(")");
1851 }
1852 Check(expr) => {
1853 f.write_str("CHECK (");
1854 f.write_node(expr);
1855 f.write_str(")");
1856 }
1857 Versioned { action, version } => {
1858 f.write_str("VERSION ");
1859 f.write_node(action);
1860 f.write_str(" ");
1861 f.write_node(version);
1862 }
1863 }
1864 }
1865}
1866impl_display_t!(ColumnOption);
1867
1868#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1869pub enum ColumnVersioned {
1870 Added,
1871}
1872
1873impl AstDisplay for ColumnVersioned {
1874 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1875 where
1876 W: fmt::Write,
1877 {
1878 match self {
1879 ColumnVersioned::Added => f.write_str("ADDED"),
1881 }
1882 }
1883}
1884impl_display!(ColumnVersioned);
1885
1886fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
1887 struct ConstraintName<'a>(&'a Option<Ident>);
1888 impl<'a> AstDisplay for ConstraintName<'a> {
1889 fn fmt<W>(&self, f: &mut AstFormatter<W>)
1890 where
1891 W: fmt::Write,
1892 {
1893 if let Some(name) = self.0 {
1894 f.write_str("CONSTRAINT ");
1895 f.write_node(name);
1896 f.write_str(" ");
1897 }
1898 }
1899 }
1900 ConstraintName(name)
1901}