1use std::collections::{BTreeMap, BTreeSet};
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14
15use derivative::Derivative;
16use enum_kinds::EnumKind;
17use futures::Stream;
18use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
19use mz_auth::password::Password;
20use mz_compute_types::ComputeInstanceId;
21use mz_ore::collections::CollectionExt;
22use mz_ore::soft_assert_no_log;
23use mz_ore::tracing::OpenTelemetryContext;
24use mz_pgcopy::CopyFormatParams;
25use mz_repr::role_id::RoleId;
26use mz_repr::{CatalogItemId, ColumnIndex, RowIterator};
27use mz_sql::ast::{FetchDirection, Raw, Statement};
28use mz_sql::catalog::ObjectType;
29use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind};
30use mz_sql::session::user::User;
31use mz_sql::session::vars::{OwnedVarInput, SystemVars};
32use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
33use tokio::sync::{mpsc, oneshot};
34use uuid::Uuid;
35
36use crate::catalog::Catalog;
37use crate::coord::ExecuteContextExtra;
38use crate::coord::appends::BuiltinTableAppendNotify;
39use crate::coord::consistency::CoordinatorInconsistencies;
40use crate::coord::peek::PeekResponseUnary;
41use crate::error::AdapterError;
42use crate::session::{EndTransactionAction, RowBatchStream, Session};
43use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
44use crate::util::Transmittable;
45use crate::webhook::AppendWebhookResponse;
46use crate::{AdapterNotice, AppendWebhookError};
47
48#[derive(Debug)]
49pub struct CatalogSnapshot {
50    pub catalog: Arc<Catalog>,
51}
52
53#[derive(Debug)]
54pub enum Command {
55    CatalogSnapshot {
56        tx: oneshot::Sender<CatalogSnapshot>,
57    },
58
59    Startup {
60        tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
61        user: User,
62        conn_id: ConnectionId,
63        client_ip: Option<IpAddr>,
64        secret_key: u32,
65        uuid: Uuid,
66        application_name: String,
67        notice_tx: mpsc::UnboundedSender<AdapterNotice>,
68    },
69
70    AuthenticatePassword {
71        tx: oneshot::Sender<Result<AuthResponse, AdapterError>>,
72        role_name: String,
73        password: Option<Password>,
74    },
75
76    AuthenticateGetSASLChallenge {
77        tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
78        role_name: String,
79        nonce: String,
80    },
81
82    AuthenticateVerifySASLProof {
83        tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
84        role_name: String,
85        proof: String,
86        auth_message: String,
87        mock_hash: String,
88    },
89
90    Execute {
91        portal_name: String,
92        session: Session,
93        tx: oneshot::Sender<Response<ExecuteResponse>>,
94        outer_ctx_extra: Option<ExecuteContextExtra>,
95    },
96
97    Commit {
102        action: EndTransactionAction,
103        session: Session,
104        tx: oneshot::Sender<Response<ExecuteResponse>>,
105    },
106
107    CancelRequest {
108        conn_id: ConnectionIdType,
109        secret_key: u32,
110    },
111
112    PrivilegedCancelRequest {
113        conn_id: ConnectionId,
114    },
115
116    GetWebhook {
117        database: String,
118        schema: String,
119        name: String,
120        tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
121    },
122
123    GetSystemVars {
124        tx: oneshot::Sender<SystemVars>,
125    },
126
127    SetSystemVars {
128        vars: BTreeMap<String, String>,
129        conn_id: ConnectionId,
130        tx: oneshot::Sender<Result<(), AdapterError>>,
131    },
132
133    Terminate {
134        conn_id: ConnectionId,
135        tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
136    },
137
138    RetireExecute {
145        data: ExecuteContextExtra,
146        reason: StatementEndedExecutionReason,
147    },
148
149    CheckConsistency {
150        tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
151    },
152
153    Dump {
154        tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
155    },
156}
157
158impl Command {
159    pub fn session(&self) -> Option<&Session> {
160        match self {
161            Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
162            Command::CancelRequest { .. }
163            | Command::Startup { .. }
164            | Command::AuthenticatePassword { .. }
165            | Command::AuthenticateGetSASLChallenge { .. }
166            | Command::AuthenticateVerifySASLProof { .. }
167            | Command::CatalogSnapshot { .. }
168            | Command::PrivilegedCancelRequest { .. }
169            | Command::GetWebhook { .. }
170            | Command::Terminate { .. }
171            | Command::GetSystemVars { .. }
172            | Command::SetSystemVars { .. }
173            | Command::RetireExecute { .. }
174            | Command::CheckConsistency { .. }
175            | Command::Dump { .. } => None,
176        }
177    }
178
179    pub fn session_mut(&mut self) -> Option<&mut Session> {
180        match self {
181            Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
182            Command::CancelRequest { .. }
183            | Command::Startup { .. }
184            | Command::AuthenticatePassword { .. }
185            | Command::AuthenticateGetSASLChallenge { .. }
186            | Command::AuthenticateVerifySASLProof { .. }
187            | Command::CatalogSnapshot { .. }
188            | Command::PrivilegedCancelRequest { .. }
189            | Command::GetWebhook { .. }
190            | Command::Terminate { .. }
191            | Command::GetSystemVars { .. }
192            | Command::SetSystemVars { .. }
193            | Command::RetireExecute { .. }
194            | Command::CheckConsistency { .. }
195            | Command::Dump { .. } => None,
196        }
197    }
198}
199
200#[derive(Debug)]
201pub struct Response<T> {
202    pub result: Result<T, AdapterError>,
203    pub session: Session,
204    pub otel_ctx: OpenTelemetryContext,
205}
206
207#[derive(Derivative)]
209#[derivative(Debug)]
210pub struct StartupResponse {
211    pub role_id: RoleId,
213    #[derivative(Debug = "ignore")]
215    pub write_notify: BuiltinTableAppendNotify,
216    pub session_defaults: BTreeMap<String, OwnedVarInput>,
218    pub catalog: Arc<Catalog>,
219}
220
221#[derive(Derivative)]
223#[derivative(Debug)]
224pub struct AuthResponse {
225    pub role_id: RoleId,
227    pub superuser: bool,
229}
230
231#[derive(Derivative)]
232#[derivative(Debug)]
233pub struct SASLChallengeResponse {
234    pub iteration_count: usize,
235    pub salt: String,
237    pub nonce: String,
238}
239
240#[derive(Derivative)]
241#[derivative(Debug)]
242pub struct SASLVerifyProofResponse {
243    pub verifier: String,
244    pub auth_resp: AuthResponse,
245}
246
247impl Transmittable for StartupResponse {
250    type Allowed = bool;
251    fn to_allowed(&self) -> Self::Allowed {
252        true
253    }
254}
255
256#[derive(Debug, Clone)]
258pub struct CatalogDump(String);
259
260impl CatalogDump {
261    pub fn new(raw: String) -> Self {
262        CatalogDump(raw)
263    }
264
265    pub fn into_string(self) -> String {
266        self.0
267    }
268}
269
270impl Transmittable for CatalogDump {
271    type Allowed = bool;
272    fn to_allowed(&self) -> Self::Allowed {
273        true
274    }
275}
276
277impl Transmittable for SystemVars {
278    type Allowed = bool;
279    fn to_allowed(&self) -> Self::Allowed {
280        true
281    }
282}
283
284#[derive(EnumKind, Derivative)]
286#[derivative(Debug)]
287#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
288pub enum ExecuteResponse {
289    AlteredDefaultPrivileges,
291    AlteredObject(ObjectType),
293    AlteredRole,
295    AlteredSystemConfiguration,
297    ClosedCursor,
299    Comment,
301    Copied(usize),
303    CopyTo {
305        format: mz_sql::plan::CopyFormat,
306        resp: Box<ExecuteResponse>,
307    },
308    CopyFrom {
309        target_id: CatalogItemId,
311        target_name: String,
313        columns: Vec<ColumnIndex>,
314        params: CopyFormatParams<'static>,
315        ctx_extra: ExecuteContextExtra,
316    },
317    CreatedConnection,
319    CreatedDatabase,
321    CreatedSchema,
323    CreatedRole,
325    CreatedCluster,
327    CreatedClusterReplica,
329    CreatedIndex,
331    CreatedIntrospectionSubscribe,
333    CreatedSecret,
335    CreatedSink,
337    CreatedSource,
339    CreatedTable,
341    CreatedView,
343    CreatedViews,
345    CreatedMaterializedView,
347    CreatedContinualTask,
349    CreatedType,
351    CreatedNetworkPolicy,
353    Deallocate { all: bool },
355    DeclaredCursor,
357    Deleted(usize),
359    DiscardedTemp,
361    DiscardedAll,
363    DroppedObject(ObjectType),
365    DroppedOwned,
367    EmptyQuery,
369    Fetch {
371        name: String,
373        count: Option<FetchDirection>,
375        timeout: ExecuteTimeout,
377        ctx_extra: ExecuteContextExtra,
378    },
379    GrantedPrivilege,
381    GrantedRole,
383    Inserted(usize),
385    Prepare,
387    Raised,
389    ReassignOwned,
391    RevokedPrivilege,
393    RevokedRole,
395    SendingRowsStreaming {
397        #[derivative(Debug = "ignore")]
398        rows: Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>,
399        instance_id: ComputeInstanceId,
400        strategy: StatementExecutionStrategy,
401    },
402    SendingRowsImmediate {
405        #[derivative(Debug = "ignore")]
406        rows: Box<dyn RowIterator + Send + Sync>,
407    },
408    SetVariable {
410        name: String,
411        reset: bool,
413    },
414    StartedTransaction,
416    Subscribing {
419        rx: RowBatchStream,
420        ctx_extra: ExecuteContextExtra,
421        instance_id: ComputeInstanceId,
422    },
423    TransactionCommitted {
425        params: BTreeMap<&'static str, String>,
427    },
428    TransactionRolledBack {
430        params: BTreeMap<&'static str, String>,
432    },
433    Updated(usize),
435    ValidatedConnection,
437}
438
439impl TryFrom<&Statement<Raw>> for ExecuteResponse {
440    type Error = ();
441
442    fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
444        let resp_kinds = Plan::generated_from(&stmt.into())
445            .iter()
446            .map(ExecuteResponse::generated_from)
447            .flatten()
448            .cloned()
449            .collect::<BTreeSet<ExecuteResponseKind>>();
450        let resps = resp_kinds
451            .iter()
452            .map(|r| (*r).try_into())
453            .collect::<Result<Vec<ExecuteResponse>, _>>();
454        if let Ok(resps) = resps {
456            if resps.len() == 1 {
457                return Ok(resps.into_element());
458            }
459        }
460        let resp = match stmt {
461            Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
462                ExecuteResponse::DroppedObject((*object_type).into())
463            }
464            Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
465            | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
466                ExecuteResponse::AlteredObject((*object_type).into())
467            }
468            _ => return Err(()),
469        };
470        soft_assert_no_log!(
472            resp_kinds.len() == 1
473                && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
474            "ExecuteResponses out of sync with planner"
475        );
476        Ok(resp)
477    }
478}
479
480impl TryInto<ExecuteResponse> for ExecuteResponseKind {
481    type Error = ();
482
483    fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
486        match self {
487            ExecuteResponseKind::AlteredDefaultPrivileges => {
488                Ok(ExecuteResponse::AlteredDefaultPrivileges)
489            }
490            ExecuteResponseKind::AlteredObject => Err(()),
491            ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
492            ExecuteResponseKind::AlteredSystemConfiguration => {
493                Ok(ExecuteResponse::AlteredSystemConfiguration)
494            }
495            ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
496            ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
497            ExecuteResponseKind::Copied => Err(()),
498            ExecuteResponseKind::CopyTo => Err(()),
499            ExecuteResponseKind::CopyFrom => Err(()),
500            ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
501            ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
502            ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
503            ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
504            ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
505            ExecuteResponseKind::CreatedClusterReplica => {
506                Ok(ExecuteResponse::CreatedClusterReplica)
507            }
508            ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
509            ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
510            ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
511            ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
512            ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
513            ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
514            ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
515            ExecuteResponseKind::CreatedMaterializedView => {
516                Ok(ExecuteResponse::CreatedMaterializedView)
517            }
518            ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
519            ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
520            ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
521            ExecuteResponseKind::Deallocate => Err(()),
522            ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
523            ExecuteResponseKind::Deleted => Err(()),
524            ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
525            ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
526            ExecuteResponseKind::DroppedObject => Err(()),
527            ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
528            ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
529            ExecuteResponseKind::Fetch => Err(()),
530            ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
531            ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
532            ExecuteResponseKind::Inserted => Err(()),
533            ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
534            ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
535            ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
536            ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
537            ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
538            ExecuteResponseKind::SetVariable => Err(()),
539            ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
540            ExecuteResponseKind::Subscribing => Err(()),
541            ExecuteResponseKind::TransactionCommitted => Err(()),
542            ExecuteResponseKind::TransactionRolledBack => Err(()),
543            ExecuteResponseKind::Updated => Err(()),
544            ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
545            ExecuteResponseKind::SendingRowsStreaming => Err(()),
546            ExecuteResponseKind::SendingRowsImmediate => Err(()),
547            ExecuteResponseKind::CreatedIntrospectionSubscribe => {
548                Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
549            }
550        }
551    }
552}
553
554impl ExecuteResponse {
555    pub fn tag(&self) -> Option<String> {
556        use ExecuteResponse::*;
557        match self {
558            AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
559            AlteredObject(o) => Some(format!("ALTER {}", o)),
560            AlteredRole => Some("ALTER ROLE".into()),
561            AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
562            ClosedCursor => Some("CLOSE CURSOR".into()),
563            Comment => Some("COMMENT".into()),
564            Copied(n) => Some(format!("COPY {}", n)),
565            CopyTo { .. } => None,
566            CopyFrom { .. } => None,
567            CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
568            CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
569            CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
570            CreatedRole => Some("CREATE ROLE".into()),
571            CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
572            CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
573            CreatedIndex { .. } => Some("CREATE INDEX".into()),
574            CreatedSecret { .. } => Some("CREATE SECRET".into()),
575            CreatedSink { .. } => Some("CREATE SINK".into()),
576            CreatedSource { .. } => Some("CREATE SOURCE".into()),
577            CreatedTable { .. } => Some("CREATE TABLE".into()),
578            CreatedView { .. } => Some("CREATE VIEW".into()),
579            CreatedViews { .. } => Some("CREATE VIEWS".into()),
580            CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
581            CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
582            CreatedType => Some("CREATE TYPE".into()),
583            CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
584            Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
585            DeclaredCursor => Some("DECLARE CURSOR".into()),
586            Deleted(n) => Some(format!("DELETE {}", n)),
587            DiscardedTemp => Some("DISCARD TEMP".into()),
588            DiscardedAll => Some("DISCARD ALL".into()),
589            DroppedObject(o) => Some(format!("DROP {o}")),
590            DroppedOwned => Some("DROP OWNED".into()),
591            EmptyQuery => None,
592            Fetch { .. } => None,
593            GrantedPrivilege => Some("GRANT".into()),
594            GrantedRole => Some("GRANT ROLE".into()),
595            Inserted(n) => {
596                Some(format!("INSERT 0 {}", n))
604            }
605            Prepare => Some("PREPARE".into()),
606            Raised => Some("RAISE".into()),
607            ReassignOwned => Some("REASSIGN OWNED".into()),
608            RevokedPrivilege => Some("REVOKE".into()),
609            RevokedRole => Some("REVOKE ROLE".into()),
610            SendingRowsStreaming { .. } | SendingRowsImmediate { .. } => None,
611            SetVariable { reset: true, .. } => Some("RESET".into()),
612            SetVariable { reset: false, .. } => Some("SET".into()),
613            StartedTransaction { .. } => Some("BEGIN".into()),
614            Subscribing { .. } => None,
615            TransactionCommitted { .. } => Some("COMMIT".into()),
616            TransactionRolledBack { .. } => Some("ROLLBACK".into()),
617            Updated(n) => Some(format!("UPDATE {}", n)),
618            ValidatedConnection => Some("VALIDATE CONNECTION".into()),
619            CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
620        }
621    }
622
623    pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
627        use ExecuteResponseKind::*;
628        use PlanKind::*;
629
630        match plan {
631            AbortTransaction => &[TransactionRolledBack],
632            AlterClusterRename
633            | AlterClusterSwap
634            | AlterCluster
635            | AlterClusterReplicaRename
636            | AlterOwner
637            | AlterItemRename
638            | AlterRetainHistory
639            | AlterNoop
640            | AlterSchemaRename
641            | AlterSchemaSwap
642            | AlterSecret
643            | AlterConnection
644            | AlterSource
645            | AlterSink
646            | AlterTableAddColumn
647            | AlterNetworkPolicy => &[AlteredObject],
648            AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
649            AlterSetCluster => &[AlteredObject],
650            AlterRole => &[AlteredRole],
651            AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
652                &[AlteredSystemConfiguration]
653            }
654            Close => &[ClosedCursor],
655            PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
656            PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
657            PlanKind::Comment => &[ExecuteResponseKind::Comment],
658            CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
659            CreateConnection => &[CreatedConnection],
660            CreateDatabase => &[CreatedDatabase],
661            CreateSchema => &[CreatedSchema],
662            CreateRole => &[CreatedRole],
663            CreateCluster => &[CreatedCluster],
664            CreateClusterReplica => &[CreatedClusterReplica],
665            CreateSource | CreateSources => &[CreatedSource],
666            CreateSecret => &[CreatedSecret],
667            CreateSink => &[CreatedSink],
668            CreateTable => &[CreatedTable],
669            CreateView => &[CreatedView],
670            CreateMaterializedView => &[CreatedMaterializedView],
671            CreateContinualTask => &[CreatedContinualTask],
672            CreateIndex => &[CreatedIndex],
673            CreateType => &[CreatedType],
674            PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
675            CreateNetworkPolicy => &[CreatedNetworkPolicy],
676            Declare => &[DeclaredCursor],
677            DiscardTemp => &[DiscardedTemp],
678            DiscardAll => &[DiscardedAll],
679            DropObjects => &[DroppedObject],
680            DropOwned => &[DroppedOwned],
681            PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
682            ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
683            | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
684                ExecuteResponseKind::CopyTo,
685                SendingRowsStreaming,
686                SendingRowsImmediate,
687            ],
688            Execute | ReadThenWrite => &[
689                Deleted,
690                Inserted,
691                SendingRowsStreaming,
692                SendingRowsImmediate,
693                Updated,
694            ],
695            PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
696            GrantPrivileges => &[GrantedPrivilege],
697            GrantRole => &[GrantedRole],
698            Insert => &[Inserted, SendingRowsImmediate],
699            PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
700            PlanKind::Raise => &[ExecuteResponseKind::Raised],
701            PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
702            RevokePrivileges => &[RevokedPrivilege],
703            RevokeRole => &[RevokedRole],
704            PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
705                &[ExecuteResponseKind::SetVariable]
706            }
707            PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
708            StartTransaction => &[StartedTransaction],
709            SideEffectingFunc => &[SendingRowsStreaming, SendingRowsImmediate],
710            ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
711        }
712    }
713}
714
715impl Transmittable for ExecuteResponse {
719    type Allowed = ExecuteResponseKind;
720    fn to_allowed(&self) -> Self::Allowed {
721        ExecuteResponseKind::from(self)
722    }
723}