mz_adapter/
command.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use chrono::{DateTime, Utc};
17use derivative::Derivative;
18use enum_kinds::EnumKind;
19use futures::Stream;
20use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
21use mz_auth::password::Password;
22use mz_cluster_client::ReplicaId;
23use mz_compute_types::ComputeInstanceId;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_controller_types::ClusterId;
26use mz_expr::RowSetFinishing;
27use mz_ore::collections::CollectionExt;
28use mz_ore::soft_assert_no_log;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_persist_client::PersistClient;
31use mz_pgcopy::CopyFormatParams;
32use mz_repr::global_id::TransientIdGen;
33use mz_repr::role_id::RoleId;
34use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType};
35use mz_sql::ast::{FetchDirection, Raw, Statement};
36use mz_sql::catalog::ObjectType;
37use mz_sql::optimizer_metrics::OptimizerMetrics;
38use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind, SideEffectingFunc};
39use mz_sql::session::user::User;
40use mz_sql::session::vars::{OwnedVarInput, SystemVars};
41use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
42use mz_storage_types::sources::Timeline;
43use mz_timestamp_oracle::TimestampOracle;
44use tokio::sync::{mpsc, oneshot};
45use uuid::Uuid;
46
47use crate::catalog::Catalog;
48use crate::coord::appends::BuiltinTableAppendNotify;
49use crate::coord::consistency::CoordinatorInconsistencies;
50use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
51use crate::coord::timestamp_selection::TimestampDetermination;
52use crate::coord::{ExecuteContextExtra, ExecuteContextGuard};
53use crate::error::AdapterError;
54use crate::session::{EndTransactionAction, RowBatchStream, Session};
55use crate::statement_logging::WatchSetCreation;
56use crate::statement_logging::{
57    FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy,
58    StatementLoggingFrontend,
59};
60use crate::util::Transmittable;
61use crate::webhook::AppendWebhookResponse;
62use crate::{
63    AdapterNotice, AppendWebhookError, CollectionIdBundle, ReadHolds, TimestampExplanation,
64};
65
66#[derive(Debug)]
67pub struct CatalogSnapshot {
68    pub catalog: Arc<Catalog>,
69}
70
71#[derive(Debug)]
72pub enum Command {
73    CatalogSnapshot {
74        tx: oneshot::Sender<CatalogSnapshot>,
75    },
76
77    Startup {
78        tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
79        user: User,
80        conn_id: ConnectionId,
81        client_ip: Option<IpAddr>,
82        secret_key: u32,
83        uuid: Uuid,
84        application_name: String,
85        notice_tx: mpsc::UnboundedSender<AdapterNotice>,
86    },
87
88    AuthenticatePassword {
89        tx: oneshot::Sender<Result<AuthResponse, AdapterError>>,
90        role_name: String,
91        password: Option<Password>,
92    },
93
94    AuthenticateGetSASLChallenge {
95        tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
96        role_name: String,
97        nonce: String,
98    },
99
100    AuthenticateVerifySASLProof {
101        tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
102        role_name: String,
103        proof: String,
104        auth_message: String,
105        mock_hash: String,
106    },
107
108    Execute {
109        portal_name: String,
110        session: Session,
111        tx: oneshot::Sender<Response<ExecuteResponse>>,
112        outer_ctx_extra: Option<ExecuteContextGuard>,
113    },
114
115    /// Attempts to commit or abort the session's transaction. Guarantees that the Coordinator's
116    /// transaction state has been cleared, even if the commit or abort fails. (A failure can
117    /// happen, for example, if the session's role id has been dropped which will prevent
118    /// sequence_end_transaction from running.)
119    Commit {
120        action: EndTransactionAction,
121        session: Session,
122        tx: oneshot::Sender<Response<ExecuteResponse>>,
123    },
124
125    CancelRequest {
126        conn_id: ConnectionIdType,
127        secret_key: u32,
128    },
129
130    PrivilegedCancelRequest {
131        conn_id: ConnectionId,
132    },
133
134    GetWebhook {
135        database: String,
136        schema: String,
137        name: String,
138        tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
139    },
140
141    GetSystemVars {
142        tx: oneshot::Sender<SystemVars>,
143    },
144
145    SetSystemVars {
146        vars: BTreeMap<String, String>,
147        conn_id: ConnectionId,
148        tx: oneshot::Sender<Result<(), AdapterError>>,
149    },
150
151    Terminate {
152        conn_id: ConnectionId,
153        tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
154    },
155
156    /// Performs any cleanup and logging actions necessary for
157    /// finalizing a statement execution.
158    ///
159    /// Only used for cases that terminate in the protocol layer and
160    /// otherwise have no reason to hand control back to the coordinator.
161    /// In other cases, we piggy-back on another command.
162    RetireExecute {
163        data: ExecuteContextExtra,
164        reason: StatementEndedExecutionReason,
165    },
166
167    CheckConsistency {
168        tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
169    },
170
171    Dump {
172        tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
173    },
174
175    GetComputeInstanceClient {
176        instance_id: ComputeInstanceId,
177        tx: oneshot::Sender<
178            Result<
179                mz_compute_client::controller::instance::Client<mz_repr::Timestamp>,
180                mz_compute_client::controller::error::InstanceMissing,
181            >,
182        >,
183    },
184
185    GetOracle {
186        timeline: Timeline,
187        tx: oneshot::Sender<
188            Result<Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync>, AdapterError>,
189        >,
190    },
191
192    DetermineRealTimeRecentTimestamp {
193        source_ids: BTreeSet<GlobalId>,
194        real_time_recency_timeout: Duration,
195        tx: oneshot::Sender<Result<Option<mz_repr::Timestamp>, AdapterError>>,
196    },
197
198    GetTransactionReadHoldsBundle {
199        conn_id: ConnectionId,
200        tx: oneshot::Sender<Option<ReadHolds<mz_repr::Timestamp>>>,
201    },
202
203    /// _Merges_ the given read holds into the given connection's stored transaction read holds.
204    StoreTransactionReadHolds {
205        conn_id: ConnectionId,
206        read_holds: ReadHolds<mz_repr::Timestamp>,
207        tx: oneshot::Sender<()>,
208    },
209
210    ExecuteSlowPathPeek {
211        dataflow_plan: Box<PeekDataflowPlan<mz_repr::Timestamp>>,
212        determination: TimestampDetermination<mz_repr::Timestamp>,
213        finishing: RowSetFinishing,
214        compute_instance: ComputeInstanceId,
215        target_replica: Option<ReplicaId>,
216        intermediate_result_type: SqlRelationType,
217        source_ids: BTreeSet<GlobalId>,
218        conn_id: ConnectionId,
219        max_result_size: u64,
220        max_query_result_size: Option<u64>,
221        /// If statement logging is enabled, contains all info needed for installing watch sets
222        /// and logging the statement execution.
223        watch_set: Option<WatchSetCreation>,
224        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
225    },
226
227    /// Preflight check for COPY TO S3 operation. This runs the slow S3 operations
228    /// (loading SDK config, checking bucket path, verifying permissions, uploading sentinel)
229    /// in a background task to avoid blocking the coordinator.
230    CopyToPreflight {
231        /// The S3 connection info needed for preflight checks.
232        s3_sink_connection: mz_compute_types::sinks::CopyToS3OneshotSinkConnection,
233        /// The sink ID for logging and S3 key management.
234        sink_id: GlobalId,
235        /// Response channel for the preflight result.
236        tx: oneshot::Sender<Result<(), AdapterError>>,
237    },
238
239    ExecuteCopyTo {
240        df_desc: Box<DataflowDescription<mz_compute_types::plan::Plan>>,
241        compute_instance: ComputeInstanceId,
242        target_replica: Option<ReplicaId>,
243        source_ids: BTreeSet<GlobalId>,
244        conn_id: ConnectionId,
245        /// If statement logging is enabled, contains all info needed for installing watch sets
246        /// and logging the statement execution.
247        watch_set: Option<WatchSetCreation>,
248        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
249    },
250
251    /// Execute a side-effecting function from the frontend peek path.
252    ExecuteSideEffectingFunc {
253        plan: SideEffectingFunc,
254        conn_id: ConnectionId,
255        /// The current role of the session, used for RBAC checks.
256        current_role: RoleId,
257        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
258    },
259
260    /// Register a pending peek initiated by frontend sequencing. This is needed for:
261    /// - statement logging
262    /// - query cancellation
263    RegisterFrontendPeek {
264        uuid: Uuid,
265        conn_id: ConnectionId,
266        cluster_id: mz_controller_types::ClusterId,
267        depends_on: BTreeSet<GlobalId>,
268        is_fast_path: bool,
269        /// If statement logging is enabled, contains all info needed for installing watch sets
270        /// and logging the statement execution.
271        watch_set: Option<WatchSetCreation>,
272        tx: oneshot::Sender<Result<(), AdapterError>>,
273    },
274
275    /// Unregister a pending peek that was registered but failed to issue.
276    /// This is used for cleanup when `client.peek()` fails after `RegisterFrontendPeek` succeeds.
277    /// The `ExecuteContextExtra` is dropped without logging the statement retirement, because the
278    /// frontend will log the error.
279    UnregisterFrontendPeek {
280        uuid: Uuid,
281        tx: oneshot::Sender<()>,
282    },
283
284    /// Generate a timestamp explanation.
285    /// This is used when `emit_timestamp_notice` is enabled.
286    ExplainTimestamp {
287        conn_id: ConnectionId,
288        session_wall_time: DateTime<Utc>,
289        cluster_id: ClusterId,
290        id_bundle: CollectionIdBundle,
291        determination: TimestampDetermination<mz_repr::Timestamp>,
292        tx: oneshot::Sender<TimestampExplanation<mz_repr::Timestamp>>,
293    },
294
295    /// Statement logging event from frontend peek sequencing.
296    /// No response channel needed - this is fire-and-forget.
297    FrontendStatementLogging(FrontendStatementLoggingEvent),
298}
299
300impl Command {
301    pub fn session(&self) -> Option<&Session> {
302        match self {
303            Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
304            Command::CancelRequest { .. }
305            | Command::Startup { .. }
306            | Command::AuthenticatePassword { .. }
307            | Command::AuthenticateGetSASLChallenge { .. }
308            | Command::AuthenticateVerifySASLProof { .. }
309            | Command::CatalogSnapshot { .. }
310            | Command::PrivilegedCancelRequest { .. }
311            | Command::GetWebhook { .. }
312            | Command::Terminate { .. }
313            | Command::GetSystemVars { .. }
314            | Command::SetSystemVars { .. }
315            | Command::RetireExecute { .. }
316            | Command::CheckConsistency { .. }
317            | Command::Dump { .. }
318            | Command::GetComputeInstanceClient { .. }
319            | Command::GetOracle { .. }
320            | Command::DetermineRealTimeRecentTimestamp { .. }
321            | Command::GetTransactionReadHoldsBundle { .. }
322            | Command::StoreTransactionReadHolds { .. }
323            | Command::ExecuteSlowPathPeek { .. }
324            | Command::CopyToPreflight { .. }
325            | Command::ExecuteCopyTo { .. }
326            | Command::ExecuteSideEffectingFunc { .. }
327            | Command::RegisterFrontendPeek { .. }
328            | Command::UnregisterFrontendPeek { .. }
329            | Command::ExplainTimestamp { .. }
330            | Command::FrontendStatementLogging(..) => None,
331        }
332    }
333
334    pub fn session_mut(&mut self) -> Option<&mut Session> {
335        match self {
336            Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
337            Command::CancelRequest { .. }
338            | Command::Startup { .. }
339            | Command::AuthenticatePassword { .. }
340            | Command::AuthenticateGetSASLChallenge { .. }
341            | Command::AuthenticateVerifySASLProof { .. }
342            | Command::CatalogSnapshot { .. }
343            | Command::PrivilegedCancelRequest { .. }
344            | Command::GetWebhook { .. }
345            | Command::Terminate { .. }
346            | Command::GetSystemVars { .. }
347            | Command::SetSystemVars { .. }
348            | Command::RetireExecute { .. }
349            | Command::CheckConsistency { .. }
350            | Command::Dump { .. }
351            | Command::GetComputeInstanceClient { .. }
352            | Command::GetOracle { .. }
353            | Command::DetermineRealTimeRecentTimestamp { .. }
354            | Command::GetTransactionReadHoldsBundle { .. }
355            | Command::StoreTransactionReadHolds { .. }
356            | Command::ExecuteSlowPathPeek { .. }
357            | Command::CopyToPreflight { .. }
358            | Command::ExecuteCopyTo { .. }
359            | Command::ExecuteSideEffectingFunc { .. }
360            | Command::RegisterFrontendPeek { .. }
361            | Command::UnregisterFrontendPeek { .. }
362            | Command::ExplainTimestamp { .. }
363            | Command::FrontendStatementLogging(..) => None,
364        }
365    }
366}
367
368#[derive(Debug)]
369pub struct Response<T> {
370    pub result: Result<T, AdapterError>,
371    pub session: Session,
372    pub otel_ctx: OpenTelemetryContext,
373}
374
375/// The response to [`Client::startup`](crate::Client::startup).
376#[derive(Derivative)]
377#[derivative(Debug)]
378pub struct StartupResponse {
379    /// RoleId for the user.
380    pub role_id: RoleId,
381    /// A future that completes when all necessary Builtin Table writes have completed.
382    #[derivative(Debug = "ignore")]
383    pub write_notify: BuiltinTableAppendNotify,
384    /// Map of (name, VarInput::Flat) tuples of session default variables that should be set.
385    pub session_defaults: BTreeMap<String, OwnedVarInput>,
386    pub catalog: Arc<Catalog>,
387    pub storage_collections: Arc<
388        dyn mz_storage_client::storage_collections::StorageCollections<
389                Timestamp = mz_repr::Timestamp,
390            > + Send
391            + Sync,
392    >,
393    pub transient_id_gen: Arc<TransientIdGen>,
394    pub optimizer_metrics: OptimizerMetrics,
395    pub persist_client: PersistClient,
396    pub statement_logging_frontend: StatementLoggingFrontend,
397}
398
399/// The response to [`Client::authenticate`](crate::Client::authenticate).
400#[derive(Derivative)]
401#[derivative(Debug)]
402pub struct AuthResponse {
403    /// RoleId for the user.
404    pub role_id: RoleId,
405    /// If the user is a superuser.
406    pub superuser: bool,
407}
408
409#[derive(Derivative)]
410#[derivative(Debug)]
411pub struct SASLChallengeResponse {
412    pub iteration_count: usize,
413    /// Base64-encoded salt for the SASL challenge.
414    pub salt: String,
415    pub nonce: String,
416}
417
418#[derive(Derivative)]
419#[derivative(Debug)]
420pub struct SASLVerifyProofResponse {
421    pub verifier: String,
422    pub auth_resp: AuthResponse,
423}
424
425// Facile implementation for `StartupResponse`, which does not use the `allowed`
426// feature of `ClientTransmitter`.
427impl Transmittable for StartupResponse {
428    type Allowed = bool;
429    fn to_allowed(&self) -> Self::Allowed {
430        true
431    }
432}
433
434/// The response to [`SessionClient::dump_catalog`](crate::SessionClient::dump_catalog).
435#[derive(Debug, Clone)]
436pub struct CatalogDump(String);
437
438impl CatalogDump {
439    pub fn new(raw: String) -> Self {
440        CatalogDump(raw)
441    }
442
443    pub fn into_string(self) -> String {
444        self.0
445    }
446}
447
448impl Transmittable for CatalogDump {
449    type Allowed = bool;
450    fn to_allowed(&self) -> Self::Allowed {
451        true
452    }
453}
454
455impl Transmittable for SystemVars {
456    type Allowed = bool;
457    fn to_allowed(&self) -> Self::Allowed {
458        true
459    }
460}
461
462/// The response to [`SessionClient::execute`](crate::SessionClient::execute).
463#[derive(EnumKind, Derivative)]
464#[derivative(Debug)]
465#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
466pub enum ExecuteResponse {
467    /// The default privileges were altered.
468    AlteredDefaultPrivileges,
469    /// The requested object was altered.
470    AlteredObject(ObjectType),
471    /// The role was altered.
472    AlteredRole,
473    /// The system configuration was altered.
474    AlteredSystemConfiguration,
475    /// The requested cursor was closed.
476    ClosedCursor,
477    /// The provided comment was created.
478    Comment,
479    /// The specified number of rows were copied into the requested output.
480    Copied(usize),
481    /// The response for a COPY TO STDOUT query.
482    CopyTo {
483        format: mz_sql::plan::CopyFormat,
484        resp: Box<ExecuteResponse>,
485    },
486    CopyFrom {
487        /// Table we're copying into.
488        target_id: CatalogItemId,
489        /// Human-readable full name of the target table.
490        target_name: String,
491        columns: Vec<ColumnIndex>,
492        params: CopyFormatParams<'static>,
493        ctx_extra: ExecuteContextGuard,
494    },
495    /// The requested connection was created.
496    CreatedConnection,
497    /// The requested database was created.
498    CreatedDatabase,
499    /// The requested schema was created.
500    CreatedSchema,
501    /// The requested role was created.
502    CreatedRole,
503    /// The requested cluster was created.
504    CreatedCluster,
505    /// The requested cluster replica was created.
506    CreatedClusterReplica,
507    /// The requested index was created.
508    CreatedIndex,
509    /// The requested introspection subscribe was created.
510    CreatedIntrospectionSubscribe,
511    /// The requested secret was created.
512    CreatedSecret,
513    /// The requested sink was created.
514    CreatedSink,
515    /// The requested source was created.
516    CreatedSource,
517    /// The requested table was created.
518    CreatedTable,
519    /// The requested view was created.
520    CreatedView,
521    /// The requested views were created.
522    CreatedViews,
523    /// The requested materialized view was created.
524    CreatedMaterializedView,
525    /// The requested continual task was created.
526    CreatedContinualTask,
527    /// The requested type was created.
528    CreatedType,
529    /// The requested network policy was created.
530    CreatedNetworkPolicy,
531    /// The requested prepared statement was removed.
532    Deallocate { all: bool },
533    /// The requested cursor was declared.
534    DeclaredCursor,
535    /// The specified number of rows were deleted from the requested table.
536    Deleted(usize),
537    /// The temporary objects associated with the session have been discarded.
538    DiscardedTemp,
539    /// All state associated with the session has been discarded.
540    DiscardedAll,
541    /// The requested object was dropped.
542    DroppedObject(ObjectType),
543    /// The requested objects were dropped.
544    DroppedOwned,
545    /// The provided query was empty.
546    EmptyQuery,
547    /// Fetch results from a cursor.
548    Fetch {
549        /// The name of the cursor from which to fetch results.
550        name: String,
551        /// The number of results to fetch.
552        count: Option<FetchDirection>,
553        /// How long to wait for results to arrive.
554        timeout: ExecuteTimeout,
555        ctx_extra: ExecuteContextGuard,
556    },
557    /// The requested privilege was granted.
558    GrantedPrivilege,
559    /// The requested role was granted.
560    GrantedRole,
561    /// The specified number of rows were inserted into the requested table.
562    Inserted(usize),
563    /// The specified prepared statement was created.
564    Prepare,
565    /// A user-requested warning was raised.
566    Raised,
567    /// The requested objects were reassigned.
568    ReassignOwned,
569    /// The requested privilege was revoked.
570    RevokedPrivilege,
571    /// The requested role was revoked.
572    RevokedRole,
573    /// Rows will be delivered via the specified stream.
574    SendingRowsStreaming {
575        #[derivative(Debug = "ignore")]
576        rows: Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>,
577        instance_id: ComputeInstanceId,
578        strategy: StatementExecutionStrategy,
579    },
580    /// Rows are known to be available immediately, and thus the execution is
581    /// considered ended in the coordinator.
582    SendingRowsImmediate {
583        #[derivative(Debug = "ignore")]
584        rows: Box<dyn RowIterator + Send + Sync>,
585    },
586    /// The specified variable was set to a new value.
587    SetVariable {
588        name: String,
589        /// Whether the operation was a `RESET` rather than a set.
590        reset: bool,
591    },
592    /// A new transaction was started.
593    StartedTransaction,
594    /// Updates to the requested source or view will be streamed to the
595    /// contained receiver.
596    Subscribing {
597        rx: RowBatchStream,
598        ctx_extra: ExecuteContextGuard,
599        instance_id: ComputeInstanceId,
600    },
601    /// The active transaction committed.
602    TransactionCommitted {
603        /// Session parameters that changed because the transaction ended.
604        params: BTreeMap<&'static str, String>,
605    },
606    /// The active transaction rolled back.
607    TransactionRolledBack {
608        /// Session parameters that changed because the transaction ended.
609        params: BTreeMap<&'static str, String>,
610    },
611    /// The specified number of rows were updated in the requested table.
612    Updated(usize),
613    /// A connection was validated.
614    ValidatedConnection,
615}
616
617impl TryFrom<&Statement<Raw>> for ExecuteResponse {
618    type Error = ();
619
620    /// Returns Ok if this Statement always produces a single, trivial ExecuteResponse.
621    fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
622        let resp_kinds = Plan::generated_from(&stmt.into())
623            .iter()
624            .map(ExecuteResponse::generated_from)
625            .flatten()
626            .cloned()
627            .collect::<BTreeSet<ExecuteResponseKind>>();
628        let resps = resp_kinds
629            .iter()
630            .map(|r| (*r).try_into())
631            .collect::<Result<Vec<ExecuteResponse>, _>>();
632        // Check if this statement's possible plans yield exactly one possible ExecuteResponse.
633        if let Ok(resps) = resps {
634            if resps.len() == 1 {
635                return Ok(resps.into_element());
636            }
637        }
638        let resp = match stmt {
639            Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
640                ExecuteResponse::DroppedObject((*object_type).into())
641            }
642            Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
643            | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
644                ExecuteResponse::AlteredObject((*object_type).into())
645            }
646            _ => return Err(()),
647        };
648        // Ensure that if the planner ever adds possible plans we complain here.
649        soft_assert_no_log!(
650            resp_kinds.len() == 1
651                && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
652            "ExecuteResponses out of sync with planner"
653        );
654        Ok(resp)
655    }
656}
657
658impl TryInto<ExecuteResponse> for ExecuteResponseKind {
659    type Error = ();
660
661    /// Attempts to convert into an ExecuteResponse. Returns an error if not possible without
662    /// actually executing a statement.
663    fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
664        match self {
665            ExecuteResponseKind::AlteredDefaultPrivileges => {
666                Ok(ExecuteResponse::AlteredDefaultPrivileges)
667            }
668            ExecuteResponseKind::AlteredObject => Err(()),
669            ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
670            ExecuteResponseKind::AlteredSystemConfiguration => {
671                Ok(ExecuteResponse::AlteredSystemConfiguration)
672            }
673            ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
674            ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
675            ExecuteResponseKind::Copied => Err(()),
676            ExecuteResponseKind::CopyTo => Err(()),
677            ExecuteResponseKind::CopyFrom => Err(()),
678            ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
679            ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
680            ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
681            ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
682            ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
683            ExecuteResponseKind::CreatedClusterReplica => {
684                Ok(ExecuteResponse::CreatedClusterReplica)
685            }
686            ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
687            ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
688            ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
689            ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
690            ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
691            ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
692            ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
693            ExecuteResponseKind::CreatedMaterializedView => {
694                Ok(ExecuteResponse::CreatedMaterializedView)
695            }
696            ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
697            ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
698            ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
699            ExecuteResponseKind::Deallocate => Err(()),
700            ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
701            ExecuteResponseKind::Deleted => Err(()),
702            ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
703            ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
704            ExecuteResponseKind::DroppedObject => Err(()),
705            ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
706            ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
707            ExecuteResponseKind::Fetch => Err(()),
708            ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
709            ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
710            ExecuteResponseKind::Inserted => Err(()),
711            ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
712            ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
713            ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
714            ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
715            ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
716            ExecuteResponseKind::SetVariable => Err(()),
717            ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
718            ExecuteResponseKind::Subscribing => Err(()),
719            ExecuteResponseKind::TransactionCommitted => Err(()),
720            ExecuteResponseKind::TransactionRolledBack => Err(()),
721            ExecuteResponseKind::Updated => Err(()),
722            ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
723            ExecuteResponseKind::SendingRowsStreaming => Err(()),
724            ExecuteResponseKind::SendingRowsImmediate => Err(()),
725            ExecuteResponseKind::CreatedIntrospectionSubscribe => {
726                Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
727            }
728        }
729    }
730}
731
732impl ExecuteResponse {
733    pub fn tag(&self) -> Option<String> {
734        use ExecuteResponse::*;
735        match self {
736            AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
737            AlteredObject(o) => Some(format!("ALTER {}", o)),
738            AlteredRole => Some("ALTER ROLE".into()),
739            AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
740            ClosedCursor => Some("CLOSE CURSOR".into()),
741            Comment => Some("COMMENT".into()),
742            Copied(n) => Some(format!("COPY {}", n)),
743            CopyTo { .. } => None,
744            CopyFrom { .. } => None,
745            CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
746            CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
747            CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
748            CreatedRole => Some("CREATE ROLE".into()),
749            CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
750            CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
751            CreatedIndex { .. } => Some("CREATE INDEX".into()),
752            CreatedSecret { .. } => Some("CREATE SECRET".into()),
753            CreatedSink { .. } => Some("CREATE SINK".into()),
754            CreatedSource { .. } => Some("CREATE SOURCE".into()),
755            CreatedTable { .. } => Some("CREATE TABLE".into()),
756            CreatedView { .. } => Some("CREATE VIEW".into()),
757            CreatedViews { .. } => Some("CREATE VIEWS".into()),
758            CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
759            CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
760            CreatedType => Some("CREATE TYPE".into()),
761            CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
762            Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
763            DeclaredCursor => Some("DECLARE CURSOR".into()),
764            Deleted(n) => Some(format!("DELETE {}", n)),
765            DiscardedTemp => Some("DISCARD TEMP".into()),
766            DiscardedAll => Some("DISCARD ALL".into()),
767            DroppedObject(o) => Some(format!("DROP {o}")),
768            DroppedOwned => Some("DROP OWNED".into()),
769            EmptyQuery => None,
770            Fetch { .. } => None,
771            GrantedPrivilege => Some("GRANT".into()),
772            GrantedRole => Some("GRANT ROLE".into()),
773            Inserted(n) => {
774                // "On successful completion, an INSERT command returns a
775                // command tag of the form `INSERT <oid> <count>`."
776                //     -- https://www.postgresql.org/docs/11/sql-insert.html
777                //
778                // OIDs are a PostgreSQL-specific historical quirk, but we
779                // can return a 0 OID to indicate that the table does not
780                // have OIDs.
781                Some(format!("INSERT 0 {}", n))
782            }
783            Prepare => Some("PREPARE".into()),
784            Raised => Some("RAISE".into()),
785            ReassignOwned => Some("REASSIGN OWNED".into()),
786            RevokedPrivilege => Some("REVOKE".into()),
787            RevokedRole => Some("REVOKE ROLE".into()),
788            SendingRowsStreaming { .. } | SendingRowsImmediate { .. } => None,
789            SetVariable { reset: true, .. } => Some("RESET".into()),
790            SetVariable { reset: false, .. } => Some("SET".into()),
791            StartedTransaction { .. } => Some("BEGIN".into()),
792            Subscribing { .. } => None,
793            TransactionCommitted { .. } => Some("COMMIT".into()),
794            TransactionRolledBack { .. } => Some("ROLLBACK".into()),
795            Updated(n) => Some(format!("UPDATE {}", n)),
796            ValidatedConnection => Some("VALIDATE CONNECTION".into()),
797            CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
798        }
799    }
800
801    /// Expresses which [`PlanKind`] generate which set of [`ExecuteResponseKind`].
802    /// `ExecuteResponseKind::Canceled` could be generated at any point as well, but that is
803    /// excluded from this function.
804    pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
805        use ExecuteResponseKind::*;
806        use PlanKind::*;
807
808        match plan {
809            AbortTransaction => &[TransactionRolledBack],
810            AlterClusterRename
811            | AlterClusterSwap
812            | AlterCluster
813            | AlterClusterReplicaRename
814            | AlterOwner
815            | AlterItemRename
816            | AlterRetainHistory
817            | AlterNoop
818            | AlterSchemaRename
819            | AlterSchemaSwap
820            | AlterSecret
821            | AlterConnection
822            | AlterSource
823            | AlterSink
824            | AlterTableAddColumn
825            | AlterMaterializedViewApplyReplacement
826            | AlterNetworkPolicy => &[AlteredObject],
827            AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
828            AlterSetCluster => &[AlteredObject],
829            AlterRole => &[AlteredRole],
830            AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
831                &[AlteredSystemConfiguration]
832            }
833            Close => &[ClosedCursor],
834            PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
835            PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
836            PlanKind::Comment => &[ExecuteResponseKind::Comment],
837            CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
838            CreateConnection => &[CreatedConnection],
839            CreateDatabase => &[CreatedDatabase],
840            CreateSchema => &[CreatedSchema],
841            CreateRole => &[CreatedRole],
842            CreateCluster => &[CreatedCluster],
843            CreateClusterReplica => &[CreatedClusterReplica],
844            CreateSource | CreateSources => &[CreatedSource],
845            CreateSecret => &[CreatedSecret],
846            CreateSink => &[CreatedSink],
847            CreateTable => &[CreatedTable],
848            CreateView => &[CreatedView],
849            CreateMaterializedView => &[CreatedMaterializedView],
850            CreateContinualTask => &[CreatedContinualTask],
851            CreateIndex => &[CreatedIndex],
852            CreateType => &[CreatedType],
853            PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
854            CreateNetworkPolicy => &[CreatedNetworkPolicy],
855            Declare => &[DeclaredCursor],
856            DiscardTemp => &[DiscardedTemp],
857            DiscardAll => &[DiscardedAll],
858            DropObjects => &[DroppedObject],
859            DropOwned => &[DroppedOwned],
860            PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
861            ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
862            | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
863                ExecuteResponseKind::CopyTo,
864                SendingRowsStreaming,
865                SendingRowsImmediate,
866            ],
867            Execute | ReadThenWrite => &[
868                Deleted,
869                Inserted,
870                SendingRowsStreaming,
871                SendingRowsImmediate,
872                Updated,
873            ],
874            PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
875            GrantPrivileges => &[GrantedPrivilege],
876            GrantRole => &[GrantedRole],
877            Insert => &[Inserted, SendingRowsImmediate],
878            PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
879            PlanKind::Raise => &[ExecuteResponseKind::Raised],
880            PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
881            RevokePrivileges => &[RevokedPrivilege],
882            RevokeRole => &[RevokedRole],
883            PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
884                &[ExecuteResponseKind::SetVariable]
885            }
886            PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
887            StartTransaction => &[StartedTransaction],
888            SideEffectingFunc => &[SendingRowsStreaming, SendingRowsImmediate],
889            ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
890        }
891    }
892}
893
894/// This implementation is meant to ensure that we maintain updated information
895/// about which types of `ExecuteResponse`s are permitted to be sent, which will
896/// be a function of which plan we're executing.
897impl Transmittable for ExecuteResponse {
898    type Allowed = ExecuteResponseKind;
899    fn to_allowed(&self) -> Self::Allowed {
900        ExecuteResponseKind::from(self)
901    }
902}