1use std::collections::{BTreeMap, BTreeSet};
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use chrono::{DateTime, Utc};
17use derivative::Derivative;
18use enum_kinds::EnumKind;
19use futures::Stream;
20use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
21use mz_auth::password::Password;
22use mz_cluster_client::ReplicaId;
23use mz_compute_types::ComputeInstanceId;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_controller_types::ClusterId;
26use mz_expr::RowSetFinishing;
27use mz_ore::collections::CollectionExt;
28use mz_ore::soft_assert_no_log;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_persist_client::PersistClient;
31use mz_pgcopy::CopyFormatParams;
32use mz_repr::global_id::TransientIdGen;
33use mz_repr::role_id::RoleId;
34use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType};
35use mz_sql::ast::{FetchDirection, Raw, Statement};
36use mz_sql::catalog::ObjectType;
37use mz_sql::optimizer_metrics::OptimizerMetrics;
38use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind, SideEffectingFunc};
39use mz_sql::session::user::User;
40use mz_sql::session::vars::{OwnedVarInput, SystemVars};
41use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
42use mz_storage_types::sources::Timeline;
43use mz_timestamp_oracle::TimestampOracle;
44use tokio::sync::{mpsc, oneshot};
45use uuid::Uuid;
46
47use crate::catalog::Catalog;
48use crate::coord::appends::BuiltinTableAppendNotify;
49use crate::coord::consistency::CoordinatorInconsistencies;
50use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
51use crate::coord::timestamp_selection::TimestampDetermination;
52use crate::coord::{ExecuteContextExtra, ExecuteContextGuard};
53use crate::error::AdapterError;
54use crate::session::{EndTransactionAction, RowBatchStream, Session};
55use crate::statement_logging::WatchSetCreation;
56use crate::statement_logging::{
57 FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy,
58 StatementLoggingFrontend,
59};
60use crate::util::Transmittable;
61use crate::webhook::AppendWebhookResponse;
62use crate::{
63 AdapterNotice, AppendWebhookError, CollectionIdBundle, ReadHolds, TimestampExplanation,
64};
65
66#[derive(Debug)]
67pub struct CatalogSnapshot {
68 pub catalog: Arc<Catalog>,
69}
70
71#[derive(Debug)]
72pub enum Command {
73 CatalogSnapshot {
74 tx: oneshot::Sender<CatalogSnapshot>,
75 },
76
77 Startup {
78 tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
79 user: User,
80 conn_id: ConnectionId,
81 client_ip: Option<IpAddr>,
82 secret_key: u32,
83 uuid: Uuid,
84 application_name: String,
85 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
86 },
87
88 AuthenticatePassword {
89 tx: oneshot::Sender<Result<AuthResponse, AdapterError>>,
90 role_name: String,
91 password: Option<Password>,
92 },
93
94 AuthenticateGetSASLChallenge {
95 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
96 role_name: String,
97 nonce: String,
98 },
99
100 AuthenticateVerifySASLProof {
101 tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
102 role_name: String,
103 proof: String,
104 auth_message: String,
105 mock_hash: String,
106 },
107
108 Execute {
109 portal_name: String,
110 session: Session,
111 tx: oneshot::Sender<Response<ExecuteResponse>>,
112 outer_ctx_extra: Option<ExecuteContextGuard>,
113 },
114
115 Commit {
120 action: EndTransactionAction,
121 session: Session,
122 tx: oneshot::Sender<Response<ExecuteResponse>>,
123 },
124
125 CancelRequest {
126 conn_id: ConnectionIdType,
127 secret_key: u32,
128 },
129
130 PrivilegedCancelRequest {
131 conn_id: ConnectionId,
132 },
133
134 GetWebhook {
135 database: String,
136 schema: String,
137 name: String,
138 tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
139 },
140
141 GetSystemVars {
142 tx: oneshot::Sender<SystemVars>,
143 },
144
145 SetSystemVars {
146 vars: BTreeMap<String, String>,
147 conn_id: ConnectionId,
148 tx: oneshot::Sender<Result<(), AdapterError>>,
149 },
150
151 Terminate {
152 conn_id: ConnectionId,
153 tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
154 },
155
156 RetireExecute {
163 data: ExecuteContextExtra,
164 reason: StatementEndedExecutionReason,
165 },
166
167 CheckConsistency {
168 tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
169 },
170
171 Dump {
172 tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
173 },
174
175 GetComputeInstanceClient {
176 instance_id: ComputeInstanceId,
177 tx: oneshot::Sender<
178 Result<
179 mz_compute_client::controller::instance::Client<mz_repr::Timestamp>,
180 mz_compute_client::controller::error::InstanceMissing,
181 >,
182 >,
183 },
184
185 GetOracle {
186 timeline: Timeline,
187 tx: oneshot::Sender<
188 Result<Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync>, AdapterError>,
189 >,
190 },
191
192 DetermineRealTimeRecentTimestamp {
193 source_ids: BTreeSet<GlobalId>,
194 real_time_recency_timeout: Duration,
195 tx: oneshot::Sender<Result<Option<mz_repr::Timestamp>, AdapterError>>,
196 },
197
198 GetTransactionReadHoldsBundle {
199 conn_id: ConnectionId,
200 tx: oneshot::Sender<Option<ReadHolds<mz_repr::Timestamp>>>,
201 },
202
203 StoreTransactionReadHolds {
205 conn_id: ConnectionId,
206 read_holds: ReadHolds<mz_repr::Timestamp>,
207 tx: oneshot::Sender<()>,
208 },
209
210 ExecuteSlowPathPeek {
211 dataflow_plan: Box<PeekDataflowPlan<mz_repr::Timestamp>>,
212 determination: TimestampDetermination<mz_repr::Timestamp>,
213 finishing: RowSetFinishing,
214 compute_instance: ComputeInstanceId,
215 target_replica: Option<ReplicaId>,
216 intermediate_result_type: SqlRelationType,
217 source_ids: BTreeSet<GlobalId>,
218 conn_id: ConnectionId,
219 max_result_size: u64,
220 max_query_result_size: Option<u64>,
221 watch_set: Option<WatchSetCreation>,
224 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
225 },
226
227 CopyToPreflight {
231 s3_sink_connection: mz_compute_types::sinks::CopyToS3OneshotSinkConnection,
233 sink_id: GlobalId,
235 tx: oneshot::Sender<Result<(), AdapterError>>,
237 },
238
239 ExecuteCopyTo {
240 df_desc: Box<DataflowDescription<mz_compute_types::plan::Plan>>,
241 compute_instance: ComputeInstanceId,
242 target_replica: Option<ReplicaId>,
243 source_ids: BTreeSet<GlobalId>,
244 conn_id: ConnectionId,
245 watch_set: Option<WatchSetCreation>,
248 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
249 },
250
251 ExecuteSideEffectingFunc {
253 plan: SideEffectingFunc,
254 conn_id: ConnectionId,
255 current_role: RoleId,
257 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
258 },
259
260 RegisterFrontendPeek {
264 uuid: Uuid,
265 conn_id: ConnectionId,
266 cluster_id: mz_controller_types::ClusterId,
267 depends_on: BTreeSet<GlobalId>,
268 is_fast_path: bool,
269 watch_set: Option<WatchSetCreation>,
272 tx: oneshot::Sender<Result<(), AdapterError>>,
273 },
274
275 UnregisterFrontendPeek {
280 uuid: Uuid,
281 tx: oneshot::Sender<()>,
282 },
283
284 ExplainTimestamp {
287 conn_id: ConnectionId,
288 session_wall_time: DateTime<Utc>,
289 cluster_id: ClusterId,
290 id_bundle: CollectionIdBundle,
291 determination: TimestampDetermination<mz_repr::Timestamp>,
292 tx: oneshot::Sender<TimestampExplanation<mz_repr::Timestamp>>,
293 },
294
295 FrontendStatementLogging(FrontendStatementLoggingEvent),
298}
299
300impl Command {
301 pub fn session(&self) -> Option<&Session> {
302 match self {
303 Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
304 Command::CancelRequest { .. }
305 | Command::Startup { .. }
306 | Command::AuthenticatePassword { .. }
307 | Command::AuthenticateGetSASLChallenge { .. }
308 | Command::AuthenticateVerifySASLProof { .. }
309 | Command::CatalogSnapshot { .. }
310 | Command::PrivilegedCancelRequest { .. }
311 | Command::GetWebhook { .. }
312 | Command::Terminate { .. }
313 | Command::GetSystemVars { .. }
314 | Command::SetSystemVars { .. }
315 | Command::RetireExecute { .. }
316 | Command::CheckConsistency { .. }
317 | Command::Dump { .. }
318 | Command::GetComputeInstanceClient { .. }
319 | Command::GetOracle { .. }
320 | Command::DetermineRealTimeRecentTimestamp { .. }
321 | Command::GetTransactionReadHoldsBundle { .. }
322 | Command::StoreTransactionReadHolds { .. }
323 | Command::ExecuteSlowPathPeek { .. }
324 | Command::CopyToPreflight { .. }
325 | Command::ExecuteCopyTo { .. }
326 | Command::ExecuteSideEffectingFunc { .. }
327 | Command::RegisterFrontendPeek { .. }
328 | Command::UnregisterFrontendPeek { .. }
329 | Command::ExplainTimestamp { .. }
330 | Command::FrontendStatementLogging(..) => None,
331 }
332 }
333
334 pub fn session_mut(&mut self) -> Option<&mut Session> {
335 match self {
336 Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
337 Command::CancelRequest { .. }
338 | Command::Startup { .. }
339 | Command::AuthenticatePassword { .. }
340 | Command::AuthenticateGetSASLChallenge { .. }
341 | Command::AuthenticateVerifySASLProof { .. }
342 | Command::CatalogSnapshot { .. }
343 | Command::PrivilegedCancelRequest { .. }
344 | Command::GetWebhook { .. }
345 | Command::Terminate { .. }
346 | Command::GetSystemVars { .. }
347 | Command::SetSystemVars { .. }
348 | Command::RetireExecute { .. }
349 | Command::CheckConsistency { .. }
350 | Command::Dump { .. }
351 | Command::GetComputeInstanceClient { .. }
352 | Command::GetOracle { .. }
353 | Command::DetermineRealTimeRecentTimestamp { .. }
354 | Command::GetTransactionReadHoldsBundle { .. }
355 | Command::StoreTransactionReadHolds { .. }
356 | Command::ExecuteSlowPathPeek { .. }
357 | Command::CopyToPreflight { .. }
358 | Command::ExecuteCopyTo { .. }
359 | Command::ExecuteSideEffectingFunc { .. }
360 | Command::RegisterFrontendPeek { .. }
361 | Command::UnregisterFrontendPeek { .. }
362 | Command::ExplainTimestamp { .. }
363 | Command::FrontendStatementLogging(..) => None,
364 }
365 }
366}
367
368#[derive(Debug)]
369pub struct Response<T> {
370 pub result: Result<T, AdapterError>,
371 pub session: Session,
372 pub otel_ctx: OpenTelemetryContext,
373}
374
375#[derive(Derivative)]
377#[derivative(Debug)]
378pub struct StartupResponse {
379 pub role_id: RoleId,
381 #[derivative(Debug = "ignore")]
383 pub write_notify: BuiltinTableAppendNotify,
384 pub session_defaults: BTreeMap<String, OwnedVarInput>,
386 pub catalog: Arc<Catalog>,
387 pub storage_collections: Arc<
388 dyn mz_storage_client::storage_collections::StorageCollections<
389 Timestamp = mz_repr::Timestamp,
390 > + Send
391 + Sync,
392 >,
393 pub transient_id_gen: Arc<TransientIdGen>,
394 pub optimizer_metrics: OptimizerMetrics,
395 pub persist_client: PersistClient,
396 pub statement_logging_frontend: StatementLoggingFrontend,
397}
398
399#[derive(Derivative)]
401#[derivative(Debug)]
402pub struct AuthResponse {
403 pub role_id: RoleId,
405 pub superuser: bool,
407}
408
409#[derive(Derivative)]
410#[derivative(Debug)]
411pub struct SASLChallengeResponse {
412 pub iteration_count: usize,
413 pub salt: String,
415 pub nonce: String,
416}
417
418#[derive(Derivative)]
419#[derivative(Debug)]
420pub struct SASLVerifyProofResponse {
421 pub verifier: String,
422 pub auth_resp: AuthResponse,
423}
424
425impl Transmittable for StartupResponse {
428 type Allowed = bool;
429 fn to_allowed(&self) -> Self::Allowed {
430 true
431 }
432}
433
434#[derive(Debug, Clone)]
436pub struct CatalogDump(String);
437
438impl CatalogDump {
439 pub fn new(raw: String) -> Self {
440 CatalogDump(raw)
441 }
442
443 pub fn into_string(self) -> String {
444 self.0
445 }
446}
447
448impl Transmittable for CatalogDump {
449 type Allowed = bool;
450 fn to_allowed(&self) -> Self::Allowed {
451 true
452 }
453}
454
455impl Transmittable for SystemVars {
456 type Allowed = bool;
457 fn to_allowed(&self) -> Self::Allowed {
458 true
459 }
460}
461
462#[derive(EnumKind, Derivative)]
464#[derivative(Debug)]
465#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
466pub enum ExecuteResponse {
467 AlteredDefaultPrivileges,
469 AlteredObject(ObjectType),
471 AlteredRole,
473 AlteredSystemConfiguration,
475 ClosedCursor,
477 Comment,
479 Copied(usize),
481 CopyTo {
483 format: mz_sql::plan::CopyFormat,
484 resp: Box<ExecuteResponse>,
485 },
486 CopyFrom {
487 target_id: CatalogItemId,
489 target_name: String,
491 columns: Vec<ColumnIndex>,
492 params: CopyFormatParams<'static>,
493 ctx_extra: ExecuteContextGuard,
494 },
495 CreatedConnection,
497 CreatedDatabase,
499 CreatedSchema,
501 CreatedRole,
503 CreatedCluster,
505 CreatedClusterReplica,
507 CreatedIndex,
509 CreatedIntrospectionSubscribe,
511 CreatedSecret,
513 CreatedSink,
515 CreatedSource,
517 CreatedTable,
519 CreatedView,
521 CreatedViews,
523 CreatedMaterializedView,
525 CreatedContinualTask,
527 CreatedType,
529 CreatedNetworkPolicy,
531 Deallocate { all: bool },
533 DeclaredCursor,
535 Deleted(usize),
537 DiscardedTemp,
539 DiscardedAll,
541 DroppedObject(ObjectType),
543 DroppedOwned,
545 EmptyQuery,
547 Fetch {
549 name: String,
551 count: Option<FetchDirection>,
553 timeout: ExecuteTimeout,
555 ctx_extra: ExecuteContextGuard,
556 },
557 GrantedPrivilege,
559 GrantedRole,
561 Inserted(usize),
563 Prepare,
565 Raised,
567 ReassignOwned,
569 RevokedPrivilege,
571 RevokedRole,
573 SendingRowsStreaming {
575 #[derivative(Debug = "ignore")]
576 rows: Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>,
577 instance_id: ComputeInstanceId,
578 strategy: StatementExecutionStrategy,
579 },
580 SendingRowsImmediate {
583 #[derivative(Debug = "ignore")]
584 rows: Box<dyn RowIterator + Send + Sync>,
585 },
586 SetVariable {
588 name: String,
589 reset: bool,
591 },
592 StartedTransaction,
594 Subscribing {
597 rx: RowBatchStream,
598 ctx_extra: ExecuteContextGuard,
599 instance_id: ComputeInstanceId,
600 },
601 TransactionCommitted {
603 params: BTreeMap<&'static str, String>,
605 },
606 TransactionRolledBack {
608 params: BTreeMap<&'static str, String>,
610 },
611 Updated(usize),
613 ValidatedConnection,
615}
616
617impl TryFrom<&Statement<Raw>> for ExecuteResponse {
618 type Error = ();
619
620 fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
622 let resp_kinds = Plan::generated_from(&stmt.into())
623 .iter()
624 .map(ExecuteResponse::generated_from)
625 .flatten()
626 .cloned()
627 .collect::<BTreeSet<ExecuteResponseKind>>();
628 let resps = resp_kinds
629 .iter()
630 .map(|r| (*r).try_into())
631 .collect::<Result<Vec<ExecuteResponse>, _>>();
632 if let Ok(resps) = resps {
634 if resps.len() == 1 {
635 return Ok(resps.into_element());
636 }
637 }
638 let resp = match stmt {
639 Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
640 ExecuteResponse::DroppedObject((*object_type).into())
641 }
642 Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
643 | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
644 ExecuteResponse::AlteredObject((*object_type).into())
645 }
646 _ => return Err(()),
647 };
648 soft_assert_no_log!(
650 resp_kinds.len() == 1
651 && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
652 "ExecuteResponses out of sync with planner"
653 );
654 Ok(resp)
655 }
656}
657
658impl TryInto<ExecuteResponse> for ExecuteResponseKind {
659 type Error = ();
660
661 fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
664 match self {
665 ExecuteResponseKind::AlteredDefaultPrivileges => {
666 Ok(ExecuteResponse::AlteredDefaultPrivileges)
667 }
668 ExecuteResponseKind::AlteredObject => Err(()),
669 ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
670 ExecuteResponseKind::AlteredSystemConfiguration => {
671 Ok(ExecuteResponse::AlteredSystemConfiguration)
672 }
673 ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
674 ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
675 ExecuteResponseKind::Copied => Err(()),
676 ExecuteResponseKind::CopyTo => Err(()),
677 ExecuteResponseKind::CopyFrom => Err(()),
678 ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
679 ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
680 ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
681 ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
682 ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
683 ExecuteResponseKind::CreatedClusterReplica => {
684 Ok(ExecuteResponse::CreatedClusterReplica)
685 }
686 ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
687 ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
688 ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
689 ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
690 ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
691 ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
692 ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
693 ExecuteResponseKind::CreatedMaterializedView => {
694 Ok(ExecuteResponse::CreatedMaterializedView)
695 }
696 ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
697 ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
698 ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
699 ExecuteResponseKind::Deallocate => Err(()),
700 ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
701 ExecuteResponseKind::Deleted => Err(()),
702 ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
703 ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
704 ExecuteResponseKind::DroppedObject => Err(()),
705 ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
706 ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
707 ExecuteResponseKind::Fetch => Err(()),
708 ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
709 ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
710 ExecuteResponseKind::Inserted => Err(()),
711 ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
712 ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
713 ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
714 ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
715 ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
716 ExecuteResponseKind::SetVariable => Err(()),
717 ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
718 ExecuteResponseKind::Subscribing => Err(()),
719 ExecuteResponseKind::TransactionCommitted => Err(()),
720 ExecuteResponseKind::TransactionRolledBack => Err(()),
721 ExecuteResponseKind::Updated => Err(()),
722 ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
723 ExecuteResponseKind::SendingRowsStreaming => Err(()),
724 ExecuteResponseKind::SendingRowsImmediate => Err(()),
725 ExecuteResponseKind::CreatedIntrospectionSubscribe => {
726 Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
727 }
728 }
729 }
730}
731
732impl ExecuteResponse {
733 pub fn tag(&self) -> Option<String> {
734 use ExecuteResponse::*;
735 match self {
736 AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
737 AlteredObject(o) => Some(format!("ALTER {}", o)),
738 AlteredRole => Some("ALTER ROLE".into()),
739 AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
740 ClosedCursor => Some("CLOSE CURSOR".into()),
741 Comment => Some("COMMENT".into()),
742 Copied(n) => Some(format!("COPY {}", n)),
743 CopyTo { .. } => None,
744 CopyFrom { .. } => None,
745 CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
746 CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
747 CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
748 CreatedRole => Some("CREATE ROLE".into()),
749 CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
750 CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
751 CreatedIndex { .. } => Some("CREATE INDEX".into()),
752 CreatedSecret { .. } => Some("CREATE SECRET".into()),
753 CreatedSink { .. } => Some("CREATE SINK".into()),
754 CreatedSource { .. } => Some("CREATE SOURCE".into()),
755 CreatedTable { .. } => Some("CREATE TABLE".into()),
756 CreatedView { .. } => Some("CREATE VIEW".into()),
757 CreatedViews { .. } => Some("CREATE VIEWS".into()),
758 CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
759 CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
760 CreatedType => Some("CREATE TYPE".into()),
761 CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
762 Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
763 DeclaredCursor => Some("DECLARE CURSOR".into()),
764 Deleted(n) => Some(format!("DELETE {}", n)),
765 DiscardedTemp => Some("DISCARD TEMP".into()),
766 DiscardedAll => Some("DISCARD ALL".into()),
767 DroppedObject(o) => Some(format!("DROP {o}")),
768 DroppedOwned => Some("DROP OWNED".into()),
769 EmptyQuery => None,
770 Fetch { .. } => None,
771 GrantedPrivilege => Some("GRANT".into()),
772 GrantedRole => Some("GRANT ROLE".into()),
773 Inserted(n) => {
774 Some(format!("INSERT 0 {}", n))
782 }
783 Prepare => Some("PREPARE".into()),
784 Raised => Some("RAISE".into()),
785 ReassignOwned => Some("REASSIGN OWNED".into()),
786 RevokedPrivilege => Some("REVOKE".into()),
787 RevokedRole => Some("REVOKE ROLE".into()),
788 SendingRowsStreaming { .. } | SendingRowsImmediate { .. } => None,
789 SetVariable { reset: true, .. } => Some("RESET".into()),
790 SetVariable { reset: false, .. } => Some("SET".into()),
791 StartedTransaction { .. } => Some("BEGIN".into()),
792 Subscribing { .. } => None,
793 TransactionCommitted { .. } => Some("COMMIT".into()),
794 TransactionRolledBack { .. } => Some("ROLLBACK".into()),
795 Updated(n) => Some(format!("UPDATE {}", n)),
796 ValidatedConnection => Some("VALIDATE CONNECTION".into()),
797 CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
798 }
799 }
800
801 pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
805 use ExecuteResponseKind::*;
806 use PlanKind::*;
807
808 match plan {
809 AbortTransaction => &[TransactionRolledBack],
810 AlterClusterRename
811 | AlterClusterSwap
812 | AlterCluster
813 | AlterClusterReplicaRename
814 | AlterOwner
815 | AlterItemRename
816 | AlterRetainHistory
817 | AlterNoop
818 | AlterSchemaRename
819 | AlterSchemaSwap
820 | AlterSecret
821 | AlterConnection
822 | AlterSource
823 | AlterSink
824 | AlterTableAddColumn
825 | AlterMaterializedViewApplyReplacement
826 | AlterNetworkPolicy => &[AlteredObject],
827 AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
828 AlterSetCluster => &[AlteredObject],
829 AlterRole => &[AlteredRole],
830 AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
831 &[AlteredSystemConfiguration]
832 }
833 Close => &[ClosedCursor],
834 PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
835 PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
836 PlanKind::Comment => &[ExecuteResponseKind::Comment],
837 CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
838 CreateConnection => &[CreatedConnection],
839 CreateDatabase => &[CreatedDatabase],
840 CreateSchema => &[CreatedSchema],
841 CreateRole => &[CreatedRole],
842 CreateCluster => &[CreatedCluster],
843 CreateClusterReplica => &[CreatedClusterReplica],
844 CreateSource | CreateSources => &[CreatedSource],
845 CreateSecret => &[CreatedSecret],
846 CreateSink => &[CreatedSink],
847 CreateTable => &[CreatedTable],
848 CreateView => &[CreatedView],
849 CreateMaterializedView => &[CreatedMaterializedView],
850 CreateContinualTask => &[CreatedContinualTask],
851 CreateIndex => &[CreatedIndex],
852 CreateType => &[CreatedType],
853 PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
854 CreateNetworkPolicy => &[CreatedNetworkPolicy],
855 Declare => &[DeclaredCursor],
856 DiscardTemp => &[DiscardedTemp],
857 DiscardAll => &[DiscardedAll],
858 DropObjects => &[DroppedObject],
859 DropOwned => &[DroppedOwned],
860 PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
861 ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
862 | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
863 ExecuteResponseKind::CopyTo,
864 SendingRowsStreaming,
865 SendingRowsImmediate,
866 ],
867 Execute | ReadThenWrite => &[
868 Deleted,
869 Inserted,
870 SendingRowsStreaming,
871 SendingRowsImmediate,
872 Updated,
873 ],
874 PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
875 GrantPrivileges => &[GrantedPrivilege],
876 GrantRole => &[GrantedRole],
877 Insert => &[Inserted, SendingRowsImmediate],
878 PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
879 PlanKind::Raise => &[ExecuteResponseKind::Raised],
880 PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
881 RevokePrivileges => &[RevokedPrivilege],
882 RevokeRole => &[RevokedRole],
883 PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
884 &[ExecuteResponseKind::SetVariable]
885 }
886 PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
887 StartTransaction => &[StartedTransaction],
888 SideEffectingFunc => &[SendingRowsStreaming, SendingRowsImmediate],
889 ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
890 }
891 }
892}
893
894impl Transmittable for ExecuteResponse {
898 type Allowed = ExecuteResponseKind;
899 fn to_allowed(&self) -> Self::Allowed {
900 ExecuteResponseKind::from(self)
901 }
902}