mz_adapter/
command.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14
15use derivative::Derivative;
16use enum_kinds::EnumKind;
17use futures::Stream;
18use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
19use mz_auth::password::Password;
20use mz_compute_types::ComputeInstanceId;
21use mz_ore::collections::CollectionExt;
22use mz_ore::soft_assert_no_log;
23use mz_ore::tracing::OpenTelemetryContext;
24use mz_pgcopy::CopyFormatParams;
25use mz_repr::role_id::RoleId;
26use mz_repr::{CatalogItemId, ColumnIndex, RowIterator};
27use mz_sql::ast::{FetchDirection, Raw, Statement};
28use mz_sql::catalog::ObjectType;
29use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind};
30use mz_sql::session::user::User;
31use mz_sql::session::vars::{OwnedVarInput, SystemVars};
32use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
33use tokio::sync::{mpsc, oneshot};
34use uuid::Uuid;
35
36use crate::catalog::Catalog;
37use crate::coord::ExecuteContextExtra;
38use crate::coord::appends::BuiltinTableAppendNotify;
39use crate::coord::consistency::CoordinatorInconsistencies;
40use crate::coord::peek::PeekResponseUnary;
41use crate::error::AdapterError;
42use crate::session::{EndTransactionAction, RowBatchStream, Session};
43use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
44use crate::util::Transmittable;
45use crate::webhook::AppendWebhookResponse;
46use crate::{AdapterNotice, AppendWebhookError};
47
48#[derive(Debug)]
49pub struct CatalogSnapshot {
50    pub catalog: Arc<Catalog>,
51}
52
53#[derive(Debug)]
54pub enum Command {
55    CatalogSnapshot {
56        tx: oneshot::Sender<CatalogSnapshot>,
57    },
58
59    Startup {
60        tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
61        user: User,
62        conn_id: ConnectionId,
63        client_ip: Option<IpAddr>,
64        secret_key: u32,
65        uuid: Uuid,
66        application_name: String,
67        notice_tx: mpsc::UnboundedSender<AdapterNotice>,
68    },
69
70    AuthenticatePassword {
71        tx: oneshot::Sender<Result<AuthResponse, AdapterError>>,
72        role_name: String,
73        password: Option<Password>,
74    },
75
76    AuthenticateGetSASLChallenge {
77        tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
78        role_name: String,
79        nonce: String,
80    },
81
82    AuthenticateVerifySASLProof {
83        tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
84        role_name: String,
85        proof: String,
86        auth_message: String,
87        mock_hash: String,
88    },
89
90    Execute {
91        portal_name: String,
92        session: Session,
93        tx: oneshot::Sender<Response<ExecuteResponse>>,
94        outer_ctx_extra: Option<ExecuteContextExtra>,
95    },
96
97    /// Attempts to commit or abort the session's transaction. Guarantees that the Coordinator's
98    /// transaction state has been cleared, even if the commit or abort fails. (A failure can
99    /// happen, for example, if the session's role id has been dropped which will prevent
100    /// sequence_end_transaction from running.)
101    Commit {
102        action: EndTransactionAction,
103        session: Session,
104        tx: oneshot::Sender<Response<ExecuteResponse>>,
105    },
106
107    CancelRequest {
108        conn_id: ConnectionIdType,
109        secret_key: u32,
110    },
111
112    PrivilegedCancelRequest {
113        conn_id: ConnectionId,
114    },
115
116    GetWebhook {
117        database: String,
118        schema: String,
119        name: String,
120        tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
121    },
122
123    GetSystemVars {
124        tx: oneshot::Sender<SystemVars>,
125    },
126
127    SetSystemVars {
128        vars: BTreeMap<String, String>,
129        conn_id: ConnectionId,
130        tx: oneshot::Sender<Result<(), AdapterError>>,
131    },
132
133    Terminate {
134        conn_id: ConnectionId,
135        tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
136    },
137
138    /// Performs any cleanup and logging actions necessary for
139    /// finalizing a statement execution.
140    ///
141    /// Only used for cases that terminate in the protocol layer and
142    /// otherwise have no reason to hand control back to the coordinator.
143    /// In other cases, we piggy-back on another command.
144    RetireExecute {
145        data: ExecuteContextExtra,
146        reason: StatementEndedExecutionReason,
147    },
148
149    CheckConsistency {
150        tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
151    },
152
153    Dump {
154        tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
155    },
156}
157
158impl Command {
159    pub fn session(&self) -> Option<&Session> {
160        match self {
161            Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
162            Command::CancelRequest { .. }
163            | Command::Startup { .. }
164            | Command::AuthenticatePassword { .. }
165            | Command::AuthenticateGetSASLChallenge { .. }
166            | Command::AuthenticateVerifySASLProof { .. }
167            | Command::CatalogSnapshot { .. }
168            | Command::PrivilegedCancelRequest { .. }
169            | Command::GetWebhook { .. }
170            | Command::Terminate { .. }
171            | Command::GetSystemVars { .. }
172            | Command::SetSystemVars { .. }
173            | Command::RetireExecute { .. }
174            | Command::CheckConsistency { .. }
175            | Command::Dump { .. } => None,
176        }
177    }
178
179    pub fn session_mut(&mut self) -> Option<&mut Session> {
180        match self {
181            Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
182            Command::CancelRequest { .. }
183            | Command::Startup { .. }
184            | Command::AuthenticatePassword { .. }
185            | Command::AuthenticateGetSASLChallenge { .. }
186            | Command::AuthenticateVerifySASLProof { .. }
187            | Command::CatalogSnapshot { .. }
188            | Command::PrivilegedCancelRequest { .. }
189            | Command::GetWebhook { .. }
190            | Command::Terminate { .. }
191            | Command::GetSystemVars { .. }
192            | Command::SetSystemVars { .. }
193            | Command::RetireExecute { .. }
194            | Command::CheckConsistency { .. }
195            | Command::Dump { .. } => None,
196        }
197    }
198}
199
200#[derive(Debug)]
201pub struct Response<T> {
202    pub result: Result<T, AdapterError>,
203    pub session: Session,
204    pub otel_ctx: OpenTelemetryContext,
205}
206
207/// The response to [`Client::startup`](crate::Client::startup).
208#[derive(Derivative)]
209#[derivative(Debug)]
210pub struct StartupResponse {
211    /// RoleId for the user.
212    pub role_id: RoleId,
213    /// A future that completes when all necessary Builtin Table writes have completed.
214    #[derivative(Debug = "ignore")]
215    pub write_notify: BuiltinTableAppendNotify,
216    /// Map of (name, VarInput::Flat) tuples of session default variables that should be set.
217    pub session_defaults: BTreeMap<String, OwnedVarInput>,
218    pub catalog: Arc<Catalog>,
219}
220
221/// The response to [`Client::authenticate`](crate::Client::authenticate).
222#[derive(Derivative)]
223#[derivative(Debug)]
224pub struct AuthResponse {
225    /// RoleId for the user.
226    pub role_id: RoleId,
227    /// If the user is a superuser.
228    pub superuser: bool,
229}
230
231#[derive(Derivative)]
232#[derivative(Debug)]
233pub struct SASLChallengeResponse {
234    pub iteration_count: usize,
235    /// Base64-encoded salt for the SASL challenge.
236    pub salt: String,
237    pub nonce: String,
238}
239
240#[derive(Derivative)]
241#[derivative(Debug)]
242pub struct SASLVerifyProofResponse {
243    pub verifier: String,
244    pub auth_resp: AuthResponse,
245}
246
247// Facile implementation for `StartupResponse`, which does not use the `allowed`
248// feature of `ClientTransmitter`.
249impl Transmittable for StartupResponse {
250    type Allowed = bool;
251    fn to_allowed(&self) -> Self::Allowed {
252        true
253    }
254}
255
256/// The response to [`SessionClient::dump_catalog`](crate::SessionClient::dump_catalog).
257#[derive(Debug, Clone)]
258pub struct CatalogDump(String);
259
260impl CatalogDump {
261    pub fn new(raw: String) -> Self {
262        CatalogDump(raw)
263    }
264
265    pub fn into_string(self) -> String {
266        self.0
267    }
268}
269
270impl Transmittable for CatalogDump {
271    type Allowed = bool;
272    fn to_allowed(&self) -> Self::Allowed {
273        true
274    }
275}
276
277impl Transmittable for SystemVars {
278    type Allowed = bool;
279    fn to_allowed(&self) -> Self::Allowed {
280        true
281    }
282}
283
284/// The response to [`SessionClient::execute`](crate::SessionClient::execute).
285#[derive(EnumKind, Derivative)]
286#[derivative(Debug)]
287#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
288pub enum ExecuteResponse {
289    /// The default privileges were altered.
290    AlteredDefaultPrivileges,
291    /// The requested object was altered.
292    AlteredObject(ObjectType),
293    /// The role was altered.
294    AlteredRole,
295    /// The system configuration was altered.
296    AlteredSystemConfiguration,
297    /// The requested cursor was closed.
298    ClosedCursor,
299    /// The provided comment was created.
300    Comment,
301    /// The specified number of rows were copied into the requested output.
302    Copied(usize),
303    /// The response for a COPY TO STDOUT query.
304    CopyTo {
305        format: mz_sql::plan::CopyFormat,
306        resp: Box<ExecuteResponse>,
307    },
308    CopyFrom {
309        /// Table we're copying into.
310        target_id: CatalogItemId,
311        /// Human-readable full name of the target table.
312        target_name: String,
313        columns: Vec<ColumnIndex>,
314        params: CopyFormatParams<'static>,
315        ctx_extra: ExecuteContextExtra,
316    },
317    /// The requested connection was created.
318    CreatedConnection,
319    /// The requested database was created.
320    CreatedDatabase,
321    /// The requested schema was created.
322    CreatedSchema,
323    /// The requested role was created.
324    CreatedRole,
325    /// The requested cluster was created.
326    CreatedCluster,
327    /// The requested cluster replica was created.
328    CreatedClusterReplica,
329    /// The requested index was created.
330    CreatedIndex,
331    /// The requested introspection subscribe was created.
332    CreatedIntrospectionSubscribe,
333    /// The requested secret was created.
334    CreatedSecret,
335    /// The requested sink was created.
336    CreatedSink,
337    /// The requested source was created.
338    CreatedSource,
339    /// The requested table was created.
340    CreatedTable,
341    /// The requested view was created.
342    CreatedView,
343    /// The requested views were created.
344    CreatedViews,
345    /// The requested materialized view was created.
346    CreatedMaterializedView,
347    /// The requested continual task was created.
348    CreatedContinualTask,
349    /// The requested type was created.
350    CreatedType,
351    /// The requested network policy was created.
352    CreatedNetworkPolicy,
353    /// The requested prepared statement was removed.
354    Deallocate { all: bool },
355    /// The requested cursor was declared.
356    DeclaredCursor,
357    /// The specified number of rows were deleted from the requested table.
358    Deleted(usize),
359    /// The temporary objects associated with the session have been discarded.
360    DiscardedTemp,
361    /// All state associated with the session has been discarded.
362    DiscardedAll,
363    /// The requested object was dropped.
364    DroppedObject(ObjectType),
365    /// The requested objects were dropped.
366    DroppedOwned,
367    /// The provided query was empty.
368    EmptyQuery,
369    /// Fetch results from a cursor.
370    Fetch {
371        /// The name of the cursor from which to fetch results.
372        name: String,
373        /// The number of results to fetch.
374        count: Option<FetchDirection>,
375        /// How long to wait for results to arrive.
376        timeout: ExecuteTimeout,
377        ctx_extra: ExecuteContextExtra,
378    },
379    /// The requested privilege was granted.
380    GrantedPrivilege,
381    /// The requested role was granted.
382    GrantedRole,
383    /// The specified number of rows were inserted into the requested table.
384    Inserted(usize),
385    /// The specified prepared statement was created.
386    Prepare,
387    /// A user-requested warning was raised.
388    Raised,
389    /// The requested objects were reassigned.
390    ReassignOwned,
391    /// The requested privilege was revoked.
392    RevokedPrivilege,
393    /// The requested role was revoked.
394    RevokedRole,
395    /// Rows will be delivered via the specified stream.
396    SendingRowsStreaming {
397        #[derivative(Debug = "ignore")]
398        rows: Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>,
399        instance_id: ComputeInstanceId,
400        strategy: StatementExecutionStrategy,
401    },
402    /// Rows are known to be available immediately, and thus the execution is
403    /// considered ended in the coordinator.
404    SendingRowsImmediate {
405        #[derivative(Debug = "ignore")]
406        rows: Box<dyn RowIterator + Send + Sync>,
407    },
408    /// The specified variable was set to a new value.
409    SetVariable {
410        name: String,
411        /// Whether the operation was a `RESET` rather than a set.
412        reset: bool,
413    },
414    /// A new transaction was started.
415    StartedTransaction,
416    /// Updates to the requested source or view will be streamed to the
417    /// contained receiver.
418    Subscribing {
419        rx: RowBatchStream,
420        ctx_extra: ExecuteContextExtra,
421        instance_id: ComputeInstanceId,
422    },
423    /// The active transaction committed.
424    TransactionCommitted {
425        /// Session parameters that changed because the transaction ended.
426        params: BTreeMap<&'static str, String>,
427    },
428    /// The active transaction rolled back.
429    TransactionRolledBack {
430        /// Session parameters that changed because the transaction ended.
431        params: BTreeMap<&'static str, String>,
432    },
433    /// The specified number of rows were updated in the requested table.
434    Updated(usize),
435    /// A connection was validated.
436    ValidatedConnection,
437}
438
439impl TryFrom<&Statement<Raw>> for ExecuteResponse {
440    type Error = ();
441
442    /// Returns Ok if this Statement always produces a single, trivial ExecuteResponse.
443    fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
444        let resp_kinds = Plan::generated_from(&stmt.into())
445            .iter()
446            .map(ExecuteResponse::generated_from)
447            .flatten()
448            .cloned()
449            .collect::<BTreeSet<ExecuteResponseKind>>();
450        let resps = resp_kinds
451            .iter()
452            .map(|r| (*r).try_into())
453            .collect::<Result<Vec<ExecuteResponse>, _>>();
454        // Check if this statement's possible plans yield exactly one possible ExecuteResponse.
455        if let Ok(resps) = resps {
456            if resps.len() == 1 {
457                return Ok(resps.into_element());
458            }
459        }
460        let resp = match stmt {
461            Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
462                ExecuteResponse::DroppedObject((*object_type).into())
463            }
464            Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
465            | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
466                ExecuteResponse::AlteredObject((*object_type).into())
467            }
468            _ => return Err(()),
469        };
470        // Ensure that if the planner ever adds possible plans we complain here.
471        soft_assert_no_log!(
472            resp_kinds.len() == 1
473                && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
474            "ExecuteResponses out of sync with planner"
475        );
476        Ok(resp)
477    }
478}
479
480impl TryInto<ExecuteResponse> for ExecuteResponseKind {
481    type Error = ();
482
483    /// Attempts to convert into an ExecuteResponse. Returns an error if not possible without
484    /// actually executing a statement.
485    fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
486        match self {
487            ExecuteResponseKind::AlteredDefaultPrivileges => {
488                Ok(ExecuteResponse::AlteredDefaultPrivileges)
489            }
490            ExecuteResponseKind::AlteredObject => Err(()),
491            ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
492            ExecuteResponseKind::AlteredSystemConfiguration => {
493                Ok(ExecuteResponse::AlteredSystemConfiguration)
494            }
495            ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
496            ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
497            ExecuteResponseKind::Copied => Err(()),
498            ExecuteResponseKind::CopyTo => Err(()),
499            ExecuteResponseKind::CopyFrom => Err(()),
500            ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
501            ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
502            ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
503            ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
504            ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
505            ExecuteResponseKind::CreatedClusterReplica => {
506                Ok(ExecuteResponse::CreatedClusterReplica)
507            }
508            ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
509            ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
510            ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
511            ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
512            ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
513            ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
514            ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
515            ExecuteResponseKind::CreatedMaterializedView => {
516                Ok(ExecuteResponse::CreatedMaterializedView)
517            }
518            ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
519            ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
520            ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
521            ExecuteResponseKind::Deallocate => Err(()),
522            ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
523            ExecuteResponseKind::Deleted => Err(()),
524            ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
525            ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
526            ExecuteResponseKind::DroppedObject => Err(()),
527            ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
528            ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
529            ExecuteResponseKind::Fetch => Err(()),
530            ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
531            ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
532            ExecuteResponseKind::Inserted => Err(()),
533            ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
534            ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
535            ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
536            ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
537            ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
538            ExecuteResponseKind::SetVariable => Err(()),
539            ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
540            ExecuteResponseKind::Subscribing => Err(()),
541            ExecuteResponseKind::TransactionCommitted => Err(()),
542            ExecuteResponseKind::TransactionRolledBack => Err(()),
543            ExecuteResponseKind::Updated => Err(()),
544            ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
545            ExecuteResponseKind::SendingRowsStreaming => Err(()),
546            ExecuteResponseKind::SendingRowsImmediate => Err(()),
547            ExecuteResponseKind::CreatedIntrospectionSubscribe => {
548                Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
549            }
550        }
551    }
552}
553
554impl ExecuteResponse {
555    pub fn tag(&self) -> Option<String> {
556        use ExecuteResponse::*;
557        match self {
558            AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
559            AlteredObject(o) => Some(format!("ALTER {}", o)),
560            AlteredRole => Some("ALTER ROLE".into()),
561            AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
562            ClosedCursor => Some("CLOSE CURSOR".into()),
563            Comment => Some("COMMENT".into()),
564            Copied(n) => Some(format!("COPY {}", n)),
565            CopyTo { .. } => None,
566            CopyFrom { .. } => None,
567            CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
568            CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
569            CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
570            CreatedRole => Some("CREATE ROLE".into()),
571            CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
572            CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
573            CreatedIndex { .. } => Some("CREATE INDEX".into()),
574            CreatedSecret { .. } => Some("CREATE SECRET".into()),
575            CreatedSink { .. } => Some("CREATE SINK".into()),
576            CreatedSource { .. } => Some("CREATE SOURCE".into()),
577            CreatedTable { .. } => Some("CREATE TABLE".into()),
578            CreatedView { .. } => Some("CREATE VIEW".into()),
579            CreatedViews { .. } => Some("CREATE VIEWS".into()),
580            CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
581            CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
582            CreatedType => Some("CREATE TYPE".into()),
583            CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
584            Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
585            DeclaredCursor => Some("DECLARE CURSOR".into()),
586            Deleted(n) => Some(format!("DELETE {}", n)),
587            DiscardedTemp => Some("DISCARD TEMP".into()),
588            DiscardedAll => Some("DISCARD ALL".into()),
589            DroppedObject(o) => Some(format!("DROP {o}")),
590            DroppedOwned => Some("DROP OWNED".into()),
591            EmptyQuery => None,
592            Fetch { .. } => None,
593            GrantedPrivilege => Some("GRANT".into()),
594            GrantedRole => Some("GRANT ROLE".into()),
595            Inserted(n) => {
596                // "On successful completion, an INSERT command returns a
597                // command tag of the form `INSERT <oid> <count>`."
598                //     -- https://www.postgresql.org/docs/11/sql-insert.html
599                //
600                // OIDs are a PostgreSQL-specific historical quirk, but we
601                // can return a 0 OID to indicate that the table does not
602                // have OIDs.
603                Some(format!("INSERT 0 {}", n))
604            }
605            Prepare => Some("PREPARE".into()),
606            Raised => Some("RAISE".into()),
607            ReassignOwned => Some("REASSIGN OWNED".into()),
608            RevokedPrivilege => Some("REVOKE".into()),
609            RevokedRole => Some("REVOKE ROLE".into()),
610            SendingRowsStreaming { .. } | SendingRowsImmediate { .. } => None,
611            SetVariable { reset: true, .. } => Some("RESET".into()),
612            SetVariable { reset: false, .. } => Some("SET".into()),
613            StartedTransaction { .. } => Some("BEGIN".into()),
614            Subscribing { .. } => None,
615            TransactionCommitted { .. } => Some("COMMIT".into()),
616            TransactionRolledBack { .. } => Some("ROLLBACK".into()),
617            Updated(n) => Some(format!("UPDATE {}", n)),
618            ValidatedConnection => Some("VALIDATE CONNECTION".into()),
619            CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
620        }
621    }
622
623    /// Expresses which [`PlanKind`] generate which set of [`ExecuteResponseKind`].
624    /// `ExecuteResponseKind::Canceled` could be generated at any point as well, but that is
625    /// excluded from this function.
626    pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
627        use ExecuteResponseKind::*;
628        use PlanKind::*;
629
630        match plan {
631            AbortTransaction => &[TransactionRolledBack],
632            AlterClusterRename
633            | AlterClusterSwap
634            | AlterCluster
635            | AlterClusterReplicaRename
636            | AlterOwner
637            | AlterItemRename
638            | AlterRetainHistory
639            | AlterNoop
640            | AlterSchemaRename
641            | AlterSchemaSwap
642            | AlterSecret
643            | AlterConnection
644            | AlterSource
645            | AlterSink
646            | AlterTableAddColumn
647            | AlterNetworkPolicy => &[AlteredObject],
648            AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
649            AlterSetCluster => &[AlteredObject],
650            AlterRole => &[AlteredRole],
651            AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
652                &[AlteredSystemConfiguration]
653            }
654            Close => &[ClosedCursor],
655            PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
656            PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
657            PlanKind::Comment => &[ExecuteResponseKind::Comment],
658            CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
659            CreateConnection => &[CreatedConnection],
660            CreateDatabase => &[CreatedDatabase],
661            CreateSchema => &[CreatedSchema],
662            CreateRole => &[CreatedRole],
663            CreateCluster => &[CreatedCluster],
664            CreateClusterReplica => &[CreatedClusterReplica],
665            CreateSource | CreateSources => &[CreatedSource],
666            CreateSecret => &[CreatedSecret],
667            CreateSink => &[CreatedSink],
668            CreateTable => &[CreatedTable],
669            CreateView => &[CreatedView],
670            CreateMaterializedView => &[CreatedMaterializedView],
671            CreateContinualTask => &[CreatedContinualTask],
672            CreateIndex => &[CreatedIndex],
673            CreateType => &[CreatedType],
674            PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
675            CreateNetworkPolicy => &[CreatedNetworkPolicy],
676            Declare => &[DeclaredCursor],
677            DiscardTemp => &[DiscardedTemp],
678            DiscardAll => &[DiscardedAll],
679            DropObjects => &[DroppedObject],
680            DropOwned => &[DroppedOwned],
681            PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
682            ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
683            | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
684                ExecuteResponseKind::CopyTo,
685                SendingRowsStreaming,
686                SendingRowsImmediate,
687            ],
688            Execute | ReadThenWrite => &[
689                Deleted,
690                Inserted,
691                SendingRowsStreaming,
692                SendingRowsImmediate,
693                Updated,
694            ],
695            PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
696            GrantPrivileges => &[GrantedPrivilege],
697            GrantRole => &[GrantedRole],
698            Insert => &[Inserted, SendingRowsImmediate],
699            PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
700            PlanKind::Raise => &[ExecuteResponseKind::Raised],
701            PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
702            RevokePrivileges => &[RevokedPrivilege],
703            RevokeRole => &[RevokedRole],
704            PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
705                &[ExecuteResponseKind::SetVariable]
706            }
707            PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
708            StartTransaction => &[StartedTransaction],
709            SideEffectingFunc => &[SendingRowsStreaming, SendingRowsImmediate],
710            ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
711        }
712    }
713}
714
715/// This implementation is meant to ensure that we maintain updated information
716/// about which types of `ExecuteResponse`s are permitted to be sent, which will
717/// be a function of which plan we're executing.
718impl Transmittable for ExecuteResponse {
719    type Allowed = ExecuteResponseKind;
720    fn to_allowed(&self) -> Self::Allowed {
721        ExecuteResponseKind::from(self)
722    }
723}