1use std::collections::{BTreeMap, BTreeSet};
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use chrono::{DateTime, Utc};
17use derivative::Derivative;
18use enum_kinds::EnumKind;
19use futures::Stream;
20use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
21use mz_auth::password::Password;
22use mz_cluster_client::ReplicaId;
23use mz_compute_types::ComputeInstanceId;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_controller_types::ClusterId;
26use mz_expr::RowSetFinishing;
27use mz_ore::collections::CollectionExt;
28use mz_ore::soft_assert_no_log;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_persist_client::PersistClient;
31use mz_pgcopy::CopyFormatParams;
32use mz_repr::global_id::TransientIdGen;
33use mz_repr::role_id::RoleId;
34use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType};
35use mz_sql::ast::{FetchDirection, Raw, Statement};
36use mz_sql::catalog::ObjectType;
37use mz_sql::optimizer_metrics::OptimizerMetrics;
38use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind, SideEffectingFunc};
39use mz_sql::session::user::User;
40use mz_sql::session::vars::{OwnedVarInput, SystemVars};
41use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
42use mz_storage_types::sources::Timeline;
43use mz_timestamp_oracle::TimestampOracle;
44use tokio::sync::{mpsc, oneshot};
45use uuid::Uuid;
46
47use crate::catalog::Catalog;
48use crate::coord::appends::BuiltinTableAppendNotify;
49use crate::coord::consistency::CoordinatorInconsistencies;
50use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
51use crate::coord::timestamp_selection::TimestampDetermination;
52use crate::coord::{ExecuteContextExtra, ExecuteContextGuard};
53use crate::error::AdapterError;
54use crate::session::{EndTransactionAction, RowBatchStream, Session};
55use crate::statement_logging::WatchSetCreation;
56use crate::statement_logging::{
57 FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy,
58 StatementLoggingFrontend,
59};
60use crate::util::Transmittable;
61use crate::webhook::AppendWebhookResponse;
62use crate::{
63 AdapterNotice, AppendWebhookError, CollectionIdBundle, ReadHolds, TimestampExplanation,
64};
65
66#[derive(Debug)]
69pub struct CopyFromStdinWriter {
70 pub batch_txs: Vec<mpsc::Sender<Vec<u8>>>,
73 pub completion_rx: oneshot::Receiver<
76 Result<(Vec<mz_persist_client::batch::ProtoBatch>, u64), crate::AdapterError>,
77 >,
78}
79
80#[derive(Debug)]
81pub struct CatalogSnapshot {
82 pub catalog: Arc<Catalog>,
83}
84
85#[derive(Debug)]
86pub enum Command {
87 CatalogSnapshot {
88 tx: oneshot::Sender<CatalogSnapshot>,
89 },
90
91 Startup {
92 tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
93 user: User,
94 conn_id: ConnectionId,
95 client_ip: Option<IpAddr>,
96 secret_key: u32,
97 uuid: Uuid,
98 application_name: String,
99 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
100 },
101
102 AuthenticatePassword {
103 tx: oneshot::Sender<Result<(), AdapterError>>,
104 role_name: String,
105 password: Option<Password>,
106 },
107
108 AuthenticateGetSASLChallenge {
109 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
110 role_name: String,
111 nonce: String,
112 },
113
114 AuthenticateVerifySASLProof {
115 tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
116 role_name: String,
117 proof: String,
118 auth_message: String,
119 mock_hash: String,
120 },
121
122 Execute {
123 portal_name: String,
124 session: Session,
125 tx: oneshot::Sender<Response<ExecuteResponse>>,
126 outer_ctx_extra: Option<ExecuteContextGuard>,
127 },
128
129 Commit {
134 action: EndTransactionAction,
135 session: Session,
136 tx: oneshot::Sender<Response<ExecuteResponse>>,
137 },
138
139 CancelRequest {
140 conn_id: ConnectionIdType,
141 secret_key: u32,
142 },
143
144 PrivilegedCancelRequest {
145 conn_id: ConnectionId,
146 },
147
148 GetWebhook {
149 database: String,
150 schema: String,
151 name: String,
152 tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
153 },
154
155 GetSystemVars {
156 tx: oneshot::Sender<SystemVars>,
157 },
158
159 SetSystemVars {
160 vars: BTreeMap<String, String>,
161 conn_id: ConnectionId,
162 tx: oneshot::Sender<Result<(), AdapterError>>,
163 },
164
165 Terminate {
166 conn_id: ConnectionId,
167 tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
168 },
169
170 StartCopyFromStdin {
174 target_id: CatalogItemId,
175 target_name: String,
176 columns: Vec<ColumnIndex>,
177 row_desc: mz_repr::RelationDesc,
179 params: mz_pgcopy::CopyFormatParams<'static>,
181 session: Session,
182 tx: oneshot::Sender<Response<CopyFromStdinWriter>>,
183 },
184
185 RetireExecute {
192 data: ExecuteContextExtra,
193 reason: StatementEndedExecutionReason,
194 },
195
196 CheckConsistency {
197 tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
198 },
199
200 Dump {
201 tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
202 },
203
204 GetComputeInstanceClient {
205 instance_id: ComputeInstanceId,
206 tx: oneshot::Sender<
207 Result<
208 mz_compute_client::controller::instance_client::InstanceClient<mz_repr::Timestamp>,
209 mz_compute_client::controller::error::InstanceMissing,
210 >,
211 >,
212 },
213
214 GetOracle {
215 timeline: Timeline,
216 tx: oneshot::Sender<
217 Result<Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync>, AdapterError>,
218 >,
219 },
220
221 DetermineRealTimeRecentTimestamp {
222 source_ids: BTreeSet<GlobalId>,
223 real_time_recency_timeout: Duration,
224 tx: oneshot::Sender<Result<Option<mz_repr::Timestamp>, AdapterError>>,
225 },
226
227 GetTransactionReadHoldsBundle {
228 conn_id: ConnectionId,
229 tx: oneshot::Sender<Option<ReadHolds<mz_repr::Timestamp>>>,
230 },
231
232 StoreTransactionReadHolds {
234 conn_id: ConnectionId,
235 read_holds: ReadHolds<mz_repr::Timestamp>,
236 tx: oneshot::Sender<()>,
237 },
238
239 ExecuteSlowPathPeek {
240 dataflow_plan: Box<PeekDataflowPlan<mz_repr::Timestamp>>,
241 determination: TimestampDetermination<mz_repr::Timestamp>,
242 finishing: RowSetFinishing,
243 compute_instance: ComputeInstanceId,
244 target_replica: Option<ReplicaId>,
245 intermediate_result_type: SqlRelationType,
246 source_ids: BTreeSet<GlobalId>,
247 conn_id: ConnectionId,
248 max_result_size: u64,
249 max_query_result_size: Option<u64>,
250 watch_set: Option<WatchSetCreation>,
253 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
254 },
255
256 CopyToPreflight {
260 s3_sink_connection: mz_compute_types::sinks::CopyToS3OneshotSinkConnection,
262 sink_id: GlobalId,
264 tx: oneshot::Sender<Result<(), AdapterError>>,
266 },
267
268 ExecuteCopyTo {
269 df_desc: Box<DataflowDescription<mz_compute_types::plan::Plan>>,
270 compute_instance: ComputeInstanceId,
271 target_replica: Option<ReplicaId>,
272 source_ids: BTreeSet<GlobalId>,
273 conn_id: ConnectionId,
274 watch_set: Option<WatchSetCreation>,
277 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
278 },
279
280 ExecuteSideEffectingFunc {
282 plan: SideEffectingFunc,
283 conn_id: ConnectionId,
284 current_role: RoleId,
286 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
287 },
288
289 RegisterFrontendPeek {
293 uuid: Uuid,
294 conn_id: ConnectionId,
295 cluster_id: mz_controller_types::ClusterId,
296 depends_on: BTreeSet<GlobalId>,
297 is_fast_path: bool,
298 watch_set: Option<WatchSetCreation>,
301 tx: oneshot::Sender<Result<(), AdapterError>>,
302 },
303
304 UnregisterFrontendPeek {
309 uuid: Uuid,
310 tx: oneshot::Sender<()>,
311 },
312
313 ExplainTimestamp {
316 conn_id: ConnectionId,
317 session_wall_time: DateTime<Utc>,
318 cluster_id: ClusterId,
319 id_bundle: CollectionIdBundle,
320 determination: TimestampDetermination<mz_repr::Timestamp>,
321 tx: oneshot::Sender<TimestampExplanation<mz_repr::Timestamp>>,
322 },
323
324 FrontendStatementLogging(FrontendStatementLoggingEvent),
327}
328
329impl Command {
330 pub fn session(&self) -> Option<&Session> {
331 match self {
332 Command::Execute { session, .. }
333 | Command::Commit { session, .. }
334 | Command::StartCopyFromStdin { session, .. } => Some(session),
335 Command::CancelRequest { .. }
336 | Command::Startup { .. }
337 | Command::AuthenticatePassword { .. }
338 | Command::AuthenticateGetSASLChallenge { .. }
339 | Command::AuthenticateVerifySASLProof { .. }
340 | Command::CatalogSnapshot { .. }
341 | Command::PrivilegedCancelRequest { .. }
342 | Command::GetWebhook { .. }
343 | Command::Terminate { .. }
344 | Command::GetSystemVars { .. }
345 | Command::SetSystemVars { .. }
346 | Command::RetireExecute { .. }
347 | Command::CheckConsistency { .. }
348 | Command::Dump { .. }
349 | Command::GetComputeInstanceClient { .. }
350 | Command::GetOracle { .. }
351 | Command::DetermineRealTimeRecentTimestamp { .. }
352 | Command::GetTransactionReadHoldsBundle { .. }
353 | Command::StoreTransactionReadHolds { .. }
354 | Command::ExecuteSlowPathPeek { .. }
355 | Command::CopyToPreflight { .. }
356 | Command::ExecuteCopyTo { .. }
357 | Command::ExecuteSideEffectingFunc { .. }
358 | Command::RegisterFrontendPeek { .. }
359 | Command::UnregisterFrontendPeek { .. }
360 | Command::ExplainTimestamp { .. }
361 | Command::FrontendStatementLogging(..) => None,
362 }
363 }
364
365 pub fn session_mut(&mut self) -> Option<&mut Session> {
366 match self {
367 Command::Execute { session, .. }
368 | Command::Commit { session, .. }
369 | Command::StartCopyFromStdin { session, .. } => Some(session),
370 Command::CancelRequest { .. }
371 | Command::Startup { .. }
372 | Command::AuthenticatePassword { .. }
373 | Command::AuthenticateGetSASLChallenge { .. }
374 | Command::AuthenticateVerifySASLProof { .. }
375 | Command::CatalogSnapshot { .. }
376 | Command::PrivilegedCancelRequest { .. }
377 | Command::GetWebhook { .. }
378 | Command::Terminate { .. }
379 | Command::GetSystemVars { .. }
380 | Command::SetSystemVars { .. }
381 | Command::RetireExecute { .. }
382 | Command::CheckConsistency { .. }
383 | Command::Dump { .. }
384 | Command::GetComputeInstanceClient { .. }
385 | Command::GetOracle { .. }
386 | Command::DetermineRealTimeRecentTimestamp { .. }
387 | Command::GetTransactionReadHoldsBundle { .. }
388 | Command::StoreTransactionReadHolds { .. }
389 | Command::ExecuteSlowPathPeek { .. }
390 | Command::CopyToPreflight { .. }
391 | Command::ExecuteCopyTo { .. }
392 | Command::ExecuteSideEffectingFunc { .. }
393 | Command::RegisterFrontendPeek { .. }
394 | Command::UnregisterFrontendPeek { .. }
395 | Command::ExplainTimestamp { .. }
396 | Command::FrontendStatementLogging(..) => None,
397 }
398 }
399}
400
401#[derive(Debug)]
402pub struct Response<T> {
403 pub result: Result<T, AdapterError>,
404 pub session: Session,
405 pub otel_ctx: OpenTelemetryContext,
406}
407
408#[derive(Debug, Clone, Copy)]
409pub struct SuperuserAttribute(pub Option<bool>);
410
411#[derive(Derivative)]
413#[derivative(Debug)]
414pub struct StartupResponse {
415 pub role_id: RoleId,
417 pub superuser_attribute: SuperuserAttribute,
422 #[derivative(Debug = "ignore")]
424 pub write_notify: BuiltinTableAppendNotify,
425 pub session_defaults: BTreeMap<String, OwnedVarInput>,
427 pub catalog: Arc<Catalog>,
428 pub storage_collections: Arc<
429 dyn mz_storage_client::storage_collections::StorageCollections<
430 Timestamp = mz_repr::Timestamp,
431 > + Send
432 + Sync,
433 >,
434 pub transient_id_gen: Arc<TransientIdGen>,
435 pub optimizer_metrics: OptimizerMetrics,
436 pub persist_client: PersistClient,
437 pub statement_logging_frontend: StatementLoggingFrontend,
438}
439
440#[derive(Derivative)]
441#[derivative(Debug)]
442pub struct SASLChallengeResponse {
443 pub iteration_count: usize,
444 pub salt: String,
446 pub nonce: String,
447}
448
449#[derive(Derivative)]
450#[derivative(Debug)]
451pub struct SASLVerifyProofResponse {
452 pub verifier: String,
453}
454
455impl Transmittable for StartupResponse {
458 type Allowed = bool;
459 fn to_allowed(&self) -> Self::Allowed {
460 true
461 }
462}
463
464#[derive(Debug, Clone)]
466pub struct CatalogDump(String);
467
468impl CatalogDump {
469 pub fn new(raw: String) -> Self {
470 CatalogDump(raw)
471 }
472
473 pub fn into_string(self) -> String {
474 self.0
475 }
476}
477
478impl Transmittable for CatalogDump {
479 type Allowed = bool;
480 fn to_allowed(&self) -> Self::Allowed {
481 true
482 }
483}
484
485impl Transmittable for SystemVars {
486 type Allowed = bool;
487 fn to_allowed(&self) -> Self::Allowed {
488 true
489 }
490}
491
492#[derive(EnumKind, Derivative)]
494#[derivative(Debug)]
495#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
496pub enum ExecuteResponse {
497 AlteredDefaultPrivileges,
499 AlteredObject(ObjectType),
501 AlteredRole,
503 AlteredSystemConfiguration,
505 ClosedCursor,
507 Comment,
509 Copied(usize),
511 CopyTo {
513 format: mz_sql::plan::CopyFormat,
514 resp: Box<ExecuteResponse>,
515 },
516 CopyFrom {
517 target_id: CatalogItemId,
519 target_name: String,
521 columns: Vec<ColumnIndex>,
522 params: CopyFormatParams<'static>,
523 ctx_extra: ExecuteContextGuard,
524 },
525 CreatedConnection,
527 CreatedDatabase,
529 CreatedSchema,
531 CreatedRole,
533 CreatedCluster,
535 CreatedClusterReplica,
537 CreatedIndex,
539 CreatedIntrospectionSubscribe,
541 CreatedSecret,
543 CreatedSink,
545 CreatedSource,
547 CreatedTable,
549 CreatedView,
551 CreatedViews,
553 CreatedMaterializedView,
555 CreatedContinualTask,
557 CreatedType,
559 CreatedNetworkPolicy,
561 Deallocate { all: bool },
563 DeclaredCursor,
565 Deleted(usize),
567 DiscardedTemp,
569 DiscardedAll,
571 DroppedObject(ObjectType),
573 DroppedOwned,
575 EmptyQuery,
577 Fetch {
579 name: String,
581 count: Option<FetchDirection>,
583 timeout: ExecuteTimeout,
585 ctx_extra: ExecuteContextGuard,
586 },
587 GrantedPrivilege,
589 GrantedRole,
591 Inserted(usize),
593 Prepare,
595 Raised,
597 ReassignOwned,
599 RevokedPrivilege,
601 RevokedRole,
603 SendingRowsStreaming {
605 #[derivative(Debug = "ignore")]
606 rows: Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>,
607 instance_id: ComputeInstanceId,
608 strategy: StatementExecutionStrategy,
609 },
610 SendingRowsImmediate {
613 #[derivative(Debug = "ignore")]
614 rows: Box<dyn RowIterator + Send + Sync>,
615 },
616 SetVariable {
618 name: String,
619 reset: bool,
621 },
622 StartedTransaction,
624 Subscribing {
627 rx: RowBatchStream,
628 ctx_extra: ExecuteContextGuard,
629 instance_id: ComputeInstanceId,
630 },
631 TransactionCommitted {
633 params: BTreeMap<&'static str, String>,
635 },
636 TransactionRolledBack {
638 params: BTreeMap<&'static str, String>,
640 },
641 Updated(usize),
643 ValidatedConnection,
645}
646
647impl TryFrom<&Statement<Raw>> for ExecuteResponse {
648 type Error = ();
649
650 fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
652 let resp_kinds = Plan::generated_from(&stmt.into())
653 .iter()
654 .map(ExecuteResponse::generated_from)
655 .flatten()
656 .cloned()
657 .collect::<BTreeSet<ExecuteResponseKind>>();
658 let resps = resp_kinds
659 .iter()
660 .map(|r| (*r).try_into())
661 .collect::<Result<Vec<ExecuteResponse>, _>>();
662 if let Ok(resps) = resps {
664 if resps.len() == 1 {
665 return Ok(resps.into_element());
666 }
667 }
668 let resp = match stmt {
669 Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
670 ExecuteResponse::DroppedObject((*object_type).into())
671 }
672 Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
673 | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
674 ExecuteResponse::AlteredObject((*object_type).into())
675 }
676 _ => return Err(()),
677 };
678 soft_assert_no_log!(
680 resp_kinds.len() == 1
681 && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
682 "ExecuteResponses out of sync with planner"
683 );
684 Ok(resp)
685 }
686}
687
688impl TryInto<ExecuteResponse> for ExecuteResponseKind {
689 type Error = ();
690
691 fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
694 match self {
695 ExecuteResponseKind::AlteredDefaultPrivileges => {
696 Ok(ExecuteResponse::AlteredDefaultPrivileges)
697 }
698 ExecuteResponseKind::AlteredObject => Err(()),
699 ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
700 ExecuteResponseKind::AlteredSystemConfiguration => {
701 Ok(ExecuteResponse::AlteredSystemConfiguration)
702 }
703 ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
704 ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
705 ExecuteResponseKind::Copied => Err(()),
706 ExecuteResponseKind::CopyTo => Err(()),
707 ExecuteResponseKind::CopyFrom => Err(()),
708 ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
709 ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
710 ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
711 ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
712 ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
713 ExecuteResponseKind::CreatedClusterReplica => {
714 Ok(ExecuteResponse::CreatedClusterReplica)
715 }
716 ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
717 ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
718 ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
719 ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
720 ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
721 ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
722 ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
723 ExecuteResponseKind::CreatedMaterializedView => {
724 Ok(ExecuteResponse::CreatedMaterializedView)
725 }
726 ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
727 ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
728 ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
729 ExecuteResponseKind::Deallocate => Err(()),
730 ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
731 ExecuteResponseKind::Deleted => Err(()),
732 ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
733 ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
734 ExecuteResponseKind::DroppedObject => Err(()),
735 ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
736 ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
737 ExecuteResponseKind::Fetch => Err(()),
738 ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
739 ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
740 ExecuteResponseKind::Inserted => Err(()),
741 ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
742 ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
743 ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
744 ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
745 ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
746 ExecuteResponseKind::SetVariable => Err(()),
747 ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
748 ExecuteResponseKind::Subscribing => Err(()),
749 ExecuteResponseKind::TransactionCommitted => Err(()),
750 ExecuteResponseKind::TransactionRolledBack => Err(()),
751 ExecuteResponseKind::Updated => Err(()),
752 ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
753 ExecuteResponseKind::SendingRowsStreaming => Err(()),
754 ExecuteResponseKind::SendingRowsImmediate => Err(()),
755 ExecuteResponseKind::CreatedIntrospectionSubscribe => {
756 Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
757 }
758 }
759 }
760}
761
762impl ExecuteResponse {
763 pub fn tag(&self) -> Option<String> {
764 use ExecuteResponse::*;
765 match self {
766 AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
767 AlteredObject(o) => Some(format!("ALTER {}", o)),
768 AlteredRole => Some("ALTER ROLE".into()),
769 AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
770 ClosedCursor => Some("CLOSE CURSOR".into()),
771 Comment => Some("COMMENT".into()),
772 Copied(n) => Some(format!("COPY {}", n)),
773 CopyTo { .. } => None,
774 CopyFrom { .. } => None,
775 CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
776 CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
777 CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
778 CreatedRole => Some("CREATE ROLE".into()),
779 CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
780 CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
781 CreatedIndex { .. } => Some("CREATE INDEX".into()),
782 CreatedSecret { .. } => Some("CREATE SECRET".into()),
783 CreatedSink { .. } => Some("CREATE SINK".into()),
784 CreatedSource { .. } => Some("CREATE SOURCE".into()),
785 CreatedTable { .. } => Some("CREATE TABLE".into()),
786 CreatedView { .. } => Some("CREATE VIEW".into()),
787 CreatedViews { .. } => Some("CREATE VIEWS".into()),
788 CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
789 CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
790 CreatedType => Some("CREATE TYPE".into()),
791 CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
792 Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
793 DeclaredCursor => Some("DECLARE CURSOR".into()),
794 Deleted(n) => Some(format!("DELETE {}", n)),
795 DiscardedTemp => Some("DISCARD TEMP".into()),
796 DiscardedAll => Some("DISCARD ALL".into()),
797 DroppedObject(o) => Some(format!("DROP {o}")),
798 DroppedOwned => Some("DROP OWNED".into()),
799 EmptyQuery => None,
800 Fetch { .. } => None,
801 GrantedPrivilege => Some("GRANT".into()),
802 GrantedRole => Some("GRANT ROLE".into()),
803 Inserted(n) => {
804 Some(format!("INSERT 0 {}", n))
812 }
813 Prepare => Some("PREPARE".into()),
814 Raised => Some("RAISE".into()),
815 ReassignOwned => Some("REASSIGN OWNED".into()),
816 RevokedPrivilege => Some("REVOKE".into()),
817 RevokedRole => Some("REVOKE ROLE".into()),
818 SendingRowsStreaming { .. } | SendingRowsImmediate { .. } => None,
819 SetVariable { reset: true, .. } => Some("RESET".into()),
820 SetVariable { reset: false, .. } => Some("SET".into()),
821 StartedTransaction { .. } => Some("BEGIN".into()),
822 Subscribing { .. } => None,
823 TransactionCommitted { .. } => Some("COMMIT".into()),
824 TransactionRolledBack { .. } => Some("ROLLBACK".into()),
825 Updated(n) => Some(format!("UPDATE {}", n)),
826 ValidatedConnection => Some("VALIDATE CONNECTION".into()),
827 CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
828 }
829 }
830
831 pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
835 use ExecuteResponseKind::*;
836 use PlanKind::*;
837
838 match plan {
839 AbortTransaction => &[TransactionRolledBack],
840 AlterClusterRename
841 | AlterClusterSwap
842 | AlterCluster
843 | AlterClusterReplicaRename
844 | AlterOwner
845 | AlterItemRename
846 | AlterRetainHistory
847 | AlterSourceTimestampInterval
848 | AlterNoop
849 | AlterSchemaRename
850 | AlterSchemaSwap
851 | AlterSecret
852 | AlterConnection
853 | AlterSource
854 | AlterSink
855 | AlterTableAddColumn
856 | AlterMaterializedViewApplyReplacement
857 | AlterNetworkPolicy => &[AlteredObject],
858 AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
859 AlterSetCluster => &[AlteredObject],
860 AlterRole => &[AlteredRole],
861 AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
862 &[AlteredSystemConfiguration]
863 }
864 Close => &[ClosedCursor],
865 PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
866 PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
867 PlanKind::Comment => &[ExecuteResponseKind::Comment],
868 CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
869 CreateConnection => &[CreatedConnection],
870 CreateDatabase => &[CreatedDatabase],
871 CreateSchema => &[CreatedSchema],
872 CreateRole => &[CreatedRole],
873 CreateCluster => &[CreatedCluster],
874 CreateClusterReplica => &[CreatedClusterReplica],
875 CreateSource | CreateSources => &[CreatedSource],
876 CreateSecret => &[CreatedSecret],
877 CreateSink => &[CreatedSink],
878 CreateTable => &[CreatedTable],
879 CreateView => &[CreatedView],
880 CreateMaterializedView => &[CreatedMaterializedView],
881 CreateContinualTask => &[CreatedContinualTask],
882 CreateIndex => &[CreatedIndex],
883 CreateType => &[CreatedType],
884 PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
885 CreateNetworkPolicy => &[CreatedNetworkPolicy],
886 Declare => &[DeclaredCursor],
887 DiscardTemp => &[DiscardedTemp],
888 DiscardAll => &[DiscardedAll],
889 DropObjects => &[DroppedObject],
890 DropOwned => &[DroppedOwned],
891 PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
892 ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
893 | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
894 ExecuteResponseKind::CopyTo,
895 SendingRowsStreaming,
896 SendingRowsImmediate,
897 ],
898 Execute | ReadThenWrite => &[
899 Deleted,
900 Inserted,
901 SendingRowsStreaming,
902 SendingRowsImmediate,
903 Updated,
904 ],
905 PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
906 GrantPrivileges => &[GrantedPrivilege],
907 GrantRole => &[GrantedRole],
908 Insert => &[Inserted, SendingRowsImmediate],
909 PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
910 PlanKind::Raise => &[ExecuteResponseKind::Raised],
911 PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
912 RevokePrivileges => &[RevokedPrivilege],
913 RevokeRole => &[RevokedRole],
914 PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
915 &[ExecuteResponseKind::SetVariable]
916 }
917 PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
918 StartTransaction => &[StartedTransaction],
919 SideEffectingFunc => &[SendingRowsStreaming, SendingRowsImmediate],
920 ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
921 }
922 }
923}
924
925impl Transmittable for ExecuteResponse {
929 type Allowed = ExecuteResponseKind;
930 fn to_allowed(&self) -> Self::Allowed {
931 ExecuteResponseKind::from(self)
932 }
933}