1use std::collections::{BTreeMap, BTreeSet};
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use chrono::{DateTime, Utc};
17use derivative::Derivative;
18use enum_kinds::EnumKind;
19use futures::Stream;
20use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
21use mz_auth::password::Password;
22use mz_cluster_client::ReplicaId;
23use mz_compute_types::ComputeInstanceId;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_controller_types::ClusterId;
26use mz_expr::RowSetFinishing;
27use mz_ore::collections::CollectionExt;
28use mz_ore::soft_assert_no_log;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_persist_client::PersistClient;
31use mz_pgcopy::CopyFormatParams;
32use mz_repr::global_id::TransientIdGen;
33use mz_repr::role_id::RoleId;
34use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType};
35use mz_sql::ast::{FetchDirection, Raw, Statement};
36use mz_sql::catalog::ObjectType;
37use mz_sql::optimizer_metrics::OptimizerMetrics;
38use mz_sql::plan;
39use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind, SideEffectingFunc};
40use mz_sql::session::user::User;
41use mz_sql::session::vars::{OwnedVarInput, SystemVars};
42use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
43use mz_storage_types::sources::Timeline;
44use mz_timestamp_oracle::TimestampOracle;
45use tokio::sync::{mpsc, oneshot};
46use uuid::Uuid;
47
48use crate::catalog::Catalog;
49use crate::config::{ScopedParameters, ScopedParametersScope, SystemParameterFrontend};
50use crate::coord::appends::BuiltinTableAppendNotify;
51use crate::coord::consistency::CoordinatorInconsistencies;
52use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
53use crate::coord::timestamp_selection::TimestampDetermination;
54use crate::coord::{ExecuteContextExtra, ExecuteContextGuard};
55use crate::error::AdapterError;
56use crate::session::{EndTransactionAction, RowBatchStream, Session};
57use crate::statement_logging::{
58 FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy,
59 StatementLoggingFrontend,
60};
61use crate::statement_logging::{StatementLoggingId, WatchSetCreation};
62use crate::util::Transmittable;
63use crate::webhook::AppendWebhookResponse;
64use crate::{
65 AdapterNotice, AppendWebhookError, CollectionIdBundle, ReadHolds, TimestampExplanation,
66};
67
68#[derive(Debug)]
71pub struct CopyFromStdinWriter {
72 pub batch_txs: Vec<mpsc::Sender<Vec<u8>>>,
75 pub completion_rx: oneshot::Receiver<
78 Result<(Vec<mz_persist_client::batch::ProtoBatch>, u64), crate::AdapterError>,
79 >,
80}
81
82#[derive(Debug)]
83pub struct CatalogSnapshot {
84 pub catalog: Arc<Catalog>,
85}
86
87#[derive(Debug)]
88pub enum Command {
89 CatalogSnapshot {
90 tx: oneshot::Sender<CatalogSnapshot>,
91 },
92
93 Startup {
94 tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
95 user: User,
96 conn_id: ConnectionId,
97 client_ip: Option<IpAddr>,
98 secret_key: u32,
99 uuid: Uuid,
100 application_name: String,
101 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
102 },
103
104 AuthenticatePassword {
105 tx: oneshot::Sender<Result<(), AdapterError>>,
106 role_name: String,
107 password: Option<Password>,
108 },
109
110 AuthenticateGetSASLChallenge {
111 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
112 role_name: String,
113 nonce: String,
114 },
115
116 AuthenticateVerifySASLProof {
117 tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
118 role_name: String,
119 proof: String,
120 auth_message: String,
121 mock_hash: String,
122 },
123
124 CheckRoleCanLogin {
125 tx: oneshot::Sender<Result<(), AdapterError>>,
126 role_name: String,
127 },
128
129 Execute {
130 portal_name: String,
131 session: Session,
132 tx: oneshot::Sender<Response<ExecuteResponse>>,
133 outer_ctx_extra: Option<ExecuteContextGuard>,
134 },
135
136 Commit {
141 action: EndTransactionAction,
142 session: Session,
143 tx: oneshot::Sender<Response<ExecuteResponse>>,
144 },
145
146 CancelRequest {
147 conn_id: ConnectionIdType,
148 secret_key: u32,
149 },
150
151 PrivilegedCancelRequest {
152 conn_id: ConnectionId,
153 },
154
155 GetWebhook {
156 database: String,
157 schema: String,
158 name: String,
159 tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
160 },
161
162 GetSystemVars {
163 tx: oneshot::Sender<SystemVars>,
164 },
165
166 SetSystemVars {
167 vars: BTreeMap<String, String>,
168 conn_id: ConnectionId,
169 tx: oneshot::Sender<Result<(), AdapterError>>,
170 },
171
172 UpdateScopedSystemParameters {
179 overrides: ScopedParameters,
180 prune_scope: Option<ScopedParametersScope>,
183 tx: oneshot::Sender<()>,
184 },
185
186 InstallScopedSystemParameterFrontend {
193 frontend: Arc<SystemParameterFrontend>,
194 },
195
196 InjectAuditEvents {
197 events: Vec<crate::catalog::InjectedAuditEvent>,
198 conn_id: ConnectionId,
199 tx: oneshot::Sender<Result<(), AdapterError>>,
200 },
201
202 Terminate {
203 conn_id: ConnectionId,
204 tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
205 },
206
207 StartCopyFromStdin {
211 target_id: CatalogItemId,
212 target_name: String,
213 columns: Vec<ColumnIndex>,
214 row_desc: mz_repr::RelationDesc,
216 params: mz_pgcopy::CopyFormatParams<'static>,
218 session: Session,
219 tx: oneshot::Sender<Response<CopyFromStdinWriter>>,
220 },
221
222 RetireExecute {
229 data: ExecuteContextExtra,
230 reason: StatementEndedExecutionReason,
231 },
232
233 CheckConsistency {
234 tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
235 },
236
237 Dump {
238 tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
239 },
240
241 GetComputeInstanceClient {
242 instance_id: ComputeInstanceId,
243 tx: oneshot::Sender<
244 Result<
245 mz_compute_client::controller::instance_client::InstanceClient,
246 mz_compute_client::controller::error::InstanceMissing,
247 >,
248 >,
249 },
250
251 GetOracle {
252 timeline: Timeline,
253 tx: oneshot::Sender<
254 Result<Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync>, AdapterError>,
255 >,
256 },
257
258 DetermineRealTimeRecentTimestamp {
259 source_ids: BTreeSet<GlobalId>,
260 real_time_recency_timeout: Duration,
261 tx: oneshot::Sender<Result<Option<mz_repr::Timestamp>, AdapterError>>,
262 },
263
264 GetTransactionReadHoldsBundle {
265 conn_id: ConnectionId,
266 tx: oneshot::Sender<Option<ReadHolds>>,
267 },
268
269 StoreTransactionReadHolds {
271 conn_id: ConnectionId,
272 read_holds: ReadHolds,
273 tx: oneshot::Sender<()>,
274 },
275
276 ExecuteSlowPathPeek {
277 dataflow_plan: Box<PeekDataflowPlan>,
278 determination: TimestampDetermination,
279 finishing: RowSetFinishing,
280 compute_instance: ComputeInstanceId,
281 target_replica: Option<ReplicaId>,
282 intermediate_result_type: SqlRelationType,
283 source_ids: BTreeSet<GlobalId>,
284 conn_id: ConnectionId,
285 max_result_size: u64,
286 max_query_result_size: Option<u64>,
287 watch_set: Option<WatchSetCreation>,
290 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
291 },
292
293 ExecuteSubscribe {
294 df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
295 dependency_ids: BTreeSet<GlobalId>,
296 cluster_id: ComputeInstanceId,
297 replica_id: Option<ReplicaId>,
298 conn_id: ConnectionId,
299 session_uuid: Uuid,
300 read_holds: ReadHolds,
301 plan: plan::SubscribePlan,
302 statement_logging_id: Option<StatementLoggingId>,
303 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
304 },
305
306 CopyToPreflight {
310 s3_sink_connection: mz_compute_types::sinks::CopyToS3OneshotSinkConnection,
312 sink_id: GlobalId,
314 tx: oneshot::Sender<Result<(), AdapterError>>,
316 },
317
318 ExecuteCopyTo {
319 df_desc: Box<DataflowDescription<mz_compute_types::plan::Plan>>,
320 compute_instance: ComputeInstanceId,
321 target_replica: Option<ReplicaId>,
322 source_ids: BTreeSet<GlobalId>,
323 conn_id: ConnectionId,
324 watch_set: Option<WatchSetCreation>,
327 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
328 },
329
330 ExecuteSideEffectingFunc {
332 plan: SideEffectingFunc,
333 conn_id: ConnectionId,
334 current_role: RoleId,
336 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
337 },
338
339 RegisterFrontendPeek {
343 uuid: Uuid,
344 conn_id: ConnectionId,
345 cluster_id: mz_controller_types::ClusterId,
346 depends_on: BTreeSet<GlobalId>,
347 is_fast_path: bool,
348 watch_set: Option<WatchSetCreation>,
351 tx: oneshot::Sender<Result<(), AdapterError>>,
352 },
353
354 UnregisterFrontendPeek {
359 uuid: Uuid,
360 tx: oneshot::Sender<()>,
361 },
362
363 ExplainTimestamp {
366 conn_id: ConnectionId,
367 session_wall_time: DateTime<Utc>,
368 cluster_id: ClusterId,
369 id_bundle: CollectionIdBundle,
370 determination: TimestampDetermination,
371 tx: oneshot::Sender<TimestampExplanation>,
372 },
373
374 FrontendStatementLogging(FrontendStatementLoggingEvent),
377}
378
379impl Command {
380 pub fn session(&self) -> Option<&Session> {
381 match self {
382 Command::Execute { session, .. }
383 | Command::Commit { session, .. }
384 | Command::StartCopyFromStdin { session, .. } => Some(session),
385 Command::CancelRequest { .. }
386 | Command::Startup { .. }
387 | Command::AuthenticatePassword { .. }
388 | Command::AuthenticateGetSASLChallenge { .. }
389 | Command::AuthenticateVerifySASLProof { .. }
390 | Command::CheckRoleCanLogin { .. }
391 | Command::CatalogSnapshot { .. }
392 | Command::PrivilegedCancelRequest { .. }
393 | Command::GetWebhook { .. }
394 | Command::Terminate { .. }
395 | Command::GetSystemVars { .. }
396 | Command::SetSystemVars { .. }
397 | Command::UpdateScopedSystemParameters { .. }
398 | Command::InstallScopedSystemParameterFrontend { .. }
399 | Command::RetireExecute { .. }
400 | Command::CheckConsistency { .. }
401 | Command::Dump { .. }
402 | Command::GetComputeInstanceClient { .. }
403 | Command::GetOracle { .. }
404 | Command::DetermineRealTimeRecentTimestamp { .. }
405 | Command::GetTransactionReadHoldsBundle { .. }
406 | Command::StoreTransactionReadHolds { .. }
407 | Command::ExecuteSlowPathPeek { .. }
408 | Command::ExecuteSubscribe { .. }
409 | Command::CopyToPreflight { .. }
410 | Command::ExecuteCopyTo { .. }
411 | Command::ExecuteSideEffectingFunc { .. }
412 | Command::RegisterFrontendPeek { .. }
413 | Command::UnregisterFrontendPeek { .. }
414 | Command::ExplainTimestamp { .. }
415 | Command::FrontendStatementLogging(..)
416 | Command::InjectAuditEvents { .. } => None,
417 }
418 }
419
420 pub fn session_mut(&mut self) -> Option<&mut Session> {
421 match self {
422 Command::Execute { session, .. }
423 | Command::Commit { session, .. }
424 | Command::StartCopyFromStdin { session, .. } => Some(session),
425 Command::CancelRequest { .. }
426 | Command::Startup { .. }
427 | Command::AuthenticatePassword { .. }
428 | Command::AuthenticateGetSASLChallenge { .. }
429 | Command::AuthenticateVerifySASLProof { .. }
430 | Command::CheckRoleCanLogin { .. }
431 | Command::CatalogSnapshot { .. }
432 | Command::PrivilegedCancelRequest { .. }
433 | Command::GetWebhook { .. }
434 | Command::Terminate { .. }
435 | Command::GetSystemVars { .. }
436 | Command::SetSystemVars { .. }
437 | Command::UpdateScopedSystemParameters { .. }
438 | Command::InstallScopedSystemParameterFrontend { .. }
439 | Command::RetireExecute { .. }
440 | Command::CheckConsistency { .. }
441 | Command::Dump { .. }
442 | Command::GetComputeInstanceClient { .. }
443 | Command::GetOracle { .. }
444 | Command::DetermineRealTimeRecentTimestamp { .. }
445 | Command::GetTransactionReadHoldsBundle { .. }
446 | Command::StoreTransactionReadHolds { .. }
447 | Command::ExecuteSlowPathPeek { .. }
448 | Command::ExecuteSubscribe { .. }
449 | Command::CopyToPreflight { .. }
450 | Command::ExecuteCopyTo { .. }
451 | Command::ExecuteSideEffectingFunc { .. }
452 | Command::RegisterFrontendPeek { .. }
453 | Command::UnregisterFrontendPeek { .. }
454 | Command::ExplainTimestamp { .. }
455 | Command::FrontendStatementLogging(..)
456 | Command::InjectAuditEvents { .. } => None,
457 }
458 }
459}
460
461#[derive(Debug)]
462pub struct Response<T> {
463 pub result: Result<T, AdapterError>,
464 pub session: Session,
465 pub otel_ctx: OpenTelemetryContext,
466}
467
468#[derive(Debug, Clone, Copy)]
469pub struct SuperuserAttribute(pub Option<bool>);
470
471#[derive(Derivative)]
473#[derivative(Debug)]
474pub struct StartupResponse {
475 pub role_id: RoleId,
477 pub superuser_attribute: SuperuserAttribute,
482 #[derivative(Debug = "ignore")]
484 pub write_notify: BuiltinTableAppendNotify,
485 pub session_defaults: BTreeMap<String, OwnedVarInput>,
487 pub catalog: Arc<Catalog>,
488 pub storage_collections:
489 Arc<dyn mz_storage_client::storage_collections::StorageCollections + Send + Sync>,
490 pub transient_id_gen: Arc<TransientIdGen>,
491 pub optimizer_metrics: OptimizerMetrics,
492 pub persist_client: PersistClient,
493 pub statement_logging_frontend: StatementLoggingFrontend,
494}
495
496#[derive(Derivative)]
497#[derivative(Debug)]
498pub struct SASLChallengeResponse {
499 pub iteration_count: usize,
500 pub salt: String,
502 pub nonce: String,
503}
504
505#[derive(Derivative)]
506#[derivative(Debug)]
507pub struct SASLVerifyProofResponse {
508 pub verifier: String,
509}
510
511impl Transmittable for StartupResponse {
514 type Allowed = bool;
515 fn to_allowed(&self) -> Self::Allowed {
516 true
517 }
518}
519
520#[derive(Debug, Clone)]
522pub struct CatalogDump(String);
523
524impl CatalogDump {
525 pub fn new(raw: String) -> Self {
526 CatalogDump(raw)
527 }
528
529 pub fn into_string(self) -> String {
530 self.0
531 }
532}
533
534impl Transmittable for CatalogDump {
535 type Allowed = bool;
536 fn to_allowed(&self) -> Self::Allowed {
537 true
538 }
539}
540
541impl Transmittable for SystemVars {
542 type Allowed = bool;
543 fn to_allowed(&self) -> Self::Allowed {
544 true
545 }
546}
547
548#[derive(EnumKind, Derivative)]
550#[derivative(Debug)]
551#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
552pub enum ExecuteResponse {
553 AlteredDefaultPrivileges,
555 AlteredObject(ObjectType),
557 AlteredRole,
559 AlteredSystemConfiguration,
561 ClosedCursor,
563 Comment,
565 Copied(usize),
567 CopyTo {
569 format: mz_sql::plan::CopyFormat,
570 resp: Box<ExecuteResponse>,
571 },
572 CopyFrom {
573 target_id: CatalogItemId,
575 target_name: String,
577 columns: Vec<ColumnIndex>,
578 params: CopyFormatParams<'static>,
579 ctx_extra: ExecuteContextGuard,
580 },
581 CreatedConnection,
583 CreatedDatabase,
585 CreatedSchema,
587 CreatedRole,
589 CreatedCluster,
591 CreatedClusterReplica,
593 CreatedIndex,
595 CreatedIntrospectionSubscribe,
597 CreatedSecret,
599 CreatedSink,
601 CreatedSource,
603 CreatedTable,
605 CreatedView,
607 CreatedViews,
609 CreatedMaterializedView,
611 CreatedType,
613 CreatedNetworkPolicy,
615 Deallocate { all: bool },
617 DeclaredCursor,
619 Deleted(usize),
621 DiscardedTemp,
623 DiscardedAll,
625 DroppedObject(ObjectType),
627 DroppedOwned,
629 EmptyQuery,
631 Fetch {
633 name: String,
635 count: Option<FetchDirection>,
637 timeout: ExecuteTimeout,
639 ctx_extra: ExecuteContextGuard,
640 },
641 GrantedPrivilege,
643 GrantedRole,
645 Inserted(usize),
647 Prepare,
649 Raised,
651 ReassignOwned,
653 RevokedPrivilege,
655 RevokedRole,
657 SendingRowsStreaming {
659 #[derivative(Debug = "ignore")]
660 rows: Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>,
661 instance_id: ComputeInstanceId,
662 strategy: StatementExecutionStrategy,
663 },
664 SendingRowsImmediate {
667 #[derivative(Debug = "ignore")]
668 rows: Box<dyn RowIterator + Send + Sync>,
669 },
670 SetVariable {
672 name: String,
673 reset: bool,
675 },
676 StartedTransaction,
678 Subscribing {
681 rx: RowBatchStream,
682 ctx_extra: ExecuteContextGuard,
683 instance_id: ComputeInstanceId,
684 },
685 TransactionCommitted {
687 params: BTreeMap<&'static str, String>,
689 },
690 TransactionRolledBack {
692 params: BTreeMap<&'static str, String>,
694 },
695 Updated(usize),
697 ValidatedConnection,
699}
700
701impl TryFrom<&Statement<Raw>> for ExecuteResponse {
702 type Error = ();
703
704 fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
706 let resp_kinds = Plan::generated_from(&stmt.into())
707 .iter()
708 .map(ExecuteResponse::generated_from)
709 .flatten()
710 .cloned()
711 .collect::<BTreeSet<ExecuteResponseKind>>();
712 let resps = resp_kinds
713 .iter()
714 .map(|r| (*r).try_into())
715 .collect::<Result<Vec<ExecuteResponse>, _>>();
716 if let Ok(resps) = resps {
718 if resps.len() == 1 {
719 return Ok(resps.into_element());
720 }
721 }
722 let resp = match stmt {
723 Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
724 ExecuteResponse::DroppedObject((*object_type).into())
725 }
726 Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
727 | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
728 ExecuteResponse::AlteredObject((*object_type).into())
729 }
730 _ => return Err(()),
731 };
732 soft_assert_no_log!(
734 resp_kinds.len() == 1
735 && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
736 "ExecuteResponses out of sync with planner"
737 );
738 Ok(resp)
739 }
740}
741
742impl TryInto<ExecuteResponse> for ExecuteResponseKind {
743 type Error = ();
744
745 fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
748 match self {
749 ExecuteResponseKind::AlteredDefaultPrivileges => {
750 Ok(ExecuteResponse::AlteredDefaultPrivileges)
751 }
752 ExecuteResponseKind::AlteredObject => Err(()),
753 ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
754 ExecuteResponseKind::AlteredSystemConfiguration => {
755 Ok(ExecuteResponse::AlteredSystemConfiguration)
756 }
757 ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
758 ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
759 ExecuteResponseKind::Copied => Err(()),
760 ExecuteResponseKind::CopyTo => Err(()),
761 ExecuteResponseKind::CopyFrom => Err(()),
762 ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
763 ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
764 ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
765 ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
766 ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
767 ExecuteResponseKind::CreatedClusterReplica => {
768 Ok(ExecuteResponse::CreatedClusterReplica)
769 }
770 ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
771 ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
772 ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
773 ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
774 ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
775 ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
776 ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
777 ExecuteResponseKind::CreatedMaterializedView => {
778 Ok(ExecuteResponse::CreatedMaterializedView)
779 }
780 ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
781 ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
782 ExecuteResponseKind::Deallocate => Err(()),
783 ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
784 ExecuteResponseKind::Deleted => Err(()),
785 ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
786 ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
787 ExecuteResponseKind::DroppedObject => Err(()),
788 ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
789 ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
790 ExecuteResponseKind::Fetch => Err(()),
791 ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
792 ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
793 ExecuteResponseKind::Inserted => Err(()),
794 ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
795 ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
796 ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
797 ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
798 ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
799 ExecuteResponseKind::SetVariable => Err(()),
800 ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
801 ExecuteResponseKind::Subscribing => Err(()),
802 ExecuteResponseKind::TransactionCommitted => Err(()),
803 ExecuteResponseKind::TransactionRolledBack => Err(()),
804 ExecuteResponseKind::Updated => Err(()),
805 ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
806 ExecuteResponseKind::SendingRowsStreaming => Err(()),
807 ExecuteResponseKind::SendingRowsImmediate => Err(()),
808 ExecuteResponseKind::CreatedIntrospectionSubscribe => {
809 Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
810 }
811 }
812 }
813}
814
815impl ExecuteResponse {
816 pub fn tag(&self) -> Option<String> {
817 use ExecuteResponse::*;
818 match self {
819 AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
820 AlteredObject(o) => Some(format!("ALTER {}", o)),
821 AlteredRole => Some("ALTER ROLE".into()),
822 AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
823 ClosedCursor => Some("CLOSE CURSOR".into()),
824 Comment => Some("COMMENT".into()),
825 Copied(n) => Some(format!("COPY {}", n)),
826 CopyTo { .. } => None,
827 CopyFrom { .. } => None,
828 CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
829 CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
830 CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
831 CreatedRole => Some("CREATE ROLE".into()),
832 CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
833 CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
834 CreatedIndex { .. } => Some("CREATE INDEX".into()),
835 CreatedSecret { .. } => Some("CREATE SECRET".into()),
836 CreatedSink { .. } => Some("CREATE SINK".into()),
837 CreatedSource { .. } => Some("CREATE SOURCE".into()),
838 CreatedTable { .. } => Some("CREATE TABLE".into()),
839 CreatedView { .. } => Some("CREATE VIEW".into()),
840 CreatedViews { .. } => Some("CREATE VIEWS".into()),
841 CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
842 CreatedType => Some("CREATE TYPE".into()),
843 CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
844 Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
845 DeclaredCursor => Some("DECLARE CURSOR".into()),
846 Deleted(n) => Some(format!("DELETE {}", n)),
847 DiscardedTemp => Some("DISCARD TEMP".into()),
848 DiscardedAll => Some("DISCARD ALL".into()),
849 DroppedObject(o) => Some(format!("DROP {o}")),
850 DroppedOwned => Some("DROP OWNED".into()),
851 EmptyQuery => None,
852 Fetch { .. } => None,
853 GrantedPrivilege => Some("GRANT".into()),
854 GrantedRole => Some("GRANT ROLE".into()),
855 Inserted(n) => {
856 Some(format!("INSERT 0 {}", n))
864 }
865 Prepare => Some("PREPARE".into()),
866 Raised => Some("RAISE".into()),
867 ReassignOwned => Some("REASSIGN OWNED".into()),
868 RevokedPrivilege => Some("REVOKE".into()),
869 RevokedRole => Some("REVOKE ROLE".into()),
870 SendingRowsStreaming { .. } | SendingRowsImmediate { .. } => None,
871 SetVariable { reset: true, .. } => Some("RESET".into()),
872 SetVariable { reset: false, .. } => Some("SET".into()),
873 StartedTransaction { .. } => Some("BEGIN".into()),
874 Subscribing { .. } => None,
875 TransactionCommitted { .. } => Some("COMMIT".into()),
876 TransactionRolledBack { .. } => Some("ROLLBACK".into()),
877 Updated(n) => Some(format!("UPDATE {}", n)),
878 ValidatedConnection => Some("VALIDATE CONNECTION".into()),
879 CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
880 }
881 }
882
883 pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
887 use ExecuteResponseKind::*;
888 use PlanKind::*;
889
890 match plan {
891 AbortTransaction => &[TransactionRolledBack],
892 AlterClusterRename
893 | AlterClusterSwap
894 | AlterCluster
895 | AlterClusterReplicaRename
896 | AlterOwner
897 | AlterItemRename
898 | AlterRetainHistory
899 | AlterSourceTimestampInterval
900 | AlterNoop
901 | AlterSchemaRename
902 | AlterSchemaSwap
903 | AlterSecret
904 | AlterConnection
905 | AlterSource
906 | AlterSink
907 | AlterTableAddColumn
908 | AlterMaterializedViewApplyReplacement
909 | AlterNetworkPolicy => &[AlteredObject],
910 AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
911 AlterSetCluster => &[AlteredObject],
912 AlterRole => &[AlteredRole],
913 AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
914 &[AlteredSystemConfiguration]
915 }
916 Close => &[ClosedCursor],
917 PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
918 PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
919 PlanKind::Comment => &[ExecuteResponseKind::Comment],
920 CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
921 CreateConnection => &[CreatedConnection],
922 CreateDatabase => &[CreatedDatabase],
923 CreateSchema => &[CreatedSchema],
924 CreateRole => &[CreatedRole],
925 CreateCluster => &[CreatedCluster],
926 CreateClusterReplica => &[CreatedClusterReplica],
927 CreateSource | CreateSources => &[CreatedSource],
928 CreateSecret => &[CreatedSecret],
929 CreateSink => &[CreatedSink],
930 CreateTable => &[CreatedTable],
931 CreateView => &[CreatedView],
932 CreateMaterializedView => &[CreatedMaterializedView],
933 CreateIndex => &[CreatedIndex],
934 CreateType => &[CreatedType],
935 PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
936 CreateNetworkPolicy => &[CreatedNetworkPolicy],
937 Declare => &[DeclaredCursor],
938 DiscardTemp => &[DiscardedTemp],
939 DiscardAll => &[DiscardedAll],
940 DropObjects => &[DroppedObject],
941 DropOwned => &[DroppedOwned],
942 PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
943 ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
944 | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
945 ExecuteResponseKind::CopyTo,
946 SendingRowsStreaming,
947 SendingRowsImmediate,
948 ],
949 Execute | ReadThenWrite => &[
950 Deleted,
951 Inserted,
952 SendingRowsStreaming,
953 SendingRowsImmediate,
954 Updated,
955 ],
956 PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
957 GrantPrivileges => &[GrantedPrivilege],
958 GrantRole => &[GrantedRole],
959 Insert => &[Inserted, SendingRowsImmediate],
960 PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
961 PlanKind::Raise => &[ExecuteResponseKind::Raised],
962 PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
963 RevokePrivileges => &[RevokedPrivilege],
964 RevokeRole => &[RevokedRole],
965 PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
966 &[ExecuteResponseKind::SetVariable]
967 }
968 PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
969 StartTransaction => &[StartedTransaction],
970 SideEffectingFunc => &[SendingRowsStreaming, SendingRowsImmediate],
971 ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
972 }
973 }
974}
975
976impl Transmittable for ExecuteResponse {
980 type Allowed = ExecuteResponseKind;
981 fn to_allowed(&self) -> Self::Allowed {
982 ExecuteResponseKind::from(self)
983 }
984}