1use std::collections::{BTreeMap, BTreeSet};
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14
15use derivative::Derivative;
16use enum_kinds::EnumKind;
17use futures::Stream;
18use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
19use mz_auth::password::Password;
20use mz_compute_types::ComputeInstanceId;
21use mz_ore::collections::CollectionExt;
22use mz_ore::soft_assert_no_log;
23use mz_ore::tracing::OpenTelemetryContext;
24use mz_pgcopy::CopyFormatParams;
25use mz_repr::role_id::RoleId;
26use mz_repr::{CatalogItemId, ColumnIndex, RowIterator};
27use mz_sql::ast::{FetchDirection, Raw, Statement};
28use mz_sql::catalog::ObjectType;
29use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind};
30use mz_sql::session::user::User;
31use mz_sql::session::vars::{OwnedVarInput, SystemVars};
32use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
33use tokio::sync::{mpsc, oneshot};
34use uuid::Uuid;
35
36use crate::catalog::Catalog;
37use crate::coord::ExecuteContextExtra;
38use crate::coord::appends::BuiltinTableAppendNotify;
39use crate::coord::consistency::CoordinatorInconsistencies;
40use crate::coord::peek::PeekResponseUnary;
41use crate::error::AdapterError;
42use crate::session::{EndTransactionAction, RowBatchStream, Session};
43use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
44use crate::util::Transmittable;
45use crate::webhook::AppendWebhookResponse;
46use crate::{AdapterNotice, AppendWebhookError};
47
48#[derive(Debug)]
49pub struct CatalogSnapshot {
50 pub catalog: Arc<Catalog>,
51}
52
53#[derive(Debug)]
54pub enum Command {
55 CatalogSnapshot {
56 tx: oneshot::Sender<CatalogSnapshot>,
57 },
58
59 Startup {
60 tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
61 user: User,
62 conn_id: ConnectionId,
63 client_ip: Option<IpAddr>,
64 secret_key: u32,
65 uuid: Uuid,
66 application_name: String,
67 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
68 },
69
70 AuthenticatePassword {
71 tx: oneshot::Sender<Result<AuthResponse, AdapterError>>,
72 role_name: String,
73 password: Option<Password>,
74 },
75
76 AuthenticateGetSASLChallenge {
77 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
78 role_name: String,
79 nonce: String,
80 },
81
82 AuthenticateVerifySASLProof {
83 tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
84 role_name: String,
85 proof: String,
86 auth_message: String,
87 mock_hash: String,
88 },
89
90 Execute {
91 portal_name: String,
92 session: Session,
93 tx: oneshot::Sender<Response<ExecuteResponse>>,
94 outer_ctx_extra: Option<ExecuteContextExtra>,
95 },
96
97 Commit {
102 action: EndTransactionAction,
103 session: Session,
104 tx: oneshot::Sender<Response<ExecuteResponse>>,
105 },
106
107 CancelRequest {
108 conn_id: ConnectionIdType,
109 secret_key: u32,
110 },
111
112 PrivilegedCancelRequest {
113 conn_id: ConnectionId,
114 },
115
116 GetWebhook {
117 database: String,
118 schema: String,
119 name: String,
120 tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
121 },
122
123 GetSystemVars {
124 tx: oneshot::Sender<SystemVars>,
125 },
126
127 SetSystemVars {
128 vars: BTreeMap<String, String>,
129 conn_id: ConnectionId,
130 tx: oneshot::Sender<Result<(), AdapterError>>,
131 },
132
133 Terminate {
134 conn_id: ConnectionId,
135 tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
136 },
137
138 RetireExecute {
145 data: ExecuteContextExtra,
146 reason: StatementEndedExecutionReason,
147 },
148
149 CheckConsistency {
150 tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
151 },
152
153 Dump {
154 tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
155 },
156}
157
158impl Command {
159 pub fn session(&self) -> Option<&Session> {
160 match self {
161 Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
162 Command::CancelRequest { .. }
163 | Command::Startup { .. }
164 | Command::AuthenticatePassword { .. }
165 | Command::AuthenticateGetSASLChallenge { .. }
166 | Command::AuthenticateVerifySASLProof { .. }
167 | Command::CatalogSnapshot { .. }
168 | Command::PrivilegedCancelRequest { .. }
169 | Command::GetWebhook { .. }
170 | Command::Terminate { .. }
171 | Command::GetSystemVars { .. }
172 | Command::SetSystemVars { .. }
173 | Command::RetireExecute { .. }
174 | Command::CheckConsistency { .. }
175 | Command::Dump { .. } => None,
176 }
177 }
178
179 pub fn session_mut(&mut self) -> Option<&mut Session> {
180 match self {
181 Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
182 Command::CancelRequest { .. }
183 | Command::Startup { .. }
184 | Command::AuthenticatePassword { .. }
185 | Command::AuthenticateGetSASLChallenge { .. }
186 | Command::AuthenticateVerifySASLProof { .. }
187 | Command::CatalogSnapshot { .. }
188 | Command::PrivilegedCancelRequest { .. }
189 | Command::GetWebhook { .. }
190 | Command::Terminate { .. }
191 | Command::GetSystemVars { .. }
192 | Command::SetSystemVars { .. }
193 | Command::RetireExecute { .. }
194 | Command::CheckConsistency { .. }
195 | Command::Dump { .. } => None,
196 }
197 }
198}
199
200#[derive(Debug)]
201pub struct Response<T> {
202 pub result: Result<T, AdapterError>,
203 pub session: Session,
204 pub otel_ctx: OpenTelemetryContext,
205}
206
207#[derive(Derivative)]
209#[derivative(Debug)]
210pub struct StartupResponse {
211 pub role_id: RoleId,
213 #[derivative(Debug = "ignore")]
215 pub write_notify: BuiltinTableAppendNotify,
216 pub session_defaults: BTreeMap<String, OwnedVarInput>,
218 pub catalog: Arc<Catalog>,
219}
220
221#[derive(Derivative)]
223#[derivative(Debug)]
224pub struct AuthResponse {
225 pub role_id: RoleId,
227 pub superuser: bool,
229}
230
231#[derive(Derivative)]
232#[derivative(Debug)]
233pub struct SASLChallengeResponse {
234 pub iteration_count: usize,
235 pub salt: String,
237 pub nonce: String,
238}
239
240#[derive(Derivative)]
241#[derivative(Debug)]
242pub struct SASLVerifyProofResponse {
243 pub verifier: String,
244 pub auth_resp: AuthResponse,
245}
246
247impl Transmittable for StartupResponse {
250 type Allowed = bool;
251 fn to_allowed(&self) -> Self::Allowed {
252 true
253 }
254}
255
256#[derive(Debug, Clone)]
258pub struct CatalogDump(String);
259
260impl CatalogDump {
261 pub fn new(raw: String) -> Self {
262 CatalogDump(raw)
263 }
264
265 pub fn into_string(self) -> String {
266 self.0
267 }
268}
269
270impl Transmittable for CatalogDump {
271 type Allowed = bool;
272 fn to_allowed(&self) -> Self::Allowed {
273 true
274 }
275}
276
277impl Transmittable for SystemVars {
278 type Allowed = bool;
279 fn to_allowed(&self) -> Self::Allowed {
280 true
281 }
282}
283
284#[derive(EnumKind, Derivative)]
286#[derivative(Debug)]
287#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
288pub enum ExecuteResponse {
289 AlteredDefaultPrivileges,
291 AlteredObject(ObjectType),
293 AlteredRole,
295 AlteredSystemConfiguration,
297 ClosedCursor,
299 Comment,
301 Copied(usize),
303 CopyTo {
305 format: mz_sql::plan::CopyFormat,
306 resp: Box<ExecuteResponse>,
307 },
308 CopyFrom {
309 target_id: CatalogItemId,
311 target_name: String,
313 columns: Vec<ColumnIndex>,
314 params: CopyFormatParams<'static>,
315 ctx_extra: ExecuteContextExtra,
316 },
317 CreatedConnection,
319 CreatedDatabase,
321 CreatedSchema,
323 CreatedRole,
325 CreatedCluster,
327 CreatedClusterReplica,
329 CreatedIndex,
331 CreatedIntrospectionSubscribe,
333 CreatedSecret,
335 CreatedSink,
337 CreatedSource,
339 CreatedTable,
341 CreatedView,
343 CreatedViews,
345 CreatedMaterializedView,
347 CreatedContinualTask,
349 CreatedType,
351 CreatedNetworkPolicy,
353 Deallocate { all: bool },
355 DeclaredCursor,
357 Deleted(usize),
359 DiscardedTemp,
361 DiscardedAll,
363 DroppedObject(ObjectType),
365 DroppedOwned,
367 EmptyQuery,
369 Fetch {
371 name: String,
373 count: Option<FetchDirection>,
375 timeout: ExecuteTimeout,
377 ctx_extra: ExecuteContextExtra,
378 },
379 GrantedPrivilege,
381 GrantedRole,
383 Inserted(usize),
385 Prepare,
387 Raised,
389 ReassignOwned,
391 RevokedPrivilege,
393 RevokedRole,
395 SendingRowsStreaming {
397 #[derivative(Debug = "ignore")]
398 rows: Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>,
399 instance_id: ComputeInstanceId,
400 strategy: StatementExecutionStrategy,
401 },
402 SendingRowsImmediate {
405 #[derivative(Debug = "ignore")]
406 rows: Box<dyn RowIterator + Send + Sync>,
407 },
408 SetVariable {
410 name: String,
411 reset: bool,
413 },
414 StartedTransaction,
416 Subscribing {
419 rx: RowBatchStream,
420 ctx_extra: ExecuteContextExtra,
421 instance_id: ComputeInstanceId,
422 },
423 TransactionCommitted {
425 params: BTreeMap<&'static str, String>,
427 },
428 TransactionRolledBack {
430 params: BTreeMap<&'static str, String>,
432 },
433 Updated(usize),
435 ValidatedConnection,
437}
438
439impl TryFrom<&Statement<Raw>> for ExecuteResponse {
440 type Error = ();
441
442 fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
444 let resp_kinds = Plan::generated_from(&stmt.into())
445 .iter()
446 .map(ExecuteResponse::generated_from)
447 .flatten()
448 .cloned()
449 .collect::<BTreeSet<ExecuteResponseKind>>();
450 let resps = resp_kinds
451 .iter()
452 .map(|r| (*r).try_into())
453 .collect::<Result<Vec<ExecuteResponse>, _>>();
454 if let Ok(resps) = resps {
456 if resps.len() == 1 {
457 return Ok(resps.into_element());
458 }
459 }
460 let resp = match stmt {
461 Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
462 ExecuteResponse::DroppedObject((*object_type).into())
463 }
464 Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
465 | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
466 ExecuteResponse::AlteredObject((*object_type).into())
467 }
468 _ => return Err(()),
469 };
470 soft_assert_no_log!(
472 resp_kinds.len() == 1
473 && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
474 "ExecuteResponses out of sync with planner"
475 );
476 Ok(resp)
477 }
478}
479
480impl TryInto<ExecuteResponse> for ExecuteResponseKind {
481 type Error = ();
482
483 fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
486 match self {
487 ExecuteResponseKind::AlteredDefaultPrivileges => {
488 Ok(ExecuteResponse::AlteredDefaultPrivileges)
489 }
490 ExecuteResponseKind::AlteredObject => Err(()),
491 ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
492 ExecuteResponseKind::AlteredSystemConfiguration => {
493 Ok(ExecuteResponse::AlteredSystemConfiguration)
494 }
495 ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
496 ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
497 ExecuteResponseKind::Copied => Err(()),
498 ExecuteResponseKind::CopyTo => Err(()),
499 ExecuteResponseKind::CopyFrom => Err(()),
500 ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
501 ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
502 ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
503 ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
504 ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
505 ExecuteResponseKind::CreatedClusterReplica => {
506 Ok(ExecuteResponse::CreatedClusterReplica)
507 }
508 ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
509 ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
510 ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
511 ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
512 ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
513 ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
514 ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
515 ExecuteResponseKind::CreatedMaterializedView => {
516 Ok(ExecuteResponse::CreatedMaterializedView)
517 }
518 ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
519 ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
520 ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
521 ExecuteResponseKind::Deallocate => Err(()),
522 ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
523 ExecuteResponseKind::Deleted => Err(()),
524 ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
525 ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
526 ExecuteResponseKind::DroppedObject => Err(()),
527 ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
528 ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
529 ExecuteResponseKind::Fetch => Err(()),
530 ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
531 ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
532 ExecuteResponseKind::Inserted => Err(()),
533 ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
534 ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
535 ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
536 ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
537 ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
538 ExecuteResponseKind::SetVariable => Err(()),
539 ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
540 ExecuteResponseKind::Subscribing => Err(()),
541 ExecuteResponseKind::TransactionCommitted => Err(()),
542 ExecuteResponseKind::TransactionRolledBack => Err(()),
543 ExecuteResponseKind::Updated => Err(()),
544 ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
545 ExecuteResponseKind::SendingRowsStreaming => Err(()),
546 ExecuteResponseKind::SendingRowsImmediate => Err(()),
547 ExecuteResponseKind::CreatedIntrospectionSubscribe => {
548 Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
549 }
550 }
551 }
552}
553
554impl ExecuteResponse {
555 pub fn tag(&self) -> Option<String> {
556 use ExecuteResponse::*;
557 match self {
558 AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
559 AlteredObject(o) => Some(format!("ALTER {}", o)),
560 AlteredRole => Some("ALTER ROLE".into()),
561 AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
562 ClosedCursor => Some("CLOSE CURSOR".into()),
563 Comment => Some("COMMENT".into()),
564 Copied(n) => Some(format!("COPY {}", n)),
565 CopyTo { .. } => None,
566 CopyFrom { .. } => None,
567 CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
568 CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
569 CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
570 CreatedRole => Some("CREATE ROLE".into()),
571 CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
572 CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
573 CreatedIndex { .. } => Some("CREATE INDEX".into()),
574 CreatedSecret { .. } => Some("CREATE SECRET".into()),
575 CreatedSink { .. } => Some("CREATE SINK".into()),
576 CreatedSource { .. } => Some("CREATE SOURCE".into()),
577 CreatedTable { .. } => Some("CREATE TABLE".into()),
578 CreatedView { .. } => Some("CREATE VIEW".into()),
579 CreatedViews { .. } => Some("CREATE VIEWS".into()),
580 CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
581 CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
582 CreatedType => Some("CREATE TYPE".into()),
583 CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
584 Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
585 DeclaredCursor => Some("DECLARE CURSOR".into()),
586 Deleted(n) => Some(format!("DELETE {}", n)),
587 DiscardedTemp => Some("DISCARD TEMP".into()),
588 DiscardedAll => Some("DISCARD ALL".into()),
589 DroppedObject(o) => Some(format!("DROP {o}")),
590 DroppedOwned => Some("DROP OWNED".into()),
591 EmptyQuery => None,
592 Fetch { .. } => None,
593 GrantedPrivilege => Some("GRANT".into()),
594 GrantedRole => Some("GRANT ROLE".into()),
595 Inserted(n) => {
596 Some(format!("INSERT 0 {}", n))
604 }
605 Prepare => Some("PREPARE".into()),
606 Raised => Some("RAISE".into()),
607 ReassignOwned => Some("REASSIGN OWNED".into()),
608 RevokedPrivilege => Some("REVOKE".into()),
609 RevokedRole => Some("REVOKE ROLE".into()),
610 SendingRowsStreaming { .. } | SendingRowsImmediate { .. } => None,
611 SetVariable { reset: true, .. } => Some("RESET".into()),
612 SetVariable { reset: false, .. } => Some("SET".into()),
613 StartedTransaction { .. } => Some("BEGIN".into()),
614 Subscribing { .. } => None,
615 TransactionCommitted { .. } => Some("COMMIT".into()),
616 TransactionRolledBack { .. } => Some("ROLLBACK".into()),
617 Updated(n) => Some(format!("UPDATE {}", n)),
618 ValidatedConnection => Some("VALIDATE CONNECTION".into()),
619 CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
620 }
621 }
622
623 pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
627 use ExecuteResponseKind::*;
628 use PlanKind::*;
629
630 match plan {
631 AbortTransaction => &[TransactionRolledBack],
632 AlterClusterRename
633 | AlterClusterSwap
634 | AlterCluster
635 | AlterClusterReplicaRename
636 | AlterOwner
637 | AlterItemRename
638 | AlterRetainHistory
639 | AlterNoop
640 | AlterSchemaRename
641 | AlterSchemaSwap
642 | AlterSecret
643 | AlterConnection
644 | AlterSource
645 | AlterSink
646 | AlterTableAddColumn
647 | AlterNetworkPolicy => &[AlteredObject],
648 AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
649 AlterSetCluster => &[AlteredObject],
650 AlterRole => &[AlteredRole],
651 AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
652 &[AlteredSystemConfiguration]
653 }
654 Close => &[ClosedCursor],
655 PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
656 PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
657 PlanKind::Comment => &[ExecuteResponseKind::Comment],
658 CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
659 CreateConnection => &[CreatedConnection],
660 CreateDatabase => &[CreatedDatabase],
661 CreateSchema => &[CreatedSchema],
662 CreateRole => &[CreatedRole],
663 CreateCluster => &[CreatedCluster],
664 CreateClusterReplica => &[CreatedClusterReplica],
665 CreateSource | CreateSources => &[CreatedSource],
666 CreateSecret => &[CreatedSecret],
667 CreateSink => &[CreatedSink],
668 CreateTable => &[CreatedTable],
669 CreateView => &[CreatedView],
670 CreateMaterializedView => &[CreatedMaterializedView],
671 CreateContinualTask => &[CreatedContinualTask],
672 CreateIndex => &[CreatedIndex],
673 CreateType => &[CreatedType],
674 PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
675 CreateNetworkPolicy => &[CreatedNetworkPolicy],
676 Declare => &[DeclaredCursor],
677 DiscardTemp => &[DiscardedTemp],
678 DiscardAll => &[DiscardedAll],
679 DropObjects => &[DroppedObject],
680 DropOwned => &[DroppedOwned],
681 PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
682 ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
683 | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
684 ExecuteResponseKind::CopyTo,
685 SendingRowsStreaming,
686 SendingRowsImmediate,
687 ],
688 Execute | ReadThenWrite => &[
689 Deleted,
690 Inserted,
691 SendingRowsStreaming,
692 SendingRowsImmediate,
693 Updated,
694 ],
695 PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
696 GrantPrivileges => &[GrantedPrivilege],
697 GrantRole => &[GrantedRole],
698 Insert => &[Inserted, SendingRowsImmediate],
699 PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
700 PlanKind::Raise => &[ExecuteResponseKind::Raised],
701 PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
702 RevokePrivileges => &[RevokedPrivilege],
703 RevokeRole => &[RevokedRole],
704 PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
705 &[ExecuteResponseKind::SetVariable]
706 }
707 PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
708 StartTransaction => &[StartedTransaction],
709 SideEffectingFunc => &[SendingRowsStreaming, SendingRowsImmediate],
710 ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
711 }
712 }
713}
714
715impl Transmittable for ExecuteResponse {
719 type Allowed = ExecuteResponseKind;
720 fn to_allowed(&self) -> Self::Allowed {
721 ExecuteResponseKind::from(self)
722 }
723}