Skip to main content

mz_adapter/
command.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use chrono::{DateTime, Utc};
17use derivative::Derivative;
18use enum_kinds::EnumKind;
19use futures::Stream;
20use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
21use mz_auth::password::Password;
22use mz_cluster_client::ReplicaId;
23use mz_compute_types::ComputeInstanceId;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_controller_types::ClusterId;
26use mz_expr::RowSetFinishing;
27use mz_ore::collections::CollectionExt;
28use mz_ore::soft_assert_no_log;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_persist_client::PersistClient;
31use mz_pgcopy::CopyFormatParams;
32use mz_repr::global_id::TransientIdGen;
33use mz_repr::role_id::RoleId;
34use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType};
35use mz_sql::ast::{FetchDirection, Raw, Statement};
36use mz_sql::catalog::ObjectType;
37use mz_sql::optimizer_metrics::OptimizerMetrics;
38use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind, SideEffectingFunc};
39use mz_sql::session::user::User;
40use mz_sql::session::vars::{OwnedVarInput, SystemVars};
41use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
42use mz_storage_types::sources::Timeline;
43use mz_timestamp_oracle::TimestampOracle;
44use tokio::sync::{mpsc, oneshot};
45use uuid::Uuid;
46
47use crate::catalog::Catalog;
48use crate::coord::appends::BuiltinTableAppendNotify;
49use crate::coord::consistency::CoordinatorInconsistencies;
50use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
51use crate::coord::timestamp_selection::TimestampDetermination;
52use crate::coord::{ExecuteContextExtra, ExecuteContextGuard};
53use crate::error::AdapterError;
54use crate::session::{EndTransactionAction, RowBatchStream, Session};
55use crate::statement_logging::WatchSetCreation;
56use crate::statement_logging::{
57    FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy,
58    StatementLoggingFrontend,
59};
60use crate::util::Transmittable;
61use crate::webhook::AppendWebhookResponse;
62use crate::{
63    AdapterNotice, AppendWebhookError, CollectionIdBundle, ReadHolds, TimestampExplanation,
64};
65
66#[derive(Debug)]
67pub struct CatalogSnapshot {
68    pub catalog: Arc<Catalog>,
69}
70
71#[derive(Debug)]
72pub enum Command {
73    CatalogSnapshot {
74        tx: oneshot::Sender<CatalogSnapshot>,
75    },
76
77    Startup {
78        tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
79        user: User,
80        conn_id: ConnectionId,
81        client_ip: Option<IpAddr>,
82        secret_key: u32,
83        uuid: Uuid,
84        application_name: String,
85        notice_tx: mpsc::UnboundedSender<AdapterNotice>,
86    },
87
88    AuthenticatePassword {
89        tx: oneshot::Sender<Result<(), AdapterError>>,
90        role_name: String,
91        password: Option<Password>,
92    },
93
94    AuthenticateGetSASLChallenge {
95        tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
96        role_name: String,
97        nonce: String,
98    },
99
100    AuthenticateVerifySASLProof {
101        tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
102        role_name: String,
103        proof: String,
104        auth_message: String,
105        mock_hash: String,
106    },
107
108    Execute {
109        portal_name: String,
110        session: Session,
111        tx: oneshot::Sender<Response<ExecuteResponse>>,
112        outer_ctx_extra: Option<ExecuteContextGuard>,
113    },
114
115    /// Attempts to commit or abort the session's transaction. Guarantees that the Coordinator's
116    /// transaction state has been cleared, even if the commit or abort fails. (A failure can
117    /// happen, for example, if the session's role id has been dropped which will prevent
118    /// sequence_end_transaction from running.)
119    Commit {
120        action: EndTransactionAction,
121        session: Session,
122        tx: oneshot::Sender<Response<ExecuteResponse>>,
123    },
124
125    CancelRequest {
126        conn_id: ConnectionIdType,
127        secret_key: u32,
128    },
129
130    PrivilegedCancelRequest {
131        conn_id: ConnectionId,
132    },
133
134    GetWebhook {
135        database: String,
136        schema: String,
137        name: String,
138        tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
139    },
140
141    GetSystemVars {
142        tx: oneshot::Sender<SystemVars>,
143    },
144
145    SetSystemVars {
146        vars: BTreeMap<String, String>,
147        conn_id: ConnectionId,
148        tx: oneshot::Sender<Result<(), AdapterError>>,
149    },
150
151    Terminate {
152        conn_id: ConnectionId,
153        tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
154    },
155
156    /// Performs any cleanup and logging actions necessary for
157    /// finalizing a statement execution.
158    ///
159    /// Only used for cases that terminate in the protocol layer and
160    /// otherwise have no reason to hand control back to the coordinator.
161    /// In other cases, we piggy-back on another command.
162    RetireExecute {
163        data: ExecuteContextExtra,
164        reason: StatementEndedExecutionReason,
165    },
166
167    CheckConsistency {
168        tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
169    },
170
171    Dump {
172        tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
173    },
174
175    GetComputeInstanceClient {
176        instance_id: ComputeInstanceId,
177        tx: oneshot::Sender<
178            Result<
179                mz_compute_client::controller::instance_client::InstanceClient<mz_repr::Timestamp>,
180                mz_compute_client::controller::error::InstanceMissing,
181            >,
182        >,
183    },
184
185    GetOracle {
186        timeline: Timeline,
187        tx: oneshot::Sender<
188            Result<Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync>, AdapterError>,
189        >,
190    },
191
192    DetermineRealTimeRecentTimestamp {
193        source_ids: BTreeSet<GlobalId>,
194        real_time_recency_timeout: Duration,
195        tx: oneshot::Sender<Result<Option<mz_repr::Timestamp>, AdapterError>>,
196    },
197
198    GetTransactionReadHoldsBundle {
199        conn_id: ConnectionId,
200        tx: oneshot::Sender<Option<ReadHolds<mz_repr::Timestamp>>>,
201    },
202
203    /// _Merges_ the given read holds into the given connection's stored transaction read holds.
204    StoreTransactionReadHolds {
205        conn_id: ConnectionId,
206        read_holds: ReadHolds<mz_repr::Timestamp>,
207        tx: oneshot::Sender<()>,
208    },
209
210    ExecuteSlowPathPeek {
211        dataflow_plan: Box<PeekDataflowPlan<mz_repr::Timestamp>>,
212        determination: TimestampDetermination<mz_repr::Timestamp>,
213        finishing: RowSetFinishing,
214        compute_instance: ComputeInstanceId,
215        target_replica: Option<ReplicaId>,
216        intermediate_result_type: SqlRelationType,
217        source_ids: BTreeSet<GlobalId>,
218        conn_id: ConnectionId,
219        max_result_size: u64,
220        max_query_result_size: Option<u64>,
221        /// If statement logging is enabled, contains all info needed for installing watch sets
222        /// and logging the statement execution.
223        watch_set: Option<WatchSetCreation>,
224        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
225    },
226
227    /// Preflight check for COPY TO S3 operation. This runs the slow S3 operations
228    /// (loading SDK config, checking bucket path, verifying permissions, uploading sentinel)
229    /// in a background task to avoid blocking the coordinator.
230    CopyToPreflight {
231        /// The S3 connection info needed for preflight checks.
232        s3_sink_connection: mz_compute_types::sinks::CopyToS3OneshotSinkConnection,
233        /// The sink ID for logging and S3 key management.
234        sink_id: GlobalId,
235        /// Response channel for the preflight result.
236        tx: oneshot::Sender<Result<(), AdapterError>>,
237    },
238
239    ExecuteCopyTo {
240        df_desc: Box<DataflowDescription<mz_compute_types::plan::Plan>>,
241        compute_instance: ComputeInstanceId,
242        target_replica: Option<ReplicaId>,
243        source_ids: BTreeSet<GlobalId>,
244        conn_id: ConnectionId,
245        /// If statement logging is enabled, contains all info needed for installing watch sets
246        /// and logging the statement execution.
247        watch_set: Option<WatchSetCreation>,
248        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
249    },
250
251    /// Execute a side-effecting function from the frontend peek path.
252    ExecuteSideEffectingFunc {
253        plan: SideEffectingFunc,
254        conn_id: ConnectionId,
255        /// The current role of the session, used for RBAC checks.
256        current_role: RoleId,
257        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
258    },
259
260    /// Register a pending peek initiated by frontend sequencing. This is needed for:
261    /// - statement logging
262    /// - query cancellation
263    RegisterFrontendPeek {
264        uuid: Uuid,
265        conn_id: ConnectionId,
266        cluster_id: mz_controller_types::ClusterId,
267        depends_on: BTreeSet<GlobalId>,
268        is_fast_path: bool,
269        /// If statement logging is enabled, contains all info needed for installing watch sets
270        /// and logging the statement execution.
271        watch_set: Option<WatchSetCreation>,
272        tx: oneshot::Sender<Result<(), AdapterError>>,
273    },
274
275    /// Unregister a pending peek that was registered but failed to issue.
276    /// This is used for cleanup when `client.peek()` fails after `RegisterFrontendPeek` succeeds.
277    /// The `ExecuteContextExtra` is dropped without logging the statement retirement, because the
278    /// frontend will log the error.
279    UnregisterFrontendPeek {
280        uuid: Uuid,
281        tx: oneshot::Sender<()>,
282    },
283
284    /// Generate a timestamp explanation.
285    /// This is used when `emit_timestamp_notice` is enabled.
286    ExplainTimestamp {
287        conn_id: ConnectionId,
288        session_wall_time: DateTime<Utc>,
289        cluster_id: ClusterId,
290        id_bundle: CollectionIdBundle,
291        determination: TimestampDetermination<mz_repr::Timestamp>,
292        tx: oneshot::Sender<TimestampExplanation<mz_repr::Timestamp>>,
293    },
294
295    /// Statement logging event from frontend peek sequencing.
296    /// No response channel needed - this is fire-and-forget.
297    FrontendStatementLogging(FrontendStatementLoggingEvent),
298}
299
300impl Command {
301    pub fn session(&self) -> Option<&Session> {
302        match self {
303            Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
304            Command::CancelRequest { .. }
305            | Command::Startup { .. }
306            | Command::AuthenticatePassword { .. }
307            | Command::AuthenticateGetSASLChallenge { .. }
308            | Command::AuthenticateVerifySASLProof { .. }
309            | Command::CatalogSnapshot { .. }
310            | Command::PrivilegedCancelRequest { .. }
311            | Command::GetWebhook { .. }
312            | Command::Terminate { .. }
313            | Command::GetSystemVars { .. }
314            | Command::SetSystemVars { .. }
315            | Command::RetireExecute { .. }
316            | Command::CheckConsistency { .. }
317            | Command::Dump { .. }
318            | Command::GetComputeInstanceClient { .. }
319            | Command::GetOracle { .. }
320            | Command::DetermineRealTimeRecentTimestamp { .. }
321            | Command::GetTransactionReadHoldsBundle { .. }
322            | Command::StoreTransactionReadHolds { .. }
323            | Command::ExecuteSlowPathPeek { .. }
324            | Command::CopyToPreflight { .. }
325            | Command::ExecuteCopyTo { .. }
326            | Command::ExecuteSideEffectingFunc { .. }
327            | Command::RegisterFrontendPeek { .. }
328            | Command::UnregisterFrontendPeek { .. }
329            | Command::ExplainTimestamp { .. }
330            | Command::FrontendStatementLogging(..) => None,
331        }
332    }
333
334    pub fn session_mut(&mut self) -> Option<&mut Session> {
335        match self {
336            Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
337            Command::CancelRequest { .. }
338            | Command::Startup { .. }
339            | Command::AuthenticatePassword { .. }
340            | Command::AuthenticateGetSASLChallenge { .. }
341            | Command::AuthenticateVerifySASLProof { .. }
342            | Command::CatalogSnapshot { .. }
343            | Command::PrivilegedCancelRequest { .. }
344            | Command::GetWebhook { .. }
345            | Command::Terminate { .. }
346            | Command::GetSystemVars { .. }
347            | Command::SetSystemVars { .. }
348            | Command::RetireExecute { .. }
349            | Command::CheckConsistency { .. }
350            | Command::Dump { .. }
351            | Command::GetComputeInstanceClient { .. }
352            | Command::GetOracle { .. }
353            | Command::DetermineRealTimeRecentTimestamp { .. }
354            | Command::GetTransactionReadHoldsBundle { .. }
355            | Command::StoreTransactionReadHolds { .. }
356            | Command::ExecuteSlowPathPeek { .. }
357            | Command::CopyToPreflight { .. }
358            | Command::ExecuteCopyTo { .. }
359            | Command::ExecuteSideEffectingFunc { .. }
360            | Command::RegisterFrontendPeek { .. }
361            | Command::UnregisterFrontendPeek { .. }
362            | Command::ExplainTimestamp { .. }
363            | Command::FrontendStatementLogging(..) => None,
364        }
365    }
366}
367
368#[derive(Debug)]
369pub struct Response<T> {
370    pub result: Result<T, AdapterError>,
371    pub session: Session,
372    pub otel_ctx: OpenTelemetryContext,
373}
374
375#[derive(Debug, Clone, Copy)]
376pub struct SuperuserAttribute(pub Option<bool>);
377
378/// The response to [`Client::startup`](crate::Client::startup).
379#[derive(Derivative)]
380#[derivative(Debug)]
381pub struct StartupResponse {
382    /// RoleId for the user.
383    pub role_id: RoleId,
384    /// The role's superuser attribute in the Catalog.
385    /// This attribute is None for Cloud. Cloud is able
386    /// to derive the role's superuser status from
387    /// external_metadata_rx.
388    pub superuser_attribute: SuperuserAttribute,
389    /// A future that completes when all necessary Builtin Table writes have completed.
390    #[derivative(Debug = "ignore")]
391    pub write_notify: BuiltinTableAppendNotify,
392    /// Map of (name, VarInput::Flat) tuples of session default variables that should be set.
393    pub session_defaults: BTreeMap<String, OwnedVarInput>,
394    pub catalog: Arc<Catalog>,
395    pub storage_collections: Arc<
396        dyn mz_storage_client::storage_collections::StorageCollections<
397                Timestamp = mz_repr::Timestamp,
398            > + Send
399            + Sync,
400    >,
401    pub transient_id_gen: Arc<TransientIdGen>,
402    pub optimizer_metrics: OptimizerMetrics,
403    pub persist_client: PersistClient,
404    pub statement_logging_frontend: StatementLoggingFrontend,
405}
406
407#[derive(Derivative)]
408#[derivative(Debug)]
409pub struct SASLChallengeResponse {
410    pub iteration_count: usize,
411    /// Base64-encoded salt for the SASL challenge.
412    pub salt: String,
413    pub nonce: String,
414}
415
416#[derive(Derivative)]
417#[derivative(Debug)]
418pub struct SASLVerifyProofResponse {
419    pub verifier: String,
420}
421
422// Facile implementation for `StartupResponse`, which does not use the `allowed`
423// feature of `ClientTransmitter`.
424impl Transmittable for StartupResponse {
425    type Allowed = bool;
426    fn to_allowed(&self) -> Self::Allowed {
427        true
428    }
429}
430
431/// The response to [`SessionClient::dump_catalog`](crate::SessionClient::dump_catalog).
432#[derive(Debug, Clone)]
433pub struct CatalogDump(String);
434
435impl CatalogDump {
436    pub fn new(raw: String) -> Self {
437        CatalogDump(raw)
438    }
439
440    pub fn into_string(self) -> String {
441        self.0
442    }
443}
444
445impl Transmittable for CatalogDump {
446    type Allowed = bool;
447    fn to_allowed(&self) -> Self::Allowed {
448        true
449    }
450}
451
452impl Transmittable for SystemVars {
453    type Allowed = bool;
454    fn to_allowed(&self) -> Self::Allowed {
455        true
456    }
457}
458
459/// The response to [`SessionClient::execute`](crate::SessionClient::execute).
460#[derive(EnumKind, Derivative)]
461#[derivative(Debug)]
462#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
463pub enum ExecuteResponse {
464    /// The default privileges were altered.
465    AlteredDefaultPrivileges,
466    /// The requested object was altered.
467    AlteredObject(ObjectType),
468    /// The role was altered.
469    AlteredRole,
470    /// The system configuration was altered.
471    AlteredSystemConfiguration,
472    /// The requested cursor was closed.
473    ClosedCursor,
474    /// The provided comment was created.
475    Comment,
476    /// The specified number of rows were copied into the requested output.
477    Copied(usize),
478    /// The response for a COPY TO STDOUT query.
479    CopyTo {
480        format: mz_sql::plan::CopyFormat,
481        resp: Box<ExecuteResponse>,
482    },
483    CopyFrom {
484        /// Table we're copying into.
485        target_id: CatalogItemId,
486        /// Human-readable full name of the target table.
487        target_name: String,
488        columns: Vec<ColumnIndex>,
489        params: CopyFormatParams<'static>,
490        ctx_extra: ExecuteContextGuard,
491    },
492    /// The requested connection was created.
493    CreatedConnection,
494    /// The requested database was created.
495    CreatedDatabase,
496    /// The requested schema was created.
497    CreatedSchema,
498    /// The requested role was created.
499    CreatedRole,
500    /// The requested cluster was created.
501    CreatedCluster,
502    /// The requested cluster replica was created.
503    CreatedClusterReplica,
504    /// The requested index was created.
505    CreatedIndex,
506    /// The requested introspection subscribe was created.
507    CreatedIntrospectionSubscribe,
508    /// The requested secret was created.
509    CreatedSecret,
510    /// The requested sink was created.
511    CreatedSink,
512    /// The requested source was created.
513    CreatedSource,
514    /// The requested table was created.
515    CreatedTable,
516    /// The requested view was created.
517    CreatedView,
518    /// The requested views were created.
519    CreatedViews,
520    /// The requested materialized view was created.
521    CreatedMaterializedView,
522    /// The requested continual task was created.
523    CreatedContinualTask,
524    /// The requested type was created.
525    CreatedType,
526    /// The requested network policy was created.
527    CreatedNetworkPolicy,
528    /// The requested prepared statement was removed.
529    Deallocate { all: bool },
530    /// The requested cursor was declared.
531    DeclaredCursor,
532    /// The specified number of rows were deleted from the requested table.
533    Deleted(usize),
534    /// The temporary objects associated with the session have been discarded.
535    DiscardedTemp,
536    /// All state associated with the session has been discarded.
537    DiscardedAll,
538    /// The requested object was dropped.
539    DroppedObject(ObjectType),
540    /// The requested objects were dropped.
541    DroppedOwned,
542    /// The provided query was empty.
543    EmptyQuery,
544    /// Fetch results from a cursor.
545    Fetch {
546        /// The name of the cursor from which to fetch results.
547        name: String,
548        /// The number of results to fetch.
549        count: Option<FetchDirection>,
550        /// How long to wait for results to arrive.
551        timeout: ExecuteTimeout,
552        ctx_extra: ExecuteContextGuard,
553    },
554    /// The requested privilege was granted.
555    GrantedPrivilege,
556    /// The requested role was granted.
557    GrantedRole,
558    /// The specified number of rows were inserted into the requested table.
559    Inserted(usize),
560    /// The specified prepared statement was created.
561    Prepare,
562    /// A user-requested warning was raised.
563    Raised,
564    /// The requested objects were reassigned.
565    ReassignOwned,
566    /// The requested privilege was revoked.
567    RevokedPrivilege,
568    /// The requested role was revoked.
569    RevokedRole,
570    /// Rows will be delivered via the specified stream.
571    SendingRowsStreaming {
572        #[derivative(Debug = "ignore")]
573        rows: Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>,
574        instance_id: ComputeInstanceId,
575        strategy: StatementExecutionStrategy,
576    },
577    /// Rows are known to be available immediately, and thus the execution is
578    /// considered ended in the coordinator.
579    SendingRowsImmediate {
580        #[derivative(Debug = "ignore")]
581        rows: Box<dyn RowIterator + Send + Sync>,
582    },
583    /// The specified variable was set to a new value.
584    SetVariable {
585        name: String,
586        /// Whether the operation was a `RESET` rather than a set.
587        reset: bool,
588    },
589    /// A new transaction was started.
590    StartedTransaction,
591    /// Updates to the requested source or view will be streamed to the
592    /// contained receiver.
593    Subscribing {
594        rx: RowBatchStream,
595        ctx_extra: ExecuteContextGuard,
596        instance_id: ComputeInstanceId,
597    },
598    /// The active transaction committed.
599    TransactionCommitted {
600        /// Session parameters that changed because the transaction ended.
601        params: BTreeMap<&'static str, String>,
602    },
603    /// The active transaction rolled back.
604    TransactionRolledBack {
605        /// Session parameters that changed because the transaction ended.
606        params: BTreeMap<&'static str, String>,
607    },
608    /// The specified number of rows were updated in the requested table.
609    Updated(usize),
610    /// A connection was validated.
611    ValidatedConnection,
612}
613
614impl TryFrom<&Statement<Raw>> for ExecuteResponse {
615    type Error = ();
616
617    /// Returns Ok if this Statement always produces a single, trivial ExecuteResponse.
618    fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
619        let resp_kinds = Plan::generated_from(&stmt.into())
620            .iter()
621            .map(ExecuteResponse::generated_from)
622            .flatten()
623            .cloned()
624            .collect::<BTreeSet<ExecuteResponseKind>>();
625        let resps = resp_kinds
626            .iter()
627            .map(|r| (*r).try_into())
628            .collect::<Result<Vec<ExecuteResponse>, _>>();
629        // Check if this statement's possible plans yield exactly one possible ExecuteResponse.
630        if let Ok(resps) = resps {
631            if resps.len() == 1 {
632                return Ok(resps.into_element());
633            }
634        }
635        let resp = match stmt {
636            Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
637                ExecuteResponse::DroppedObject((*object_type).into())
638            }
639            Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
640            | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
641                ExecuteResponse::AlteredObject((*object_type).into())
642            }
643            _ => return Err(()),
644        };
645        // Ensure that if the planner ever adds possible plans we complain here.
646        soft_assert_no_log!(
647            resp_kinds.len() == 1
648                && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
649            "ExecuteResponses out of sync with planner"
650        );
651        Ok(resp)
652    }
653}
654
655impl TryInto<ExecuteResponse> for ExecuteResponseKind {
656    type Error = ();
657
658    /// Attempts to convert into an ExecuteResponse. Returns an error if not possible without
659    /// actually executing a statement.
660    fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
661        match self {
662            ExecuteResponseKind::AlteredDefaultPrivileges => {
663                Ok(ExecuteResponse::AlteredDefaultPrivileges)
664            }
665            ExecuteResponseKind::AlteredObject => Err(()),
666            ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
667            ExecuteResponseKind::AlteredSystemConfiguration => {
668                Ok(ExecuteResponse::AlteredSystemConfiguration)
669            }
670            ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
671            ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
672            ExecuteResponseKind::Copied => Err(()),
673            ExecuteResponseKind::CopyTo => Err(()),
674            ExecuteResponseKind::CopyFrom => Err(()),
675            ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
676            ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
677            ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
678            ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
679            ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
680            ExecuteResponseKind::CreatedClusterReplica => {
681                Ok(ExecuteResponse::CreatedClusterReplica)
682            }
683            ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
684            ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
685            ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
686            ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
687            ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
688            ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
689            ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
690            ExecuteResponseKind::CreatedMaterializedView => {
691                Ok(ExecuteResponse::CreatedMaterializedView)
692            }
693            ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
694            ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
695            ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
696            ExecuteResponseKind::Deallocate => Err(()),
697            ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
698            ExecuteResponseKind::Deleted => Err(()),
699            ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
700            ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
701            ExecuteResponseKind::DroppedObject => Err(()),
702            ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
703            ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
704            ExecuteResponseKind::Fetch => Err(()),
705            ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
706            ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
707            ExecuteResponseKind::Inserted => Err(()),
708            ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
709            ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
710            ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
711            ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
712            ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
713            ExecuteResponseKind::SetVariable => Err(()),
714            ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
715            ExecuteResponseKind::Subscribing => Err(()),
716            ExecuteResponseKind::TransactionCommitted => Err(()),
717            ExecuteResponseKind::TransactionRolledBack => Err(()),
718            ExecuteResponseKind::Updated => Err(()),
719            ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
720            ExecuteResponseKind::SendingRowsStreaming => Err(()),
721            ExecuteResponseKind::SendingRowsImmediate => Err(()),
722            ExecuteResponseKind::CreatedIntrospectionSubscribe => {
723                Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
724            }
725        }
726    }
727}
728
729impl ExecuteResponse {
730    pub fn tag(&self) -> Option<String> {
731        use ExecuteResponse::*;
732        match self {
733            AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
734            AlteredObject(o) => Some(format!("ALTER {}", o)),
735            AlteredRole => Some("ALTER ROLE".into()),
736            AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
737            ClosedCursor => Some("CLOSE CURSOR".into()),
738            Comment => Some("COMMENT".into()),
739            Copied(n) => Some(format!("COPY {}", n)),
740            CopyTo { .. } => None,
741            CopyFrom { .. } => None,
742            CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
743            CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
744            CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
745            CreatedRole => Some("CREATE ROLE".into()),
746            CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
747            CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
748            CreatedIndex { .. } => Some("CREATE INDEX".into()),
749            CreatedSecret { .. } => Some("CREATE SECRET".into()),
750            CreatedSink { .. } => Some("CREATE SINK".into()),
751            CreatedSource { .. } => Some("CREATE SOURCE".into()),
752            CreatedTable { .. } => Some("CREATE TABLE".into()),
753            CreatedView { .. } => Some("CREATE VIEW".into()),
754            CreatedViews { .. } => Some("CREATE VIEWS".into()),
755            CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
756            CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
757            CreatedType => Some("CREATE TYPE".into()),
758            CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
759            Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
760            DeclaredCursor => Some("DECLARE CURSOR".into()),
761            Deleted(n) => Some(format!("DELETE {}", n)),
762            DiscardedTemp => Some("DISCARD TEMP".into()),
763            DiscardedAll => Some("DISCARD ALL".into()),
764            DroppedObject(o) => Some(format!("DROP {o}")),
765            DroppedOwned => Some("DROP OWNED".into()),
766            EmptyQuery => None,
767            Fetch { .. } => None,
768            GrantedPrivilege => Some("GRANT".into()),
769            GrantedRole => Some("GRANT ROLE".into()),
770            Inserted(n) => {
771                // "On successful completion, an INSERT command returns a
772                // command tag of the form `INSERT <oid> <count>`."
773                //     -- https://www.postgresql.org/docs/11/sql-insert.html
774                //
775                // OIDs are a PostgreSQL-specific historical quirk, but we
776                // can return a 0 OID to indicate that the table does not
777                // have OIDs.
778                Some(format!("INSERT 0 {}", n))
779            }
780            Prepare => Some("PREPARE".into()),
781            Raised => Some("RAISE".into()),
782            ReassignOwned => Some("REASSIGN OWNED".into()),
783            RevokedPrivilege => Some("REVOKE".into()),
784            RevokedRole => Some("REVOKE ROLE".into()),
785            SendingRowsStreaming { .. } | SendingRowsImmediate { .. } => None,
786            SetVariable { reset: true, .. } => Some("RESET".into()),
787            SetVariable { reset: false, .. } => Some("SET".into()),
788            StartedTransaction { .. } => Some("BEGIN".into()),
789            Subscribing { .. } => None,
790            TransactionCommitted { .. } => Some("COMMIT".into()),
791            TransactionRolledBack { .. } => Some("ROLLBACK".into()),
792            Updated(n) => Some(format!("UPDATE {}", n)),
793            ValidatedConnection => Some("VALIDATE CONNECTION".into()),
794            CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
795        }
796    }
797
798    /// Expresses which [`PlanKind`] generate which set of [`ExecuteResponseKind`].
799    /// `ExecuteResponseKind::Canceled` could be generated at any point as well, but that is
800    /// excluded from this function.
801    pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
802        use ExecuteResponseKind::*;
803        use PlanKind::*;
804
805        match plan {
806            AbortTransaction => &[TransactionRolledBack],
807            AlterClusterRename
808            | AlterClusterSwap
809            | AlterCluster
810            | AlterClusterReplicaRename
811            | AlterOwner
812            | AlterItemRename
813            | AlterRetainHistory
814            | AlterNoop
815            | AlterSchemaRename
816            | AlterSchemaSwap
817            | AlterSecret
818            | AlterConnection
819            | AlterSource
820            | AlterSink
821            | AlterTableAddColumn
822            | AlterMaterializedViewApplyReplacement
823            | AlterNetworkPolicy => &[AlteredObject],
824            AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
825            AlterSetCluster => &[AlteredObject],
826            AlterRole => &[AlteredRole],
827            AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
828                &[AlteredSystemConfiguration]
829            }
830            Close => &[ClosedCursor],
831            PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
832            PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
833            PlanKind::Comment => &[ExecuteResponseKind::Comment],
834            CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
835            CreateConnection => &[CreatedConnection],
836            CreateDatabase => &[CreatedDatabase],
837            CreateSchema => &[CreatedSchema],
838            CreateRole => &[CreatedRole],
839            CreateCluster => &[CreatedCluster],
840            CreateClusterReplica => &[CreatedClusterReplica],
841            CreateSource | CreateSources => &[CreatedSource],
842            CreateSecret => &[CreatedSecret],
843            CreateSink => &[CreatedSink],
844            CreateTable => &[CreatedTable],
845            CreateView => &[CreatedView],
846            CreateMaterializedView => &[CreatedMaterializedView],
847            CreateContinualTask => &[CreatedContinualTask],
848            CreateIndex => &[CreatedIndex],
849            CreateType => &[CreatedType],
850            PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
851            CreateNetworkPolicy => &[CreatedNetworkPolicy],
852            Declare => &[DeclaredCursor],
853            DiscardTemp => &[DiscardedTemp],
854            DiscardAll => &[DiscardedAll],
855            DropObjects => &[DroppedObject],
856            DropOwned => &[DroppedOwned],
857            PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
858            ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
859            | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
860                ExecuteResponseKind::CopyTo,
861                SendingRowsStreaming,
862                SendingRowsImmediate,
863            ],
864            Execute | ReadThenWrite => &[
865                Deleted,
866                Inserted,
867                SendingRowsStreaming,
868                SendingRowsImmediate,
869                Updated,
870            ],
871            PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
872            GrantPrivileges => &[GrantedPrivilege],
873            GrantRole => &[GrantedRole],
874            Insert => &[Inserted, SendingRowsImmediate],
875            PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
876            PlanKind::Raise => &[ExecuteResponseKind::Raised],
877            PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
878            RevokePrivileges => &[RevokedPrivilege],
879            RevokeRole => &[RevokedRole],
880            PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
881                &[ExecuteResponseKind::SetVariable]
882            }
883            PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
884            StartTransaction => &[StartedTransaction],
885            SideEffectingFunc => &[SendingRowsStreaming, SendingRowsImmediate],
886            ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
887        }
888    }
889}
890
891/// This implementation is meant to ensure that we maintain updated information
892/// about which types of `ExecuteResponse`s are permitted to be sent, which will
893/// be a function of which plan we're executing.
894impl Transmittable for ExecuteResponse {
895    type Allowed = ExecuteResponseKind;
896    fn to_allowed(&self) -> Self::Allowed {
897        ExecuteResponseKind::from(self)
898    }
899}