1use std::collections::{BTreeMap, BTreeSet};
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use chrono::{DateTime, Utc};
17use derivative::Derivative;
18use enum_kinds::EnumKind;
19use futures::Stream;
20use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
21use mz_auth::password::Password;
22use mz_cluster_client::ReplicaId;
23use mz_compute_types::ComputeInstanceId;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_controller_types::ClusterId;
26use mz_expr::RowSetFinishing;
27use mz_ore::collections::CollectionExt;
28use mz_ore::soft_assert_no_log;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_persist_client::PersistClient;
31use mz_pgcopy::CopyFormatParams;
32use mz_repr::global_id::TransientIdGen;
33use mz_repr::role_id::RoleId;
34use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType};
35use mz_sql::ast::{FetchDirection, Raw, Statement};
36use mz_sql::catalog::ObjectType;
37use mz_sql::optimizer_metrics::OptimizerMetrics;
38use mz_sql::plan;
39use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind, SideEffectingFunc};
40use mz_sql::session::user::User;
41use mz_sql::session::vars::{OwnedVarInput, SystemVars};
42use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
43use mz_storage_types::sources::Timeline;
44use mz_timestamp_oracle::TimestampOracle;
45use tokio::sync::{mpsc, oneshot};
46use uuid::Uuid;
47
48use crate::catalog::Catalog;
49use crate::coord::appends::BuiltinTableAppendNotify;
50use crate::coord::consistency::CoordinatorInconsistencies;
51use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
52use crate::coord::timestamp_selection::TimestampDetermination;
53use crate::coord::{ExecuteContextExtra, ExecuteContextGuard};
54use crate::error::AdapterError;
55use crate::session::{EndTransactionAction, RowBatchStream, Session};
56use crate::statement_logging::{
57 FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy,
58 StatementLoggingFrontend,
59};
60use crate::statement_logging::{StatementLoggingId, WatchSetCreation};
61use crate::util::Transmittable;
62use crate::webhook::AppendWebhookResponse;
63use crate::{
64 AdapterNotice, AppendWebhookError, CollectionIdBundle, ReadHolds, TimestampExplanation,
65};
66
67#[derive(Debug)]
70pub struct CopyFromStdinWriter {
71 pub batch_txs: Vec<mpsc::Sender<Vec<u8>>>,
74 pub completion_rx: oneshot::Receiver<
77 Result<(Vec<mz_persist_client::batch::ProtoBatch>, u64), crate::AdapterError>,
78 >,
79}
80
81#[derive(Debug)]
82pub struct CatalogSnapshot {
83 pub catalog: Arc<Catalog>,
84}
85
86#[derive(Debug)]
87pub enum Command {
88 CatalogSnapshot {
89 tx: oneshot::Sender<CatalogSnapshot>,
90 },
91
92 Startup {
93 tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
94 user: User,
95 conn_id: ConnectionId,
96 client_ip: Option<IpAddr>,
97 secret_key: u32,
98 uuid: Uuid,
99 application_name: String,
100 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
101 },
102
103 AuthenticatePassword {
104 tx: oneshot::Sender<Result<(), AdapterError>>,
105 role_name: String,
106 password: Option<Password>,
107 },
108
109 AuthenticateGetSASLChallenge {
110 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
111 role_name: String,
112 nonce: String,
113 },
114
115 AuthenticateVerifySASLProof {
116 tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
117 role_name: String,
118 proof: String,
119 auth_message: String,
120 mock_hash: String,
121 },
122
123 CheckRoleCanLogin {
124 tx: oneshot::Sender<Result<(), AdapterError>>,
125 role_name: String,
126 },
127
128 Execute {
129 portal_name: String,
130 session: Session,
131 tx: oneshot::Sender<Response<ExecuteResponse>>,
132 outer_ctx_extra: Option<ExecuteContextGuard>,
133 },
134
135 Commit {
140 action: EndTransactionAction,
141 session: Session,
142 tx: oneshot::Sender<Response<ExecuteResponse>>,
143 },
144
145 CancelRequest {
146 conn_id: ConnectionIdType,
147 secret_key: u32,
148 },
149
150 PrivilegedCancelRequest {
151 conn_id: ConnectionId,
152 },
153
154 GetWebhook {
155 database: String,
156 schema: String,
157 name: String,
158 tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
159 },
160
161 GetSystemVars {
162 tx: oneshot::Sender<SystemVars>,
163 },
164
165 SetSystemVars {
166 vars: BTreeMap<String, String>,
167 conn_id: ConnectionId,
168 tx: oneshot::Sender<Result<(), AdapterError>>,
169 },
170
171 InjectAuditEvents {
172 events: Vec<crate::catalog::InjectedAuditEvent>,
173 conn_id: ConnectionId,
174 tx: oneshot::Sender<Result<(), AdapterError>>,
175 },
176
177 Terminate {
178 conn_id: ConnectionId,
179 tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
180 },
181
182 StartCopyFromStdin {
186 target_id: CatalogItemId,
187 target_name: String,
188 columns: Vec<ColumnIndex>,
189 row_desc: mz_repr::RelationDesc,
191 params: mz_pgcopy::CopyFormatParams<'static>,
193 session: Session,
194 tx: oneshot::Sender<Response<CopyFromStdinWriter>>,
195 },
196
197 RetireExecute {
204 data: ExecuteContextExtra,
205 reason: StatementEndedExecutionReason,
206 },
207
208 CheckConsistency {
209 tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
210 },
211
212 Dump {
213 tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
214 },
215
216 GetComputeInstanceClient {
217 instance_id: ComputeInstanceId,
218 tx: oneshot::Sender<
219 Result<
220 mz_compute_client::controller::instance_client::InstanceClient<mz_repr::Timestamp>,
221 mz_compute_client::controller::error::InstanceMissing,
222 >,
223 >,
224 },
225
226 GetOracle {
227 timeline: Timeline,
228 tx: oneshot::Sender<
229 Result<Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync>, AdapterError>,
230 >,
231 },
232
233 DetermineRealTimeRecentTimestamp {
234 source_ids: BTreeSet<GlobalId>,
235 real_time_recency_timeout: Duration,
236 tx: oneshot::Sender<Result<Option<mz_repr::Timestamp>, AdapterError>>,
237 },
238
239 GetTransactionReadHoldsBundle {
240 conn_id: ConnectionId,
241 tx: oneshot::Sender<Option<ReadHolds<mz_repr::Timestamp>>>,
242 },
243
244 StoreTransactionReadHolds {
246 conn_id: ConnectionId,
247 read_holds: ReadHolds<mz_repr::Timestamp>,
248 tx: oneshot::Sender<()>,
249 },
250
251 ExecuteSlowPathPeek {
252 dataflow_plan: Box<PeekDataflowPlan<mz_repr::Timestamp>>,
253 determination: TimestampDetermination<mz_repr::Timestamp>,
254 finishing: RowSetFinishing,
255 compute_instance: ComputeInstanceId,
256 target_replica: Option<ReplicaId>,
257 intermediate_result_type: SqlRelationType,
258 source_ids: BTreeSet<GlobalId>,
259 conn_id: ConnectionId,
260 max_result_size: u64,
261 max_query_result_size: Option<u64>,
262 watch_set: Option<WatchSetCreation>,
265 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
266 },
267
268 ExecuteSubscribe {
269 df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
270 dependency_ids: BTreeSet<GlobalId>,
271 cluster_id: ComputeInstanceId,
272 replica_id: Option<ReplicaId>,
273 conn_id: ConnectionId,
274 session_uuid: Uuid,
275 read_holds: ReadHolds<mz_repr::Timestamp>,
276 plan: plan::SubscribePlan,
277 statement_logging_id: Option<StatementLoggingId>,
278 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
279 },
280
281 CopyToPreflight {
285 s3_sink_connection: mz_compute_types::sinks::CopyToS3OneshotSinkConnection,
287 sink_id: GlobalId,
289 tx: oneshot::Sender<Result<(), AdapterError>>,
291 },
292
293 ExecuteCopyTo {
294 df_desc: Box<DataflowDescription<mz_compute_types::plan::Plan>>,
295 compute_instance: ComputeInstanceId,
296 target_replica: Option<ReplicaId>,
297 source_ids: BTreeSet<GlobalId>,
298 conn_id: ConnectionId,
299 watch_set: Option<WatchSetCreation>,
302 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
303 },
304
305 ExecuteSideEffectingFunc {
307 plan: SideEffectingFunc,
308 conn_id: ConnectionId,
309 current_role: RoleId,
311 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
312 },
313
314 RegisterFrontendPeek {
318 uuid: Uuid,
319 conn_id: ConnectionId,
320 cluster_id: mz_controller_types::ClusterId,
321 depends_on: BTreeSet<GlobalId>,
322 is_fast_path: bool,
323 watch_set: Option<WatchSetCreation>,
326 tx: oneshot::Sender<Result<(), AdapterError>>,
327 },
328
329 UnregisterFrontendPeek {
334 uuid: Uuid,
335 tx: oneshot::Sender<()>,
336 },
337
338 ExplainTimestamp {
341 conn_id: ConnectionId,
342 session_wall_time: DateTime<Utc>,
343 cluster_id: ClusterId,
344 id_bundle: CollectionIdBundle,
345 determination: TimestampDetermination<mz_repr::Timestamp>,
346 tx: oneshot::Sender<TimestampExplanation<mz_repr::Timestamp>>,
347 },
348
349 FrontendStatementLogging(FrontendStatementLoggingEvent),
352}
353
354impl Command {
355 pub fn session(&self) -> Option<&Session> {
356 match self {
357 Command::Execute { session, .. }
358 | Command::Commit { session, .. }
359 | Command::StartCopyFromStdin { session, .. } => Some(session),
360 Command::CancelRequest { .. }
361 | Command::Startup { .. }
362 | Command::AuthenticatePassword { .. }
363 | Command::AuthenticateGetSASLChallenge { .. }
364 | Command::AuthenticateVerifySASLProof { .. }
365 | Command::CheckRoleCanLogin { .. }
366 | Command::CatalogSnapshot { .. }
367 | Command::PrivilegedCancelRequest { .. }
368 | Command::GetWebhook { .. }
369 | Command::Terminate { .. }
370 | Command::GetSystemVars { .. }
371 | Command::SetSystemVars { .. }
372 | Command::RetireExecute { .. }
373 | Command::CheckConsistency { .. }
374 | Command::Dump { .. }
375 | Command::GetComputeInstanceClient { .. }
376 | Command::GetOracle { .. }
377 | Command::DetermineRealTimeRecentTimestamp { .. }
378 | Command::GetTransactionReadHoldsBundle { .. }
379 | Command::StoreTransactionReadHolds { .. }
380 | Command::ExecuteSlowPathPeek { .. }
381 | Command::ExecuteSubscribe { .. }
382 | Command::CopyToPreflight { .. }
383 | Command::ExecuteCopyTo { .. }
384 | Command::ExecuteSideEffectingFunc { .. }
385 | Command::RegisterFrontendPeek { .. }
386 | Command::UnregisterFrontendPeek { .. }
387 | Command::ExplainTimestamp { .. }
388 | Command::FrontendStatementLogging(..)
389 | Command::InjectAuditEvents { .. } => None,
390 }
391 }
392
393 pub fn session_mut(&mut self) -> Option<&mut Session> {
394 match self {
395 Command::Execute { session, .. }
396 | Command::Commit { session, .. }
397 | Command::StartCopyFromStdin { session, .. } => Some(session),
398 Command::CancelRequest { .. }
399 | Command::Startup { .. }
400 | Command::AuthenticatePassword { .. }
401 | Command::AuthenticateGetSASLChallenge { .. }
402 | Command::AuthenticateVerifySASLProof { .. }
403 | Command::CheckRoleCanLogin { .. }
404 | Command::CatalogSnapshot { .. }
405 | Command::PrivilegedCancelRequest { .. }
406 | Command::GetWebhook { .. }
407 | Command::Terminate { .. }
408 | Command::GetSystemVars { .. }
409 | Command::SetSystemVars { .. }
410 | Command::RetireExecute { .. }
411 | Command::CheckConsistency { .. }
412 | Command::Dump { .. }
413 | Command::GetComputeInstanceClient { .. }
414 | Command::GetOracle { .. }
415 | Command::DetermineRealTimeRecentTimestamp { .. }
416 | Command::GetTransactionReadHoldsBundle { .. }
417 | Command::StoreTransactionReadHolds { .. }
418 | Command::ExecuteSlowPathPeek { .. }
419 | Command::ExecuteSubscribe { .. }
420 | Command::CopyToPreflight { .. }
421 | Command::ExecuteCopyTo { .. }
422 | Command::ExecuteSideEffectingFunc { .. }
423 | Command::RegisterFrontendPeek { .. }
424 | Command::UnregisterFrontendPeek { .. }
425 | Command::ExplainTimestamp { .. }
426 | Command::FrontendStatementLogging(..)
427 | Command::InjectAuditEvents { .. } => None,
428 }
429 }
430}
431
432#[derive(Debug)]
433pub struct Response<T> {
434 pub result: Result<T, AdapterError>,
435 pub session: Session,
436 pub otel_ctx: OpenTelemetryContext,
437}
438
439#[derive(Debug, Clone, Copy)]
440pub struct SuperuserAttribute(pub Option<bool>);
441
442#[derive(Derivative)]
444#[derivative(Debug)]
445pub struct StartupResponse {
446 pub role_id: RoleId,
448 pub superuser_attribute: SuperuserAttribute,
453 #[derivative(Debug = "ignore")]
455 pub write_notify: BuiltinTableAppendNotify,
456 pub session_defaults: BTreeMap<String, OwnedVarInput>,
458 pub catalog: Arc<Catalog>,
459 pub storage_collections: Arc<
460 dyn mz_storage_client::storage_collections::StorageCollections<
461 Timestamp = mz_repr::Timestamp,
462 > + Send
463 + Sync,
464 >,
465 pub transient_id_gen: Arc<TransientIdGen>,
466 pub optimizer_metrics: OptimizerMetrics,
467 pub persist_client: PersistClient,
468 pub statement_logging_frontend: StatementLoggingFrontend,
469}
470
471#[derive(Derivative)]
472#[derivative(Debug)]
473pub struct SASLChallengeResponse {
474 pub iteration_count: usize,
475 pub salt: String,
477 pub nonce: String,
478}
479
480#[derive(Derivative)]
481#[derivative(Debug)]
482pub struct SASLVerifyProofResponse {
483 pub verifier: String,
484}
485
486impl Transmittable for StartupResponse {
489 type Allowed = bool;
490 fn to_allowed(&self) -> Self::Allowed {
491 true
492 }
493}
494
495#[derive(Debug, Clone)]
497pub struct CatalogDump(String);
498
499impl CatalogDump {
500 pub fn new(raw: String) -> Self {
501 CatalogDump(raw)
502 }
503
504 pub fn into_string(self) -> String {
505 self.0
506 }
507}
508
509impl Transmittable for CatalogDump {
510 type Allowed = bool;
511 fn to_allowed(&self) -> Self::Allowed {
512 true
513 }
514}
515
516impl Transmittable for SystemVars {
517 type Allowed = bool;
518 fn to_allowed(&self) -> Self::Allowed {
519 true
520 }
521}
522
523#[derive(EnumKind, Derivative)]
525#[derivative(Debug)]
526#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
527pub enum ExecuteResponse {
528 AlteredDefaultPrivileges,
530 AlteredObject(ObjectType),
532 AlteredRole,
534 AlteredSystemConfiguration,
536 ClosedCursor,
538 Comment,
540 Copied(usize),
542 CopyTo {
544 format: mz_sql::plan::CopyFormat,
545 resp: Box<ExecuteResponse>,
546 },
547 CopyFrom {
548 target_id: CatalogItemId,
550 target_name: String,
552 columns: Vec<ColumnIndex>,
553 params: CopyFormatParams<'static>,
554 ctx_extra: ExecuteContextGuard,
555 },
556 CreatedConnection,
558 CreatedDatabase,
560 CreatedSchema,
562 CreatedRole,
564 CreatedCluster,
566 CreatedClusterReplica,
568 CreatedIndex,
570 CreatedIntrospectionSubscribe,
572 CreatedSecret,
574 CreatedSink,
576 CreatedSource,
578 CreatedTable,
580 CreatedView,
582 CreatedViews,
584 CreatedMaterializedView,
586 CreatedContinualTask,
588 CreatedType,
590 CreatedNetworkPolicy,
592 Deallocate { all: bool },
594 DeclaredCursor,
596 Deleted(usize),
598 DiscardedTemp,
600 DiscardedAll,
602 DroppedObject(ObjectType),
604 DroppedOwned,
606 EmptyQuery,
608 Fetch {
610 name: String,
612 count: Option<FetchDirection>,
614 timeout: ExecuteTimeout,
616 ctx_extra: ExecuteContextGuard,
617 },
618 GrantedPrivilege,
620 GrantedRole,
622 Inserted(usize),
624 Prepare,
626 Raised,
628 ReassignOwned,
630 RevokedPrivilege,
632 RevokedRole,
634 SendingRowsStreaming {
636 #[derivative(Debug = "ignore")]
637 rows: Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>,
638 instance_id: ComputeInstanceId,
639 strategy: StatementExecutionStrategy,
640 },
641 SendingRowsImmediate {
644 #[derivative(Debug = "ignore")]
645 rows: Box<dyn RowIterator + Send + Sync>,
646 },
647 SetVariable {
649 name: String,
650 reset: bool,
652 },
653 StartedTransaction,
655 Subscribing {
658 rx: RowBatchStream,
659 ctx_extra: ExecuteContextGuard,
660 instance_id: ComputeInstanceId,
661 },
662 TransactionCommitted {
664 params: BTreeMap<&'static str, String>,
666 },
667 TransactionRolledBack {
669 params: BTreeMap<&'static str, String>,
671 },
672 Updated(usize),
674 ValidatedConnection,
676}
677
678impl TryFrom<&Statement<Raw>> for ExecuteResponse {
679 type Error = ();
680
681 fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
683 let resp_kinds = Plan::generated_from(&stmt.into())
684 .iter()
685 .map(ExecuteResponse::generated_from)
686 .flatten()
687 .cloned()
688 .collect::<BTreeSet<ExecuteResponseKind>>();
689 let resps = resp_kinds
690 .iter()
691 .map(|r| (*r).try_into())
692 .collect::<Result<Vec<ExecuteResponse>, _>>();
693 if let Ok(resps) = resps {
695 if resps.len() == 1 {
696 return Ok(resps.into_element());
697 }
698 }
699 let resp = match stmt {
700 Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
701 ExecuteResponse::DroppedObject((*object_type).into())
702 }
703 Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
704 | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
705 ExecuteResponse::AlteredObject((*object_type).into())
706 }
707 _ => return Err(()),
708 };
709 soft_assert_no_log!(
711 resp_kinds.len() == 1
712 && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
713 "ExecuteResponses out of sync with planner"
714 );
715 Ok(resp)
716 }
717}
718
719impl TryInto<ExecuteResponse> for ExecuteResponseKind {
720 type Error = ();
721
722 fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
725 match self {
726 ExecuteResponseKind::AlteredDefaultPrivileges => {
727 Ok(ExecuteResponse::AlteredDefaultPrivileges)
728 }
729 ExecuteResponseKind::AlteredObject => Err(()),
730 ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
731 ExecuteResponseKind::AlteredSystemConfiguration => {
732 Ok(ExecuteResponse::AlteredSystemConfiguration)
733 }
734 ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
735 ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
736 ExecuteResponseKind::Copied => Err(()),
737 ExecuteResponseKind::CopyTo => Err(()),
738 ExecuteResponseKind::CopyFrom => Err(()),
739 ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
740 ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
741 ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
742 ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
743 ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
744 ExecuteResponseKind::CreatedClusterReplica => {
745 Ok(ExecuteResponse::CreatedClusterReplica)
746 }
747 ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
748 ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
749 ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
750 ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
751 ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
752 ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
753 ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
754 ExecuteResponseKind::CreatedMaterializedView => {
755 Ok(ExecuteResponse::CreatedMaterializedView)
756 }
757 ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
758 ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
759 ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
760 ExecuteResponseKind::Deallocate => Err(()),
761 ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
762 ExecuteResponseKind::Deleted => Err(()),
763 ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
764 ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
765 ExecuteResponseKind::DroppedObject => Err(()),
766 ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
767 ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
768 ExecuteResponseKind::Fetch => Err(()),
769 ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
770 ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
771 ExecuteResponseKind::Inserted => Err(()),
772 ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
773 ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
774 ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
775 ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
776 ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
777 ExecuteResponseKind::SetVariable => Err(()),
778 ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
779 ExecuteResponseKind::Subscribing => Err(()),
780 ExecuteResponseKind::TransactionCommitted => Err(()),
781 ExecuteResponseKind::TransactionRolledBack => Err(()),
782 ExecuteResponseKind::Updated => Err(()),
783 ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
784 ExecuteResponseKind::SendingRowsStreaming => Err(()),
785 ExecuteResponseKind::SendingRowsImmediate => Err(()),
786 ExecuteResponseKind::CreatedIntrospectionSubscribe => {
787 Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
788 }
789 }
790 }
791}
792
793impl ExecuteResponse {
794 pub fn tag(&self) -> Option<String> {
795 use ExecuteResponse::*;
796 match self {
797 AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
798 AlteredObject(o) => Some(format!("ALTER {}", o)),
799 AlteredRole => Some("ALTER ROLE".into()),
800 AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
801 ClosedCursor => Some("CLOSE CURSOR".into()),
802 Comment => Some("COMMENT".into()),
803 Copied(n) => Some(format!("COPY {}", n)),
804 CopyTo { .. } => None,
805 CopyFrom { .. } => None,
806 CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
807 CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
808 CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
809 CreatedRole => Some("CREATE ROLE".into()),
810 CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
811 CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
812 CreatedIndex { .. } => Some("CREATE INDEX".into()),
813 CreatedSecret { .. } => Some("CREATE SECRET".into()),
814 CreatedSink { .. } => Some("CREATE SINK".into()),
815 CreatedSource { .. } => Some("CREATE SOURCE".into()),
816 CreatedTable { .. } => Some("CREATE TABLE".into()),
817 CreatedView { .. } => Some("CREATE VIEW".into()),
818 CreatedViews { .. } => Some("CREATE VIEWS".into()),
819 CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
820 CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
821 CreatedType => Some("CREATE TYPE".into()),
822 CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
823 Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
824 DeclaredCursor => Some("DECLARE CURSOR".into()),
825 Deleted(n) => Some(format!("DELETE {}", n)),
826 DiscardedTemp => Some("DISCARD TEMP".into()),
827 DiscardedAll => Some("DISCARD ALL".into()),
828 DroppedObject(o) => Some(format!("DROP {o}")),
829 DroppedOwned => Some("DROP OWNED".into()),
830 EmptyQuery => None,
831 Fetch { .. } => None,
832 GrantedPrivilege => Some("GRANT".into()),
833 GrantedRole => Some("GRANT ROLE".into()),
834 Inserted(n) => {
835 Some(format!("INSERT 0 {}", n))
843 }
844 Prepare => Some("PREPARE".into()),
845 Raised => Some("RAISE".into()),
846 ReassignOwned => Some("REASSIGN OWNED".into()),
847 RevokedPrivilege => Some("REVOKE".into()),
848 RevokedRole => Some("REVOKE ROLE".into()),
849 SendingRowsStreaming { .. } | SendingRowsImmediate { .. } => None,
850 SetVariable { reset: true, .. } => Some("RESET".into()),
851 SetVariable { reset: false, .. } => Some("SET".into()),
852 StartedTransaction { .. } => Some("BEGIN".into()),
853 Subscribing { .. } => None,
854 TransactionCommitted { .. } => Some("COMMIT".into()),
855 TransactionRolledBack { .. } => Some("ROLLBACK".into()),
856 Updated(n) => Some(format!("UPDATE {}", n)),
857 ValidatedConnection => Some("VALIDATE CONNECTION".into()),
858 CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
859 }
860 }
861
862 pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
866 use ExecuteResponseKind::*;
867 use PlanKind::*;
868
869 match plan {
870 AbortTransaction => &[TransactionRolledBack],
871 AlterClusterRename
872 | AlterClusterSwap
873 | AlterCluster
874 | AlterClusterReplicaRename
875 | AlterOwner
876 | AlterItemRename
877 | AlterRetainHistory
878 | AlterSourceTimestampInterval
879 | AlterNoop
880 | AlterSchemaRename
881 | AlterSchemaSwap
882 | AlterSecret
883 | AlterConnection
884 | AlterSource
885 | AlterSink
886 | AlterTableAddColumn
887 | AlterMaterializedViewApplyReplacement
888 | AlterNetworkPolicy => &[AlteredObject],
889 AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
890 AlterSetCluster => &[AlteredObject],
891 AlterRole => &[AlteredRole],
892 AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
893 &[AlteredSystemConfiguration]
894 }
895 Close => &[ClosedCursor],
896 PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
897 PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
898 PlanKind::Comment => &[ExecuteResponseKind::Comment],
899 CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
900 CreateConnection => &[CreatedConnection],
901 CreateDatabase => &[CreatedDatabase],
902 CreateSchema => &[CreatedSchema],
903 CreateRole => &[CreatedRole],
904 CreateCluster => &[CreatedCluster],
905 CreateClusterReplica => &[CreatedClusterReplica],
906 CreateSource | CreateSources => &[CreatedSource],
907 CreateSecret => &[CreatedSecret],
908 CreateSink => &[CreatedSink],
909 CreateTable => &[CreatedTable],
910 CreateView => &[CreatedView],
911 CreateMaterializedView => &[CreatedMaterializedView],
912 CreateContinualTask => &[CreatedContinualTask],
913 CreateIndex => &[CreatedIndex],
914 CreateType => &[CreatedType],
915 PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
916 CreateNetworkPolicy => &[CreatedNetworkPolicy],
917 Declare => &[DeclaredCursor],
918 DiscardTemp => &[DiscardedTemp],
919 DiscardAll => &[DiscardedAll],
920 DropObjects => &[DroppedObject],
921 DropOwned => &[DroppedOwned],
922 PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
923 ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
924 | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
925 ExecuteResponseKind::CopyTo,
926 SendingRowsStreaming,
927 SendingRowsImmediate,
928 ],
929 Execute | ReadThenWrite => &[
930 Deleted,
931 Inserted,
932 SendingRowsStreaming,
933 SendingRowsImmediate,
934 Updated,
935 ],
936 PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
937 GrantPrivileges => &[GrantedPrivilege],
938 GrantRole => &[GrantedRole],
939 Insert => &[Inserted, SendingRowsImmediate],
940 PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
941 PlanKind::Raise => &[ExecuteResponseKind::Raised],
942 PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
943 RevokePrivileges => &[RevokedPrivilege],
944 RevokeRole => &[RevokedRole],
945 PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
946 &[ExecuteResponseKind::SetVariable]
947 }
948 PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
949 StartTransaction => &[StartedTransaction],
950 SideEffectingFunc => &[SendingRowsStreaming, SendingRowsImmediate],
951 ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
952 }
953 }
954}
955
956impl Transmittable for ExecuteResponse {
960 type Allowed = ExecuteResponseKind;
961 fn to_allowed(&self) -> Self::Allowed {
962 ExecuteResponseKind::from(self)
963 }
964}