Skip to main content

mz_adapter/
command.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use chrono::{DateTime, Utc};
17use derivative::Derivative;
18use enum_kinds::EnumKind;
19use futures::Stream;
20use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
21use mz_auth::password::Password;
22use mz_cluster_client::ReplicaId;
23use mz_compute_types::ComputeInstanceId;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_controller_types::ClusterId;
26use mz_expr::RowSetFinishing;
27use mz_ore::collections::CollectionExt;
28use mz_ore::soft_assert_no_log;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_persist_client::PersistClient;
31use mz_pgcopy::CopyFormatParams;
32use mz_repr::global_id::TransientIdGen;
33use mz_repr::role_id::RoleId;
34use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType};
35use mz_sql::ast::{FetchDirection, Raw, Statement};
36use mz_sql::catalog::ObjectType;
37use mz_sql::optimizer_metrics::OptimizerMetrics;
38use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind, SideEffectingFunc};
39use mz_sql::session::user::User;
40use mz_sql::session::vars::{OwnedVarInput, SystemVars};
41use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
42use mz_storage_types::sources::Timeline;
43use mz_timestamp_oracle::TimestampOracle;
44use tokio::sync::{mpsc, oneshot};
45use uuid::Uuid;
46
47use crate::catalog::Catalog;
48use crate::coord::appends::BuiltinTableAppendNotify;
49use crate::coord::consistency::CoordinatorInconsistencies;
50use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
51use crate::coord::timestamp_selection::TimestampDetermination;
52use crate::coord::{ExecuteContextExtra, ExecuteContextGuard};
53use crate::error::AdapterError;
54use crate::session::{EndTransactionAction, RowBatchStream, Session};
55use crate::statement_logging::WatchSetCreation;
56use crate::statement_logging::{
57    FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy,
58    StatementLoggingFrontend,
59};
60use crate::util::Transmittable;
61use crate::webhook::AppendWebhookResponse;
62use crate::{
63    AdapterNotice, AppendWebhookError, CollectionIdBundle, ReadHolds, TimestampExplanation,
64};
65
66/// A handle for pgwire to stream raw byte chunks to the coordinator's
67/// parallel background batch builder tasks during COPY FROM STDIN.
68#[derive(Debug)]
69pub struct CopyFromStdinWriter {
70    /// Channels for distributing raw byte chunks (split at row boundaries)
71    /// across parallel worker tasks. pgwire round-robins chunks across these.
72    pub batch_txs: Vec<mpsc::Sender<Vec<u8>>>,
73    /// Receives the final result (`Vec<ProtoBatch>` + total row count, or error)
74    /// from the collector task. pgwire uses this to commit the batches.
75    pub completion_rx: oneshot::Receiver<
76        Result<(Vec<mz_persist_client::batch::ProtoBatch>, u64), crate::AdapterError>,
77    >,
78}
79
80#[derive(Debug)]
81pub struct CatalogSnapshot {
82    pub catalog: Arc<Catalog>,
83}
84
85#[derive(Debug)]
86pub enum Command {
87    CatalogSnapshot {
88        tx: oneshot::Sender<CatalogSnapshot>,
89    },
90
91    Startup {
92        tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
93        user: User,
94        conn_id: ConnectionId,
95        client_ip: Option<IpAddr>,
96        secret_key: u32,
97        uuid: Uuid,
98        application_name: String,
99        notice_tx: mpsc::UnboundedSender<AdapterNotice>,
100    },
101
102    AuthenticatePassword {
103        tx: oneshot::Sender<Result<(), AdapterError>>,
104        role_name: String,
105        password: Option<Password>,
106    },
107
108    AuthenticateGetSASLChallenge {
109        tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
110        role_name: String,
111        nonce: String,
112    },
113
114    AuthenticateVerifySASLProof {
115        tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
116        role_name: String,
117        proof: String,
118        auth_message: String,
119        mock_hash: String,
120    },
121
122    Execute {
123        portal_name: String,
124        session: Session,
125        tx: oneshot::Sender<Response<ExecuteResponse>>,
126        outer_ctx_extra: Option<ExecuteContextGuard>,
127    },
128
129    /// Attempts to commit or abort the session's transaction. Guarantees that the Coordinator's
130    /// transaction state has been cleared, even if the commit or abort fails. (A failure can
131    /// happen, for example, if the session's role id has been dropped which will prevent
132    /// sequence_end_transaction from running.)
133    Commit {
134        action: EndTransactionAction,
135        session: Session,
136        tx: oneshot::Sender<Response<ExecuteResponse>>,
137    },
138
139    CancelRequest {
140        conn_id: ConnectionIdType,
141        secret_key: u32,
142    },
143
144    PrivilegedCancelRequest {
145        conn_id: ConnectionId,
146    },
147
148    GetWebhook {
149        database: String,
150        schema: String,
151        name: String,
152        tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
153    },
154
155    GetSystemVars {
156        tx: oneshot::Sender<SystemVars>,
157    },
158
159    SetSystemVars {
160        vars: BTreeMap<String, String>,
161        conn_id: ConnectionId,
162        tx: oneshot::Sender<Result<(), AdapterError>>,
163    },
164
165    Terminate {
166        conn_id: ConnectionId,
167        tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
168    },
169
170    /// Sets up a streaming COPY FROM STDIN operation. The coordinator
171    /// creates parallel background batch builder tasks and returns a
172    /// [`CopyFromStdinWriter`] that pgwire uses to stream raw byte chunks.
173    StartCopyFromStdin {
174        target_id: CatalogItemId,
175        target_name: String,
176        columns: Vec<ColumnIndex>,
177        /// The row description for the target table (used for constraint checks).
178        row_desc: mz_repr::RelationDesc,
179        /// Copy format parameters (text/csv/binary) for decoding raw bytes.
180        params: mz_pgcopy::CopyFormatParams<'static>,
181        session: Session,
182        tx: oneshot::Sender<Response<CopyFromStdinWriter>>,
183    },
184
185    /// Performs any cleanup and logging actions necessary for
186    /// finalizing a statement execution.
187    ///
188    /// Only used for cases that terminate in the protocol layer and
189    /// otherwise have no reason to hand control back to the coordinator.
190    /// In other cases, we piggy-back on another command.
191    RetireExecute {
192        data: ExecuteContextExtra,
193        reason: StatementEndedExecutionReason,
194    },
195
196    CheckConsistency {
197        tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
198    },
199
200    Dump {
201        tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
202    },
203
204    GetComputeInstanceClient {
205        instance_id: ComputeInstanceId,
206        tx: oneshot::Sender<
207            Result<
208                mz_compute_client::controller::instance_client::InstanceClient<mz_repr::Timestamp>,
209                mz_compute_client::controller::error::InstanceMissing,
210            >,
211        >,
212    },
213
214    GetOracle {
215        timeline: Timeline,
216        tx: oneshot::Sender<
217            Result<Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync>, AdapterError>,
218        >,
219    },
220
221    DetermineRealTimeRecentTimestamp {
222        source_ids: BTreeSet<GlobalId>,
223        real_time_recency_timeout: Duration,
224        tx: oneshot::Sender<Result<Option<mz_repr::Timestamp>, AdapterError>>,
225    },
226
227    GetTransactionReadHoldsBundle {
228        conn_id: ConnectionId,
229        tx: oneshot::Sender<Option<ReadHolds<mz_repr::Timestamp>>>,
230    },
231
232    /// _Merges_ the given read holds into the given connection's stored transaction read holds.
233    StoreTransactionReadHolds {
234        conn_id: ConnectionId,
235        read_holds: ReadHolds<mz_repr::Timestamp>,
236        tx: oneshot::Sender<()>,
237    },
238
239    ExecuteSlowPathPeek {
240        dataflow_plan: Box<PeekDataflowPlan<mz_repr::Timestamp>>,
241        determination: TimestampDetermination<mz_repr::Timestamp>,
242        finishing: RowSetFinishing,
243        compute_instance: ComputeInstanceId,
244        target_replica: Option<ReplicaId>,
245        intermediate_result_type: SqlRelationType,
246        source_ids: BTreeSet<GlobalId>,
247        conn_id: ConnectionId,
248        max_result_size: u64,
249        max_query_result_size: Option<u64>,
250        /// If statement logging is enabled, contains all info needed for installing watch sets
251        /// and logging the statement execution.
252        watch_set: Option<WatchSetCreation>,
253        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
254    },
255
256    /// Preflight check for COPY TO S3 operation. This runs the slow S3 operations
257    /// (loading SDK config, checking bucket path, verifying permissions, uploading sentinel)
258    /// in a background task to avoid blocking the coordinator.
259    CopyToPreflight {
260        /// The S3 connection info needed for preflight checks.
261        s3_sink_connection: mz_compute_types::sinks::CopyToS3OneshotSinkConnection,
262        /// The sink ID for logging and S3 key management.
263        sink_id: GlobalId,
264        /// Response channel for the preflight result.
265        tx: oneshot::Sender<Result<(), AdapterError>>,
266    },
267
268    ExecuteCopyTo {
269        df_desc: Box<DataflowDescription<mz_compute_types::plan::Plan>>,
270        compute_instance: ComputeInstanceId,
271        target_replica: Option<ReplicaId>,
272        source_ids: BTreeSet<GlobalId>,
273        conn_id: ConnectionId,
274        /// If statement logging is enabled, contains all info needed for installing watch sets
275        /// and logging the statement execution.
276        watch_set: Option<WatchSetCreation>,
277        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
278    },
279
280    /// Execute a side-effecting function from the frontend peek path.
281    ExecuteSideEffectingFunc {
282        plan: SideEffectingFunc,
283        conn_id: ConnectionId,
284        /// The current role of the session, used for RBAC checks.
285        current_role: RoleId,
286        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
287    },
288
289    /// Register a pending peek initiated by frontend sequencing. This is needed for:
290    /// - statement logging
291    /// - query cancellation
292    RegisterFrontendPeek {
293        uuid: Uuid,
294        conn_id: ConnectionId,
295        cluster_id: mz_controller_types::ClusterId,
296        depends_on: BTreeSet<GlobalId>,
297        is_fast_path: bool,
298        /// If statement logging is enabled, contains all info needed for installing watch sets
299        /// and logging the statement execution.
300        watch_set: Option<WatchSetCreation>,
301        tx: oneshot::Sender<Result<(), AdapterError>>,
302    },
303
304    /// Unregister a pending peek that was registered but failed to issue.
305    /// This is used for cleanup when `client.peek()` fails after `RegisterFrontendPeek` succeeds.
306    /// The `ExecuteContextExtra` is dropped without logging the statement retirement, because the
307    /// frontend will log the error.
308    UnregisterFrontendPeek {
309        uuid: Uuid,
310        tx: oneshot::Sender<()>,
311    },
312
313    /// Generate a timestamp explanation.
314    /// This is used when `emit_timestamp_notice` is enabled.
315    ExplainTimestamp {
316        conn_id: ConnectionId,
317        session_wall_time: DateTime<Utc>,
318        cluster_id: ClusterId,
319        id_bundle: CollectionIdBundle,
320        determination: TimestampDetermination<mz_repr::Timestamp>,
321        tx: oneshot::Sender<TimestampExplanation<mz_repr::Timestamp>>,
322    },
323
324    /// Statement logging event from frontend peek sequencing.
325    /// No response channel needed - this is fire-and-forget.
326    FrontendStatementLogging(FrontendStatementLoggingEvent),
327}
328
329impl Command {
330    pub fn session(&self) -> Option<&Session> {
331        match self {
332            Command::Execute { session, .. }
333            | Command::Commit { session, .. }
334            | Command::StartCopyFromStdin { session, .. } => Some(session),
335            Command::CancelRequest { .. }
336            | Command::Startup { .. }
337            | Command::AuthenticatePassword { .. }
338            | Command::AuthenticateGetSASLChallenge { .. }
339            | Command::AuthenticateVerifySASLProof { .. }
340            | Command::CatalogSnapshot { .. }
341            | Command::PrivilegedCancelRequest { .. }
342            | Command::GetWebhook { .. }
343            | Command::Terminate { .. }
344            | Command::GetSystemVars { .. }
345            | Command::SetSystemVars { .. }
346            | Command::RetireExecute { .. }
347            | Command::CheckConsistency { .. }
348            | Command::Dump { .. }
349            | Command::GetComputeInstanceClient { .. }
350            | Command::GetOracle { .. }
351            | Command::DetermineRealTimeRecentTimestamp { .. }
352            | Command::GetTransactionReadHoldsBundle { .. }
353            | Command::StoreTransactionReadHolds { .. }
354            | Command::ExecuteSlowPathPeek { .. }
355            | Command::CopyToPreflight { .. }
356            | Command::ExecuteCopyTo { .. }
357            | Command::ExecuteSideEffectingFunc { .. }
358            | Command::RegisterFrontendPeek { .. }
359            | Command::UnregisterFrontendPeek { .. }
360            | Command::ExplainTimestamp { .. }
361            | Command::FrontendStatementLogging(..) => None,
362        }
363    }
364
365    pub fn session_mut(&mut self) -> Option<&mut Session> {
366        match self {
367            Command::Execute { session, .. }
368            | Command::Commit { session, .. }
369            | Command::StartCopyFromStdin { session, .. } => Some(session),
370            Command::CancelRequest { .. }
371            | Command::Startup { .. }
372            | Command::AuthenticatePassword { .. }
373            | Command::AuthenticateGetSASLChallenge { .. }
374            | Command::AuthenticateVerifySASLProof { .. }
375            | Command::CatalogSnapshot { .. }
376            | Command::PrivilegedCancelRequest { .. }
377            | Command::GetWebhook { .. }
378            | Command::Terminate { .. }
379            | Command::GetSystemVars { .. }
380            | Command::SetSystemVars { .. }
381            | Command::RetireExecute { .. }
382            | Command::CheckConsistency { .. }
383            | Command::Dump { .. }
384            | Command::GetComputeInstanceClient { .. }
385            | Command::GetOracle { .. }
386            | Command::DetermineRealTimeRecentTimestamp { .. }
387            | Command::GetTransactionReadHoldsBundle { .. }
388            | Command::StoreTransactionReadHolds { .. }
389            | Command::ExecuteSlowPathPeek { .. }
390            | Command::CopyToPreflight { .. }
391            | Command::ExecuteCopyTo { .. }
392            | Command::ExecuteSideEffectingFunc { .. }
393            | Command::RegisterFrontendPeek { .. }
394            | Command::UnregisterFrontendPeek { .. }
395            | Command::ExplainTimestamp { .. }
396            | Command::FrontendStatementLogging(..) => None,
397        }
398    }
399}
400
401#[derive(Debug)]
402pub struct Response<T> {
403    pub result: Result<T, AdapterError>,
404    pub session: Session,
405    pub otel_ctx: OpenTelemetryContext,
406}
407
408#[derive(Debug, Clone, Copy)]
409pub struct SuperuserAttribute(pub Option<bool>);
410
411/// The response to [`Client::startup`](crate::Client::startup).
412#[derive(Derivative)]
413#[derivative(Debug)]
414pub struct StartupResponse {
415    /// RoleId for the user.
416    pub role_id: RoleId,
417    /// The role's superuser attribute in the Catalog.
418    /// This attribute is None for Cloud. Cloud is able
419    /// to derive the role's superuser status from
420    /// external_metadata_rx.
421    pub superuser_attribute: SuperuserAttribute,
422    /// A future that completes when all necessary Builtin Table writes have completed.
423    #[derivative(Debug = "ignore")]
424    pub write_notify: BuiltinTableAppendNotify,
425    /// Map of (name, VarInput::Flat) tuples of session default variables that should be set.
426    pub session_defaults: BTreeMap<String, OwnedVarInput>,
427    pub catalog: Arc<Catalog>,
428    pub storage_collections: Arc<
429        dyn mz_storage_client::storage_collections::StorageCollections<
430                Timestamp = mz_repr::Timestamp,
431            > + Send
432            + Sync,
433    >,
434    pub transient_id_gen: Arc<TransientIdGen>,
435    pub optimizer_metrics: OptimizerMetrics,
436    pub persist_client: PersistClient,
437    pub statement_logging_frontend: StatementLoggingFrontend,
438}
439
440#[derive(Derivative)]
441#[derivative(Debug)]
442pub struct SASLChallengeResponse {
443    pub iteration_count: usize,
444    /// Base64-encoded salt for the SASL challenge.
445    pub salt: String,
446    pub nonce: String,
447}
448
449#[derive(Derivative)]
450#[derivative(Debug)]
451pub struct SASLVerifyProofResponse {
452    pub verifier: String,
453}
454
455// Facile implementation for `StartupResponse`, which does not use the `allowed`
456// feature of `ClientTransmitter`.
457impl Transmittable for StartupResponse {
458    type Allowed = bool;
459    fn to_allowed(&self) -> Self::Allowed {
460        true
461    }
462}
463
464/// The response to [`SessionClient::dump_catalog`](crate::SessionClient::dump_catalog).
465#[derive(Debug, Clone)]
466pub struct CatalogDump(String);
467
468impl CatalogDump {
469    pub fn new(raw: String) -> Self {
470        CatalogDump(raw)
471    }
472
473    pub fn into_string(self) -> String {
474        self.0
475    }
476}
477
478impl Transmittable for CatalogDump {
479    type Allowed = bool;
480    fn to_allowed(&self) -> Self::Allowed {
481        true
482    }
483}
484
485impl Transmittable for SystemVars {
486    type Allowed = bool;
487    fn to_allowed(&self) -> Self::Allowed {
488        true
489    }
490}
491
492/// The response to [`SessionClient::execute`](crate::SessionClient::execute).
493#[derive(EnumKind, Derivative)]
494#[derivative(Debug)]
495#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
496pub enum ExecuteResponse {
497    /// The default privileges were altered.
498    AlteredDefaultPrivileges,
499    /// The requested object was altered.
500    AlteredObject(ObjectType),
501    /// The role was altered.
502    AlteredRole,
503    /// The system configuration was altered.
504    AlteredSystemConfiguration,
505    /// The requested cursor was closed.
506    ClosedCursor,
507    /// The provided comment was created.
508    Comment,
509    /// The specified number of rows were copied into the requested output.
510    Copied(usize),
511    /// The response for a COPY TO STDOUT query.
512    CopyTo {
513        format: mz_sql::plan::CopyFormat,
514        resp: Box<ExecuteResponse>,
515    },
516    CopyFrom {
517        /// Table we're copying into.
518        target_id: CatalogItemId,
519        /// Human-readable full name of the target table.
520        target_name: String,
521        columns: Vec<ColumnIndex>,
522        params: CopyFormatParams<'static>,
523        ctx_extra: ExecuteContextGuard,
524    },
525    /// The requested connection was created.
526    CreatedConnection,
527    /// The requested database was created.
528    CreatedDatabase,
529    /// The requested schema was created.
530    CreatedSchema,
531    /// The requested role was created.
532    CreatedRole,
533    /// The requested cluster was created.
534    CreatedCluster,
535    /// The requested cluster replica was created.
536    CreatedClusterReplica,
537    /// The requested index was created.
538    CreatedIndex,
539    /// The requested introspection subscribe was created.
540    CreatedIntrospectionSubscribe,
541    /// The requested secret was created.
542    CreatedSecret,
543    /// The requested sink was created.
544    CreatedSink,
545    /// The requested source was created.
546    CreatedSource,
547    /// The requested table was created.
548    CreatedTable,
549    /// The requested view was created.
550    CreatedView,
551    /// The requested views were created.
552    CreatedViews,
553    /// The requested materialized view was created.
554    CreatedMaterializedView,
555    /// The requested continual task was created.
556    CreatedContinualTask,
557    /// The requested type was created.
558    CreatedType,
559    /// The requested network policy was created.
560    CreatedNetworkPolicy,
561    /// The requested prepared statement was removed.
562    Deallocate { all: bool },
563    /// The requested cursor was declared.
564    DeclaredCursor,
565    /// The specified number of rows were deleted from the requested table.
566    Deleted(usize),
567    /// The temporary objects associated with the session have been discarded.
568    DiscardedTemp,
569    /// All state associated with the session has been discarded.
570    DiscardedAll,
571    /// The requested object was dropped.
572    DroppedObject(ObjectType),
573    /// The requested objects were dropped.
574    DroppedOwned,
575    /// The provided query was empty.
576    EmptyQuery,
577    /// Fetch results from a cursor.
578    Fetch {
579        /// The name of the cursor from which to fetch results.
580        name: String,
581        /// The number of results to fetch.
582        count: Option<FetchDirection>,
583        /// How long to wait for results to arrive.
584        timeout: ExecuteTimeout,
585        ctx_extra: ExecuteContextGuard,
586    },
587    /// The requested privilege was granted.
588    GrantedPrivilege,
589    /// The requested role was granted.
590    GrantedRole,
591    /// The specified number of rows were inserted into the requested table.
592    Inserted(usize),
593    /// The specified prepared statement was created.
594    Prepare,
595    /// A user-requested warning was raised.
596    Raised,
597    /// The requested objects were reassigned.
598    ReassignOwned,
599    /// The requested privilege was revoked.
600    RevokedPrivilege,
601    /// The requested role was revoked.
602    RevokedRole,
603    /// Rows will be delivered via the specified stream.
604    SendingRowsStreaming {
605        #[derivative(Debug = "ignore")]
606        rows: Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>,
607        instance_id: ComputeInstanceId,
608        strategy: StatementExecutionStrategy,
609    },
610    /// Rows are known to be available immediately, and thus the execution is
611    /// considered ended in the coordinator.
612    SendingRowsImmediate {
613        #[derivative(Debug = "ignore")]
614        rows: Box<dyn RowIterator + Send + Sync>,
615    },
616    /// The specified variable was set to a new value.
617    SetVariable {
618        name: String,
619        /// Whether the operation was a `RESET` rather than a set.
620        reset: bool,
621    },
622    /// A new transaction was started.
623    StartedTransaction,
624    /// Updates to the requested source or view will be streamed to the
625    /// contained receiver.
626    Subscribing {
627        rx: RowBatchStream,
628        ctx_extra: ExecuteContextGuard,
629        instance_id: ComputeInstanceId,
630    },
631    /// The active transaction committed.
632    TransactionCommitted {
633        /// Session parameters that changed because the transaction ended.
634        params: BTreeMap<&'static str, String>,
635    },
636    /// The active transaction rolled back.
637    TransactionRolledBack {
638        /// Session parameters that changed because the transaction ended.
639        params: BTreeMap<&'static str, String>,
640    },
641    /// The specified number of rows were updated in the requested table.
642    Updated(usize),
643    /// A connection was validated.
644    ValidatedConnection,
645}
646
647impl TryFrom<&Statement<Raw>> for ExecuteResponse {
648    type Error = ();
649
650    /// Returns Ok if this Statement always produces a single, trivial ExecuteResponse.
651    fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
652        let resp_kinds = Plan::generated_from(&stmt.into())
653            .iter()
654            .map(ExecuteResponse::generated_from)
655            .flatten()
656            .cloned()
657            .collect::<BTreeSet<ExecuteResponseKind>>();
658        let resps = resp_kinds
659            .iter()
660            .map(|r| (*r).try_into())
661            .collect::<Result<Vec<ExecuteResponse>, _>>();
662        // Check if this statement's possible plans yield exactly one possible ExecuteResponse.
663        if let Ok(resps) = resps {
664            if resps.len() == 1 {
665                return Ok(resps.into_element());
666            }
667        }
668        let resp = match stmt {
669            Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
670                ExecuteResponse::DroppedObject((*object_type).into())
671            }
672            Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
673            | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
674                ExecuteResponse::AlteredObject((*object_type).into())
675            }
676            _ => return Err(()),
677        };
678        // Ensure that if the planner ever adds possible plans we complain here.
679        soft_assert_no_log!(
680            resp_kinds.len() == 1
681                && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
682            "ExecuteResponses out of sync with planner"
683        );
684        Ok(resp)
685    }
686}
687
688impl TryInto<ExecuteResponse> for ExecuteResponseKind {
689    type Error = ();
690
691    /// Attempts to convert into an ExecuteResponse. Returns an error if not possible without
692    /// actually executing a statement.
693    fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
694        match self {
695            ExecuteResponseKind::AlteredDefaultPrivileges => {
696                Ok(ExecuteResponse::AlteredDefaultPrivileges)
697            }
698            ExecuteResponseKind::AlteredObject => Err(()),
699            ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
700            ExecuteResponseKind::AlteredSystemConfiguration => {
701                Ok(ExecuteResponse::AlteredSystemConfiguration)
702            }
703            ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
704            ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
705            ExecuteResponseKind::Copied => Err(()),
706            ExecuteResponseKind::CopyTo => Err(()),
707            ExecuteResponseKind::CopyFrom => Err(()),
708            ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
709            ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
710            ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
711            ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
712            ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
713            ExecuteResponseKind::CreatedClusterReplica => {
714                Ok(ExecuteResponse::CreatedClusterReplica)
715            }
716            ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
717            ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
718            ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
719            ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
720            ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
721            ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
722            ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
723            ExecuteResponseKind::CreatedMaterializedView => {
724                Ok(ExecuteResponse::CreatedMaterializedView)
725            }
726            ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
727            ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
728            ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
729            ExecuteResponseKind::Deallocate => Err(()),
730            ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
731            ExecuteResponseKind::Deleted => Err(()),
732            ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
733            ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
734            ExecuteResponseKind::DroppedObject => Err(()),
735            ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
736            ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
737            ExecuteResponseKind::Fetch => Err(()),
738            ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
739            ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
740            ExecuteResponseKind::Inserted => Err(()),
741            ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
742            ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
743            ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
744            ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
745            ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
746            ExecuteResponseKind::SetVariable => Err(()),
747            ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
748            ExecuteResponseKind::Subscribing => Err(()),
749            ExecuteResponseKind::TransactionCommitted => Err(()),
750            ExecuteResponseKind::TransactionRolledBack => Err(()),
751            ExecuteResponseKind::Updated => Err(()),
752            ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
753            ExecuteResponseKind::SendingRowsStreaming => Err(()),
754            ExecuteResponseKind::SendingRowsImmediate => Err(()),
755            ExecuteResponseKind::CreatedIntrospectionSubscribe => {
756                Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
757            }
758        }
759    }
760}
761
762impl ExecuteResponse {
763    pub fn tag(&self) -> Option<String> {
764        use ExecuteResponse::*;
765        match self {
766            AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
767            AlteredObject(o) => Some(format!("ALTER {}", o)),
768            AlteredRole => Some("ALTER ROLE".into()),
769            AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
770            ClosedCursor => Some("CLOSE CURSOR".into()),
771            Comment => Some("COMMENT".into()),
772            Copied(n) => Some(format!("COPY {}", n)),
773            CopyTo { .. } => None,
774            CopyFrom { .. } => None,
775            CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
776            CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
777            CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
778            CreatedRole => Some("CREATE ROLE".into()),
779            CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
780            CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
781            CreatedIndex { .. } => Some("CREATE INDEX".into()),
782            CreatedSecret { .. } => Some("CREATE SECRET".into()),
783            CreatedSink { .. } => Some("CREATE SINK".into()),
784            CreatedSource { .. } => Some("CREATE SOURCE".into()),
785            CreatedTable { .. } => Some("CREATE TABLE".into()),
786            CreatedView { .. } => Some("CREATE VIEW".into()),
787            CreatedViews { .. } => Some("CREATE VIEWS".into()),
788            CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
789            CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
790            CreatedType => Some("CREATE TYPE".into()),
791            CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
792            Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
793            DeclaredCursor => Some("DECLARE CURSOR".into()),
794            Deleted(n) => Some(format!("DELETE {}", n)),
795            DiscardedTemp => Some("DISCARD TEMP".into()),
796            DiscardedAll => Some("DISCARD ALL".into()),
797            DroppedObject(o) => Some(format!("DROP {o}")),
798            DroppedOwned => Some("DROP OWNED".into()),
799            EmptyQuery => None,
800            Fetch { .. } => None,
801            GrantedPrivilege => Some("GRANT".into()),
802            GrantedRole => Some("GRANT ROLE".into()),
803            Inserted(n) => {
804                // "On successful completion, an INSERT command returns a
805                // command tag of the form `INSERT <oid> <count>`."
806                //     -- https://www.postgresql.org/docs/11/sql-insert.html
807                //
808                // OIDs are a PostgreSQL-specific historical quirk, but we
809                // can return a 0 OID to indicate that the table does not
810                // have OIDs.
811                Some(format!("INSERT 0 {}", n))
812            }
813            Prepare => Some("PREPARE".into()),
814            Raised => Some("RAISE".into()),
815            ReassignOwned => Some("REASSIGN OWNED".into()),
816            RevokedPrivilege => Some("REVOKE".into()),
817            RevokedRole => Some("REVOKE ROLE".into()),
818            SendingRowsStreaming { .. } | SendingRowsImmediate { .. } => None,
819            SetVariable { reset: true, .. } => Some("RESET".into()),
820            SetVariable { reset: false, .. } => Some("SET".into()),
821            StartedTransaction { .. } => Some("BEGIN".into()),
822            Subscribing { .. } => None,
823            TransactionCommitted { .. } => Some("COMMIT".into()),
824            TransactionRolledBack { .. } => Some("ROLLBACK".into()),
825            Updated(n) => Some(format!("UPDATE {}", n)),
826            ValidatedConnection => Some("VALIDATE CONNECTION".into()),
827            CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
828        }
829    }
830
831    /// Expresses which [`PlanKind`] generate which set of [`ExecuteResponseKind`].
832    /// `ExecuteResponseKind::Canceled` could be generated at any point as well, but that is
833    /// excluded from this function.
834    pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
835        use ExecuteResponseKind::*;
836        use PlanKind::*;
837
838        match plan {
839            AbortTransaction => &[TransactionRolledBack],
840            AlterClusterRename
841            | AlterClusterSwap
842            | AlterCluster
843            | AlterClusterReplicaRename
844            | AlterOwner
845            | AlterItemRename
846            | AlterRetainHistory
847            | AlterSourceTimestampInterval
848            | AlterNoop
849            | AlterSchemaRename
850            | AlterSchemaSwap
851            | AlterSecret
852            | AlterConnection
853            | AlterSource
854            | AlterSink
855            | AlterTableAddColumn
856            | AlterMaterializedViewApplyReplacement
857            | AlterNetworkPolicy => &[AlteredObject],
858            AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
859            AlterSetCluster => &[AlteredObject],
860            AlterRole => &[AlteredRole],
861            AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
862                &[AlteredSystemConfiguration]
863            }
864            Close => &[ClosedCursor],
865            PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
866            PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
867            PlanKind::Comment => &[ExecuteResponseKind::Comment],
868            CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
869            CreateConnection => &[CreatedConnection],
870            CreateDatabase => &[CreatedDatabase],
871            CreateSchema => &[CreatedSchema],
872            CreateRole => &[CreatedRole],
873            CreateCluster => &[CreatedCluster],
874            CreateClusterReplica => &[CreatedClusterReplica],
875            CreateSource | CreateSources => &[CreatedSource],
876            CreateSecret => &[CreatedSecret],
877            CreateSink => &[CreatedSink],
878            CreateTable => &[CreatedTable],
879            CreateView => &[CreatedView],
880            CreateMaterializedView => &[CreatedMaterializedView],
881            CreateContinualTask => &[CreatedContinualTask],
882            CreateIndex => &[CreatedIndex],
883            CreateType => &[CreatedType],
884            PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
885            CreateNetworkPolicy => &[CreatedNetworkPolicy],
886            Declare => &[DeclaredCursor],
887            DiscardTemp => &[DiscardedTemp],
888            DiscardAll => &[DiscardedAll],
889            DropObjects => &[DroppedObject],
890            DropOwned => &[DroppedOwned],
891            PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
892            ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
893            | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
894                ExecuteResponseKind::CopyTo,
895                SendingRowsStreaming,
896                SendingRowsImmediate,
897            ],
898            Execute | ReadThenWrite => &[
899                Deleted,
900                Inserted,
901                SendingRowsStreaming,
902                SendingRowsImmediate,
903                Updated,
904            ],
905            PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
906            GrantPrivileges => &[GrantedPrivilege],
907            GrantRole => &[GrantedRole],
908            Insert => &[Inserted, SendingRowsImmediate],
909            PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
910            PlanKind::Raise => &[ExecuteResponseKind::Raised],
911            PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
912            RevokePrivileges => &[RevokedPrivilege],
913            RevokeRole => &[RevokedRole],
914            PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
915                &[ExecuteResponseKind::SetVariable]
916            }
917            PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
918            StartTransaction => &[StartedTransaction],
919            SideEffectingFunc => &[SendingRowsStreaming, SendingRowsImmediate],
920            ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
921        }
922    }
923}
924
925/// This implementation is meant to ensure that we maintain updated information
926/// about which types of `ExecuteResponse`s are permitted to be sent, which will
927/// be a function of which plan we're executing.
928impl Transmittable for ExecuteResponse {
929    type Allowed = ExecuteResponseKind;
930    fn to_allowed(&self) -> Self::Allowed {
931        ExecuteResponseKind::from(self)
932    }
933}