mz_adapter/
command.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::future::Future;
12use std::net::IpAddr;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use derivative::Derivative;
17use enum_kinds::EnumKind;
18use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
19use mz_auth::password::Password;
20use mz_compute_types::ComputeInstanceId;
21use mz_ore::collections::CollectionExt;
22use mz_ore::soft_assert_no_log;
23use mz_ore::tracing::OpenTelemetryContext;
24use mz_pgcopy::CopyFormatParams;
25use mz_repr::role_id::RoleId;
26use mz_repr::{CatalogItemId, ColumnIndex, RowIterator};
27use mz_sql::ast::{FetchDirection, Raw, Statement};
28use mz_sql::catalog::ObjectType;
29use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind};
30use mz_sql::session::user::User;
31use mz_sql::session::vars::{OwnedVarInput, SystemVars};
32use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
33use tokio::sync::{mpsc, oneshot};
34use uuid::Uuid;
35
36use crate::catalog::Catalog;
37use crate::coord::ExecuteContextExtra;
38use crate::coord::appends::BuiltinTableAppendNotify;
39use crate::coord::consistency::CoordinatorInconsistencies;
40use crate::coord::peek::PeekResponseUnary;
41use crate::error::AdapterError;
42use crate::session::{EndTransactionAction, RowBatchStream, Session};
43use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
44use crate::util::Transmittable;
45use crate::webhook::AppendWebhookResponse;
46use crate::{AdapterNotice, AppendWebhookError};
47
48#[derive(Debug)]
49pub struct CatalogSnapshot {
50    pub catalog: Arc<Catalog>,
51}
52
53#[derive(Debug)]
54pub enum Command {
55    CatalogSnapshot {
56        tx: oneshot::Sender<CatalogSnapshot>,
57    },
58
59    Startup {
60        tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
61        user: User,
62        conn_id: ConnectionId,
63        client_ip: Option<IpAddr>,
64        secret_key: u32,
65        uuid: Uuid,
66        application_name: String,
67        notice_tx: mpsc::UnboundedSender<AdapterNotice>,
68    },
69
70    AuthenticatePassword {
71        tx: oneshot::Sender<Result<AuthResponse, AdapterError>>,
72        role_name: String,
73        password: Option<Password>,
74    },
75
76    Execute {
77        portal_name: String,
78        session: Session,
79        tx: oneshot::Sender<Response<ExecuteResponse>>,
80        outer_ctx_extra: Option<ExecuteContextExtra>,
81    },
82
83    /// Attempts to commit or abort the session's transaction. Guarantees that the Coordinator's
84    /// transaction state has been cleared, even if the commit or abort fails. (A failure can
85    /// happen, for example, if the session's role id has been dropped which will prevent
86    /// sequence_end_transaction from running.)
87    Commit {
88        action: EndTransactionAction,
89        session: Session,
90        tx: oneshot::Sender<Response<ExecuteResponse>>,
91    },
92
93    CancelRequest {
94        conn_id: ConnectionIdType,
95        secret_key: u32,
96    },
97
98    PrivilegedCancelRequest {
99        conn_id: ConnectionId,
100    },
101
102    GetWebhook {
103        database: String,
104        schema: String,
105        name: String,
106        tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
107    },
108
109    GetSystemVars {
110        tx: oneshot::Sender<SystemVars>,
111    },
112
113    SetSystemVars {
114        vars: BTreeMap<String, String>,
115        conn_id: ConnectionId,
116        tx: oneshot::Sender<Result<(), AdapterError>>,
117    },
118
119    Terminate {
120        conn_id: ConnectionId,
121        tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
122    },
123
124    /// Performs any cleanup and logging actions necessary for
125    /// finalizing a statement execution.
126    ///
127    /// Only used for cases that terminate in the protocol layer and
128    /// otherwise have no reason to hand control back to the coordinator.
129    /// In other cases, we piggy-back on another command.
130    RetireExecute {
131        data: ExecuteContextExtra,
132        reason: StatementEndedExecutionReason,
133    },
134
135    CheckConsistency {
136        tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
137    },
138
139    Dump {
140        tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
141    },
142}
143
144impl Command {
145    pub fn session(&self) -> Option<&Session> {
146        match self {
147            Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
148            Command::CancelRequest { .. }
149            | Command::Startup { .. }
150            | Command::AuthenticatePassword { .. }
151            | Command::CatalogSnapshot { .. }
152            | Command::PrivilegedCancelRequest { .. }
153            | Command::GetWebhook { .. }
154            | Command::Terminate { .. }
155            | Command::GetSystemVars { .. }
156            | Command::SetSystemVars { .. }
157            | Command::RetireExecute { .. }
158            | Command::CheckConsistency { .. }
159            | Command::Dump { .. } => None,
160        }
161    }
162
163    pub fn session_mut(&mut self) -> Option<&mut Session> {
164        match self {
165            Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
166            Command::CancelRequest { .. }
167            | Command::Startup { .. }
168            | Command::AuthenticatePassword { .. }
169            | Command::CatalogSnapshot { .. }
170            | Command::PrivilegedCancelRequest { .. }
171            | Command::GetWebhook { .. }
172            | Command::Terminate { .. }
173            | Command::GetSystemVars { .. }
174            | Command::SetSystemVars { .. }
175            | Command::RetireExecute { .. }
176            | Command::CheckConsistency { .. }
177            | Command::Dump { .. } => None,
178        }
179    }
180}
181
182#[derive(Debug)]
183pub struct Response<T> {
184    pub result: Result<T, AdapterError>,
185    pub session: Session,
186    pub otel_ctx: OpenTelemetryContext,
187}
188
189pub type RowsFuture = Pin<Box<dyn Future<Output = PeekResponseUnary> + Send>>;
190
191/// The response to [`Client::startup`](crate::Client::startup).
192#[derive(Derivative)]
193#[derivative(Debug)]
194pub struct StartupResponse {
195    /// RoleId for the user.
196    pub role_id: RoleId,
197    /// A future that completes when all necessary Builtin Table writes have completed.
198    #[derivative(Debug = "ignore")]
199    pub write_notify: BuiltinTableAppendNotify,
200    /// Map of (name, VarInput::Flat) tuples of session default variables that should be set.
201    pub session_defaults: BTreeMap<String, OwnedVarInput>,
202    pub catalog: Arc<Catalog>,
203}
204
205/// The response to [`Client::authenticate`](crate::Client::authenticate).
206#[derive(Derivative)]
207#[derivative(Debug)]
208pub struct AuthResponse {
209    /// RoleId for the user.
210    pub role_id: RoleId,
211    /// If the user is a superuser.
212    pub superuser: bool,
213}
214
215// Facile implementation for `StartupResponse`, which does not use the `allowed`
216// feature of `ClientTransmitter`.
217impl Transmittable for StartupResponse {
218    type Allowed = bool;
219    fn to_allowed(&self) -> Self::Allowed {
220        true
221    }
222}
223
224/// The response to [`SessionClient::dump_catalog`](crate::SessionClient::dump_catalog).
225#[derive(Debug, Clone)]
226pub struct CatalogDump(String);
227
228impl CatalogDump {
229    pub fn new(raw: String) -> Self {
230        CatalogDump(raw)
231    }
232
233    pub fn into_string(self) -> String {
234        self.0
235    }
236}
237
238impl Transmittable for CatalogDump {
239    type Allowed = bool;
240    fn to_allowed(&self) -> Self::Allowed {
241        true
242    }
243}
244
245impl Transmittable for SystemVars {
246    type Allowed = bool;
247    fn to_allowed(&self) -> Self::Allowed {
248        true
249    }
250}
251
252/// The response to [`SessionClient::execute`](crate::SessionClient::execute).
253#[derive(EnumKind, Derivative)]
254#[derivative(Debug)]
255#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
256pub enum ExecuteResponse {
257    /// The default privileges were altered.
258    AlteredDefaultPrivileges,
259    /// The requested object was altered.
260    AlteredObject(ObjectType),
261    /// The role was altered.
262    AlteredRole,
263    /// The system configuration was altered.
264    AlteredSystemConfiguration,
265    /// The requested cursor was closed.
266    ClosedCursor,
267    /// The provided comment was created.
268    Comment,
269    /// The specified number of rows were copied into the requested output.
270    Copied(usize),
271    /// The response for a COPY TO STDOUT query.
272    CopyTo {
273        format: mz_sql::plan::CopyFormat,
274        resp: Box<ExecuteResponse>,
275    },
276    CopyFrom {
277        id: CatalogItemId,
278        columns: Vec<ColumnIndex>,
279        params: CopyFormatParams<'static>,
280        ctx_extra: ExecuteContextExtra,
281    },
282    /// The requested connection was created.
283    CreatedConnection,
284    /// The requested database was created.
285    CreatedDatabase,
286    /// The requested schema was created.
287    CreatedSchema,
288    /// The requested role was created.
289    CreatedRole,
290    /// The requested cluster was created.
291    CreatedCluster,
292    /// The requested cluster replica was created.
293    CreatedClusterReplica,
294    /// The requested index was created.
295    CreatedIndex,
296    /// The requested introspection subscribe was created.
297    CreatedIntrospectionSubscribe,
298    /// The requested secret was created.
299    CreatedSecret,
300    /// The requested sink was created.
301    CreatedSink,
302    /// The requested source was created.
303    CreatedSource,
304    /// The requested table was created.
305    CreatedTable,
306    /// The requested view was created.
307    CreatedView,
308    /// The requested views were created.
309    CreatedViews,
310    /// The requested materialized view was created.
311    CreatedMaterializedView,
312    /// The requested continual task was created.
313    CreatedContinualTask,
314    /// The requested type was created.
315    CreatedType,
316    /// The requested network policy was created.
317    CreatedNetworkPolicy,
318    /// The requested prepared statement was removed.
319    Deallocate { all: bool },
320    /// The requested cursor was declared.
321    DeclaredCursor,
322    /// The specified number of rows were deleted from the requested table.
323    Deleted(usize),
324    /// The temporary objects associated with the session have been discarded.
325    DiscardedTemp,
326    /// All state associated with the session has been discarded.
327    DiscardedAll,
328    /// The requested object was dropped.
329    DroppedObject(ObjectType),
330    /// The requested objects were dropped.
331    DroppedOwned,
332    /// The provided query was empty.
333    EmptyQuery,
334    /// Fetch results from a cursor.
335    Fetch {
336        /// The name of the cursor from which to fetch results.
337        name: String,
338        /// The number of results to fetch.
339        count: Option<FetchDirection>,
340        /// How long to wait for results to arrive.
341        timeout: ExecuteTimeout,
342        ctx_extra: ExecuteContextExtra,
343    },
344    /// The requested privilege was granted.
345    GrantedPrivilege,
346    /// The requested role was granted.
347    GrantedRole,
348    /// The specified number of rows were inserted into the requested table.
349    Inserted(usize),
350    /// The specified prepared statement was created.
351    Prepare,
352    /// A user-requested warning was raised.
353    Raised,
354    /// The requested objects were reassigned.
355    ReassignOwned,
356    /// The requested privilege was revoked.
357    RevokedPrivilege,
358    /// The requested role was revoked.
359    RevokedRole,
360    /// Rows will be delivered via the specified future.
361    SendingRows {
362        #[derivative(Debug = "ignore")]
363        future: RowsFuture,
364        instance_id: ComputeInstanceId,
365        strategy: StatementExecutionStrategy,
366    },
367    /// Like `SendingRows`, but the rows are known to be available
368    /// immediately, and thus the execution is considered ended in the coordinator.
369    SendingRowsImmediate {
370        #[derivative(Debug = "ignore")]
371        rows: Box<dyn RowIterator + Send + Sync>,
372    },
373    /// The specified variable was set to a new value.
374    SetVariable {
375        name: String,
376        /// Whether the operation was a `RESET` rather than a set.
377        reset: bool,
378    },
379    /// A new transaction was started.
380    StartedTransaction,
381    /// Updates to the requested source or view will be streamed to the
382    /// contained receiver.
383    Subscribing {
384        rx: RowBatchStream,
385        ctx_extra: ExecuteContextExtra,
386        instance_id: ComputeInstanceId,
387    },
388    /// The active transaction committed.
389    TransactionCommitted {
390        /// Session parameters that changed because the transaction ended.
391        params: BTreeMap<&'static str, String>,
392    },
393    /// The active transaction rolled back.
394    TransactionRolledBack {
395        /// Session parameters that changed because the transaction ended.
396        params: BTreeMap<&'static str, String>,
397    },
398    /// The specified number of rows were updated in the requested table.
399    Updated(usize),
400    /// A connection was validated.
401    ValidatedConnection,
402}
403
404impl TryFrom<&Statement<Raw>> for ExecuteResponse {
405    type Error = ();
406
407    /// Returns Ok if this Statement always produces a single, trivial ExecuteResponse.
408    fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
409        let resp_kinds = Plan::generated_from(&stmt.into())
410            .iter()
411            .map(ExecuteResponse::generated_from)
412            .flatten()
413            .cloned()
414            .collect::<BTreeSet<ExecuteResponseKind>>();
415        let resps = resp_kinds
416            .iter()
417            .map(|r| (*r).try_into())
418            .collect::<Result<Vec<ExecuteResponse>, _>>();
419        // Check if this statement's possible plans yield exactly one possible ExecuteResponse.
420        if let Ok(resps) = resps {
421            if resps.len() == 1 {
422                return Ok(resps.into_element());
423            }
424        }
425        let resp = match stmt {
426            Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
427                ExecuteResponse::DroppedObject((*object_type).into())
428            }
429            Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
430            | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
431                ExecuteResponse::AlteredObject((*object_type).into())
432            }
433            _ => return Err(()),
434        };
435        // Ensure that if the planner ever adds possible plans we complain here.
436        soft_assert_no_log!(
437            resp_kinds.len() == 1
438                && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
439            "ExecuteResponses out of sync with planner"
440        );
441        Ok(resp)
442    }
443}
444
445impl TryInto<ExecuteResponse> for ExecuteResponseKind {
446    type Error = ();
447
448    /// Attempts to convert into an ExecuteResponse. Returns an error if not possible without
449    /// actually executing a statement.
450    fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
451        match self {
452            ExecuteResponseKind::AlteredDefaultPrivileges => {
453                Ok(ExecuteResponse::AlteredDefaultPrivileges)
454            }
455            ExecuteResponseKind::AlteredObject => Err(()),
456            ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
457            ExecuteResponseKind::AlteredSystemConfiguration => {
458                Ok(ExecuteResponse::AlteredSystemConfiguration)
459            }
460            ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
461            ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
462            ExecuteResponseKind::Copied => Err(()),
463            ExecuteResponseKind::CopyTo => Err(()),
464            ExecuteResponseKind::CopyFrom => Err(()),
465            ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
466            ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
467            ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
468            ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
469            ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
470            ExecuteResponseKind::CreatedClusterReplica => {
471                Ok(ExecuteResponse::CreatedClusterReplica)
472            }
473            ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
474            ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
475            ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
476            ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
477            ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
478            ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
479            ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
480            ExecuteResponseKind::CreatedMaterializedView => {
481                Ok(ExecuteResponse::CreatedMaterializedView)
482            }
483            ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
484            ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
485            ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
486            ExecuteResponseKind::Deallocate => Err(()),
487            ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
488            ExecuteResponseKind::Deleted => Err(()),
489            ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
490            ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
491            ExecuteResponseKind::DroppedObject => Err(()),
492            ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
493            ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
494            ExecuteResponseKind::Fetch => Err(()),
495            ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
496            ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
497            ExecuteResponseKind::Inserted => Err(()),
498            ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
499            ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
500            ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
501            ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
502            ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
503            ExecuteResponseKind::SendingRows => Err(()),
504            ExecuteResponseKind::SetVariable => Err(()),
505            ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
506            ExecuteResponseKind::Subscribing => Err(()),
507            ExecuteResponseKind::TransactionCommitted => Err(()),
508            ExecuteResponseKind::TransactionRolledBack => Err(()),
509            ExecuteResponseKind::Updated => Err(()),
510            ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
511            ExecuteResponseKind::SendingRowsImmediate => Err(()),
512            ExecuteResponseKind::CreatedIntrospectionSubscribe => {
513                Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
514            }
515        }
516    }
517}
518
519impl ExecuteResponse {
520    pub fn tag(&self) -> Option<String> {
521        use ExecuteResponse::*;
522        match self {
523            AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
524            AlteredObject(o) => Some(format!("ALTER {}", o)),
525            AlteredRole => Some("ALTER ROLE".into()),
526            AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
527            ClosedCursor => Some("CLOSE CURSOR".into()),
528            Comment => Some("COMMENT".into()),
529            Copied(n) => Some(format!("COPY {}", n)),
530            CopyTo { .. } => None,
531            CopyFrom { .. } => None,
532            CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
533            CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
534            CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
535            CreatedRole => Some("CREATE ROLE".into()),
536            CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
537            CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
538            CreatedIndex { .. } => Some("CREATE INDEX".into()),
539            CreatedSecret { .. } => Some("CREATE SECRET".into()),
540            CreatedSink { .. } => Some("CREATE SINK".into()),
541            CreatedSource { .. } => Some("CREATE SOURCE".into()),
542            CreatedTable { .. } => Some("CREATE TABLE".into()),
543            CreatedView { .. } => Some("CREATE VIEW".into()),
544            CreatedViews { .. } => Some("CREATE VIEWS".into()),
545            CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
546            CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
547            CreatedType => Some("CREATE TYPE".into()),
548            CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
549            Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
550            DeclaredCursor => Some("DECLARE CURSOR".into()),
551            Deleted(n) => Some(format!("DELETE {}", n)),
552            DiscardedTemp => Some("DISCARD TEMP".into()),
553            DiscardedAll => Some("DISCARD ALL".into()),
554            DroppedObject(o) => Some(format!("DROP {o}")),
555            DroppedOwned => Some("DROP OWNED".into()),
556            EmptyQuery => None,
557            Fetch { .. } => None,
558            GrantedPrivilege => Some("GRANT".into()),
559            GrantedRole => Some("GRANT ROLE".into()),
560            Inserted(n) => {
561                // "On successful completion, an INSERT command returns a
562                // command tag of the form `INSERT <oid> <count>`."
563                //     -- https://www.postgresql.org/docs/11/sql-insert.html
564                //
565                // OIDs are a PostgreSQL-specific historical quirk, but we
566                // can return a 0 OID to indicate that the table does not
567                // have OIDs.
568                Some(format!("INSERT 0 {}", n))
569            }
570            Prepare => Some("PREPARE".into()),
571            Raised => Some("RAISE".into()),
572            ReassignOwned => Some("REASSIGN OWNED".into()),
573            RevokedPrivilege => Some("REVOKE".into()),
574            RevokedRole => Some("REVOKE ROLE".into()),
575            SendingRows { .. } | SendingRowsImmediate { .. } => None,
576            SetVariable { reset: true, .. } => Some("RESET".into()),
577            SetVariable { reset: false, .. } => Some("SET".into()),
578            StartedTransaction { .. } => Some("BEGIN".into()),
579            Subscribing { .. } => None,
580            TransactionCommitted { .. } => Some("COMMIT".into()),
581            TransactionRolledBack { .. } => Some("ROLLBACK".into()),
582            Updated(n) => Some(format!("UPDATE {}", n)),
583            ValidatedConnection => Some("VALIDATE CONNECTION".into()),
584            CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
585        }
586    }
587
588    /// Expresses which [`PlanKind`] generate which set of [`ExecuteResponseKind`].
589    /// `ExecuteResponseKind::Canceled` could be generated at any point as well, but that is
590    /// excluded from this function.
591    pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
592        use ExecuteResponseKind::*;
593        use PlanKind::*;
594
595        match plan {
596            AbortTransaction => &[TransactionRolledBack],
597            AlterClusterRename
598            | AlterClusterSwap
599            | AlterCluster
600            | AlterClusterReplicaRename
601            | AlterOwner
602            | AlterItemRename
603            | AlterRetainHistory
604            | AlterNoop
605            | AlterSchemaRename
606            | AlterSchemaSwap
607            | AlterSecret
608            | AlterConnection
609            | AlterSource
610            | AlterSink
611            | AlterTableAddColumn
612            | AlterNetworkPolicy => &[AlteredObject],
613            AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
614            AlterSetCluster => &[AlteredObject],
615            AlterRole => &[AlteredRole],
616            AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
617                &[AlteredSystemConfiguration]
618            }
619            Close => &[ClosedCursor],
620            PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
621            PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
622            PlanKind::Comment => &[ExecuteResponseKind::Comment],
623            CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
624            CreateConnection => &[CreatedConnection],
625            CreateDatabase => &[CreatedDatabase],
626            CreateSchema => &[CreatedSchema],
627            CreateRole => &[CreatedRole],
628            CreateCluster => &[CreatedCluster],
629            CreateClusterReplica => &[CreatedClusterReplica],
630            CreateSource | CreateSources => &[CreatedSource],
631            CreateSecret => &[CreatedSecret],
632            CreateSink => &[CreatedSink],
633            CreateTable => &[CreatedTable],
634            CreateView => &[CreatedView],
635            CreateMaterializedView => &[CreatedMaterializedView],
636            CreateContinualTask => &[CreatedContinualTask],
637            CreateIndex => &[CreatedIndex],
638            CreateType => &[CreatedType],
639            PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
640            CreateNetworkPolicy => &[CreatedNetworkPolicy],
641            Declare => &[DeclaredCursor],
642            DiscardTemp => &[DiscardedTemp],
643            DiscardAll => &[DiscardedAll],
644            DropObjects => &[DroppedObject],
645            DropOwned => &[DroppedOwned],
646            PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
647            ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
648            | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
649                ExecuteResponseKind::CopyTo,
650                SendingRows,
651                SendingRowsImmediate,
652            ],
653            Execute | ReadThenWrite => &[
654                Deleted,
655                Inserted,
656                SendingRows,
657                SendingRowsImmediate,
658                Updated,
659            ],
660            PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
661            GrantPrivileges => &[GrantedPrivilege],
662            GrantRole => &[GrantedRole],
663            Insert => &[Inserted, SendingRowsImmediate],
664            PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
665            PlanKind::Raise => &[ExecuteResponseKind::Raised],
666            PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
667            RevokePrivileges => &[RevokedPrivilege],
668            RevokeRole => &[RevokedRole],
669            PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
670                &[ExecuteResponseKind::SetVariable]
671            }
672            PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
673            StartTransaction => &[StartedTransaction],
674            SideEffectingFunc => &[SendingRows, SendingRowsImmediate],
675            ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
676        }
677    }
678}
679
680/// This implementation is meant to ensure that we maintain updated information
681/// about which types of `ExecuteResponse`s are permitted to be sent, which will
682/// be a function of which plan we're executing.
683impl Transmittable for ExecuteResponse {
684    type Allowed = ExecuteResponseKind;
685    fn to_allowed(&self) -> Self::Allowed {
686        ExecuteResponseKind::from(self)
687    }
688}