1use std::collections::{BTreeMap, BTreeSet};
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use chrono::{DateTime, Utc};
17use derivative::Derivative;
18use enum_kinds::EnumKind;
19use futures::Stream;
20use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
21use mz_auth::password::Password;
22use mz_cluster_client::ReplicaId;
23use mz_compute_types::ComputeInstanceId;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_controller_types::ClusterId;
26use mz_expr::RowSetFinishing;
27use mz_ore::collections::CollectionExt;
28use mz_ore::soft_assert_no_log;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_persist_client::PersistClient;
31use mz_pgcopy::CopyFormatParams;
32use mz_repr::global_id::TransientIdGen;
33use mz_repr::role_id::RoleId;
34use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType};
35use mz_sql::ast::{FetchDirection, Raw, Statement};
36use mz_sql::catalog::ObjectType;
37use mz_sql::optimizer_metrics::OptimizerMetrics;
38use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind, SideEffectingFunc};
39use mz_sql::session::user::User;
40use mz_sql::session::vars::{OwnedVarInput, SystemVars};
41use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
42use mz_storage_types::sources::Timeline;
43use mz_timestamp_oracle::TimestampOracle;
44use tokio::sync::{mpsc, oneshot};
45use uuid::Uuid;
46
47use crate::catalog::Catalog;
48use crate::coord::appends::BuiltinTableAppendNotify;
49use crate::coord::consistency::CoordinatorInconsistencies;
50use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
51use crate::coord::timestamp_selection::TimestampDetermination;
52use crate::coord::{ExecuteContextExtra, ExecuteContextGuard};
53use crate::error::AdapterError;
54use crate::session::{EndTransactionAction, RowBatchStream, Session};
55use crate::statement_logging::WatchSetCreation;
56use crate::statement_logging::{
57 FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy,
58 StatementLoggingFrontend,
59};
60use crate::util::Transmittable;
61use crate::webhook::AppendWebhookResponse;
62use crate::{
63 AdapterNotice, AppendWebhookError, CollectionIdBundle, ReadHolds, TimestampExplanation,
64};
65
66#[derive(Debug)]
67pub struct CatalogSnapshot {
68 pub catalog: Arc<Catalog>,
69}
70
71#[derive(Debug)]
72pub enum Command {
73 CatalogSnapshot {
74 tx: oneshot::Sender<CatalogSnapshot>,
75 },
76
77 Startup {
78 tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
79 user: User,
80 conn_id: ConnectionId,
81 client_ip: Option<IpAddr>,
82 secret_key: u32,
83 uuid: Uuid,
84 application_name: String,
85 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
86 },
87
88 AuthenticatePassword {
89 tx: oneshot::Sender<Result<(), AdapterError>>,
90 role_name: String,
91 password: Option<Password>,
92 },
93
94 AuthenticateGetSASLChallenge {
95 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
96 role_name: String,
97 nonce: String,
98 },
99
100 AuthenticateVerifySASLProof {
101 tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
102 role_name: String,
103 proof: String,
104 auth_message: String,
105 mock_hash: String,
106 },
107
108 Execute {
109 portal_name: String,
110 session: Session,
111 tx: oneshot::Sender<Response<ExecuteResponse>>,
112 outer_ctx_extra: Option<ExecuteContextGuard>,
113 },
114
115 Commit {
120 action: EndTransactionAction,
121 session: Session,
122 tx: oneshot::Sender<Response<ExecuteResponse>>,
123 },
124
125 CancelRequest {
126 conn_id: ConnectionIdType,
127 secret_key: u32,
128 },
129
130 PrivilegedCancelRequest {
131 conn_id: ConnectionId,
132 },
133
134 GetWebhook {
135 database: String,
136 schema: String,
137 name: String,
138 tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
139 },
140
141 GetSystemVars {
142 tx: oneshot::Sender<SystemVars>,
143 },
144
145 SetSystemVars {
146 vars: BTreeMap<String, String>,
147 conn_id: ConnectionId,
148 tx: oneshot::Sender<Result<(), AdapterError>>,
149 },
150
151 Terminate {
152 conn_id: ConnectionId,
153 tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
154 },
155
156 RetireExecute {
163 data: ExecuteContextExtra,
164 reason: StatementEndedExecutionReason,
165 },
166
167 CheckConsistency {
168 tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
169 },
170
171 Dump {
172 tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
173 },
174
175 GetComputeInstanceClient {
176 instance_id: ComputeInstanceId,
177 tx: oneshot::Sender<
178 Result<
179 mz_compute_client::controller::instance_client::InstanceClient<mz_repr::Timestamp>,
180 mz_compute_client::controller::error::InstanceMissing,
181 >,
182 >,
183 },
184
185 GetOracle {
186 timeline: Timeline,
187 tx: oneshot::Sender<
188 Result<Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync>, AdapterError>,
189 >,
190 },
191
192 DetermineRealTimeRecentTimestamp {
193 source_ids: BTreeSet<GlobalId>,
194 real_time_recency_timeout: Duration,
195 tx: oneshot::Sender<Result<Option<mz_repr::Timestamp>, AdapterError>>,
196 },
197
198 GetTransactionReadHoldsBundle {
199 conn_id: ConnectionId,
200 tx: oneshot::Sender<Option<ReadHolds<mz_repr::Timestamp>>>,
201 },
202
203 StoreTransactionReadHolds {
205 conn_id: ConnectionId,
206 read_holds: ReadHolds<mz_repr::Timestamp>,
207 tx: oneshot::Sender<()>,
208 },
209
210 ExecuteSlowPathPeek {
211 dataflow_plan: Box<PeekDataflowPlan<mz_repr::Timestamp>>,
212 determination: TimestampDetermination<mz_repr::Timestamp>,
213 finishing: RowSetFinishing,
214 compute_instance: ComputeInstanceId,
215 target_replica: Option<ReplicaId>,
216 intermediate_result_type: SqlRelationType,
217 source_ids: BTreeSet<GlobalId>,
218 conn_id: ConnectionId,
219 max_result_size: u64,
220 max_query_result_size: Option<u64>,
221 watch_set: Option<WatchSetCreation>,
224 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
225 },
226
227 CopyToPreflight {
231 s3_sink_connection: mz_compute_types::sinks::CopyToS3OneshotSinkConnection,
233 sink_id: GlobalId,
235 tx: oneshot::Sender<Result<(), AdapterError>>,
237 },
238
239 ExecuteCopyTo {
240 df_desc: Box<DataflowDescription<mz_compute_types::plan::Plan>>,
241 compute_instance: ComputeInstanceId,
242 target_replica: Option<ReplicaId>,
243 source_ids: BTreeSet<GlobalId>,
244 conn_id: ConnectionId,
245 watch_set: Option<WatchSetCreation>,
248 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
249 },
250
251 ExecuteSideEffectingFunc {
253 plan: SideEffectingFunc,
254 conn_id: ConnectionId,
255 current_role: RoleId,
257 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
258 },
259
260 RegisterFrontendPeek {
264 uuid: Uuid,
265 conn_id: ConnectionId,
266 cluster_id: mz_controller_types::ClusterId,
267 depends_on: BTreeSet<GlobalId>,
268 is_fast_path: bool,
269 watch_set: Option<WatchSetCreation>,
272 tx: oneshot::Sender<Result<(), AdapterError>>,
273 },
274
275 UnregisterFrontendPeek {
280 uuid: Uuid,
281 tx: oneshot::Sender<()>,
282 },
283
284 ExplainTimestamp {
287 conn_id: ConnectionId,
288 session_wall_time: DateTime<Utc>,
289 cluster_id: ClusterId,
290 id_bundle: CollectionIdBundle,
291 determination: TimestampDetermination<mz_repr::Timestamp>,
292 tx: oneshot::Sender<TimestampExplanation<mz_repr::Timestamp>>,
293 },
294
295 FrontendStatementLogging(FrontendStatementLoggingEvent),
298}
299
300impl Command {
301 pub fn session(&self) -> Option<&Session> {
302 match self {
303 Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
304 Command::CancelRequest { .. }
305 | Command::Startup { .. }
306 | Command::AuthenticatePassword { .. }
307 | Command::AuthenticateGetSASLChallenge { .. }
308 | Command::AuthenticateVerifySASLProof { .. }
309 | Command::CatalogSnapshot { .. }
310 | Command::PrivilegedCancelRequest { .. }
311 | Command::GetWebhook { .. }
312 | Command::Terminate { .. }
313 | Command::GetSystemVars { .. }
314 | Command::SetSystemVars { .. }
315 | Command::RetireExecute { .. }
316 | Command::CheckConsistency { .. }
317 | Command::Dump { .. }
318 | Command::GetComputeInstanceClient { .. }
319 | Command::GetOracle { .. }
320 | Command::DetermineRealTimeRecentTimestamp { .. }
321 | Command::GetTransactionReadHoldsBundle { .. }
322 | Command::StoreTransactionReadHolds { .. }
323 | Command::ExecuteSlowPathPeek { .. }
324 | Command::CopyToPreflight { .. }
325 | Command::ExecuteCopyTo { .. }
326 | Command::ExecuteSideEffectingFunc { .. }
327 | Command::RegisterFrontendPeek { .. }
328 | Command::UnregisterFrontendPeek { .. }
329 | Command::ExplainTimestamp { .. }
330 | Command::FrontendStatementLogging(..) => None,
331 }
332 }
333
334 pub fn session_mut(&mut self) -> Option<&mut Session> {
335 match self {
336 Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
337 Command::CancelRequest { .. }
338 | Command::Startup { .. }
339 | Command::AuthenticatePassword { .. }
340 | Command::AuthenticateGetSASLChallenge { .. }
341 | Command::AuthenticateVerifySASLProof { .. }
342 | Command::CatalogSnapshot { .. }
343 | Command::PrivilegedCancelRequest { .. }
344 | Command::GetWebhook { .. }
345 | Command::Terminate { .. }
346 | Command::GetSystemVars { .. }
347 | Command::SetSystemVars { .. }
348 | Command::RetireExecute { .. }
349 | Command::CheckConsistency { .. }
350 | Command::Dump { .. }
351 | Command::GetComputeInstanceClient { .. }
352 | Command::GetOracle { .. }
353 | Command::DetermineRealTimeRecentTimestamp { .. }
354 | Command::GetTransactionReadHoldsBundle { .. }
355 | Command::StoreTransactionReadHolds { .. }
356 | Command::ExecuteSlowPathPeek { .. }
357 | Command::CopyToPreflight { .. }
358 | Command::ExecuteCopyTo { .. }
359 | Command::ExecuteSideEffectingFunc { .. }
360 | Command::RegisterFrontendPeek { .. }
361 | Command::UnregisterFrontendPeek { .. }
362 | Command::ExplainTimestamp { .. }
363 | Command::FrontendStatementLogging(..) => None,
364 }
365 }
366}
367
368#[derive(Debug)]
369pub struct Response<T> {
370 pub result: Result<T, AdapterError>,
371 pub session: Session,
372 pub otel_ctx: OpenTelemetryContext,
373}
374
375#[derive(Debug, Clone, Copy)]
376pub struct SuperuserAttribute(pub Option<bool>);
377
378#[derive(Derivative)]
380#[derivative(Debug)]
381pub struct StartupResponse {
382 pub role_id: RoleId,
384 pub superuser_attribute: SuperuserAttribute,
389 #[derivative(Debug = "ignore")]
391 pub write_notify: BuiltinTableAppendNotify,
392 pub session_defaults: BTreeMap<String, OwnedVarInput>,
394 pub catalog: Arc<Catalog>,
395 pub storage_collections: Arc<
396 dyn mz_storage_client::storage_collections::StorageCollections<
397 Timestamp = mz_repr::Timestamp,
398 > + Send
399 + Sync,
400 >,
401 pub transient_id_gen: Arc<TransientIdGen>,
402 pub optimizer_metrics: OptimizerMetrics,
403 pub persist_client: PersistClient,
404 pub statement_logging_frontend: StatementLoggingFrontend,
405}
406
407#[derive(Derivative)]
408#[derivative(Debug)]
409pub struct SASLChallengeResponse {
410 pub iteration_count: usize,
411 pub salt: String,
413 pub nonce: String,
414}
415
416#[derive(Derivative)]
417#[derivative(Debug)]
418pub struct SASLVerifyProofResponse {
419 pub verifier: String,
420}
421
422impl Transmittable for StartupResponse {
425 type Allowed = bool;
426 fn to_allowed(&self) -> Self::Allowed {
427 true
428 }
429}
430
431#[derive(Debug, Clone)]
433pub struct CatalogDump(String);
434
435impl CatalogDump {
436 pub fn new(raw: String) -> Self {
437 CatalogDump(raw)
438 }
439
440 pub fn into_string(self) -> String {
441 self.0
442 }
443}
444
445impl Transmittable for CatalogDump {
446 type Allowed = bool;
447 fn to_allowed(&self) -> Self::Allowed {
448 true
449 }
450}
451
452impl Transmittable for SystemVars {
453 type Allowed = bool;
454 fn to_allowed(&self) -> Self::Allowed {
455 true
456 }
457}
458
459#[derive(EnumKind, Derivative)]
461#[derivative(Debug)]
462#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
463pub enum ExecuteResponse {
464 AlteredDefaultPrivileges,
466 AlteredObject(ObjectType),
468 AlteredRole,
470 AlteredSystemConfiguration,
472 ClosedCursor,
474 Comment,
476 Copied(usize),
478 CopyTo {
480 format: mz_sql::plan::CopyFormat,
481 resp: Box<ExecuteResponse>,
482 },
483 CopyFrom {
484 target_id: CatalogItemId,
486 target_name: String,
488 columns: Vec<ColumnIndex>,
489 params: CopyFormatParams<'static>,
490 ctx_extra: ExecuteContextGuard,
491 },
492 CreatedConnection,
494 CreatedDatabase,
496 CreatedSchema,
498 CreatedRole,
500 CreatedCluster,
502 CreatedClusterReplica,
504 CreatedIndex,
506 CreatedIntrospectionSubscribe,
508 CreatedSecret,
510 CreatedSink,
512 CreatedSource,
514 CreatedTable,
516 CreatedView,
518 CreatedViews,
520 CreatedMaterializedView,
522 CreatedContinualTask,
524 CreatedType,
526 CreatedNetworkPolicy,
528 Deallocate { all: bool },
530 DeclaredCursor,
532 Deleted(usize),
534 DiscardedTemp,
536 DiscardedAll,
538 DroppedObject(ObjectType),
540 DroppedOwned,
542 EmptyQuery,
544 Fetch {
546 name: String,
548 count: Option<FetchDirection>,
550 timeout: ExecuteTimeout,
552 ctx_extra: ExecuteContextGuard,
553 },
554 GrantedPrivilege,
556 GrantedRole,
558 Inserted(usize),
560 Prepare,
562 Raised,
564 ReassignOwned,
566 RevokedPrivilege,
568 RevokedRole,
570 SendingRowsStreaming {
572 #[derivative(Debug = "ignore")]
573 rows: Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>,
574 instance_id: ComputeInstanceId,
575 strategy: StatementExecutionStrategy,
576 },
577 SendingRowsImmediate {
580 #[derivative(Debug = "ignore")]
581 rows: Box<dyn RowIterator + Send + Sync>,
582 },
583 SetVariable {
585 name: String,
586 reset: bool,
588 },
589 StartedTransaction,
591 Subscribing {
594 rx: RowBatchStream,
595 ctx_extra: ExecuteContextGuard,
596 instance_id: ComputeInstanceId,
597 },
598 TransactionCommitted {
600 params: BTreeMap<&'static str, String>,
602 },
603 TransactionRolledBack {
605 params: BTreeMap<&'static str, String>,
607 },
608 Updated(usize),
610 ValidatedConnection,
612}
613
614impl TryFrom<&Statement<Raw>> for ExecuteResponse {
615 type Error = ();
616
617 fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
619 let resp_kinds = Plan::generated_from(&stmt.into())
620 .iter()
621 .map(ExecuteResponse::generated_from)
622 .flatten()
623 .cloned()
624 .collect::<BTreeSet<ExecuteResponseKind>>();
625 let resps = resp_kinds
626 .iter()
627 .map(|r| (*r).try_into())
628 .collect::<Result<Vec<ExecuteResponse>, _>>();
629 if let Ok(resps) = resps {
631 if resps.len() == 1 {
632 return Ok(resps.into_element());
633 }
634 }
635 let resp = match stmt {
636 Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
637 ExecuteResponse::DroppedObject((*object_type).into())
638 }
639 Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
640 | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
641 ExecuteResponse::AlteredObject((*object_type).into())
642 }
643 _ => return Err(()),
644 };
645 soft_assert_no_log!(
647 resp_kinds.len() == 1
648 && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
649 "ExecuteResponses out of sync with planner"
650 );
651 Ok(resp)
652 }
653}
654
655impl TryInto<ExecuteResponse> for ExecuteResponseKind {
656 type Error = ();
657
658 fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
661 match self {
662 ExecuteResponseKind::AlteredDefaultPrivileges => {
663 Ok(ExecuteResponse::AlteredDefaultPrivileges)
664 }
665 ExecuteResponseKind::AlteredObject => Err(()),
666 ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
667 ExecuteResponseKind::AlteredSystemConfiguration => {
668 Ok(ExecuteResponse::AlteredSystemConfiguration)
669 }
670 ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
671 ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
672 ExecuteResponseKind::Copied => Err(()),
673 ExecuteResponseKind::CopyTo => Err(()),
674 ExecuteResponseKind::CopyFrom => Err(()),
675 ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
676 ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
677 ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
678 ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
679 ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
680 ExecuteResponseKind::CreatedClusterReplica => {
681 Ok(ExecuteResponse::CreatedClusterReplica)
682 }
683 ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
684 ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
685 ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
686 ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
687 ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
688 ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
689 ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
690 ExecuteResponseKind::CreatedMaterializedView => {
691 Ok(ExecuteResponse::CreatedMaterializedView)
692 }
693 ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
694 ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
695 ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
696 ExecuteResponseKind::Deallocate => Err(()),
697 ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
698 ExecuteResponseKind::Deleted => Err(()),
699 ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
700 ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
701 ExecuteResponseKind::DroppedObject => Err(()),
702 ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
703 ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
704 ExecuteResponseKind::Fetch => Err(()),
705 ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
706 ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
707 ExecuteResponseKind::Inserted => Err(()),
708 ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
709 ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
710 ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
711 ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
712 ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
713 ExecuteResponseKind::SetVariable => Err(()),
714 ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
715 ExecuteResponseKind::Subscribing => Err(()),
716 ExecuteResponseKind::TransactionCommitted => Err(()),
717 ExecuteResponseKind::TransactionRolledBack => Err(()),
718 ExecuteResponseKind::Updated => Err(()),
719 ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
720 ExecuteResponseKind::SendingRowsStreaming => Err(()),
721 ExecuteResponseKind::SendingRowsImmediate => Err(()),
722 ExecuteResponseKind::CreatedIntrospectionSubscribe => {
723 Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
724 }
725 }
726 }
727}
728
729impl ExecuteResponse {
730 pub fn tag(&self) -> Option<String> {
731 use ExecuteResponse::*;
732 match self {
733 AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
734 AlteredObject(o) => Some(format!("ALTER {}", o)),
735 AlteredRole => Some("ALTER ROLE".into()),
736 AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
737 ClosedCursor => Some("CLOSE CURSOR".into()),
738 Comment => Some("COMMENT".into()),
739 Copied(n) => Some(format!("COPY {}", n)),
740 CopyTo { .. } => None,
741 CopyFrom { .. } => None,
742 CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
743 CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
744 CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
745 CreatedRole => Some("CREATE ROLE".into()),
746 CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
747 CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
748 CreatedIndex { .. } => Some("CREATE INDEX".into()),
749 CreatedSecret { .. } => Some("CREATE SECRET".into()),
750 CreatedSink { .. } => Some("CREATE SINK".into()),
751 CreatedSource { .. } => Some("CREATE SOURCE".into()),
752 CreatedTable { .. } => Some("CREATE TABLE".into()),
753 CreatedView { .. } => Some("CREATE VIEW".into()),
754 CreatedViews { .. } => Some("CREATE VIEWS".into()),
755 CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
756 CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
757 CreatedType => Some("CREATE TYPE".into()),
758 CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
759 Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
760 DeclaredCursor => Some("DECLARE CURSOR".into()),
761 Deleted(n) => Some(format!("DELETE {}", n)),
762 DiscardedTemp => Some("DISCARD TEMP".into()),
763 DiscardedAll => Some("DISCARD ALL".into()),
764 DroppedObject(o) => Some(format!("DROP {o}")),
765 DroppedOwned => Some("DROP OWNED".into()),
766 EmptyQuery => None,
767 Fetch { .. } => None,
768 GrantedPrivilege => Some("GRANT".into()),
769 GrantedRole => Some("GRANT ROLE".into()),
770 Inserted(n) => {
771 Some(format!("INSERT 0 {}", n))
779 }
780 Prepare => Some("PREPARE".into()),
781 Raised => Some("RAISE".into()),
782 ReassignOwned => Some("REASSIGN OWNED".into()),
783 RevokedPrivilege => Some("REVOKE".into()),
784 RevokedRole => Some("REVOKE ROLE".into()),
785 SendingRowsStreaming { .. } | SendingRowsImmediate { .. } => None,
786 SetVariable { reset: true, .. } => Some("RESET".into()),
787 SetVariable { reset: false, .. } => Some("SET".into()),
788 StartedTransaction { .. } => Some("BEGIN".into()),
789 Subscribing { .. } => None,
790 TransactionCommitted { .. } => Some("COMMIT".into()),
791 TransactionRolledBack { .. } => Some("ROLLBACK".into()),
792 Updated(n) => Some(format!("UPDATE {}", n)),
793 ValidatedConnection => Some("VALIDATE CONNECTION".into()),
794 CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
795 }
796 }
797
798 pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
802 use ExecuteResponseKind::*;
803 use PlanKind::*;
804
805 match plan {
806 AbortTransaction => &[TransactionRolledBack],
807 AlterClusterRename
808 | AlterClusterSwap
809 | AlterCluster
810 | AlterClusterReplicaRename
811 | AlterOwner
812 | AlterItemRename
813 | AlterRetainHistory
814 | AlterNoop
815 | AlterSchemaRename
816 | AlterSchemaSwap
817 | AlterSecret
818 | AlterConnection
819 | AlterSource
820 | AlterSink
821 | AlterTableAddColumn
822 | AlterMaterializedViewApplyReplacement
823 | AlterNetworkPolicy => &[AlteredObject],
824 AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
825 AlterSetCluster => &[AlteredObject],
826 AlterRole => &[AlteredRole],
827 AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
828 &[AlteredSystemConfiguration]
829 }
830 Close => &[ClosedCursor],
831 PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
832 PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
833 PlanKind::Comment => &[ExecuteResponseKind::Comment],
834 CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
835 CreateConnection => &[CreatedConnection],
836 CreateDatabase => &[CreatedDatabase],
837 CreateSchema => &[CreatedSchema],
838 CreateRole => &[CreatedRole],
839 CreateCluster => &[CreatedCluster],
840 CreateClusterReplica => &[CreatedClusterReplica],
841 CreateSource | CreateSources => &[CreatedSource],
842 CreateSecret => &[CreatedSecret],
843 CreateSink => &[CreatedSink],
844 CreateTable => &[CreatedTable],
845 CreateView => &[CreatedView],
846 CreateMaterializedView => &[CreatedMaterializedView],
847 CreateContinualTask => &[CreatedContinualTask],
848 CreateIndex => &[CreatedIndex],
849 CreateType => &[CreatedType],
850 PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
851 CreateNetworkPolicy => &[CreatedNetworkPolicy],
852 Declare => &[DeclaredCursor],
853 DiscardTemp => &[DiscardedTemp],
854 DiscardAll => &[DiscardedAll],
855 DropObjects => &[DroppedObject],
856 DropOwned => &[DroppedOwned],
857 PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
858 ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
859 | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
860 ExecuteResponseKind::CopyTo,
861 SendingRowsStreaming,
862 SendingRowsImmediate,
863 ],
864 Execute | ReadThenWrite => &[
865 Deleted,
866 Inserted,
867 SendingRowsStreaming,
868 SendingRowsImmediate,
869 Updated,
870 ],
871 PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
872 GrantPrivileges => &[GrantedPrivilege],
873 GrantRole => &[GrantedRole],
874 Insert => &[Inserted, SendingRowsImmediate],
875 PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
876 PlanKind::Raise => &[ExecuteResponseKind::Raised],
877 PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
878 RevokePrivileges => &[RevokedPrivilege],
879 RevokeRole => &[RevokedRole],
880 PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
881 &[ExecuteResponseKind::SetVariable]
882 }
883 PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
884 StartTransaction => &[StartedTransaction],
885 SideEffectingFunc => &[SendingRowsStreaming, SendingRowsImmediate],
886 ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
887 }
888 }
889}
890
891impl Transmittable for ExecuteResponse {
895 type Allowed = ExecuteResponseKind;
896 fn to_allowed(&self) -> Self::Allowed {
897 ExecuteResponseKind::from(self)
898 }
899}