1use std::collections::{BTreeMap, BTreeSet};
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use derivative::Derivative;
17use enum_kinds::EnumKind;
18use futures::Stream;
19use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
20use mz_auth::password::Password;
21use mz_cluster_client::ReplicaId;
22use mz_compute_types::ComputeInstanceId;
23use mz_compute_types::dataflows::DataflowDescription;
24use mz_expr::RowSetFinishing;
25use mz_ore::collections::CollectionExt;
26use mz_ore::soft_assert_no_log;
27use mz_ore::tracing::OpenTelemetryContext;
28use mz_persist_client::PersistClient;
29use mz_pgcopy::CopyFormatParams;
30use mz_repr::global_id::TransientIdGen;
31use mz_repr::role_id::RoleId;
32use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType};
33use mz_sql::ast::{FetchDirection, Raw, Statement};
34use mz_sql::catalog::ObjectType;
35use mz_sql::optimizer_metrics::OptimizerMetrics;
36use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind};
37use mz_sql::session::user::User;
38use mz_sql::session::vars::{OwnedVarInput, SystemVars};
39use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
40use mz_storage_types::sources::Timeline;
41use mz_timestamp_oracle::TimestampOracle;
42use tokio::sync::{mpsc, oneshot};
43use uuid::Uuid;
44
45use crate::catalog::Catalog;
46use crate::coord::ExecuteContextExtra;
47use crate::coord::appends::BuiltinTableAppendNotify;
48use crate::coord::consistency::CoordinatorInconsistencies;
49use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
50use crate::coord::timestamp_selection::TimestampDetermination;
51use crate::error::AdapterError;
52use crate::session::{EndTransactionAction, RowBatchStream, Session};
53use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
54use crate::util::Transmittable;
55use crate::webhook::AppendWebhookResponse;
56use crate::{AdapterNotice, AppendWebhookError, ReadHolds};
57
58#[derive(Debug)]
59pub struct CatalogSnapshot {
60 pub catalog: Arc<Catalog>,
61}
62
63#[derive(Debug)]
64pub enum Command {
65 CatalogSnapshot {
66 tx: oneshot::Sender<CatalogSnapshot>,
67 },
68
69 Startup {
70 tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
71 user: User,
72 conn_id: ConnectionId,
73 client_ip: Option<IpAddr>,
74 secret_key: u32,
75 uuid: Uuid,
76 application_name: String,
77 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
78 },
79
80 AuthenticatePassword {
81 tx: oneshot::Sender<Result<AuthResponse, AdapterError>>,
82 role_name: String,
83 password: Option<Password>,
84 },
85
86 AuthenticateGetSASLChallenge {
87 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
88 role_name: String,
89 nonce: String,
90 },
91
92 AuthenticateVerifySASLProof {
93 tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
94 role_name: String,
95 proof: String,
96 auth_message: String,
97 mock_hash: String,
98 },
99
100 Execute {
101 portal_name: String,
102 session: Session,
103 tx: oneshot::Sender<Response<ExecuteResponse>>,
104 outer_ctx_extra: Option<ExecuteContextExtra>,
105 },
106
107 Commit {
112 action: EndTransactionAction,
113 session: Session,
114 tx: oneshot::Sender<Response<ExecuteResponse>>,
115 },
116
117 CancelRequest {
118 conn_id: ConnectionIdType,
119 secret_key: u32,
120 },
121
122 PrivilegedCancelRequest {
123 conn_id: ConnectionId,
124 },
125
126 GetWebhook {
127 database: String,
128 schema: String,
129 name: String,
130 tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
131 },
132
133 GetSystemVars {
134 tx: oneshot::Sender<SystemVars>,
135 },
136
137 SetSystemVars {
138 vars: BTreeMap<String, String>,
139 conn_id: ConnectionId,
140 tx: oneshot::Sender<Result<(), AdapterError>>,
141 },
142
143 Terminate {
144 conn_id: ConnectionId,
145 tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
146 },
147
148 RetireExecute {
155 data: ExecuteContextExtra,
156 reason: StatementEndedExecutionReason,
157 },
158
159 CheckConsistency {
160 tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
161 },
162
163 Dump {
164 tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
165 },
166
167 GetComputeInstanceClient {
168 instance_id: ComputeInstanceId,
169 tx: oneshot::Sender<
170 Result<
171 mz_compute_client::controller::instance::Client<mz_repr::Timestamp>,
172 mz_compute_client::controller::error::InstanceMissing,
173 >,
174 >,
175 },
176
177 GetOracle {
178 timeline: Timeline,
179 tx: oneshot::Sender<
180 Result<Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync>, AdapterError>,
181 >,
182 },
183
184 DetermineRealTimeRecentTimestamp {
185 source_ids: BTreeSet<GlobalId>,
186 real_time_recency_timeout: Duration,
187 tx: oneshot::Sender<Result<Option<mz_repr::Timestamp>, AdapterError>>,
188 },
189
190 GetTransactionReadHoldsBundle {
191 conn_id: ConnectionId,
192 tx: oneshot::Sender<Option<ReadHolds<mz_repr::Timestamp>>>,
193 },
194
195 StoreTransactionReadHolds {
197 conn_id: ConnectionId,
198 read_holds: ReadHolds<mz_repr::Timestamp>,
199 tx: oneshot::Sender<()>,
200 },
201
202 ExecuteSlowPathPeek {
203 dataflow_plan: Box<PeekDataflowPlan<mz_repr::Timestamp>>,
204 determination: TimestampDetermination<mz_repr::Timestamp>,
205 finishing: RowSetFinishing,
206 compute_instance: ComputeInstanceId,
207 target_replica: Option<ReplicaId>,
208 intermediate_result_type: SqlRelationType,
209 source_ids: BTreeSet<GlobalId>,
210 conn_id: ConnectionId,
211 max_result_size: u64,
212 max_query_result_size: Option<u64>,
213 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
214 },
215
216 ExecuteCopyTo {
217 df_desc: Box<DataflowDescription<mz_compute_types::plan::Plan>>,
218 compute_instance: ComputeInstanceId,
219 target_replica: Option<ReplicaId>,
220 source_ids: BTreeSet<GlobalId>,
221 conn_id: ConnectionId,
222 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
223 },
224}
225
226impl Command {
227 pub fn session(&self) -> Option<&Session> {
228 match self {
229 Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
230 Command::CancelRequest { .. }
231 | Command::Startup { .. }
232 | Command::AuthenticatePassword { .. }
233 | Command::AuthenticateGetSASLChallenge { .. }
234 | Command::AuthenticateVerifySASLProof { .. }
235 | Command::CatalogSnapshot { .. }
236 | Command::PrivilegedCancelRequest { .. }
237 | Command::GetWebhook { .. }
238 | Command::Terminate { .. }
239 | Command::GetSystemVars { .. }
240 | Command::SetSystemVars { .. }
241 | Command::RetireExecute { .. }
242 | Command::CheckConsistency { .. }
243 | Command::Dump { .. }
244 | Command::GetComputeInstanceClient { .. }
245 | Command::GetOracle { .. }
246 | Command::DetermineRealTimeRecentTimestamp { .. }
247 | Command::GetTransactionReadHoldsBundle { .. }
248 | Command::StoreTransactionReadHolds { .. }
249 | Command::ExecuteSlowPathPeek { .. }
250 | Command::ExecuteCopyTo { .. } => None,
251 }
252 }
253
254 pub fn session_mut(&mut self) -> Option<&mut Session> {
255 match self {
256 Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
257 Command::CancelRequest { .. }
258 | Command::Startup { .. }
259 | Command::AuthenticatePassword { .. }
260 | Command::AuthenticateGetSASLChallenge { .. }
261 | Command::AuthenticateVerifySASLProof { .. }
262 | Command::CatalogSnapshot { .. }
263 | Command::PrivilegedCancelRequest { .. }
264 | Command::GetWebhook { .. }
265 | Command::Terminate { .. }
266 | Command::GetSystemVars { .. }
267 | Command::SetSystemVars { .. }
268 | Command::RetireExecute { .. }
269 | Command::CheckConsistency { .. }
270 | Command::Dump { .. }
271 | Command::GetComputeInstanceClient { .. }
272 | Command::GetOracle { .. }
273 | Command::DetermineRealTimeRecentTimestamp { .. }
274 | Command::GetTransactionReadHoldsBundle { .. }
275 | Command::StoreTransactionReadHolds { .. }
276 | Command::ExecuteSlowPathPeek { .. }
277 | Command::ExecuteCopyTo { .. } => None,
278 }
279 }
280}
281
282#[derive(Debug)]
283pub struct Response<T> {
284 pub result: Result<T, AdapterError>,
285 pub session: Session,
286 pub otel_ctx: OpenTelemetryContext,
287}
288
289#[derive(Derivative)]
291#[derivative(Debug)]
292pub struct StartupResponse {
293 pub role_id: RoleId,
295 #[derivative(Debug = "ignore")]
297 pub write_notify: BuiltinTableAppendNotify,
298 pub session_defaults: BTreeMap<String, OwnedVarInput>,
300 pub catalog: Arc<Catalog>,
301 pub storage_collections: Arc<
302 dyn mz_storage_client::storage_collections::StorageCollections<
303 Timestamp = mz_repr::Timestamp,
304 > + Send
305 + Sync,
306 >,
307 pub transient_id_gen: Arc<TransientIdGen>,
308 pub optimizer_metrics: OptimizerMetrics,
309 pub persist_client: PersistClient,
310}
311
312#[derive(Derivative)]
314#[derivative(Debug)]
315pub struct AuthResponse {
316 pub role_id: RoleId,
318 pub superuser: bool,
320}
321
322#[derive(Derivative)]
323#[derivative(Debug)]
324pub struct SASLChallengeResponse {
325 pub iteration_count: usize,
326 pub salt: String,
328 pub nonce: String,
329}
330
331#[derive(Derivative)]
332#[derivative(Debug)]
333pub struct SASLVerifyProofResponse {
334 pub verifier: String,
335 pub auth_resp: AuthResponse,
336}
337
338impl Transmittable for StartupResponse {
341 type Allowed = bool;
342 fn to_allowed(&self) -> Self::Allowed {
343 true
344 }
345}
346
347#[derive(Debug, Clone)]
349pub struct CatalogDump(String);
350
351impl CatalogDump {
352 pub fn new(raw: String) -> Self {
353 CatalogDump(raw)
354 }
355
356 pub fn into_string(self) -> String {
357 self.0
358 }
359}
360
361impl Transmittable for CatalogDump {
362 type Allowed = bool;
363 fn to_allowed(&self) -> Self::Allowed {
364 true
365 }
366}
367
368impl Transmittable for SystemVars {
369 type Allowed = bool;
370 fn to_allowed(&self) -> Self::Allowed {
371 true
372 }
373}
374
375#[derive(EnumKind, Derivative)]
377#[derivative(Debug)]
378#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
379pub enum ExecuteResponse {
380 AlteredDefaultPrivileges,
382 AlteredObject(ObjectType),
384 AlteredRole,
386 AlteredSystemConfiguration,
388 ClosedCursor,
390 Comment,
392 Copied(usize),
394 CopyTo {
396 format: mz_sql::plan::CopyFormat,
397 resp: Box<ExecuteResponse>,
398 },
399 CopyFrom {
400 target_id: CatalogItemId,
402 target_name: String,
404 columns: Vec<ColumnIndex>,
405 params: CopyFormatParams<'static>,
406 ctx_extra: ExecuteContextExtra,
407 },
408 CreatedConnection,
410 CreatedDatabase,
412 CreatedSchema,
414 CreatedRole,
416 CreatedCluster,
418 CreatedClusterReplica,
420 CreatedIndex,
422 CreatedIntrospectionSubscribe,
424 CreatedSecret,
426 CreatedSink,
428 CreatedSource,
430 CreatedTable,
432 CreatedView,
434 CreatedViews,
436 CreatedMaterializedView,
438 CreatedContinualTask,
440 CreatedType,
442 CreatedNetworkPolicy,
444 Deallocate { all: bool },
446 DeclaredCursor,
448 Deleted(usize),
450 DiscardedTemp,
452 DiscardedAll,
454 DroppedObject(ObjectType),
456 DroppedOwned,
458 EmptyQuery,
460 Fetch {
462 name: String,
464 count: Option<FetchDirection>,
466 timeout: ExecuteTimeout,
468 ctx_extra: ExecuteContextExtra,
469 },
470 GrantedPrivilege,
472 GrantedRole,
474 Inserted(usize),
476 Prepare,
478 Raised,
480 ReassignOwned,
482 RevokedPrivilege,
484 RevokedRole,
486 SendingRowsStreaming {
488 #[derivative(Debug = "ignore")]
489 rows: Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>,
490 instance_id: ComputeInstanceId,
491 strategy: StatementExecutionStrategy,
492 },
493 SendingRowsImmediate {
496 #[derivative(Debug = "ignore")]
497 rows: Box<dyn RowIterator + Send + Sync>,
498 },
499 SetVariable {
501 name: String,
502 reset: bool,
504 },
505 StartedTransaction,
507 Subscribing {
510 rx: RowBatchStream,
511 ctx_extra: ExecuteContextExtra,
512 instance_id: ComputeInstanceId,
513 },
514 TransactionCommitted {
516 params: BTreeMap<&'static str, String>,
518 },
519 TransactionRolledBack {
521 params: BTreeMap<&'static str, String>,
523 },
524 Updated(usize),
526 ValidatedConnection,
528}
529
530impl TryFrom<&Statement<Raw>> for ExecuteResponse {
531 type Error = ();
532
533 fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
535 let resp_kinds = Plan::generated_from(&stmt.into())
536 .iter()
537 .map(ExecuteResponse::generated_from)
538 .flatten()
539 .cloned()
540 .collect::<BTreeSet<ExecuteResponseKind>>();
541 let resps = resp_kinds
542 .iter()
543 .map(|r| (*r).try_into())
544 .collect::<Result<Vec<ExecuteResponse>, _>>();
545 if let Ok(resps) = resps {
547 if resps.len() == 1 {
548 return Ok(resps.into_element());
549 }
550 }
551 let resp = match stmt {
552 Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
553 ExecuteResponse::DroppedObject((*object_type).into())
554 }
555 Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
556 | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
557 ExecuteResponse::AlteredObject((*object_type).into())
558 }
559 _ => return Err(()),
560 };
561 soft_assert_no_log!(
563 resp_kinds.len() == 1
564 && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
565 "ExecuteResponses out of sync with planner"
566 );
567 Ok(resp)
568 }
569}
570
571impl TryInto<ExecuteResponse> for ExecuteResponseKind {
572 type Error = ();
573
574 fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
577 match self {
578 ExecuteResponseKind::AlteredDefaultPrivileges => {
579 Ok(ExecuteResponse::AlteredDefaultPrivileges)
580 }
581 ExecuteResponseKind::AlteredObject => Err(()),
582 ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
583 ExecuteResponseKind::AlteredSystemConfiguration => {
584 Ok(ExecuteResponse::AlteredSystemConfiguration)
585 }
586 ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
587 ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
588 ExecuteResponseKind::Copied => Err(()),
589 ExecuteResponseKind::CopyTo => Err(()),
590 ExecuteResponseKind::CopyFrom => Err(()),
591 ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
592 ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
593 ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
594 ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
595 ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
596 ExecuteResponseKind::CreatedClusterReplica => {
597 Ok(ExecuteResponse::CreatedClusterReplica)
598 }
599 ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
600 ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
601 ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
602 ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
603 ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
604 ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
605 ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
606 ExecuteResponseKind::CreatedMaterializedView => {
607 Ok(ExecuteResponse::CreatedMaterializedView)
608 }
609 ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
610 ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
611 ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
612 ExecuteResponseKind::Deallocate => Err(()),
613 ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
614 ExecuteResponseKind::Deleted => Err(()),
615 ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
616 ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
617 ExecuteResponseKind::DroppedObject => Err(()),
618 ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
619 ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
620 ExecuteResponseKind::Fetch => Err(()),
621 ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
622 ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
623 ExecuteResponseKind::Inserted => Err(()),
624 ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
625 ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
626 ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
627 ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
628 ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
629 ExecuteResponseKind::SetVariable => Err(()),
630 ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
631 ExecuteResponseKind::Subscribing => Err(()),
632 ExecuteResponseKind::TransactionCommitted => Err(()),
633 ExecuteResponseKind::TransactionRolledBack => Err(()),
634 ExecuteResponseKind::Updated => Err(()),
635 ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
636 ExecuteResponseKind::SendingRowsStreaming => Err(()),
637 ExecuteResponseKind::SendingRowsImmediate => Err(()),
638 ExecuteResponseKind::CreatedIntrospectionSubscribe => {
639 Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
640 }
641 }
642 }
643}
644
645impl ExecuteResponse {
646 pub fn tag(&self) -> Option<String> {
647 use ExecuteResponse::*;
648 match self {
649 AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
650 AlteredObject(o) => Some(format!("ALTER {}", o)),
651 AlteredRole => Some("ALTER ROLE".into()),
652 AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
653 ClosedCursor => Some("CLOSE CURSOR".into()),
654 Comment => Some("COMMENT".into()),
655 Copied(n) => Some(format!("COPY {}", n)),
656 CopyTo { .. } => None,
657 CopyFrom { .. } => None,
658 CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
659 CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
660 CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
661 CreatedRole => Some("CREATE ROLE".into()),
662 CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
663 CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
664 CreatedIndex { .. } => Some("CREATE INDEX".into()),
665 CreatedSecret { .. } => Some("CREATE SECRET".into()),
666 CreatedSink { .. } => Some("CREATE SINK".into()),
667 CreatedSource { .. } => Some("CREATE SOURCE".into()),
668 CreatedTable { .. } => Some("CREATE TABLE".into()),
669 CreatedView { .. } => Some("CREATE VIEW".into()),
670 CreatedViews { .. } => Some("CREATE VIEWS".into()),
671 CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
672 CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
673 CreatedType => Some("CREATE TYPE".into()),
674 CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
675 Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
676 DeclaredCursor => Some("DECLARE CURSOR".into()),
677 Deleted(n) => Some(format!("DELETE {}", n)),
678 DiscardedTemp => Some("DISCARD TEMP".into()),
679 DiscardedAll => Some("DISCARD ALL".into()),
680 DroppedObject(o) => Some(format!("DROP {o}")),
681 DroppedOwned => Some("DROP OWNED".into()),
682 EmptyQuery => None,
683 Fetch { .. } => None,
684 GrantedPrivilege => Some("GRANT".into()),
685 GrantedRole => Some("GRANT ROLE".into()),
686 Inserted(n) => {
687 Some(format!("INSERT 0 {}", n))
695 }
696 Prepare => Some("PREPARE".into()),
697 Raised => Some("RAISE".into()),
698 ReassignOwned => Some("REASSIGN OWNED".into()),
699 RevokedPrivilege => Some("REVOKE".into()),
700 RevokedRole => Some("REVOKE ROLE".into()),
701 SendingRowsStreaming { .. } | SendingRowsImmediate { .. } => None,
702 SetVariable { reset: true, .. } => Some("RESET".into()),
703 SetVariable { reset: false, .. } => Some("SET".into()),
704 StartedTransaction { .. } => Some("BEGIN".into()),
705 Subscribing { .. } => None,
706 TransactionCommitted { .. } => Some("COMMIT".into()),
707 TransactionRolledBack { .. } => Some("ROLLBACK".into()),
708 Updated(n) => Some(format!("UPDATE {}", n)),
709 ValidatedConnection => Some("VALIDATE CONNECTION".into()),
710 CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
711 }
712 }
713
714 pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
718 use ExecuteResponseKind::*;
719 use PlanKind::*;
720
721 match plan {
722 AbortTransaction => &[TransactionRolledBack],
723 AlterClusterRename
724 | AlterClusterSwap
725 | AlterCluster
726 | AlterClusterReplicaRename
727 | AlterOwner
728 | AlterItemRename
729 | AlterRetainHistory
730 | AlterNoop
731 | AlterSchemaRename
732 | AlterSchemaSwap
733 | AlterSecret
734 | AlterConnection
735 | AlterSource
736 | AlterSink
737 | AlterTableAddColumn
738 | AlterMaterializedViewApplyReplacement
739 | AlterNetworkPolicy => &[AlteredObject],
740 AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
741 AlterSetCluster => &[AlteredObject],
742 AlterRole => &[AlteredRole],
743 AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
744 &[AlteredSystemConfiguration]
745 }
746 Close => &[ClosedCursor],
747 PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
748 PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
749 PlanKind::Comment => &[ExecuteResponseKind::Comment],
750 CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
751 CreateConnection => &[CreatedConnection],
752 CreateDatabase => &[CreatedDatabase],
753 CreateSchema => &[CreatedSchema],
754 CreateRole => &[CreatedRole],
755 CreateCluster => &[CreatedCluster],
756 CreateClusterReplica => &[CreatedClusterReplica],
757 CreateSource | CreateSources => &[CreatedSource],
758 CreateSecret => &[CreatedSecret],
759 CreateSink => &[CreatedSink],
760 CreateTable => &[CreatedTable],
761 CreateView => &[CreatedView],
762 CreateMaterializedView => &[CreatedMaterializedView],
763 CreateContinualTask => &[CreatedContinualTask],
764 CreateIndex => &[CreatedIndex],
765 CreateType => &[CreatedType],
766 PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
767 CreateNetworkPolicy => &[CreatedNetworkPolicy],
768 Declare => &[DeclaredCursor],
769 DiscardTemp => &[DiscardedTemp],
770 DiscardAll => &[DiscardedAll],
771 DropObjects => &[DroppedObject],
772 DropOwned => &[DroppedOwned],
773 PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
774 ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
775 | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
776 ExecuteResponseKind::CopyTo,
777 SendingRowsStreaming,
778 SendingRowsImmediate,
779 ],
780 Execute | ReadThenWrite => &[
781 Deleted,
782 Inserted,
783 SendingRowsStreaming,
784 SendingRowsImmediate,
785 Updated,
786 ],
787 PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
788 GrantPrivileges => &[GrantedPrivilege],
789 GrantRole => &[GrantedRole],
790 Insert => &[Inserted, SendingRowsImmediate],
791 PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
792 PlanKind::Raise => &[ExecuteResponseKind::Raised],
793 PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
794 RevokePrivileges => &[RevokedPrivilege],
795 RevokeRole => &[RevokedRole],
796 PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
797 &[ExecuteResponseKind::SetVariable]
798 }
799 PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
800 StartTransaction => &[StartedTransaction],
801 SideEffectingFunc => &[SendingRowsStreaming, SendingRowsImmediate],
802 ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
803 }
804 }
805}
806
807impl Transmittable for ExecuteResponse {
811 type Allowed = ExecuteResponseKind;
812 fn to_allowed(&self) -> Self::Allowed {
813 ExecuteResponseKind::from(self)
814 }
815}