mz_adapter/
command.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use derivative::Derivative;
17use enum_kinds::EnumKind;
18use futures::Stream;
19use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
20use mz_auth::password::Password;
21use mz_cluster_client::ReplicaId;
22use mz_compute_types::ComputeInstanceId;
23use mz_compute_types::dataflows::DataflowDescription;
24use mz_expr::RowSetFinishing;
25use mz_ore::collections::CollectionExt;
26use mz_ore::soft_assert_no_log;
27use mz_ore::tracing::OpenTelemetryContext;
28use mz_persist_client::PersistClient;
29use mz_pgcopy::CopyFormatParams;
30use mz_repr::global_id::TransientIdGen;
31use mz_repr::role_id::RoleId;
32use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType};
33use mz_sql::ast::{FetchDirection, Raw, Statement};
34use mz_sql::catalog::ObjectType;
35use mz_sql::optimizer_metrics::OptimizerMetrics;
36use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind};
37use mz_sql::session::user::User;
38use mz_sql::session::vars::{OwnedVarInput, SystemVars};
39use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
40use mz_storage_types::sources::Timeline;
41use mz_timestamp_oracle::TimestampOracle;
42use tokio::sync::{mpsc, oneshot};
43use uuid::Uuid;
44
45use crate::catalog::Catalog;
46use crate::coord::ExecuteContextExtra;
47use crate::coord::appends::BuiltinTableAppendNotify;
48use crate::coord::consistency::CoordinatorInconsistencies;
49use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
50use crate::coord::timestamp_selection::TimestampDetermination;
51use crate::error::AdapterError;
52use crate::session::{EndTransactionAction, RowBatchStream, Session};
53use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
54use crate::util::Transmittable;
55use crate::webhook::AppendWebhookResponse;
56use crate::{AdapterNotice, AppendWebhookError, ReadHolds};
57
58#[derive(Debug)]
59pub struct CatalogSnapshot {
60    pub catalog: Arc<Catalog>,
61}
62
63#[derive(Debug)]
64pub enum Command {
65    CatalogSnapshot {
66        tx: oneshot::Sender<CatalogSnapshot>,
67    },
68
69    Startup {
70        tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
71        user: User,
72        conn_id: ConnectionId,
73        client_ip: Option<IpAddr>,
74        secret_key: u32,
75        uuid: Uuid,
76        application_name: String,
77        notice_tx: mpsc::UnboundedSender<AdapterNotice>,
78    },
79
80    AuthenticatePassword {
81        tx: oneshot::Sender<Result<AuthResponse, AdapterError>>,
82        role_name: String,
83        password: Option<Password>,
84    },
85
86    AuthenticateGetSASLChallenge {
87        tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
88        role_name: String,
89        nonce: String,
90    },
91
92    AuthenticateVerifySASLProof {
93        tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
94        role_name: String,
95        proof: String,
96        auth_message: String,
97        mock_hash: String,
98    },
99
100    Execute {
101        portal_name: String,
102        session: Session,
103        tx: oneshot::Sender<Response<ExecuteResponse>>,
104        outer_ctx_extra: Option<ExecuteContextExtra>,
105    },
106
107    /// Attempts to commit or abort the session's transaction. Guarantees that the Coordinator's
108    /// transaction state has been cleared, even if the commit or abort fails. (A failure can
109    /// happen, for example, if the session's role id has been dropped which will prevent
110    /// sequence_end_transaction from running.)
111    Commit {
112        action: EndTransactionAction,
113        session: Session,
114        tx: oneshot::Sender<Response<ExecuteResponse>>,
115    },
116
117    CancelRequest {
118        conn_id: ConnectionIdType,
119        secret_key: u32,
120    },
121
122    PrivilegedCancelRequest {
123        conn_id: ConnectionId,
124    },
125
126    GetWebhook {
127        database: String,
128        schema: String,
129        name: String,
130        tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
131    },
132
133    GetSystemVars {
134        tx: oneshot::Sender<SystemVars>,
135    },
136
137    SetSystemVars {
138        vars: BTreeMap<String, String>,
139        conn_id: ConnectionId,
140        tx: oneshot::Sender<Result<(), AdapterError>>,
141    },
142
143    Terminate {
144        conn_id: ConnectionId,
145        tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
146    },
147
148    /// Performs any cleanup and logging actions necessary for
149    /// finalizing a statement execution.
150    ///
151    /// Only used for cases that terminate in the protocol layer and
152    /// otherwise have no reason to hand control back to the coordinator.
153    /// In other cases, we piggy-back on another command.
154    RetireExecute {
155        data: ExecuteContextExtra,
156        reason: StatementEndedExecutionReason,
157    },
158
159    CheckConsistency {
160        tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
161    },
162
163    Dump {
164        tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
165    },
166
167    GetComputeInstanceClient {
168        instance_id: ComputeInstanceId,
169        tx: oneshot::Sender<
170            Result<
171                mz_compute_client::controller::instance::Client<mz_repr::Timestamp>,
172                mz_compute_client::controller::error::InstanceMissing,
173            >,
174        >,
175    },
176
177    GetOracle {
178        timeline: Timeline,
179        tx: oneshot::Sender<
180            Result<Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync>, AdapterError>,
181        >,
182    },
183
184    DetermineRealTimeRecentTimestamp {
185        source_ids: BTreeSet<GlobalId>,
186        real_time_recency_timeout: Duration,
187        tx: oneshot::Sender<Result<Option<mz_repr::Timestamp>, AdapterError>>,
188    },
189
190    GetTransactionReadHoldsBundle {
191        conn_id: ConnectionId,
192        tx: oneshot::Sender<Option<ReadHolds<mz_repr::Timestamp>>>,
193    },
194
195    /// _Merges_ the given read holds into the given connection's stored transaction read holds.
196    StoreTransactionReadHolds {
197        conn_id: ConnectionId,
198        read_holds: ReadHolds<mz_repr::Timestamp>,
199        tx: oneshot::Sender<()>,
200    },
201
202    ExecuteSlowPathPeek {
203        dataflow_plan: Box<PeekDataflowPlan<mz_repr::Timestamp>>,
204        determination: TimestampDetermination<mz_repr::Timestamp>,
205        finishing: RowSetFinishing,
206        compute_instance: ComputeInstanceId,
207        target_replica: Option<ReplicaId>,
208        intermediate_result_type: SqlRelationType,
209        source_ids: BTreeSet<GlobalId>,
210        conn_id: ConnectionId,
211        max_result_size: u64,
212        max_query_result_size: Option<u64>,
213        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
214    },
215
216    ExecuteCopyTo {
217        df_desc: Box<DataflowDescription<mz_compute_types::plan::Plan>>,
218        compute_instance: ComputeInstanceId,
219        target_replica: Option<ReplicaId>,
220        source_ids: BTreeSet<GlobalId>,
221        conn_id: ConnectionId,
222        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
223    },
224}
225
226impl Command {
227    pub fn session(&self) -> Option<&Session> {
228        match self {
229            Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
230            Command::CancelRequest { .. }
231            | Command::Startup { .. }
232            | Command::AuthenticatePassword { .. }
233            | Command::AuthenticateGetSASLChallenge { .. }
234            | Command::AuthenticateVerifySASLProof { .. }
235            | Command::CatalogSnapshot { .. }
236            | Command::PrivilegedCancelRequest { .. }
237            | Command::GetWebhook { .. }
238            | Command::Terminate { .. }
239            | Command::GetSystemVars { .. }
240            | Command::SetSystemVars { .. }
241            | Command::RetireExecute { .. }
242            | Command::CheckConsistency { .. }
243            | Command::Dump { .. }
244            | Command::GetComputeInstanceClient { .. }
245            | Command::GetOracle { .. }
246            | Command::DetermineRealTimeRecentTimestamp { .. }
247            | Command::GetTransactionReadHoldsBundle { .. }
248            | Command::StoreTransactionReadHolds { .. }
249            | Command::ExecuteSlowPathPeek { .. }
250            | Command::ExecuteCopyTo { .. } => None,
251        }
252    }
253
254    pub fn session_mut(&mut self) -> Option<&mut Session> {
255        match self {
256            Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
257            Command::CancelRequest { .. }
258            | Command::Startup { .. }
259            | Command::AuthenticatePassword { .. }
260            | Command::AuthenticateGetSASLChallenge { .. }
261            | Command::AuthenticateVerifySASLProof { .. }
262            | Command::CatalogSnapshot { .. }
263            | Command::PrivilegedCancelRequest { .. }
264            | Command::GetWebhook { .. }
265            | Command::Terminate { .. }
266            | Command::GetSystemVars { .. }
267            | Command::SetSystemVars { .. }
268            | Command::RetireExecute { .. }
269            | Command::CheckConsistency { .. }
270            | Command::Dump { .. }
271            | Command::GetComputeInstanceClient { .. }
272            | Command::GetOracle { .. }
273            | Command::DetermineRealTimeRecentTimestamp { .. }
274            | Command::GetTransactionReadHoldsBundle { .. }
275            | Command::StoreTransactionReadHolds { .. }
276            | Command::ExecuteSlowPathPeek { .. }
277            | Command::ExecuteCopyTo { .. } => None,
278        }
279    }
280}
281
282#[derive(Debug)]
283pub struct Response<T> {
284    pub result: Result<T, AdapterError>,
285    pub session: Session,
286    pub otel_ctx: OpenTelemetryContext,
287}
288
289/// The response to [`Client::startup`](crate::Client::startup).
290#[derive(Derivative)]
291#[derivative(Debug)]
292pub struct StartupResponse {
293    /// RoleId for the user.
294    pub role_id: RoleId,
295    /// A future that completes when all necessary Builtin Table writes have completed.
296    #[derivative(Debug = "ignore")]
297    pub write_notify: BuiltinTableAppendNotify,
298    /// Map of (name, VarInput::Flat) tuples of session default variables that should be set.
299    pub session_defaults: BTreeMap<String, OwnedVarInput>,
300    pub catalog: Arc<Catalog>,
301    pub storage_collections: Arc<
302        dyn mz_storage_client::storage_collections::StorageCollections<
303                Timestamp = mz_repr::Timestamp,
304            > + Send
305            + Sync,
306    >,
307    pub transient_id_gen: Arc<TransientIdGen>,
308    pub optimizer_metrics: OptimizerMetrics,
309    pub persist_client: PersistClient,
310}
311
312/// The response to [`Client::authenticate`](crate::Client::authenticate).
313#[derive(Derivative)]
314#[derivative(Debug)]
315pub struct AuthResponse {
316    /// RoleId for the user.
317    pub role_id: RoleId,
318    /// If the user is a superuser.
319    pub superuser: bool,
320}
321
322#[derive(Derivative)]
323#[derivative(Debug)]
324pub struct SASLChallengeResponse {
325    pub iteration_count: usize,
326    /// Base64-encoded salt for the SASL challenge.
327    pub salt: String,
328    pub nonce: String,
329}
330
331#[derive(Derivative)]
332#[derivative(Debug)]
333pub struct SASLVerifyProofResponse {
334    pub verifier: String,
335    pub auth_resp: AuthResponse,
336}
337
338// Facile implementation for `StartupResponse`, which does not use the `allowed`
339// feature of `ClientTransmitter`.
340impl Transmittable for StartupResponse {
341    type Allowed = bool;
342    fn to_allowed(&self) -> Self::Allowed {
343        true
344    }
345}
346
347/// The response to [`SessionClient::dump_catalog`](crate::SessionClient::dump_catalog).
348#[derive(Debug, Clone)]
349pub struct CatalogDump(String);
350
351impl CatalogDump {
352    pub fn new(raw: String) -> Self {
353        CatalogDump(raw)
354    }
355
356    pub fn into_string(self) -> String {
357        self.0
358    }
359}
360
361impl Transmittable for CatalogDump {
362    type Allowed = bool;
363    fn to_allowed(&self) -> Self::Allowed {
364        true
365    }
366}
367
368impl Transmittable for SystemVars {
369    type Allowed = bool;
370    fn to_allowed(&self) -> Self::Allowed {
371        true
372    }
373}
374
375/// The response to [`SessionClient::execute`](crate::SessionClient::execute).
376#[derive(EnumKind, Derivative)]
377#[derivative(Debug)]
378#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
379pub enum ExecuteResponse {
380    /// The default privileges were altered.
381    AlteredDefaultPrivileges,
382    /// The requested object was altered.
383    AlteredObject(ObjectType),
384    /// The role was altered.
385    AlteredRole,
386    /// The system configuration was altered.
387    AlteredSystemConfiguration,
388    /// The requested cursor was closed.
389    ClosedCursor,
390    /// The provided comment was created.
391    Comment,
392    /// The specified number of rows were copied into the requested output.
393    Copied(usize),
394    /// The response for a COPY TO STDOUT query.
395    CopyTo {
396        format: mz_sql::plan::CopyFormat,
397        resp: Box<ExecuteResponse>,
398    },
399    CopyFrom {
400        /// Table we're copying into.
401        target_id: CatalogItemId,
402        /// Human-readable full name of the target table.
403        target_name: String,
404        columns: Vec<ColumnIndex>,
405        params: CopyFormatParams<'static>,
406        ctx_extra: ExecuteContextExtra,
407    },
408    /// The requested connection was created.
409    CreatedConnection,
410    /// The requested database was created.
411    CreatedDatabase,
412    /// The requested schema was created.
413    CreatedSchema,
414    /// The requested role was created.
415    CreatedRole,
416    /// The requested cluster was created.
417    CreatedCluster,
418    /// The requested cluster replica was created.
419    CreatedClusterReplica,
420    /// The requested index was created.
421    CreatedIndex,
422    /// The requested introspection subscribe was created.
423    CreatedIntrospectionSubscribe,
424    /// The requested secret was created.
425    CreatedSecret,
426    /// The requested sink was created.
427    CreatedSink,
428    /// The requested source was created.
429    CreatedSource,
430    /// The requested table was created.
431    CreatedTable,
432    /// The requested view was created.
433    CreatedView,
434    /// The requested views were created.
435    CreatedViews,
436    /// The requested materialized view was created.
437    CreatedMaterializedView,
438    /// The requested continual task was created.
439    CreatedContinualTask,
440    /// The requested type was created.
441    CreatedType,
442    /// The requested network policy was created.
443    CreatedNetworkPolicy,
444    /// The requested prepared statement was removed.
445    Deallocate { all: bool },
446    /// The requested cursor was declared.
447    DeclaredCursor,
448    /// The specified number of rows were deleted from the requested table.
449    Deleted(usize),
450    /// The temporary objects associated with the session have been discarded.
451    DiscardedTemp,
452    /// All state associated with the session has been discarded.
453    DiscardedAll,
454    /// The requested object was dropped.
455    DroppedObject(ObjectType),
456    /// The requested objects were dropped.
457    DroppedOwned,
458    /// The provided query was empty.
459    EmptyQuery,
460    /// Fetch results from a cursor.
461    Fetch {
462        /// The name of the cursor from which to fetch results.
463        name: String,
464        /// The number of results to fetch.
465        count: Option<FetchDirection>,
466        /// How long to wait for results to arrive.
467        timeout: ExecuteTimeout,
468        ctx_extra: ExecuteContextExtra,
469    },
470    /// The requested privilege was granted.
471    GrantedPrivilege,
472    /// The requested role was granted.
473    GrantedRole,
474    /// The specified number of rows were inserted into the requested table.
475    Inserted(usize),
476    /// The specified prepared statement was created.
477    Prepare,
478    /// A user-requested warning was raised.
479    Raised,
480    /// The requested objects were reassigned.
481    ReassignOwned,
482    /// The requested privilege was revoked.
483    RevokedPrivilege,
484    /// The requested role was revoked.
485    RevokedRole,
486    /// Rows will be delivered via the specified stream.
487    SendingRowsStreaming {
488        #[derivative(Debug = "ignore")]
489        rows: Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>,
490        instance_id: ComputeInstanceId,
491        strategy: StatementExecutionStrategy,
492    },
493    /// Rows are known to be available immediately, and thus the execution is
494    /// considered ended in the coordinator.
495    SendingRowsImmediate {
496        #[derivative(Debug = "ignore")]
497        rows: Box<dyn RowIterator + Send + Sync>,
498    },
499    /// The specified variable was set to a new value.
500    SetVariable {
501        name: String,
502        /// Whether the operation was a `RESET` rather than a set.
503        reset: bool,
504    },
505    /// A new transaction was started.
506    StartedTransaction,
507    /// Updates to the requested source or view will be streamed to the
508    /// contained receiver.
509    Subscribing {
510        rx: RowBatchStream,
511        ctx_extra: ExecuteContextExtra,
512        instance_id: ComputeInstanceId,
513    },
514    /// The active transaction committed.
515    TransactionCommitted {
516        /// Session parameters that changed because the transaction ended.
517        params: BTreeMap<&'static str, String>,
518    },
519    /// The active transaction rolled back.
520    TransactionRolledBack {
521        /// Session parameters that changed because the transaction ended.
522        params: BTreeMap<&'static str, String>,
523    },
524    /// The specified number of rows were updated in the requested table.
525    Updated(usize),
526    /// A connection was validated.
527    ValidatedConnection,
528}
529
530impl TryFrom<&Statement<Raw>> for ExecuteResponse {
531    type Error = ();
532
533    /// Returns Ok if this Statement always produces a single, trivial ExecuteResponse.
534    fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
535        let resp_kinds = Plan::generated_from(&stmt.into())
536            .iter()
537            .map(ExecuteResponse::generated_from)
538            .flatten()
539            .cloned()
540            .collect::<BTreeSet<ExecuteResponseKind>>();
541        let resps = resp_kinds
542            .iter()
543            .map(|r| (*r).try_into())
544            .collect::<Result<Vec<ExecuteResponse>, _>>();
545        // Check if this statement's possible plans yield exactly one possible ExecuteResponse.
546        if let Ok(resps) = resps {
547            if resps.len() == 1 {
548                return Ok(resps.into_element());
549            }
550        }
551        let resp = match stmt {
552            Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
553                ExecuteResponse::DroppedObject((*object_type).into())
554            }
555            Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
556            | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
557                ExecuteResponse::AlteredObject((*object_type).into())
558            }
559            _ => return Err(()),
560        };
561        // Ensure that if the planner ever adds possible plans we complain here.
562        soft_assert_no_log!(
563            resp_kinds.len() == 1
564                && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
565            "ExecuteResponses out of sync with planner"
566        );
567        Ok(resp)
568    }
569}
570
571impl TryInto<ExecuteResponse> for ExecuteResponseKind {
572    type Error = ();
573
574    /// Attempts to convert into an ExecuteResponse. Returns an error if not possible without
575    /// actually executing a statement.
576    fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
577        match self {
578            ExecuteResponseKind::AlteredDefaultPrivileges => {
579                Ok(ExecuteResponse::AlteredDefaultPrivileges)
580            }
581            ExecuteResponseKind::AlteredObject => Err(()),
582            ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
583            ExecuteResponseKind::AlteredSystemConfiguration => {
584                Ok(ExecuteResponse::AlteredSystemConfiguration)
585            }
586            ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
587            ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
588            ExecuteResponseKind::Copied => Err(()),
589            ExecuteResponseKind::CopyTo => Err(()),
590            ExecuteResponseKind::CopyFrom => Err(()),
591            ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
592            ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
593            ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
594            ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
595            ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
596            ExecuteResponseKind::CreatedClusterReplica => {
597                Ok(ExecuteResponse::CreatedClusterReplica)
598            }
599            ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
600            ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
601            ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
602            ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
603            ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
604            ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
605            ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
606            ExecuteResponseKind::CreatedMaterializedView => {
607                Ok(ExecuteResponse::CreatedMaterializedView)
608            }
609            ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
610            ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
611            ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
612            ExecuteResponseKind::Deallocate => Err(()),
613            ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
614            ExecuteResponseKind::Deleted => Err(()),
615            ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
616            ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
617            ExecuteResponseKind::DroppedObject => Err(()),
618            ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
619            ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
620            ExecuteResponseKind::Fetch => Err(()),
621            ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
622            ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
623            ExecuteResponseKind::Inserted => Err(()),
624            ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
625            ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
626            ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
627            ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
628            ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
629            ExecuteResponseKind::SetVariable => Err(()),
630            ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
631            ExecuteResponseKind::Subscribing => Err(()),
632            ExecuteResponseKind::TransactionCommitted => Err(()),
633            ExecuteResponseKind::TransactionRolledBack => Err(()),
634            ExecuteResponseKind::Updated => Err(()),
635            ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
636            ExecuteResponseKind::SendingRowsStreaming => Err(()),
637            ExecuteResponseKind::SendingRowsImmediate => Err(()),
638            ExecuteResponseKind::CreatedIntrospectionSubscribe => {
639                Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
640            }
641        }
642    }
643}
644
645impl ExecuteResponse {
646    pub fn tag(&self) -> Option<String> {
647        use ExecuteResponse::*;
648        match self {
649            AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
650            AlteredObject(o) => Some(format!("ALTER {}", o)),
651            AlteredRole => Some("ALTER ROLE".into()),
652            AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
653            ClosedCursor => Some("CLOSE CURSOR".into()),
654            Comment => Some("COMMENT".into()),
655            Copied(n) => Some(format!("COPY {}", n)),
656            CopyTo { .. } => None,
657            CopyFrom { .. } => None,
658            CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
659            CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
660            CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
661            CreatedRole => Some("CREATE ROLE".into()),
662            CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
663            CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
664            CreatedIndex { .. } => Some("CREATE INDEX".into()),
665            CreatedSecret { .. } => Some("CREATE SECRET".into()),
666            CreatedSink { .. } => Some("CREATE SINK".into()),
667            CreatedSource { .. } => Some("CREATE SOURCE".into()),
668            CreatedTable { .. } => Some("CREATE TABLE".into()),
669            CreatedView { .. } => Some("CREATE VIEW".into()),
670            CreatedViews { .. } => Some("CREATE VIEWS".into()),
671            CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
672            CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
673            CreatedType => Some("CREATE TYPE".into()),
674            CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
675            Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
676            DeclaredCursor => Some("DECLARE CURSOR".into()),
677            Deleted(n) => Some(format!("DELETE {}", n)),
678            DiscardedTemp => Some("DISCARD TEMP".into()),
679            DiscardedAll => Some("DISCARD ALL".into()),
680            DroppedObject(o) => Some(format!("DROP {o}")),
681            DroppedOwned => Some("DROP OWNED".into()),
682            EmptyQuery => None,
683            Fetch { .. } => None,
684            GrantedPrivilege => Some("GRANT".into()),
685            GrantedRole => Some("GRANT ROLE".into()),
686            Inserted(n) => {
687                // "On successful completion, an INSERT command returns a
688                // command tag of the form `INSERT <oid> <count>`."
689                //     -- https://www.postgresql.org/docs/11/sql-insert.html
690                //
691                // OIDs are a PostgreSQL-specific historical quirk, but we
692                // can return a 0 OID to indicate that the table does not
693                // have OIDs.
694                Some(format!("INSERT 0 {}", n))
695            }
696            Prepare => Some("PREPARE".into()),
697            Raised => Some("RAISE".into()),
698            ReassignOwned => Some("REASSIGN OWNED".into()),
699            RevokedPrivilege => Some("REVOKE".into()),
700            RevokedRole => Some("REVOKE ROLE".into()),
701            SendingRowsStreaming { .. } | SendingRowsImmediate { .. } => None,
702            SetVariable { reset: true, .. } => Some("RESET".into()),
703            SetVariable { reset: false, .. } => Some("SET".into()),
704            StartedTransaction { .. } => Some("BEGIN".into()),
705            Subscribing { .. } => None,
706            TransactionCommitted { .. } => Some("COMMIT".into()),
707            TransactionRolledBack { .. } => Some("ROLLBACK".into()),
708            Updated(n) => Some(format!("UPDATE {}", n)),
709            ValidatedConnection => Some("VALIDATE CONNECTION".into()),
710            CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
711        }
712    }
713
714    /// Expresses which [`PlanKind`] generate which set of [`ExecuteResponseKind`].
715    /// `ExecuteResponseKind::Canceled` could be generated at any point as well, but that is
716    /// excluded from this function.
717    pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
718        use ExecuteResponseKind::*;
719        use PlanKind::*;
720
721        match plan {
722            AbortTransaction => &[TransactionRolledBack],
723            AlterClusterRename
724            | AlterClusterSwap
725            | AlterCluster
726            | AlterClusterReplicaRename
727            | AlterOwner
728            | AlterItemRename
729            | AlterRetainHistory
730            | AlterNoop
731            | AlterSchemaRename
732            | AlterSchemaSwap
733            | AlterSecret
734            | AlterConnection
735            | AlterSource
736            | AlterSink
737            | AlterTableAddColumn
738            | AlterMaterializedViewApplyReplacement
739            | AlterNetworkPolicy => &[AlteredObject],
740            AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
741            AlterSetCluster => &[AlteredObject],
742            AlterRole => &[AlteredRole],
743            AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
744                &[AlteredSystemConfiguration]
745            }
746            Close => &[ClosedCursor],
747            PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
748            PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
749            PlanKind::Comment => &[ExecuteResponseKind::Comment],
750            CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
751            CreateConnection => &[CreatedConnection],
752            CreateDatabase => &[CreatedDatabase],
753            CreateSchema => &[CreatedSchema],
754            CreateRole => &[CreatedRole],
755            CreateCluster => &[CreatedCluster],
756            CreateClusterReplica => &[CreatedClusterReplica],
757            CreateSource | CreateSources => &[CreatedSource],
758            CreateSecret => &[CreatedSecret],
759            CreateSink => &[CreatedSink],
760            CreateTable => &[CreatedTable],
761            CreateView => &[CreatedView],
762            CreateMaterializedView => &[CreatedMaterializedView],
763            CreateContinualTask => &[CreatedContinualTask],
764            CreateIndex => &[CreatedIndex],
765            CreateType => &[CreatedType],
766            PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
767            CreateNetworkPolicy => &[CreatedNetworkPolicy],
768            Declare => &[DeclaredCursor],
769            DiscardTemp => &[DiscardedTemp],
770            DiscardAll => &[DiscardedAll],
771            DropObjects => &[DroppedObject],
772            DropOwned => &[DroppedOwned],
773            PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
774            ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
775            | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
776                ExecuteResponseKind::CopyTo,
777                SendingRowsStreaming,
778                SendingRowsImmediate,
779            ],
780            Execute | ReadThenWrite => &[
781                Deleted,
782                Inserted,
783                SendingRowsStreaming,
784                SendingRowsImmediate,
785                Updated,
786            ],
787            PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
788            GrantPrivileges => &[GrantedPrivilege],
789            GrantRole => &[GrantedRole],
790            Insert => &[Inserted, SendingRowsImmediate],
791            PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
792            PlanKind::Raise => &[ExecuteResponseKind::Raised],
793            PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
794            RevokePrivileges => &[RevokedPrivilege],
795            RevokeRole => &[RevokedRole],
796            PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
797                &[ExecuteResponseKind::SetVariable]
798            }
799            PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
800            StartTransaction => &[StartedTransaction],
801            SideEffectingFunc => &[SendingRowsStreaming, SendingRowsImmediate],
802            ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
803        }
804    }
805}
806
807/// This implementation is meant to ensure that we maintain updated information
808/// about which types of `ExecuteResponse`s are permitted to be sent, which will
809/// be a function of which plan we're executing.
810impl Transmittable for ExecuteResponse {
811    type Allowed = ExecuteResponseKind;
812    fn to_allowed(&self) -> Self::Allowed {
813        ExecuteResponseKind::from(self)
814    }
815}