mz_adapter/
command.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14
15use derivative::Derivative;
16use enum_kinds::EnumKind;
17use futures::Stream;
18use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
19use mz_auth::password::Password;
20use mz_compute_types::ComputeInstanceId;
21use mz_ore::collections::CollectionExt;
22use mz_ore::soft_assert_no_log;
23use mz_ore::tracing::OpenTelemetryContext;
24use mz_pgcopy::CopyFormatParams;
25use mz_repr::role_id::RoleId;
26use mz_repr::{CatalogItemId, ColumnIndex, RowIterator};
27use mz_sql::ast::{FetchDirection, Raw, Statement};
28use mz_sql::catalog::ObjectType;
29use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind};
30use mz_sql::session::user::User;
31use mz_sql::session::vars::{OwnedVarInput, SystemVars};
32use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
33use tokio::sync::{mpsc, oneshot};
34use uuid::Uuid;
35
36use crate::catalog::Catalog;
37use crate::coord::ExecuteContextExtra;
38use crate::coord::appends::BuiltinTableAppendNotify;
39use crate::coord::consistency::CoordinatorInconsistencies;
40use crate::coord::peek::PeekResponseUnary;
41use crate::error::AdapterError;
42use crate::session::{EndTransactionAction, RowBatchStream, Session};
43use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
44use crate::util::Transmittable;
45use crate::webhook::AppendWebhookResponse;
46use crate::{AdapterNotice, AppendWebhookError};
47
48#[derive(Debug)]
49pub struct CatalogSnapshot {
50    pub catalog: Arc<Catalog>,
51}
52
53#[derive(Debug)]
54pub enum Command {
55    CatalogSnapshot {
56        tx: oneshot::Sender<CatalogSnapshot>,
57    },
58
59    Startup {
60        tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
61        user: User,
62        conn_id: ConnectionId,
63        client_ip: Option<IpAddr>,
64        secret_key: u32,
65        uuid: Uuid,
66        application_name: String,
67        notice_tx: mpsc::UnboundedSender<AdapterNotice>,
68    },
69
70    AuthenticatePassword {
71        tx: oneshot::Sender<Result<AuthResponse, AdapterError>>,
72        role_name: String,
73        password: Option<Password>,
74    },
75
76    Execute {
77        portal_name: String,
78        session: Session,
79        tx: oneshot::Sender<Response<ExecuteResponse>>,
80        outer_ctx_extra: Option<ExecuteContextExtra>,
81    },
82
83    /// Attempts to commit or abort the session's transaction. Guarantees that the Coordinator's
84    /// transaction state has been cleared, even if the commit or abort fails. (A failure can
85    /// happen, for example, if the session's role id has been dropped which will prevent
86    /// sequence_end_transaction from running.)
87    Commit {
88        action: EndTransactionAction,
89        session: Session,
90        tx: oneshot::Sender<Response<ExecuteResponse>>,
91    },
92
93    CancelRequest {
94        conn_id: ConnectionIdType,
95        secret_key: u32,
96    },
97
98    PrivilegedCancelRequest {
99        conn_id: ConnectionId,
100    },
101
102    GetWebhook {
103        database: String,
104        schema: String,
105        name: String,
106        tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
107    },
108
109    GetSystemVars {
110        tx: oneshot::Sender<SystemVars>,
111    },
112
113    SetSystemVars {
114        vars: BTreeMap<String, String>,
115        conn_id: ConnectionId,
116        tx: oneshot::Sender<Result<(), AdapterError>>,
117    },
118
119    Terminate {
120        conn_id: ConnectionId,
121        tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
122    },
123
124    /// Performs any cleanup and logging actions necessary for
125    /// finalizing a statement execution.
126    ///
127    /// Only used for cases that terminate in the protocol layer and
128    /// otherwise have no reason to hand control back to the coordinator.
129    /// In other cases, we piggy-back on another command.
130    RetireExecute {
131        data: ExecuteContextExtra,
132        reason: StatementEndedExecutionReason,
133    },
134
135    CheckConsistency {
136        tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
137    },
138
139    Dump {
140        tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
141    },
142}
143
144impl Command {
145    pub fn session(&self) -> Option<&Session> {
146        match self {
147            Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
148            Command::CancelRequest { .. }
149            | Command::Startup { .. }
150            | Command::AuthenticatePassword { .. }
151            | Command::CatalogSnapshot { .. }
152            | Command::PrivilegedCancelRequest { .. }
153            | Command::GetWebhook { .. }
154            | Command::Terminate { .. }
155            | Command::GetSystemVars { .. }
156            | Command::SetSystemVars { .. }
157            | Command::RetireExecute { .. }
158            | Command::CheckConsistency { .. }
159            | Command::Dump { .. } => None,
160        }
161    }
162
163    pub fn session_mut(&mut self) -> Option<&mut Session> {
164        match self {
165            Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
166            Command::CancelRequest { .. }
167            | Command::Startup { .. }
168            | Command::AuthenticatePassword { .. }
169            | Command::CatalogSnapshot { .. }
170            | Command::PrivilegedCancelRequest { .. }
171            | Command::GetWebhook { .. }
172            | Command::Terminate { .. }
173            | Command::GetSystemVars { .. }
174            | Command::SetSystemVars { .. }
175            | Command::RetireExecute { .. }
176            | Command::CheckConsistency { .. }
177            | Command::Dump { .. } => None,
178        }
179    }
180}
181
182#[derive(Debug)]
183pub struct Response<T> {
184    pub result: Result<T, AdapterError>,
185    pub session: Session,
186    pub otel_ctx: OpenTelemetryContext,
187}
188
189/// The response to [`Client::startup`](crate::Client::startup).
190#[derive(Derivative)]
191#[derivative(Debug)]
192pub struct StartupResponse {
193    /// RoleId for the user.
194    pub role_id: RoleId,
195    /// A future that completes when all necessary Builtin Table writes have completed.
196    #[derivative(Debug = "ignore")]
197    pub write_notify: BuiltinTableAppendNotify,
198    /// Map of (name, VarInput::Flat) tuples of session default variables that should be set.
199    pub session_defaults: BTreeMap<String, OwnedVarInput>,
200    pub catalog: Arc<Catalog>,
201}
202
203/// The response to [`Client::authenticate`](crate::Client::authenticate).
204#[derive(Derivative)]
205#[derivative(Debug)]
206pub struct AuthResponse {
207    /// RoleId for the user.
208    pub role_id: RoleId,
209    /// If the user is a superuser.
210    pub superuser: bool,
211}
212
213// Facile implementation for `StartupResponse`, which does not use the `allowed`
214// feature of `ClientTransmitter`.
215impl Transmittable for StartupResponse {
216    type Allowed = bool;
217    fn to_allowed(&self) -> Self::Allowed {
218        true
219    }
220}
221
222/// The response to [`SessionClient::dump_catalog`](crate::SessionClient::dump_catalog).
223#[derive(Debug, Clone)]
224pub struct CatalogDump(String);
225
226impl CatalogDump {
227    pub fn new(raw: String) -> Self {
228        CatalogDump(raw)
229    }
230
231    pub fn into_string(self) -> String {
232        self.0
233    }
234}
235
236impl Transmittable for CatalogDump {
237    type Allowed = bool;
238    fn to_allowed(&self) -> Self::Allowed {
239        true
240    }
241}
242
243impl Transmittable for SystemVars {
244    type Allowed = bool;
245    fn to_allowed(&self) -> Self::Allowed {
246        true
247    }
248}
249
250/// The response to [`SessionClient::execute`](crate::SessionClient::execute).
251#[derive(EnumKind, Derivative)]
252#[derivative(Debug)]
253#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
254pub enum ExecuteResponse {
255    /// The default privileges were altered.
256    AlteredDefaultPrivileges,
257    /// The requested object was altered.
258    AlteredObject(ObjectType),
259    /// The role was altered.
260    AlteredRole,
261    /// The system configuration was altered.
262    AlteredSystemConfiguration,
263    /// The requested cursor was closed.
264    ClosedCursor,
265    /// The provided comment was created.
266    Comment,
267    /// The specified number of rows were copied into the requested output.
268    Copied(usize),
269    /// The response for a COPY TO STDOUT query.
270    CopyTo {
271        format: mz_sql::plan::CopyFormat,
272        resp: Box<ExecuteResponse>,
273    },
274    CopyFrom {
275        id: CatalogItemId,
276        columns: Vec<ColumnIndex>,
277        params: CopyFormatParams<'static>,
278        ctx_extra: ExecuteContextExtra,
279    },
280    /// The requested connection was created.
281    CreatedConnection,
282    /// The requested database was created.
283    CreatedDatabase,
284    /// The requested schema was created.
285    CreatedSchema,
286    /// The requested role was created.
287    CreatedRole,
288    /// The requested cluster was created.
289    CreatedCluster,
290    /// The requested cluster replica was created.
291    CreatedClusterReplica,
292    /// The requested index was created.
293    CreatedIndex,
294    /// The requested introspection subscribe was created.
295    CreatedIntrospectionSubscribe,
296    /// The requested secret was created.
297    CreatedSecret,
298    /// The requested sink was created.
299    CreatedSink,
300    /// The requested source was created.
301    CreatedSource,
302    /// The requested table was created.
303    CreatedTable,
304    /// The requested view was created.
305    CreatedView,
306    /// The requested views were created.
307    CreatedViews,
308    /// The requested materialized view was created.
309    CreatedMaterializedView,
310    /// The requested continual task was created.
311    CreatedContinualTask,
312    /// The requested type was created.
313    CreatedType,
314    /// The requested network policy was created.
315    CreatedNetworkPolicy,
316    /// The requested prepared statement was removed.
317    Deallocate { all: bool },
318    /// The requested cursor was declared.
319    DeclaredCursor,
320    /// The specified number of rows were deleted from the requested table.
321    Deleted(usize),
322    /// The temporary objects associated with the session have been discarded.
323    DiscardedTemp,
324    /// All state associated with the session has been discarded.
325    DiscardedAll,
326    /// The requested object was dropped.
327    DroppedObject(ObjectType),
328    /// The requested objects were dropped.
329    DroppedOwned,
330    /// The provided query was empty.
331    EmptyQuery,
332    /// Fetch results from a cursor.
333    Fetch {
334        /// The name of the cursor from which to fetch results.
335        name: String,
336        /// The number of results to fetch.
337        count: Option<FetchDirection>,
338        /// How long to wait for results to arrive.
339        timeout: ExecuteTimeout,
340        ctx_extra: ExecuteContextExtra,
341    },
342    /// The requested privilege was granted.
343    GrantedPrivilege,
344    /// The requested role was granted.
345    GrantedRole,
346    /// The specified number of rows were inserted into the requested table.
347    Inserted(usize),
348    /// The specified prepared statement was created.
349    Prepare,
350    /// A user-requested warning was raised.
351    Raised,
352    /// The requested objects were reassigned.
353    ReassignOwned,
354    /// The requested privilege was revoked.
355    RevokedPrivilege,
356    /// The requested role was revoked.
357    RevokedRole,
358    /// Rows will be delivered via the specified stream.
359    SendingRowsStreaming {
360        #[derivative(Debug = "ignore")]
361        rows: Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>,
362        instance_id: ComputeInstanceId,
363        strategy: StatementExecutionStrategy,
364    },
365    /// Rows are known to be available immediately, and thus the execution is
366    /// considered ended in the coordinator.
367    SendingRowsImmediate {
368        #[derivative(Debug = "ignore")]
369        rows: Box<dyn RowIterator + Send + Sync>,
370    },
371    /// The specified variable was set to a new value.
372    SetVariable {
373        name: String,
374        /// Whether the operation was a `RESET` rather than a set.
375        reset: bool,
376    },
377    /// A new transaction was started.
378    StartedTransaction,
379    /// Updates to the requested source or view will be streamed to the
380    /// contained receiver.
381    Subscribing {
382        rx: RowBatchStream,
383        ctx_extra: ExecuteContextExtra,
384        instance_id: ComputeInstanceId,
385    },
386    /// The active transaction committed.
387    TransactionCommitted {
388        /// Session parameters that changed because the transaction ended.
389        params: BTreeMap<&'static str, String>,
390    },
391    /// The active transaction rolled back.
392    TransactionRolledBack {
393        /// Session parameters that changed because the transaction ended.
394        params: BTreeMap<&'static str, String>,
395    },
396    /// The specified number of rows were updated in the requested table.
397    Updated(usize),
398    /// A connection was validated.
399    ValidatedConnection,
400}
401
402impl TryFrom<&Statement<Raw>> for ExecuteResponse {
403    type Error = ();
404
405    /// Returns Ok if this Statement always produces a single, trivial ExecuteResponse.
406    fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
407        let resp_kinds = Plan::generated_from(&stmt.into())
408            .iter()
409            .map(ExecuteResponse::generated_from)
410            .flatten()
411            .cloned()
412            .collect::<BTreeSet<ExecuteResponseKind>>();
413        let resps = resp_kinds
414            .iter()
415            .map(|r| (*r).try_into())
416            .collect::<Result<Vec<ExecuteResponse>, _>>();
417        // Check if this statement's possible plans yield exactly one possible ExecuteResponse.
418        if let Ok(resps) = resps {
419            if resps.len() == 1 {
420                return Ok(resps.into_element());
421            }
422        }
423        let resp = match stmt {
424            Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
425                ExecuteResponse::DroppedObject((*object_type).into())
426            }
427            Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
428            | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
429                ExecuteResponse::AlteredObject((*object_type).into())
430            }
431            _ => return Err(()),
432        };
433        // Ensure that if the planner ever adds possible plans we complain here.
434        soft_assert_no_log!(
435            resp_kinds.len() == 1
436                && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
437            "ExecuteResponses out of sync with planner"
438        );
439        Ok(resp)
440    }
441}
442
443impl TryInto<ExecuteResponse> for ExecuteResponseKind {
444    type Error = ();
445
446    /// Attempts to convert into an ExecuteResponse. Returns an error if not possible without
447    /// actually executing a statement.
448    fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
449        match self {
450            ExecuteResponseKind::AlteredDefaultPrivileges => {
451                Ok(ExecuteResponse::AlteredDefaultPrivileges)
452            }
453            ExecuteResponseKind::AlteredObject => Err(()),
454            ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
455            ExecuteResponseKind::AlteredSystemConfiguration => {
456                Ok(ExecuteResponse::AlteredSystemConfiguration)
457            }
458            ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
459            ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
460            ExecuteResponseKind::Copied => Err(()),
461            ExecuteResponseKind::CopyTo => Err(()),
462            ExecuteResponseKind::CopyFrom => Err(()),
463            ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
464            ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
465            ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
466            ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
467            ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
468            ExecuteResponseKind::CreatedClusterReplica => {
469                Ok(ExecuteResponse::CreatedClusterReplica)
470            }
471            ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
472            ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
473            ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
474            ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
475            ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
476            ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
477            ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
478            ExecuteResponseKind::CreatedMaterializedView => {
479                Ok(ExecuteResponse::CreatedMaterializedView)
480            }
481            ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
482            ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
483            ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
484            ExecuteResponseKind::Deallocate => Err(()),
485            ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
486            ExecuteResponseKind::Deleted => Err(()),
487            ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
488            ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
489            ExecuteResponseKind::DroppedObject => Err(()),
490            ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
491            ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
492            ExecuteResponseKind::Fetch => Err(()),
493            ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
494            ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
495            ExecuteResponseKind::Inserted => Err(()),
496            ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
497            ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
498            ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
499            ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
500            ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
501            ExecuteResponseKind::SetVariable => Err(()),
502            ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
503            ExecuteResponseKind::Subscribing => Err(()),
504            ExecuteResponseKind::TransactionCommitted => Err(()),
505            ExecuteResponseKind::TransactionRolledBack => Err(()),
506            ExecuteResponseKind::Updated => Err(()),
507            ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
508            ExecuteResponseKind::SendingRowsStreaming => Err(()),
509            ExecuteResponseKind::SendingRowsImmediate => Err(()),
510            ExecuteResponseKind::CreatedIntrospectionSubscribe => {
511                Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
512            }
513        }
514    }
515}
516
517impl ExecuteResponse {
518    pub fn tag(&self) -> Option<String> {
519        use ExecuteResponse::*;
520        match self {
521            AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
522            AlteredObject(o) => Some(format!("ALTER {}", o)),
523            AlteredRole => Some("ALTER ROLE".into()),
524            AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
525            ClosedCursor => Some("CLOSE CURSOR".into()),
526            Comment => Some("COMMENT".into()),
527            Copied(n) => Some(format!("COPY {}", n)),
528            CopyTo { .. } => None,
529            CopyFrom { .. } => None,
530            CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
531            CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
532            CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
533            CreatedRole => Some("CREATE ROLE".into()),
534            CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
535            CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
536            CreatedIndex { .. } => Some("CREATE INDEX".into()),
537            CreatedSecret { .. } => Some("CREATE SECRET".into()),
538            CreatedSink { .. } => Some("CREATE SINK".into()),
539            CreatedSource { .. } => Some("CREATE SOURCE".into()),
540            CreatedTable { .. } => Some("CREATE TABLE".into()),
541            CreatedView { .. } => Some("CREATE VIEW".into()),
542            CreatedViews { .. } => Some("CREATE VIEWS".into()),
543            CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
544            CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
545            CreatedType => Some("CREATE TYPE".into()),
546            CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
547            Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
548            DeclaredCursor => Some("DECLARE CURSOR".into()),
549            Deleted(n) => Some(format!("DELETE {}", n)),
550            DiscardedTemp => Some("DISCARD TEMP".into()),
551            DiscardedAll => Some("DISCARD ALL".into()),
552            DroppedObject(o) => Some(format!("DROP {o}")),
553            DroppedOwned => Some("DROP OWNED".into()),
554            EmptyQuery => None,
555            Fetch { .. } => None,
556            GrantedPrivilege => Some("GRANT".into()),
557            GrantedRole => Some("GRANT ROLE".into()),
558            Inserted(n) => {
559                // "On successful completion, an INSERT command returns a
560                // command tag of the form `INSERT <oid> <count>`."
561                //     -- https://www.postgresql.org/docs/11/sql-insert.html
562                //
563                // OIDs are a PostgreSQL-specific historical quirk, but we
564                // can return a 0 OID to indicate that the table does not
565                // have OIDs.
566                Some(format!("INSERT 0 {}", n))
567            }
568            Prepare => Some("PREPARE".into()),
569            Raised => Some("RAISE".into()),
570            ReassignOwned => Some("REASSIGN OWNED".into()),
571            RevokedPrivilege => Some("REVOKE".into()),
572            RevokedRole => Some("REVOKE ROLE".into()),
573            SendingRowsStreaming { .. } | SendingRowsImmediate { .. } => None,
574            SetVariable { reset: true, .. } => Some("RESET".into()),
575            SetVariable { reset: false, .. } => Some("SET".into()),
576            StartedTransaction { .. } => Some("BEGIN".into()),
577            Subscribing { .. } => None,
578            TransactionCommitted { .. } => Some("COMMIT".into()),
579            TransactionRolledBack { .. } => Some("ROLLBACK".into()),
580            Updated(n) => Some(format!("UPDATE {}", n)),
581            ValidatedConnection => Some("VALIDATE CONNECTION".into()),
582            CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
583        }
584    }
585
586    /// Expresses which [`PlanKind`] generate which set of [`ExecuteResponseKind`].
587    /// `ExecuteResponseKind::Canceled` could be generated at any point as well, but that is
588    /// excluded from this function.
589    pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
590        use ExecuteResponseKind::*;
591        use PlanKind::*;
592
593        match plan {
594            AbortTransaction => &[TransactionRolledBack],
595            AlterClusterRename
596            | AlterClusterSwap
597            | AlterCluster
598            | AlterClusterReplicaRename
599            | AlterOwner
600            | AlterItemRename
601            | AlterRetainHistory
602            | AlterNoop
603            | AlterSchemaRename
604            | AlterSchemaSwap
605            | AlterSecret
606            | AlterConnection
607            | AlterSource
608            | AlterSink
609            | AlterTableAddColumn
610            | AlterNetworkPolicy => &[AlteredObject],
611            AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
612            AlterSetCluster => &[AlteredObject],
613            AlterRole => &[AlteredRole],
614            AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
615                &[AlteredSystemConfiguration]
616            }
617            Close => &[ClosedCursor],
618            PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
619            PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
620            PlanKind::Comment => &[ExecuteResponseKind::Comment],
621            CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
622            CreateConnection => &[CreatedConnection],
623            CreateDatabase => &[CreatedDatabase],
624            CreateSchema => &[CreatedSchema],
625            CreateRole => &[CreatedRole],
626            CreateCluster => &[CreatedCluster],
627            CreateClusterReplica => &[CreatedClusterReplica],
628            CreateSource | CreateSources => &[CreatedSource],
629            CreateSecret => &[CreatedSecret],
630            CreateSink => &[CreatedSink],
631            CreateTable => &[CreatedTable],
632            CreateView => &[CreatedView],
633            CreateMaterializedView => &[CreatedMaterializedView],
634            CreateContinualTask => &[CreatedContinualTask],
635            CreateIndex => &[CreatedIndex],
636            CreateType => &[CreatedType],
637            PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
638            CreateNetworkPolicy => &[CreatedNetworkPolicy],
639            Declare => &[DeclaredCursor],
640            DiscardTemp => &[DiscardedTemp],
641            DiscardAll => &[DiscardedAll],
642            DropObjects => &[DroppedObject],
643            DropOwned => &[DroppedOwned],
644            PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
645            ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
646            | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
647                ExecuteResponseKind::CopyTo,
648                SendingRowsStreaming,
649                SendingRowsImmediate,
650            ],
651            Execute | ReadThenWrite => &[
652                Deleted,
653                Inserted,
654                SendingRowsStreaming,
655                SendingRowsImmediate,
656                Updated,
657            ],
658            PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
659            GrantPrivileges => &[GrantedPrivilege],
660            GrantRole => &[GrantedRole],
661            Insert => &[Inserted, SendingRowsImmediate],
662            PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
663            PlanKind::Raise => &[ExecuteResponseKind::Raised],
664            PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
665            RevokePrivileges => &[RevokedPrivilege],
666            RevokeRole => &[RevokedRole],
667            PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
668                &[ExecuteResponseKind::SetVariable]
669            }
670            PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
671            StartTransaction => &[StartedTransaction],
672            SideEffectingFunc => &[SendingRowsStreaming, SendingRowsImmediate],
673            ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
674        }
675    }
676}
677
678/// This implementation is meant to ensure that we maintain updated information
679/// about which types of `ExecuteResponse`s are permitted to be sent, which will
680/// be a function of which plan we're executing.
681impl Transmittable for ExecuteResponse {
682    type Allowed = ExecuteResponseKind;
683    fn to_allowed(&self) -> Self::Allowed {
684        ExecuteResponseKind::from(self)
685    }
686}