1use std::collections::{BTreeMap, BTreeSet};
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use chrono::{DateTime, Utc};
17use derivative::Derivative;
18use enum_kinds::EnumKind;
19use futures::Stream;
20use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
21use mz_auth::password::Password;
22use mz_cluster_client::ReplicaId;
23use mz_compute_types::ComputeInstanceId;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_controller_types::ClusterId;
26use mz_expr::RowSetFinishing;
27use mz_ore::collections::CollectionExt;
28use mz_ore::soft_assert_no_log;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_persist_client::PersistClient;
31use mz_pgcopy::CopyFormatParams;
32use mz_repr::global_id::TransientIdGen;
33use mz_repr::role_id::RoleId;
34use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType};
35use mz_sql::ast::{FetchDirection, Raw, Statement};
36use mz_sql::catalog::ObjectType;
37use mz_sql::optimizer_metrics::OptimizerMetrics;
38use mz_sql::plan;
39use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind, SideEffectingFunc};
40use mz_sql::session::user::User;
41use mz_sql::session::vars::{OwnedVarInput, SystemVars};
42use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
43use mz_storage_types::sources::Timeline;
44use mz_timestamp_oracle::TimestampOracle;
45use tokio::sync::{mpsc, oneshot};
46use uuid::Uuid;
47
48use crate::catalog::Catalog;
49use crate::coord::appends::BuiltinTableAppendNotify;
50use crate::coord::consistency::CoordinatorInconsistencies;
51use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
52use crate::coord::timestamp_selection::TimestampDetermination;
53use crate::coord::{ExecuteContextExtra, ExecuteContextGuard};
54use crate::error::AdapterError;
55use crate::session::{EndTransactionAction, RowBatchStream, Session};
56use crate::statement_logging::{
57 FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy,
58 StatementLoggingFrontend,
59};
60use crate::statement_logging::{StatementLoggingId, WatchSetCreation};
61use crate::util::Transmittable;
62use crate::webhook::AppendWebhookResponse;
63use crate::{
64 AdapterNotice, AppendWebhookError, CollectionIdBundle, ReadHolds, TimestampExplanation,
65};
66
67#[derive(Debug)]
70pub struct CopyFromStdinWriter {
71 pub batch_txs: Vec<mpsc::Sender<Vec<u8>>>,
74 pub completion_rx: oneshot::Receiver<
77 Result<(Vec<mz_persist_client::batch::ProtoBatch>, u64), crate::AdapterError>,
78 >,
79}
80
81#[derive(Debug)]
82pub struct CatalogSnapshot {
83 pub catalog: Arc<Catalog>,
84}
85
86#[derive(Debug)]
87pub enum Command {
88 CatalogSnapshot {
89 tx: oneshot::Sender<CatalogSnapshot>,
90 },
91
92 Startup {
93 tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
94 user: User,
95 conn_id: ConnectionId,
96 client_ip: Option<IpAddr>,
97 secret_key: u32,
98 uuid: Uuid,
99 application_name: String,
100 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
101 },
102
103 AuthenticatePassword {
104 tx: oneshot::Sender<Result<(), AdapterError>>,
105 role_name: String,
106 password: Option<Password>,
107 },
108
109 AuthenticateGetSASLChallenge {
110 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
111 role_name: String,
112 nonce: String,
113 },
114
115 AuthenticateVerifySASLProof {
116 tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
117 role_name: String,
118 proof: String,
119 auth_message: String,
120 mock_hash: String,
121 },
122
123 CheckRoleCanLogin {
124 tx: oneshot::Sender<Result<(), AdapterError>>,
125 role_name: String,
126 },
127
128 Execute {
129 portal_name: String,
130 session: Session,
131 tx: oneshot::Sender<Response<ExecuteResponse>>,
132 outer_ctx_extra: Option<ExecuteContextGuard>,
133 },
134
135 Commit {
140 action: EndTransactionAction,
141 session: Session,
142 tx: oneshot::Sender<Response<ExecuteResponse>>,
143 },
144
145 CancelRequest {
146 conn_id: ConnectionIdType,
147 secret_key: u32,
148 },
149
150 PrivilegedCancelRequest {
151 conn_id: ConnectionId,
152 },
153
154 GetWebhook {
155 database: String,
156 schema: String,
157 name: String,
158 tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
159 },
160
161 GetSystemVars {
162 tx: oneshot::Sender<SystemVars>,
163 },
164
165 SetSystemVars {
166 vars: BTreeMap<String, String>,
167 conn_id: ConnectionId,
168 tx: oneshot::Sender<Result<(), AdapterError>>,
169 },
170
171 InjectAuditEvents {
172 events: Vec<crate::catalog::InjectedAuditEvent>,
173 conn_id: ConnectionId,
174 tx: oneshot::Sender<Result<(), AdapterError>>,
175 },
176
177 Terminate {
178 conn_id: ConnectionId,
179 tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
180 },
181
182 StartCopyFromStdin {
186 target_id: CatalogItemId,
187 target_name: String,
188 columns: Vec<ColumnIndex>,
189 row_desc: mz_repr::RelationDesc,
191 params: mz_pgcopy::CopyFormatParams<'static>,
193 session: Session,
194 tx: oneshot::Sender<Response<CopyFromStdinWriter>>,
195 },
196
197 RetireExecute {
204 data: ExecuteContextExtra,
205 reason: StatementEndedExecutionReason,
206 },
207
208 CheckConsistency {
209 tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
210 },
211
212 Dump {
213 tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
214 },
215
216 GetComputeInstanceClient {
217 instance_id: ComputeInstanceId,
218 tx: oneshot::Sender<
219 Result<
220 mz_compute_client::controller::instance_client::InstanceClient,
221 mz_compute_client::controller::error::InstanceMissing,
222 >,
223 >,
224 },
225
226 GetOracle {
227 timeline: Timeline,
228 tx: oneshot::Sender<
229 Result<Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync>, AdapterError>,
230 >,
231 },
232
233 DetermineRealTimeRecentTimestamp {
234 source_ids: BTreeSet<GlobalId>,
235 real_time_recency_timeout: Duration,
236 tx: oneshot::Sender<Result<Option<mz_repr::Timestamp>, AdapterError>>,
237 },
238
239 GetTransactionReadHoldsBundle {
240 conn_id: ConnectionId,
241 tx: oneshot::Sender<Option<ReadHolds>>,
242 },
243
244 StoreTransactionReadHolds {
246 conn_id: ConnectionId,
247 read_holds: ReadHolds,
248 tx: oneshot::Sender<()>,
249 },
250
251 ExecuteSlowPathPeek {
252 dataflow_plan: Box<PeekDataflowPlan>,
253 determination: TimestampDetermination,
254 finishing: RowSetFinishing,
255 compute_instance: ComputeInstanceId,
256 target_replica: Option<ReplicaId>,
257 intermediate_result_type: SqlRelationType,
258 source_ids: BTreeSet<GlobalId>,
259 conn_id: ConnectionId,
260 max_result_size: u64,
261 max_query_result_size: Option<u64>,
262 watch_set: Option<WatchSetCreation>,
265 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
266 },
267
268 ExecuteSubscribe {
269 df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
270 dependency_ids: BTreeSet<GlobalId>,
271 cluster_id: ComputeInstanceId,
272 replica_id: Option<ReplicaId>,
273 conn_id: ConnectionId,
274 session_uuid: Uuid,
275 read_holds: ReadHolds,
276 plan: plan::SubscribePlan,
277 statement_logging_id: Option<StatementLoggingId>,
278 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
279 },
280
281 CopyToPreflight {
285 s3_sink_connection: mz_compute_types::sinks::CopyToS3OneshotSinkConnection,
287 sink_id: GlobalId,
289 tx: oneshot::Sender<Result<(), AdapterError>>,
291 },
292
293 ExecuteCopyTo {
294 df_desc: Box<DataflowDescription<mz_compute_types::plan::Plan>>,
295 compute_instance: ComputeInstanceId,
296 target_replica: Option<ReplicaId>,
297 source_ids: BTreeSet<GlobalId>,
298 conn_id: ConnectionId,
299 watch_set: Option<WatchSetCreation>,
302 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
303 },
304
305 ExecuteSideEffectingFunc {
307 plan: SideEffectingFunc,
308 conn_id: ConnectionId,
309 current_role: RoleId,
311 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
312 },
313
314 RegisterFrontendPeek {
318 uuid: Uuid,
319 conn_id: ConnectionId,
320 cluster_id: mz_controller_types::ClusterId,
321 depends_on: BTreeSet<GlobalId>,
322 is_fast_path: bool,
323 watch_set: Option<WatchSetCreation>,
326 tx: oneshot::Sender<Result<(), AdapterError>>,
327 },
328
329 UnregisterFrontendPeek {
334 uuid: Uuid,
335 tx: oneshot::Sender<()>,
336 },
337
338 ExplainTimestamp {
341 conn_id: ConnectionId,
342 session_wall_time: DateTime<Utc>,
343 cluster_id: ClusterId,
344 id_bundle: CollectionIdBundle,
345 determination: TimestampDetermination,
346 tx: oneshot::Sender<TimestampExplanation>,
347 },
348
349 FrontendStatementLogging(FrontendStatementLoggingEvent),
352}
353
354impl Command {
355 pub fn session(&self) -> Option<&Session> {
356 match self {
357 Command::Execute { session, .. }
358 | Command::Commit { session, .. }
359 | Command::StartCopyFromStdin { session, .. } => Some(session),
360 Command::CancelRequest { .. }
361 | Command::Startup { .. }
362 | Command::AuthenticatePassword { .. }
363 | Command::AuthenticateGetSASLChallenge { .. }
364 | Command::AuthenticateVerifySASLProof { .. }
365 | Command::CheckRoleCanLogin { .. }
366 | Command::CatalogSnapshot { .. }
367 | Command::PrivilegedCancelRequest { .. }
368 | Command::GetWebhook { .. }
369 | Command::Terminate { .. }
370 | Command::GetSystemVars { .. }
371 | Command::SetSystemVars { .. }
372 | Command::RetireExecute { .. }
373 | Command::CheckConsistency { .. }
374 | Command::Dump { .. }
375 | Command::GetComputeInstanceClient { .. }
376 | Command::GetOracle { .. }
377 | Command::DetermineRealTimeRecentTimestamp { .. }
378 | Command::GetTransactionReadHoldsBundle { .. }
379 | Command::StoreTransactionReadHolds { .. }
380 | Command::ExecuteSlowPathPeek { .. }
381 | Command::ExecuteSubscribe { .. }
382 | Command::CopyToPreflight { .. }
383 | Command::ExecuteCopyTo { .. }
384 | Command::ExecuteSideEffectingFunc { .. }
385 | Command::RegisterFrontendPeek { .. }
386 | Command::UnregisterFrontendPeek { .. }
387 | Command::ExplainTimestamp { .. }
388 | Command::FrontendStatementLogging(..)
389 | Command::InjectAuditEvents { .. } => None,
390 }
391 }
392
393 pub fn session_mut(&mut self) -> Option<&mut Session> {
394 match self {
395 Command::Execute { session, .. }
396 | Command::Commit { session, .. }
397 | Command::StartCopyFromStdin { session, .. } => Some(session),
398 Command::CancelRequest { .. }
399 | Command::Startup { .. }
400 | Command::AuthenticatePassword { .. }
401 | Command::AuthenticateGetSASLChallenge { .. }
402 | Command::AuthenticateVerifySASLProof { .. }
403 | Command::CheckRoleCanLogin { .. }
404 | Command::CatalogSnapshot { .. }
405 | Command::PrivilegedCancelRequest { .. }
406 | Command::GetWebhook { .. }
407 | Command::Terminate { .. }
408 | Command::GetSystemVars { .. }
409 | Command::SetSystemVars { .. }
410 | Command::RetireExecute { .. }
411 | Command::CheckConsistency { .. }
412 | Command::Dump { .. }
413 | Command::GetComputeInstanceClient { .. }
414 | Command::GetOracle { .. }
415 | Command::DetermineRealTimeRecentTimestamp { .. }
416 | Command::GetTransactionReadHoldsBundle { .. }
417 | Command::StoreTransactionReadHolds { .. }
418 | Command::ExecuteSlowPathPeek { .. }
419 | Command::ExecuteSubscribe { .. }
420 | Command::CopyToPreflight { .. }
421 | Command::ExecuteCopyTo { .. }
422 | Command::ExecuteSideEffectingFunc { .. }
423 | Command::RegisterFrontendPeek { .. }
424 | Command::UnregisterFrontendPeek { .. }
425 | Command::ExplainTimestamp { .. }
426 | Command::FrontendStatementLogging(..)
427 | Command::InjectAuditEvents { .. } => None,
428 }
429 }
430}
431
432#[derive(Debug)]
433pub struct Response<T> {
434 pub result: Result<T, AdapterError>,
435 pub session: Session,
436 pub otel_ctx: OpenTelemetryContext,
437}
438
439#[derive(Debug, Clone, Copy)]
440pub struct SuperuserAttribute(pub Option<bool>);
441
442#[derive(Derivative)]
444#[derivative(Debug)]
445pub struct StartupResponse {
446 pub role_id: RoleId,
448 pub superuser_attribute: SuperuserAttribute,
453 #[derivative(Debug = "ignore")]
455 pub write_notify: BuiltinTableAppendNotify,
456 pub session_defaults: BTreeMap<String, OwnedVarInput>,
458 pub catalog: Arc<Catalog>,
459 pub storage_collections:
460 Arc<dyn mz_storage_client::storage_collections::StorageCollections + Send + Sync>,
461 pub transient_id_gen: Arc<TransientIdGen>,
462 pub optimizer_metrics: OptimizerMetrics,
463 pub persist_client: PersistClient,
464 pub statement_logging_frontend: StatementLoggingFrontend,
465}
466
467#[derive(Derivative)]
468#[derivative(Debug)]
469pub struct SASLChallengeResponse {
470 pub iteration_count: usize,
471 pub salt: String,
473 pub nonce: String,
474}
475
476#[derive(Derivative)]
477#[derivative(Debug)]
478pub struct SASLVerifyProofResponse {
479 pub verifier: String,
480}
481
482impl Transmittable for StartupResponse {
485 type Allowed = bool;
486 fn to_allowed(&self) -> Self::Allowed {
487 true
488 }
489}
490
491#[derive(Debug, Clone)]
493pub struct CatalogDump(String);
494
495impl CatalogDump {
496 pub fn new(raw: String) -> Self {
497 CatalogDump(raw)
498 }
499
500 pub fn into_string(self) -> String {
501 self.0
502 }
503}
504
505impl Transmittable for CatalogDump {
506 type Allowed = bool;
507 fn to_allowed(&self) -> Self::Allowed {
508 true
509 }
510}
511
512impl Transmittable for SystemVars {
513 type Allowed = bool;
514 fn to_allowed(&self) -> Self::Allowed {
515 true
516 }
517}
518
519#[derive(EnumKind, Derivative)]
521#[derivative(Debug)]
522#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
523pub enum ExecuteResponse {
524 AlteredDefaultPrivileges,
526 AlteredObject(ObjectType),
528 AlteredRole,
530 AlteredSystemConfiguration,
532 ClosedCursor,
534 Comment,
536 Copied(usize),
538 CopyTo {
540 format: mz_sql::plan::CopyFormat,
541 resp: Box<ExecuteResponse>,
542 },
543 CopyFrom {
544 target_id: CatalogItemId,
546 target_name: String,
548 columns: Vec<ColumnIndex>,
549 params: CopyFormatParams<'static>,
550 ctx_extra: ExecuteContextGuard,
551 },
552 CreatedConnection,
554 CreatedDatabase,
556 CreatedSchema,
558 CreatedRole,
560 CreatedCluster,
562 CreatedClusterReplica,
564 CreatedIndex,
566 CreatedIntrospectionSubscribe,
568 CreatedSecret,
570 CreatedSink,
572 CreatedSource,
574 CreatedTable,
576 CreatedView,
578 CreatedViews,
580 CreatedMaterializedView,
582 CreatedContinualTask,
584 CreatedType,
586 CreatedNetworkPolicy,
588 Deallocate { all: bool },
590 DeclaredCursor,
592 Deleted(usize),
594 DiscardedTemp,
596 DiscardedAll,
598 DroppedObject(ObjectType),
600 DroppedOwned,
602 EmptyQuery,
604 Fetch {
606 name: String,
608 count: Option<FetchDirection>,
610 timeout: ExecuteTimeout,
612 ctx_extra: ExecuteContextGuard,
613 },
614 GrantedPrivilege,
616 GrantedRole,
618 Inserted(usize),
620 Prepare,
622 Raised,
624 ReassignOwned,
626 RevokedPrivilege,
628 RevokedRole,
630 SendingRowsStreaming {
632 #[derivative(Debug = "ignore")]
633 rows: Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>,
634 instance_id: ComputeInstanceId,
635 strategy: StatementExecutionStrategy,
636 },
637 SendingRowsImmediate {
640 #[derivative(Debug = "ignore")]
641 rows: Box<dyn RowIterator + Send + Sync>,
642 },
643 SetVariable {
645 name: String,
646 reset: bool,
648 },
649 StartedTransaction,
651 Subscribing {
654 rx: RowBatchStream,
655 ctx_extra: ExecuteContextGuard,
656 instance_id: ComputeInstanceId,
657 },
658 TransactionCommitted {
660 params: BTreeMap<&'static str, String>,
662 },
663 TransactionRolledBack {
665 params: BTreeMap<&'static str, String>,
667 },
668 Updated(usize),
670 ValidatedConnection,
672}
673
674impl TryFrom<&Statement<Raw>> for ExecuteResponse {
675 type Error = ();
676
677 fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
679 let resp_kinds = Plan::generated_from(&stmt.into())
680 .iter()
681 .map(ExecuteResponse::generated_from)
682 .flatten()
683 .cloned()
684 .collect::<BTreeSet<ExecuteResponseKind>>();
685 let resps = resp_kinds
686 .iter()
687 .map(|r| (*r).try_into())
688 .collect::<Result<Vec<ExecuteResponse>, _>>();
689 if let Ok(resps) = resps {
691 if resps.len() == 1 {
692 return Ok(resps.into_element());
693 }
694 }
695 let resp = match stmt {
696 Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
697 ExecuteResponse::DroppedObject((*object_type).into())
698 }
699 Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
700 | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
701 ExecuteResponse::AlteredObject((*object_type).into())
702 }
703 _ => return Err(()),
704 };
705 soft_assert_no_log!(
707 resp_kinds.len() == 1
708 && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
709 "ExecuteResponses out of sync with planner"
710 );
711 Ok(resp)
712 }
713}
714
715impl TryInto<ExecuteResponse> for ExecuteResponseKind {
716 type Error = ();
717
718 fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
721 match self {
722 ExecuteResponseKind::AlteredDefaultPrivileges => {
723 Ok(ExecuteResponse::AlteredDefaultPrivileges)
724 }
725 ExecuteResponseKind::AlteredObject => Err(()),
726 ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
727 ExecuteResponseKind::AlteredSystemConfiguration => {
728 Ok(ExecuteResponse::AlteredSystemConfiguration)
729 }
730 ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
731 ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
732 ExecuteResponseKind::Copied => Err(()),
733 ExecuteResponseKind::CopyTo => Err(()),
734 ExecuteResponseKind::CopyFrom => Err(()),
735 ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
736 ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
737 ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
738 ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
739 ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
740 ExecuteResponseKind::CreatedClusterReplica => {
741 Ok(ExecuteResponse::CreatedClusterReplica)
742 }
743 ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
744 ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
745 ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
746 ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
747 ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
748 ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
749 ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
750 ExecuteResponseKind::CreatedMaterializedView => {
751 Ok(ExecuteResponse::CreatedMaterializedView)
752 }
753 ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
754 ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
755 ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
756 ExecuteResponseKind::Deallocate => Err(()),
757 ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
758 ExecuteResponseKind::Deleted => Err(()),
759 ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
760 ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
761 ExecuteResponseKind::DroppedObject => Err(()),
762 ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
763 ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
764 ExecuteResponseKind::Fetch => Err(()),
765 ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
766 ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
767 ExecuteResponseKind::Inserted => Err(()),
768 ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
769 ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
770 ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
771 ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
772 ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
773 ExecuteResponseKind::SetVariable => Err(()),
774 ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
775 ExecuteResponseKind::Subscribing => Err(()),
776 ExecuteResponseKind::TransactionCommitted => Err(()),
777 ExecuteResponseKind::TransactionRolledBack => Err(()),
778 ExecuteResponseKind::Updated => Err(()),
779 ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
780 ExecuteResponseKind::SendingRowsStreaming => Err(()),
781 ExecuteResponseKind::SendingRowsImmediate => Err(()),
782 ExecuteResponseKind::CreatedIntrospectionSubscribe => {
783 Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
784 }
785 }
786 }
787}
788
789impl ExecuteResponse {
790 pub fn tag(&self) -> Option<String> {
791 use ExecuteResponse::*;
792 match self {
793 AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
794 AlteredObject(o) => Some(format!("ALTER {}", o)),
795 AlteredRole => Some("ALTER ROLE".into()),
796 AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
797 ClosedCursor => Some("CLOSE CURSOR".into()),
798 Comment => Some("COMMENT".into()),
799 Copied(n) => Some(format!("COPY {}", n)),
800 CopyTo { .. } => None,
801 CopyFrom { .. } => None,
802 CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
803 CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
804 CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
805 CreatedRole => Some("CREATE ROLE".into()),
806 CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
807 CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
808 CreatedIndex { .. } => Some("CREATE INDEX".into()),
809 CreatedSecret { .. } => Some("CREATE SECRET".into()),
810 CreatedSink { .. } => Some("CREATE SINK".into()),
811 CreatedSource { .. } => Some("CREATE SOURCE".into()),
812 CreatedTable { .. } => Some("CREATE TABLE".into()),
813 CreatedView { .. } => Some("CREATE VIEW".into()),
814 CreatedViews { .. } => Some("CREATE VIEWS".into()),
815 CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
816 CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
817 CreatedType => Some("CREATE TYPE".into()),
818 CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
819 Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
820 DeclaredCursor => Some("DECLARE CURSOR".into()),
821 Deleted(n) => Some(format!("DELETE {}", n)),
822 DiscardedTemp => Some("DISCARD TEMP".into()),
823 DiscardedAll => Some("DISCARD ALL".into()),
824 DroppedObject(o) => Some(format!("DROP {o}")),
825 DroppedOwned => Some("DROP OWNED".into()),
826 EmptyQuery => None,
827 Fetch { .. } => None,
828 GrantedPrivilege => Some("GRANT".into()),
829 GrantedRole => Some("GRANT ROLE".into()),
830 Inserted(n) => {
831 Some(format!("INSERT 0 {}", n))
839 }
840 Prepare => Some("PREPARE".into()),
841 Raised => Some("RAISE".into()),
842 ReassignOwned => Some("REASSIGN OWNED".into()),
843 RevokedPrivilege => Some("REVOKE".into()),
844 RevokedRole => Some("REVOKE ROLE".into()),
845 SendingRowsStreaming { .. } | SendingRowsImmediate { .. } => None,
846 SetVariable { reset: true, .. } => Some("RESET".into()),
847 SetVariable { reset: false, .. } => Some("SET".into()),
848 StartedTransaction { .. } => Some("BEGIN".into()),
849 Subscribing { .. } => None,
850 TransactionCommitted { .. } => Some("COMMIT".into()),
851 TransactionRolledBack { .. } => Some("ROLLBACK".into()),
852 Updated(n) => Some(format!("UPDATE {}", n)),
853 ValidatedConnection => Some("VALIDATE CONNECTION".into()),
854 CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
855 }
856 }
857
858 pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
862 use ExecuteResponseKind::*;
863 use PlanKind::*;
864
865 match plan {
866 AbortTransaction => &[TransactionRolledBack],
867 AlterClusterRename
868 | AlterClusterSwap
869 | AlterCluster
870 | AlterClusterReplicaRename
871 | AlterOwner
872 | AlterItemRename
873 | AlterRetainHistory
874 | AlterSourceTimestampInterval
875 | AlterNoop
876 | AlterSchemaRename
877 | AlterSchemaSwap
878 | AlterSecret
879 | AlterConnection
880 | AlterSource
881 | AlterSink
882 | AlterTableAddColumn
883 | AlterMaterializedViewApplyReplacement
884 | AlterNetworkPolicy => &[AlteredObject],
885 AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
886 AlterSetCluster => &[AlteredObject],
887 AlterRole => &[AlteredRole],
888 AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
889 &[AlteredSystemConfiguration]
890 }
891 Close => &[ClosedCursor],
892 PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
893 PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
894 PlanKind::Comment => &[ExecuteResponseKind::Comment],
895 CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
896 CreateConnection => &[CreatedConnection],
897 CreateDatabase => &[CreatedDatabase],
898 CreateSchema => &[CreatedSchema],
899 CreateRole => &[CreatedRole],
900 CreateCluster => &[CreatedCluster],
901 CreateClusterReplica => &[CreatedClusterReplica],
902 CreateSource | CreateSources => &[CreatedSource],
903 CreateSecret => &[CreatedSecret],
904 CreateSink => &[CreatedSink],
905 CreateTable => &[CreatedTable],
906 CreateView => &[CreatedView],
907 CreateMaterializedView => &[CreatedMaterializedView],
908 CreateContinualTask => &[CreatedContinualTask],
909 CreateIndex => &[CreatedIndex],
910 CreateType => &[CreatedType],
911 PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
912 CreateNetworkPolicy => &[CreatedNetworkPolicy],
913 Declare => &[DeclaredCursor],
914 DiscardTemp => &[DiscardedTemp],
915 DiscardAll => &[DiscardedAll],
916 DropObjects => &[DroppedObject],
917 DropOwned => &[DroppedOwned],
918 PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
919 ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
920 | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
921 ExecuteResponseKind::CopyTo,
922 SendingRowsStreaming,
923 SendingRowsImmediate,
924 ],
925 Execute | ReadThenWrite => &[
926 Deleted,
927 Inserted,
928 SendingRowsStreaming,
929 SendingRowsImmediate,
930 Updated,
931 ],
932 PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
933 GrantPrivileges => &[GrantedPrivilege],
934 GrantRole => &[GrantedRole],
935 Insert => &[Inserted, SendingRowsImmediate],
936 PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
937 PlanKind::Raise => &[ExecuteResponseKind::Raised],
938 PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
939 RevokePrivileges => &[RevokedPrivilege],
940 RevokeRole => &[RevokedRole],
941 PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
942 &[ExecuteResponseKind::SetVariable]
943 }
944 PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
945 StartTransaction => &[StartedTransaction],
946 SideEffectingFunc => &[SendingRowsStreaming, SendingRowsImmediate],
947 ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
948 }
949 }
950}
951
952impl Transmittable for ExecuteResponse {
956 type Allowed = ExecuteResponseKind;
957 fn to_allowed(&self) -> Self::Allowed {
958 ExecuteResponseKind::from(self)
959 }
960}