1use std::collections::{BTreeMap, BTreeSet};
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use chrono::{DateTime, Utc};
17use derivative::Derivative;
18use enum_kinds::EnumKind;
19use futures::Stream;
20use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
21use mz_auth::password::Password;
22use mz_cluster_client::ReplicaId;
23use mz_compute_types::ComputeInstanceId;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_controller_types::ClusterId;
26use mz_expr::RowSetFinishing;
27use mz_ore::collections::CollectionExt;
28use mz_ore::soft_assert_no_log;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_persist_client::PersistClient;
31use mz_pgcopy::CopyFormatParams;
32use mz_repr::global_id::TransientIdGen;
33use mz_repr::role_id::RoleId;
34use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType};
35use mz_sql::ast::{FetchDirection, Raw, Statement};
36use mz_sql::catalog::ObjectType;
37use mz_sql::optimizer_metrics::OptimizerMetrics;
38use mz_sql::plan;
39use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind, SideEffectingFunc};
40use mz_sql::session::user::User;
41use mz_sql::session::vars::{OwnedVarInput, SystemVars};
42use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
43use mz_storage_types::sources::Timeline;
44use mz_timestamp_oracle::TimestampOracle;
45use tokio::sync::{mpsc, oneshot};
46use uuid::Uuid;
47
48use crate::catalog::Catalog;
49use crate::coord::appends::BuiltinTableAppendNotify;
50use crate::coord::consistency::CoordinatorInconsistencies;
51use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
52use crate::coord::timestamp_selection::TimestampDetermination;
53use crate::coord::{ExecuteContextExtra, ExecuteContextGuard};
54use crate::error::AdapterError;
55use crate::session::{EndTransactionAction, RowBatchStream, Session};
56use crate::statement_logging::{
57 FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy,
58 StatementLoggingFrontend,
59};
60use crate::statement_logging::{StatementLoggingId, WatchSetCreation};
61use crate::util::Transmittable;
62use crate::webhook::AppendWebhookResponse;
63use crate::{
64 AdapterNotice, AppendWebhookError, CollectionIdBundle, ReadHolds, TimestampExplanation,
65};
66
67#[derive(Debug)]
70pub struct CopyFromStdinWriter {
71 pub batch_txs: Vec<mpsc::Sender<Vec<u8>>>,
74 pub completion_rx: oneshot::Receiver<
77 Result<(Vec<mz_persist_client::batch::ProtoBatch>, u64), crate::AdapterError>,
78 >,
79}
80
81#[derive(Debug)]
82pub struct CatalogSnapshot {
83 pub catalog: Arc<Catalog>,
84}
85
86#[derive(Debug)]
87pub enum Command {
88 CatalogSnapshot {
89 tx: oneshot::Sender<CatalogSnapshot>,
90 },
91
92 Startup {
93 tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
94 user: User,
95 conn_id: ConnectionId,
96 client_ip: Option<IpAddr>,
97 secret_key: u32,
98 uuid: Uuid,
99 application_name: String,
100 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
101 },
102
103 AuthenticatePassword {
104 tx: oneshot::Sender<Result<(), AdapterError>>,
105 role_name: String,
106 password: Option<Password>,
107 },
108
109 AuthenticateGetSASLChallenge {
110 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
111 role_name: String,
112 nonce: String,
113 },
114
115 AuthenticateVerifySASLProof {
116 tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
117 role_name: String,
118 proof: String,
119 auth_message: String,
120 mock_hash: String,
121 },
122
123 CheckRoleCanLogin {
124 tx: oneshot::Sender<Result<(), AdapterError>>,
125 role_name: String,
126 },
127
128 Execute {
129 portal_name: String,
130 session: Session,
131 tx: oneshot::Sender<Response<ExecuteResponse>>,
132 outer_ctx_extra: Option<ExecuteContextGuard>,
133 },
134
135 Commit {
140 action: EndTransactionAction,
141 session: Session,
142 tx: oneshot::Sender<Response<ExecuteResponse>>,
143 },
144
145 CancelRequest {
146 conn_id: ConnectionIdType,
147 secret_key: u32,
148 },
149
150 PrivilegedCancelRequest {
151 conn_id: ConnectionId,
152 },
153
154 GetWebhook {
155 database: String,
156 schema: String,
157 name: String,
158 tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
159 },
160
161 GetSystemVars {
162 tx: oneshot::Sender<SystemVars>,
163 },
164
165 SetSystemVars {
166 vars: BTreeMap<String, String>,
167 conn_id: ConnectionId,
168 tx: oneshot::Sender<Result<(), AdapterError>>,
169 },
170
171 InjectAuditEvents {
172 events: Vec<crate::catalog::InjectedAuditEvent>,
173 conn_id: ConnectionId,
174 tx: oneshot::Sender<Result<(), AdapterError>>,
175 },
176
177 Terminate {
178 conn_id: ConnectionId,
179 tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
180 },
181
182 StartCopyFromStdin {
186 target_id: CatalogItemId,
187 target_name: String,
188 columns: Vec<ColumnIndex>,
189 row_desc: mz_repr::RelationDesc,
191 params: mz_pgcopy::CopyFormatParams<'static>,
193 session: Session,
194 tx: oneshot::Sender<Response<CopyFromStdinWriter>>,
195 },
196
197 RetireExecute {
204 data: ExecuteContextExtra,
205 reason: StatementEndedExecutionReason,
206 },
207
208 CheckConsistency {
209 tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
210 },
211
212 Dump {
213 tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
214 },
215
216 GetComputeInstanceClient {
217 instance_id: ComputeInstanceId,
218 tx: oneshot::Sender<
219 Result<
220 mz_compute_client::controller::instance_client::InstanceClient,
221 mz_compute_client::controller::error::InstanceMissing,
222 >,
223 >,
224 },
225
226 GetOracle {
227 timeline: Timeline,
228 tx: oneshot::Sender<
229 Result<Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync>, AdapterError>,
230 >,
231 },
232
233 DetermineRealTimeRecentTimestamp {
234 source_ids: BTreeSet<GlobalId>,
235 real_time_recency_timeout: Duration,
236 tx: oneshot::Sender<Result<Option<mz_repr::Timestamp>, AdapterError>>,
237 },
238
239 GetTransactionReadHoldsBundle {
240 conn_id: ConnectionId,
241 tx: oneshot::Sender<Option<ReadHolds>>,
242 },
243
244 StoreTransactionReadHolds {
246 conn_id: ConnectionId,
247 read_holds: ReadHolds,
248 tx: oneshot::Sender<()>,
249 },
250
251 ExecuteSlowPathPeek {
252 dataflow_plan: Box<PeekDataflowPlan>,
253 determination: TimestampDetermination,
254 finishing: RowSetFinishing,
255 compute_instance: ComputeInstanceId,
256 target_replica: Option<ReplicaId>,
257 intermediate_result_type: SqlRelationType,
258 source_ids: BTreeSet<GlobalId>,
259 conn_id: ConnectionId,
260 max_result_size: u64,
261 max_query_result_size: Option<u64>,
262 watch_set: Option<WatchSetCreation>,
265 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
266 },
267
268 ExecuteSubscribe {
269 df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
270 dependency_ids: BTreeSet<GlobalId>,
271 cluster_id: ComputeInstanceId,
272 replica_id: Option<ReplicaId>,
273 conn_id: ConnectionId,
274 session_uuid: Uuid,
275 read_holds: ReadHolds,
276 plan: plan::SubscribePlan,
277 statement_logging_id: Option<StatementLoggingId>,
278 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
279 },
280
281 CopyToPreflight {
285 s3_sink_connection: mz_compute_types::sinks::CopyToS3OneshotSinkConnection,
287 sink_id: GlobalId,
289 tx: oneshot::Sender<Result<(), AdapterError>>,
291 },
292
293 ExecuteCopyTo {
294 df_desc: Box<DataflowDescription<mz_compute_types::plan::Plan>>,
295 compute_instance: ComputeInstanceId,
296 target_replica: Option<ReplicaId>,
297 source_ids: BTreeSet<GlobalId>,
298 conn_id: ConnectionId,
299 watch_set: Option<WatchSetCreation>,
302 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
303 },
304
305 ExecuteSideEffectingFunc {
307 plan: SideEffectingFunc,
308 conn_id: ConnectionId,
309 current_role: RoleId,
311 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
312 },
313
314 RegisterFrontendPeek {
318 uuid: Uuid,
319 conn_id: ConnectionId,
320 cluster_id: mz_controller_types::ClusterId,
321 depends_on: BTreeSet<GlobalId>,
322 is_fast_path: bool,
323 watch_set: Option<WatchSetCreation>,
326 tx: oneshot::Sender<Result<(), AdapterError>>,
327 },
328
329 UnregisterFrontendPeek {
334 uuid: Uuid,
335 tx: oneshot::Sender<()>,
336 },
337
338 ExplainTimestamp {
341 conn_id: ConnectionId,
342 session_wall_time: DateTime<Utc>,
343 cluster_id: ClusterId,
344 id_bundle: CollectionIdBundle,
345 determination: TimestampDetermination,
346 tx: oneshot::Sender<TimestampExplanation>,
347 },
348
349 FrontendStatementLogging(FrontendStatementLoggingEvent),
352}
353
354impl Command {
355 pub fn session(&self) -> Option<&Session> {
356 match self {
357 Command::Execute { session, .. }
358 | Command::Commit { session, .. }
359 | Command::StartCopyFromStdin { session, .. } => Some(session),
360 Command::CancelRequest { .. }
361 | Command::Startup { .. }
362 | Command::AuthenticatePassword { .. }
363 | Command::AuthenticateGetSASLChallenge { .. }
364 | Command::AuthenticateVerifySASLProof { .. }
365 | Command::CheckRoleCanLogin { .. }
366 | Command::CatalogSnapshot { .. }
367 | Command::PrivilegedCancelRequest { .. }
368 | Command::GetWebhook { .. }
369 | Command::Terminate { .. }
370 | Command::GetSystemVars { .. }
371 | Command::SetSystemVars { .. }
372 | Command::RetireExecute { .. }
373 | Command::CheckConsistency { .. }
374 | Command::Dump { .. }
375 | Command::GetComputeInstanceClient { .. }
376 | Command::GetOracle { .. }
377 | Command::DetermineRealTimeRecentTimestamp { .. }
378 | Command::GetTransactionReadHoldsBundle { .. }
379 | Command::StoreTransactionReadHolds { .. }
380 | Command::ExecuteSlowPathPeek { .. }
381 | Command::ExecuteSubscribe { .. }
382 | Command::CopyToPreflight { .. }
383 | Command::ExecuteCopyTo { .. }
384 | Command::ExecuteSideEffectingFunc { .. }
385 | Command::RegisterFrontendPeek { .. }
386 | Command::UnregisterFrontendPeek { .. }
387 | Command::ExplainTimestamp { .. }
388 | Command::FrontendStatementLogging(..)
389 | Command::InjectAuditEvents { .. } => None,
390 }
391 }
392
393 pub fn session_mut(&mut self) -> Option<&mut Session> {
394 match self {
395 Command::Execute { session, .. }
396 | Command::Commit { session, .. }
397 | Command::StartCopyFromStdin { session, .. } => Some(session),
398 Command::CancelRequest { .. }
399 | Command::Startup { .. }
400 | Command::AuthenticatePassword { .. }
401 | Command::AuthenticateGetSASLChallenge { .. }
402 | Command::AuthenticateVerifySASLProof { .. }
403 | Command::CheckRoleCanLogin { .. }
404 | Command::CatalogSnapshot { .. }
405 | Command::PrivilegedCancelRequest { .. }
406 | Command::GetWebhook { .. }
407 | Command::Terminate { .. }
408 | Command::GetSystemVars { .. }
409 | Command::SetSystemVars { .. }
410 | Command::RetireExecute { .. }
411 | Command::CheckConsistency { .. }
412 | Command::Dump { .. }
413 | Command::GetComputeInstanceClient { .. }
414 | Command::GetOracle { .. }
415 | Command::DetermineRealTimeRecentTimestamp { .. }
416 | Command::GetTransactionReadHoldsBundle { .. }
417 | Command::StoreTransactionReadHolds { .. }
418 | Command::ExecuteSlowPathPeek { .. }
419 | Command::ExecuteSubscribe { .. }
420 | Command::CopyToPreflight { .. }
421 | Command::ExecuteCopyTo { .. }
422 | Command::ExecuteSideEffectingFunc { .. }
423 | Command::RegisterFrontendPeek { .. }
424 | Command::UnregisterFrontendPeek { .. }
425 | Command::ExplainTimestamp { .. }
426 | Command::FrontendStatementLogging(..)
427 | Command::InjectAuditEvents { .. } => None,
428 }
429 }
430}
431
432#[derive(Debug)]
433pub struct Response<T> {
434 pub result: Result<T, AdapterError>,
435 pub session: Session,
436 pub otel_ctx: OpenTelemetryContext,
437}
438
439#[derive(Debug, Clone, Copy)]
440pub struct SuperuserAttribute(pub Option<bool>);
441
442#[derive(Derivative)]
444#[derivative(Debug)]
445pub struct StartupResponse {
446 pub role_id: RoleId,
448 pub superuser_attribute: SuperuserAttribute,
453 #[derivative(Debug = "ignore")]
455 pub write_notify: BuiltinTableAppendNotify,
456 pub session_defaults: BTreeMap<String, OwnedVarInput>,
458 pub catalog: Arc<Catalog>,
459 pub storage_collections:
460 Arc<dyn mz_storage_client::storage_collections::StorageCollections + Send + Sync>,
461 pub transient_id_gen: Arc<TransientIdGen>,
462 pub optimizer_metrics: OptimizerMetrics,
463 pub persist_client: PersistClient,
464 pub statement_logging_frontend: StatementLoggingFrontend,
465}
466
467#[derive(Derivative)]
468#[derivative(Debug)]
469pub struct SASLChallengeResponse {
470 pub iteration_count: usize,
471 pub salt: String,
473 pub nonce: String,
474}
475
476#[derive(Derivative)]
477#[derivative(Debug)]
478pub struct SASLVerifyProofResponse {
479 pub verifier: String,
480}
481
482impl Transmittable for StartupResponse {
485 type Allowed = bool;
486 fn to_allowed(&self) -> Self::Allowed {
487 true
488 }
489}
490
491#[derive(Debug, Clone)]
493pub struct CatalogDump(String);
494
495impl CatalogDump {
496 pub fn new(raw: String) -> Self {
497 CatalogDump(raw)
498 }
499
500 pub fn into_string(self) -> String {
501 self.0
502 }
503}
504
505impl Transmittable for CatalogDump {
506 type Allowed = bool;
507 fn to_allowed(&self) -> Self::Allowed {
508 true
509 }
510}
511
512impl Transmittable for SystemVars {
513 type Allowed = bool;
514 fn to_allowed(&self) -> Self::Allowed {
515 true
516 }
517}
518
519#[derive(EnumKind, Derivative)]
521#[derivative(Debug)]
522#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
523pub enum ExecuteResponse {
524 AlteredDefaultPrivileges,
526 AlteredObject(ObjectType),
528 AlteredRole,
530 AlteredSystemConfiguration,
532 ClosedCursor,
534 Comment,
536 Copied(usize),
538 CopyTo {
540 format: mz_sql::plan::CopyFormat,
541 resp: Box<ExecuteResponse>,
542 },
543 CopyFrom {
544 target_id: CatalogItemId,
546 target_name: String,
548 columns: Vec<ColumnIndex>,
549 params: CopyFormatParams<'static>,
550 ctx_extra: ExecuteContextGuard,
551 },
552 CreatedConnection,
554 CreatedDatabase,
556 CreatedSchema,
558 CreatedRole,
560 CreatedCluster,
562 CreatedClusterReplica,
564 CreatedIndex,
566 CreatedIntrospectionSubscribe,
568 CreatedSecret,
570 CreatedSink,
572 CreatedSource,
574 CreatedTable,
576 CreatedView,
578 CreatedViews,
580 CreatedMaterializedView,
582 CreatedType,
584 CreatedNetworkPolicy,
586 Deallocate { all: bool },
588 DeclaredCursor,
590 Deleted(usize),
592 DiscardedTemp,
594 DiscardedAll,
596 DroppedObject(ObjectType),
598 DroppedOwned,
600 EmptyQuery,
602 Fetch {
604 name: String,
606 count: Option<FetchDirection>,
608 timeout: ExecuteTimeout,
610 ctx_extra: ExecuteContextGuard,
611 },
612 GrantedPrivilege,
614 GrantedRole,
616 Inserted(usize),
618 Prepare,
620 Raised,
622 ReassignOwned,
624 RevokedPrivilege,
626 RevokedRole,
628 SendingRowsStreaming {
630 #[derivative(Debug = "ignore")]
631 rows: Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>,
632 instance_id: ComputeInstanceId,
633 strategy: StatementExecutionStrategy,
634 },
635 SendingRowsImmediate {
638 #[derivative(Debug = "ignore")]
639 rows: Box<dyn RowIterator + Send + Sync>,
640 },
641 SetVariable {
643 name: String,
644 reset: bool,
646 },
647 StartedTransaction,
649 Subscribing {
652 rx: RowBatchStream,
653 ctx_extra: ExecuteContextGuard,
654 instance_id: ComputeInstanceId,
655 },
656 TransactionCommitted {
658 params: BTreeMap<&'static str, String>,
660 },
661 TransactionRolledBack {
663 params: BTreeMap<&'static str, String>,
665 },
666 Updated(usize),
668 ValidatedConnection,
670}
671
672impl TryFrom<&Statement<Raw>> for ExecuteResponse {
673 type Error = ();
674
675 fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
677 let resp_kinds = Plan::generated_from(&stmt.into())
678 .iter()
679 .map(ExecuteResponse::generated_from)
680 .flatten()
681 .cloned()
682 .collect::<BTreeSet<ExecuteResponseKind>>();
683 let resps = resp_kinds
684 .iter()
685 .map(|r| (*r).try_into())
686 .collect::<Result<Vec<ExecuteResponse>, _>>();
687 if let Ok(resps) = resps {
689 if resps.len() == 1 {
690 return Ok(resps.into_element());
691 }
692 }
693 let resp = match stmt {
694 Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
695 ExecuteResponse::DroppedObject((*object_type).into())
696 }
697 Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
698 | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
699 ExecuteResponse::AlteredObject((*object_type).into())
700 }
701 _ => return Err(()),
702 };
703 soft_assert_no_log!(
705 resp_kinds.len() == 1
706 && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
707 "ExecuteResponses out of sync with planner"
708 );
709 Ok(resp)
710 }
711}
712
713impl TryInto<ExecuteResponse> for ExecuteResponseKind {
714 type Error = ();
715
716 fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
719 match self {
720 ExecuteResponseKind::AlteredDefaultPrivileges => {
721 Ok(ExecuteResponse::AlteredDefaultPrivileges)
722 }
723 ExecuteResponseKind::AlteredObject => Err(()),
724 ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
725 ExecuteResponseKind::AlteredSystemConfiguration => {
726 Ok(ExecuteResponse::AlteredSystemConfiguration)
727 }
728 ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
729 ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
730 ExecuteResponseKind::Copied => Err(()),
731 ExecuteResponseKind::CopyTo => Err(()),
732 ExecuteResponseKind::CopyFrom => Err(()),
733 ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
734 ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
735 ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
736 ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
737 ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
738 ExecuteResponseKind::CreatedClusterReplica => {
739 Ok(ExecuteResponse::CreatedClusterReplica)
740 }
741 ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
742 ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
743 ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
744 ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
745 ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
746 ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
747 ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
748 ExecuteResponseKind::CreatedMaterializedView => {
749 Ok(ExecuteResponse::CreatedMaterializedView)
750 }
751 ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
752 ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
753 ExecuteResponseKind::Deallocate => Err(()),
754 ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
755 ExecuteResponseKind::Deleted => Err(()),
756 ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
757 ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
758 ExecuteResponseKind::DroppedObject => Err(()),
759 ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
760 ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
761 ExecuteResponseKind::Fetch => Err(()),
762 ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
763 ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
764 ExecuteResponseKind::Inserted => Err(()),
765 ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
766 ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
767 ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
768 ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
769 ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
770 ExecuteResponseKind::SetVariable => Err(()),
771 ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
772 ExecuteResponseKind::Subscribing => Err(()),
773 ExecuteResponseKind::TransactionCommitted => Err(()),
774 ExecuteResponseKind::TransactionRolledBack => Err(()),
775 ExecuteResponseKind::Updated => Err(()),
776 ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
777 ExecuteResponseKind::SendingRowsStreaming => Err(()),
778 ExecuteResponseKind::SendingRowsImmediate => Err(()),
779 ExecuteResponseKind::CreatedIntrospectionSubscribe => {
780 Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
781 }
782 }
783 }
784}
785
786impl ExecuteResponse {
787 pub fn tag(&self) -> Option<String> {
788 use ExecuteResponse::*;
789 match self {
790 AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
791 AlteredObject(o) => Some(format!("ALTER {}", o)),
792 AlteredRole => Some("ALTER ROLE".into()),
793 AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
794 ClosedCursor => Some("CLOSE CURSOR".into()),
795 Comment => Some("COMMENT".into()),
796 Copied(n) => Some(format!("COPY {}", n)),
797 CopyTo { .. } => None,
798 CopyFrom { .. } => None,
799 CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
800 CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
801 CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
802 CreatedRole => Some("CREATE ROLE".into()),
803 CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
804 CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
805 CreatedIndex { .. } => Some("CREATE INDEX".into()),
806 CreatedSecret { .. } => Some("CREATE SECRET".into()),
807 CreatedSink { .. } => Some("CREATE SINK".into()),
808 CreatedSource { .. } => Some("CREATE SOURCE".into()),
809 CreatedTable { .. } => Some("CREATE TABLE".into()),
810 CreatedView { .. } => Some("CREATE VIEW".into()),
811 CreatedViews { .. } => Some("CREATE VIEWS".into()),
812 CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
813 CreatedType => Some("CREATE TYPE".into()),
814 CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
815 Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
816 DeclaredCursor => Some("DECLARE CURSOR".into()),
817 Deleted(n) => Some(format!("DELETE {}", n)),
818 DiscardedTemp => Some("DISCARD TEMP".into()),
819 DiscardedAll => Some("DISCARD ALL".into()),
820 DroppedObject(o) => Some(format!("DROP {o}")),
821 DroppedOwned => Some("DROP OWNED".into()),
822 EmptyQuery => None,
823 Fetch { .. } => None,
824 GrantedPrivilege => Some("GRANT".into()),
825 GrantedRole => Some("GRANT ROLE".into()),
826 Inserted(n) => {
827 Some(format!("INSERT 0 {}", n))
835 }
836 Prepare => Some("PREPARE".into()),
837 Raised => Some("RAISE".into()),
838 ReassignOwned => Some("REASSIGN OWNED".into()),
839 RevokedPrivilege => Some("REVOKE".into()),
840 RevokedRole => Some("REVOKE ROLE".into()),
841 SendingRowsStreaming { .. } | SendingRowsImmediate { .. } => None,
842 SetVariable { reset: true, .. } => Some("RESET".into()),
843 SetVariable { reset: false, .. } => Some("SET".into()),
844 StartedTransaction { .. } => Some("BEGIN".into()),
845 Subscribing { .. } => None,
846 TransactionCommitted { .. } => Some("COMMIT".into()),
847 TransactionRolledBack { .. } => Some("ROLLBACK".into()),
848 Updated(n) => Some(format!("UPDATE {}", n)),
849 ValidatedConnection => Some("VALIDATE CONNECTION".into()),
850 CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
851 }
852 }
853
854 pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
858 use ExecuteResponseKind::*;
859 use PlanKind::*;
860
861 match plan {
862 AbortTransaction => &[TransactionRolledBack],
863 AlterClusterRename
864 | AlterClusterSwap
865 | AlterCluster
866 | AlterClusterReplicaRename
867 | AlterOwner
868 | AlterItemRename
869 | AlterRetainHistory
870 | AlterSourceTimestampInterval
871 | AlterNoop
872 | AlterSchemaRename
873 | AlterSchemaSwap
874 | AlterSecret
875 | AlterConnection
876 | AlterSource
877 | AlterSink
878 | AlterTableAddColumn
879 | AlterMaterializedViewApplyReplacement
880 | AlterNetworkPolicy => &[AlteredObject],
881 AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
882 AlterSetCluster => &[AlteredObject],
883 AlterRole => &[AlteredRole],
884 AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
885 &[AlteredSystemConfiguration]
886 }
887 Close => &[ClosedCursor],
888 PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
889 PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
890 PlanKind::Comment => &[ExecuteResponseKind::Comment],
891 CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
892 CreateConnection => &[CreatedConnection],
893 CreateDatabase => &[CreatedDatabase],
894 CreateSchema => &[CreatedSchema],
895 CreateRole => &[CreatedRole],
896 CreateCluster => &[CreatedCluster],
897 CreateClusterReplica => &[CreatedClusterReplica],
898 CreateSource | CreateSources => &[CreatedSource],
899 CreateSecret => &[CreatedSecret],
900 CreateSink => &[CreatedSink],
901 CreateTable => &[CreatedTable],
902 CreateView => &[CreatedView],
903 CreateMaterializedView => &[CreatedMaterializedView],
904 CreateIndex => &[CreatedIndex],
905 CreateType => &[CreatedType],
906 PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
907 CreateNetworkPolicy => &[CreatedNetworkPolicy],
908 Declare => &[DeclaredCursor],
909 DiscardTemp => &[DiscardedTemp],
910 DiscardAll => &[DiscardedAll],
911 DropObjects => &[DroppedObject],
912 DropOwned => &[DroppedOwned],
913 PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
914 ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
915 | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
916 ExecuteResponseKind::CopyTo,
917 SendingRowsStreaming,
918 SendingRowsImmediate,
919 ],
920 Execute | ReadThenWrite => &[
921 Deleted,
922 Inserted,
923 SendingRowsStreaming,
924 SendingRowsImmediate,
925 Updated,
926 ],
927 PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
928 GrantPrivileges => &[GrantedPrivilege],
929 GrantRole => &[GrantedRole],
930 Insert => &[Inserted, SendingRowsImmediate],
931 PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
932 PlanKind::Raise => &[ExecuteResponseKind::Raised],
933 PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
934 RevokePrivileges => &[RevokedPrivilege],
935 RevokeRole => &[RevokedRole],
936 PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
937 &[ExecuteResponseKind::SetVariable]
938 }
939 PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
940 StartTransaction => &[StartedTransaction],
941 SideEffectingFunc => &[SendingRowsStreaming, SendingRowsImmediate],
942 ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
943 }
944 }
945}
946
947impl Transmittable for ExecuteResponse {
951 type Allowed = ExecuteResponseKind;
952 fn to_allowed(&self) -> Self::Allowed {
953 ExecuteResponseKind::from(self)
954 }
955}