Skip to main content

mz_adapter/
command.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use chrono::{DateTime, Utc};
17use derivative::Derivative;
18use enum_kinds::EnumKind;
19use futures::Stream;
20use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
21use mz_auth::password::Password;
22use mz_cluster_client::ReplicaId;
23use mz_compute_types::ComputeInstanceId;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_controller_types::ClusterId;
26use mz_expr::RowSetFinishing;
27use mz_ore::collections::CollectionExt;
28use mz_ore::soft_assert_no_log;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_persist_client::PersistClient;
31use mz_pgcopy::CopyFormatParams;
32use mz_repr::global_id::TransientIdGen;
33use mz_repr::role_id::RoleId;
34use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType};
35use mz_sql::ast::{FetchDirection, Raw, Statement};
36use mz_sql::catalog::ObjectType;
37use mz_sql::optimizer_metrics::OptimizerMetrics;
38use mz_sql::plan;
39use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind, SideEffectingFunc};
40use mz_sql::session::user::User;
41use mz_sql::session::vars::{OwnedVarInput, SystemVars};
42use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
43use mz_storage_types::sources::Timeline;
44use mz_timestamp_oracle::TimestampOracle;
45use tokio::sync::{mpsc, oneshot};
46use uuid::Uuid;
47
48use crate::catalog::Catalog;
49use crate::coord::appends::BuiltinTableAppendNotify;
50use crate::coord::consistency::CoordinatorInconsistencies;
51use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
52use crate::coord::timestamp_selection::TimestampDetermination;
53use crate::coord::{ExecuteContextExtra, ExecuteContextGuard};
54use crate::error::AdapterError;
55use crate::session::{EndTransactionAction, RowBatchStream, Session};
56use crate::statement_logging::{
57    FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy,
58    StatementLoggingFrontend,
59};
60use crate::statement_logging::{StatementLoggingId, WatchSetCreation};
61use crate::util::Transmittable;
62use crate::webhook::AppendWebhookResponse;
63use crate::{
64    AdapterNotice, AppendWebhookError, CollectionIdBundle, ReadHolds, TimestampExplanation,
65};
66
67/// A handle for pgwire to stream raw byte chunks to the coordinator's
68/// parallel background batch builder tasks during COPY FROM STDIN.
69#[derive(Debug)]
70pub struct CopyFromStdinWriter {
71    /// Channels for distributing raw byte chunks (split at row boundaries)
72    /// across parallel worker tasks. pgwire round-robins chunks across these.
73    pub batch_txs: Vec<mpsc::Sender<Vec<u8>>>,
74    /// Receives the final result (`Vec<ProtoBatch>` + total row count, or error)
75    /// from the collector task. pgwire uses this to commit the batches.
76    pub completion_rx: oneshot::Receiver<
77        Result<(Vec<mz_persist_client::batch::ProtoBatch>, u64), crate::AdapterError>,
78    >,
79}
80
81#[derive(Debug)]
82pub struct CatalogSnapshot {
83    pub catalog: Arc<Catalog>,
84}
85
86#[derive(Debug)]
87pub enum Command {
88    CatalogSnapshot {
89        tx: oneshot::Sender<CatalogSnapshot>,
90    },
91
92    Startup {
93        tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
94        user: User,
95        conn_id: ConnectionId,
96        client_ip: Option<IpAddr>,
97        secret_key: u32,
98        uuid: Uuid,
99        application_name: String,
100        notice_tx: mpsc::UnboundedSender<AdapterNotice>,
101    },
102
103    AuthenticatePassword {
104        tx: oneshot::Sender<Result<(), AdapterError>>,
105        role_name: String,
106        password: Option<Password>,
107    },
108
109    AuthenticateGetSASLChallenge {
110        tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
111        role_name: String,
112        nonce: String,
113    },
114
115    AuthenticateVerifySASLProof {
116        tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
117        role_name: String,
118        proof: String,
119        auth_message: String,
120        mock_hash: String,
121    },
122
123    CheckRoleCanLogin {
124        tx: oneshot::Sender<Result<(), AdapterError>>,
125        role_name: String,
126    },
127
128    Execute {
129        portal_name: String,
130        session: Session,
131        tx: oneshot::Sender<Response<ExecuteResponse>>,
132        outer_ctx_extra: Option<ExecuteContextGuard>,
133    },
134
135    /// Attempts to commit or abort the session's transaction. Guarantees that the Coordinator's
136    /// transaction state has been cleared, even if the commit or abort fails. (A failure can
137    /// happen, for example, if the session's role id has been dropped which will prevent
138    /// sequence_end_transaction from running.)
139    Commit {
140        action: EndTransactionAction,
141        session: Session,
142        tx: oneshot::Sender<Response<ExecuteResponse>>,
143    },
144
145    CancelRequest {
146        conn_id: ConnectionIdType,
147        secret_key: u32,
148    },
149
150    PrivilegedCancelRequest {
151        conn_id: ConnectionId,
152    },
153
154    GetWebhook {
155        database: String,
156        schema: String,
157        name: String,
158        tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
159    },
160
161    GetSystemVars {
162        tx: oneshot::Sender<SystemVars>,
163    },
164
165    SetSystemVars {
166        vars: BTreeMap<String, String>,
167        conn_id: ConnectionId,
168        tx: oneshot::Sender<Result<(), AdapterError>>,
169    },
170
171    InjectAuditEvents {
172        events: Vec<crate::catalog::InjectedAuditEvent>,
173        conn_id: ConnectionId,
174        tx: oneshot::Sender<Result<(), AdapterError>>,
175    },
176
177    Terminate {
178        conn_id: ConnectionId,
179        tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
180    },
181
182    /// Sets up a streaming COPY FROM STDIN operation. The coordinator
183    /// creates parallel background batch builder tasks and returns a
184    /// [`CopyFromStdinWriter`] that pgwire uses to stream raw byte chunks.
185    StartCopyFromStdin {
186        target_id: CatalogItemId,
187        target_name: String,
188        columns: Vec<ColumnIndex>,
189        /// The row description for the target table (used for constraint checks).
190        row_desc: mz_repr::RelationDesc,
191        /// Copy format parameters (text/csv/binary) for decoding raw bytes.
192        params: mz_pgcopy::CopyFormatParams<'static>,
193        session: Session,
194        tx: oneshot::Sender<Response<CopyFromStdinWriter>>,
195    },
196
197    /// Performs any cleanup and logging actions necessary for
198    /// finalizing a statement execution.
199    ///
200    /// Only used for cases that terminate in the protocol layer and
201    /// otherwise have no reason to hand control back to the coordinator.
202    /// In other cases, we piggy-back on another command.
203    RetireExecute {
204        data: ExecuteContextExtra,
205        reason: StatementEndedExecutionReason,
206    },
207
208    CheckConsistency {
209        tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
210    },
211
212    Dump {
213        tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
214    },
215
216    GetComputeInstanceClient {
217        instance_id: ComputeInstanceId,
218        tx: oneshot::Sender<
219            Result<
220                mz_compute_client::controller::instance_client::InstanceClient<mz_repr::Timestamp>,
221                mz_compute_client::controller::error::InstanceMissing,
222            >,
223        >,
224    },
225
226    GetOracle {
227        timeline: Timeline,
228        tx: oneshot::Sender<
229            Result<Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync>, AdapterError>,
230        >,
231    },
232
233    DetermineRealTimeRecentTimestamp {
234        source_ids: BTreeSet<GlobalId>,
235        real_time_recency_timeout: Duration,
236        tx: oneshot::Sender<Result<Option<mz_repr::Timestamp>, AdapterError>>,
237    },
238
239    GetTransactionReadHoldsBundle {
240        conn_id: ConnectionId,
241        tx: oneshot::Sender<Option<ReadHolds<mz_repr::Timestamp>>>,
242    },
243
244    /// _Merges_ the given read holds into the given connection's stored transaction read holds.
245    StoreTransactionReadHolds {
246        conn_id: ConnectionId,
247        read_holds: ReadHolds<mz_repr::Timestamp>,
248        tx: oneshot::Sender<()>,
249    },
250
251    ExecuteSlowPathPeek {
252        dataflow_plan: Box<PeekDataflowPlan<mz_repr::Timestamp>>,
253        determination: TimestampDetermination<mz_repr::Timestamp>,
254        finishing: RowSetFinishing,
255        compute_instance: ComputeInstanceId,
256        target_replica: Option<ReplicaId>,
257        intermediate_result_type: SqlRelationType,
258        source_ids: BTreeSet<GlobalId>,
259        conn_id: ConnectionId,
260        max_result_size: u64,
261        max_query_result_size: Option<u64>,
262        /// If statement logging is enabled, contains all info needed for installing watch sets
263        /// and logging the statement execution.
264        watch_set: Option<WatchSetCreation>,
265        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
266    },
267
268    ExecuteSubscribe {
269        df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
270        dependency_ids: BTreeSet<GlobalId>,
271        cluster_id: ComputeInstanceId,
272        replica_id: Option<ReplicaId>,
273        conn_id: ConnectionId,
274        session_uuid: Uuid,
275        read_holds: ReadHolds<mz_repr::Timestamp>,
276        plan: plan::SubscribePlan,
277        statement_logging_id: Option<StatementLoggingId>,
278        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
279    },
280
281    /// Preflight check for COPY TO S3 operation. This runs the slow S3 operations
282    /// (loading SDK config, checking bucket path, verifying permissions, uploading sentinel)
283    /// in a background task to avoid blocking the coordinator.
284    CopyToPreflight {
285        /// The S3 connection info needed for preflight checks.
286        s3_sink_connection: mz_compute_types::sinks::CopyToS3OneshotSinkConnection,
287        /// The sink ID for logging and S3 key management.
288        sink_id: GlobalId,
289        /// Response channel for the preflight result.
290        tx: oneshot::Sender<Result<(), AdapterError>>,
291    },
292
293    ExecuteCopyTo {
294        df_desc: Box<DataflowDescription<mz_compute_types::plan::Plan>>,
295        compute_instance: ComputeInstanceId,
296        target_replica: Option<ReplicaId>,
297        source_ids: BTreeSet<GlobalId>,
298        conn_id: ConnectionId,
299        /// If statement logging is enabled, contains all info needed for installing watch sets
300        /// and logging the statement execution.
301        watch_set: Option<WatchSetCreation>,
302        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
303    },
304
305    /// Execute a side-effecting function from the frontend peek path.
306    ExecuteSideEffectingFunc {
307        plan: SideEffectingFunc,
308        conn_id: ConnectionId,
309        /// The current role of the session, used for RBAC checks.
310        current_role: RoleId,
311        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
312    },
313
314    /// Register a pending peek initiated by frontend sequencing. This is needed for:
315    /// - statement logging
316    /// - query cancellation
317    RegisterFrontendPeek {
318        uuid: Uuid,
319        conn_id: ConnectionId,
320        cluster_id: mz_controller_types::ClusterId,
321        depends_on: BTreeSet<GlobalId>,
322        is_fast_path: bool,
323        /// If statement logging is enabled, contains all info needed for installing watch sets
324        /// and logging the statement execution.
325        watch_set: Option<WatchSetCreation>,
326        tx: oneshot::Sender<Result<(), AdapterError>>,
327    },
328
329    /// Unregister a pending peek that was registered but failed to issue.
330    /// This is used for cleanup when `client.peek()` fails after `RegisterFrontendPeek` succeeds.
331    /// The `ExecuteContextExtra` is dropped without logging the statement retirement, because the
332    /// frontend will log the error.
333    UnregisterFrontendPeek {
334        uuid: Uuid,
335        tx: oneshot::Sender<()>,
336    },
337
338    /// Generate a timestamp explanation.
339    /// This is used when `emit_timestamp_notice` is enabled.
340    ExplainTimestamp {
341        conn_id: ConnectionId,
342        session_wall_time: DateTime<Utc>,
343        cluster_id: ClusterId,
344        id_bundle: CollectionIdBundle,
345        determination: TimestampDetermination<mz_repr::Timestamp>,
346        tx: oneshot::Sender<TimestampExplanation<mz_repr::Timestamp>>,
347    },
348
349    /// Statement logging event from frontend peek sequencing.
350    /// No response channel needed - this is fire-and-forget.
351    FrontendStatementLogging(FrontendStatementLoggingEvent),
352}
353
354impl Command {
355    pub fn session(&self) -> Option<&Session> {
356        match self {
357            Command::Execute { session, .. }
358            | Command::Commit { session, .. }
359            | Command::StartCopyFromStdin { session, .. } => Some(session),
360            Command::CancelRequest { .. }
361            | Command::Startup { .. }
362            | Command::AuthenticatePassword { .. }
363            | Command::AuthenticateGetSASLChallenge { .. }
364            | Command::AuthenticateVerifySASLProof { .. }
365            | Command::CheckRoleCanLogin { .. }
366            | Command::CatalogSnapshot { .. }
367            | Command::PrivilegedCancelRequest { .. }
368            | Command::GetWebhook { .. }
369            | Command::Terminate { .. }
370            | Command::GetSystemVars { .. }
371            | Command::SetSystemVars { .. }
372            | Command::RetireExecute { .. }
373            | Command::CheckConsistency { .. }
374            | Command::Dump { .. }
375            | Command::GetComputeInstanceClient { .. }
376            | Command::GetOracle { .. }
377            | Command::DetermineRealTimeRecentTimestamp { .. }
378            | Command::GetTransactionReadHoldsBundle { .. }
379            | Command::StoreTransactionReadHolds { .. }
380            | Command::ExecuteSlowPathPeek { .. }
381            | Command::ExecuteSubscribe { .. }
382            | Command::CopyToPreflight { .. }
383            | Command::ExecuteCopyTo { .. }
384            | Command::ExecuteSideEffectingFunc { .. }
385            | Command::RegisterFrontendPeek { .. }
386            | Command::UnregisterFrontendPeek { .. }
387            | Command::ExplainTimestamp { .. }
388            | Command::FrontendStatementLogging(..)
389            | Command::InjectAuditEvents { .. } => None,
390        }
391    }
392
393    pub fn session_mut(&mut self) -> Option<&mut Session> {
394        match self {
395            Command::Execute { session, .. }
396            | Command::Commit { session, .. }
397            | Command::StartCopyFromStdin { session, .. } => Some(session),
398            Command::CancelRequest { .. }
399            | Command::Startup { .. }
400            | Command::AuthenticatePassword { .. }
401            | Command::AuthenticateGetSASLChallenge { .. }
402            | Command::AuthenticateVerifySASLProof { .. }
403            | Command::CheckRoleCanLogin { .. }
404            | Command::CatalogSnapshot { .. }
405            | Command::PrivilegedCancelRequest { .. }
406            | Command::GetWebhook { .. }
407            | Command::Terminate { .. }
408            | Command::GetSystemVars { .. }
409            | Command::SetSystemVars { .. }
410            | Command::RetireExecute { .. }
411            | Command::CheckConsistency { .. }
412            | Command::Dump { .. }
413            | Command::GetComputeInstanceClient { .. }
414            | Command::GetOracle { .. }
415            | Command::DetermineRealTimeRecentTimestamp { .. }
416            | Command::GetTransactionReadHoldsBundle { .. }
417            | Command::StoreTransactionReadHolds { .. }
418            | Command::ExecuteSlowPathPeek { .. }
419            | Command::ExecuteSubscribe { .. }
420            | Command::CopyToPreflight { .. }
421            | Command::ExecuteCopyTo { .. }
422            | Command::ExecuteSideEffectingFunc { .. }
423            | Command::RegisterFrontendPeek { .. }
424            | Command::UnregisterFrontendPeek { .. }
425            | Command::ExplainTimestamp { .. }
426            | Command::FrontendStatementLogging(..)
427            | Command::InjectAuditEvents { .. } => None,
428        }
429    }
430}
431
432#[derive(Debug)]
433pub struct Response<T> {
434    pub result: Result<T, AdapterError>,
435    pub session: Session,
436    pub otel_ctx: OpenTelemetryContext,
437}
438
439#[derive(Debug, Clone, Copy)]
440pub struct SuperuserAttribute(pub Option<bool>);
441
442/// The response to [`Client::startup`](crate::Client::startup).
443#[derive(Derivative)]
444#[derivative(Debug)]
445pub struct StartupResponse {
446    /// RoleId for the user.
447    pub role_id: RoleId,
448    /// The role's superuser attribute in the Catalog.
449    /// This attribute is None for Cloud. Cloud is able
450    /// to derive the role's superuser status from
451    /// external_metadata_rx.
452    pub superuser_attribute: SuperuserAttribute,
453    /// A future that completes when all necessary Builtin Table writes have completed.
454    #[derivative(Debug = "ignore")]
455    pub write_notify: BuiltinTableAppendNotify,
456    /// Map of (name, VarInput::Flat) tuples of session default variables that should be set.
457    pub session_defaults: BTreeMap<String, OwnedVarInput>,
458    pub catalog: Arc<Catalog>,
459    pub storage_collections: Arc<
460        dyn mz_storage_client::storage_collections::StorageCollections<
461                Timestamp = mz_repr::Timestamp,
462            > + Send
463            + Sync,
464    >,
465    pub transient_id_gen: Arc<TransientIdGen>,
466    pub optimizer_metrics: OptimizerMetrics,
467    pub persist_client: PersistClient,
468    pub statement_logging_frontend: StatementLoggingFrontend,
469}
470
471#[derive(Derivative)]
472#[derivative(Debug)]
473pub struct SASLChallengeResponse {
474    pub iteration_count: usize,
475    /// Base64-encoded salt for the SASL challenge.
476    pub salt: String,
477    pub nonce: String,
478}
479
480#[derive(Derivative)]
481#[derivative(Debug)]
482pub struct SASLVerifyProofResponse {
483    pub verifier: String,
484}
485
486// Facile implementation for `StartupResponse`, which does not use the `allowed`
487// feature of `ClientTransmitter`.
488impl Transmittable for StartupResponse {
489    type Allowed = bool;
490    fn to_allowed(&self) -> Self::Allowed {
491        true
492    }
493}
494
495/// The response to [`SessionClient::dump_catalog`](crate::SessionClient::dump_catalog).
496#[derive(Debug, Clone)]
497pub struct CatalogDump(String);
498
499impl CatalogDump {
500    pub fn new(raw: String) -> Self {
501        CatalogDump(raw)
502    }
503
504    pub fn into_string(self) -> String {
505        self.0
506    }
507}
508
509impl Transmittable for CatalogDump {
510    type Allowed = bool;
511    fn to_allowed(&self) -> Self::Allowed {
512        true
513    }
514}
515
516impl Transmittable for SystemVars {
517    type Allowed = bool;
518    fn to_allowed(&self) -> Self::Allowed {
519        true
520    }
521}
522
523/// The response to [`SessionClient::execute`](crate::SessionClient::execute).
524#[derive(EnumKind, Derivative)]
525#[derivative(Debug)]
526#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
527pub enum ExecuteResponse {
528    /// The default privileges were altered.
529    AlteredDefaultPrivileges,
530    /// The requested object was altered.
531    AlteredObject(ObjectType),
532    /// The role was altered.
533    AlteredRole,
534    /// The system configuration was altered.
535    AlteredSystemConfiguration,
536    /// The requested cursor was closed.
537    ClosedCursor,
538    /// The provided comment was created.
539    Comment,
540    /// The specified number of rows were copied into the requested output.
541    Copied(usize),
542    /// The response for a COPY TO STDOUT query.
543    CopyTo {
544        format: mz_sql::plan::CopyFormat,
545        resp: Box<ExecuteResponse>,
546    },
547    CopyFrom {
548        /// Table we're copying into.
549        target_id: CatalogItemId,
550        /// Human-readable full name of the target table.
551        target_name: String,
552        columns: Vec<ColumnIndex>,
553        params: CopyFormatParams<'static>,
554        ctx_extra: ExecuteContextGuard,
555    },
556    /// The requested connection was created.
557    CreatedConnection,
558    /// The requested database was created.
559    CreatedDatabase,
560    /// The requested schema was created.
561    CreatedSchema,
562    /// The requested role was created.
563    CreatedRole,
564    /// The requested cluster was created.
565    CreatedCluster,
566    /// The requested cluster replica was created.
567    CreatedClusterReplica,
568    /// The requested index was created.
569    CreatedIndex,
570    /// The requested introspection subscribe was created.
571    CreatedIntrospectionSubscribe,
572    /// The requested secret was created.
573    CreatedSecret,
574    /// The requested sink was created.
575    CreatedSink,
576    /// The requested source was created.
577    CreatedSource,
578    /// The requested table was created.
579    CreatedTable,
580    /// The requested view was created.
581    CreatedView,
582    /// The requested views were created.
583    CreatedViews,
584    /// The requested materialized view was created.
585    CreatedMaterializedView,
586    /// The requested continual task was created.
587    CreatedContinualTask,
588    /// The requested type was created.
589    CreatedType,
590    /// The requested network policy was created.
591    CreatedNetworkPolicy,
592    /// The requested prepared statement was removed.
593    Deallocate { all: bool },
594    /// The requested cursor was declared.
595    DeclaredCursor,
596    /// The specified number of rows were deleted from the requested table.
597    Deleted(usize),
598    /// The temporary objects associated with the session have been discarded.
599    DiscardedTemp,
600    /// All state associated with the session has been discarded.
601    DiscardedAll,
602    /// The requested object was dropped.
603    DroppedObject(ObjectType),
604    /// The requested objects were dropped.
605    DroppedOwned,
606    /// The provided query was empty.
607    EmptyQuery,
608    /// Fetch results from a cursor.
609    Fetch {
610        /// The name of the cursor from which to fetch results.
611        name: String,
612        /// The number of results to fetch.
613        count: Option<FetchDirection>,
614        /// How long to wait for results to arrive.
615        timeout: ExecuteTimeout,
616        ctx_extra: ExecuteContextGuard,
617    },
618    /// The requested privilege was granted.
619    GrantedPrivilege,
620    /// The requested role was granted.
621    GrantedRole,
622    /// The specified number of rows were inserted into the requested table.
623    Inserted(usize),
624    /// The specified prepared statement was created.
625    Prepare,
626    /// A user-requested warning was raised.
627    Raised,
628    /// The requested objects were reassigned.
629    ReassignOwned,
630    /// The requested privilege was revoked.
631    RevokedPrivilege,
632    /// The requested role was revoked.
633    RevokedRole,
634    /// Rows will be delivered via the specified stream.
635    SendingRowsStreaming {
636        #[derivative(Debug = "ignore")]
637        rows: Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>,
638        instance_id: ComputeInstanceId,
639        strategy: StatementExecutionStrategy,
640    },
641    /// Rows are known to be available immediately, and thus the execution is
642    /// considered ended in the coordinator.
643    SendingRowsImmediate {
644        #[derivative(Debug = "ignore")]
645        rows: Box<dyn RowIterator + Send + Sync>,
646    },
647    /// The specified variable was set to a new value.
648    SetVariable {
649        name: String,
650        /// Whether the operation was a `RESET` rather than a set.
651        reset: bool,
652    },
653    /// A new transaction was started.
654    StartedTransaction,
655    /// Updates to the requested source or view will be streamed to the
656    /// contained receiver.
657    Subscribing {
658        rx: RowBatchStream,
659        ctx_extra: ExecuteContextGuard,
660        instance_id: ComputeInstanceId,
661    },
662    /// The active transaction committed.
663    TransactionCommitted {
664        /// Session parameters that changed because the transaction ended.
665        params: BTreeMap<&'static str, String>,
666    },
667    /// The active transaction rolled back.
668    TransactionRolledBack {
669        /// Session parameters that changed because the transaction ended.
670        params: BTreeMap<&'static str, String>,
671    },
672    /// The specified number of rows were updated in the requested table.
673    Updated(usize),
674    /// A connection was validated.
675    ValidatedConnection,
676}
677
678impl TryFrom<&Statement<Raw>> for ExecuteResponse {
679    type Error = ();
680
681    /// Returns Ok if this Statement always produces a single, trivial ExecuteResponse.
682    fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
683        let resp_kinds = Plan::generated_from(&stmt.into())
684            .iter()
685            .map(ExecuteResponse::generated_from)
686            .flatten()
687            .cloned()
688            .collect::<BTreeSet<ExecuteResponseKind>>();
689        let resps = resp_kinds
690            .iter()
691            .map(|r| (*r).try_into())
692            .collect::<Result<Vec<ExecuteResponse>, _>>();
693        // Check if this statement's possible plans yield exactly one possible ExecuteResponse.
694        if let Ok(resps) = resps {
695            if resps.len() == 1 {
696                return Ok(resps.into_element());
697            }
698        }
699        let resp = match stmt {
700            Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
701                ExecuteResponse::DroppedObject((*object_type).into())
702            }
703            Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
704            | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
705                ExecuteResponse::AlteredObject((*object_type).into())
706            }
707            _ => return Err(()),
708        };
709        // Ensure that if the planner ever adds possible plans we complain here.
710        soft_assert_no_log!(
711            resp_kinds.len() == 1
712                && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
713            "ExecuteResponses out of sync with planner"
714        );
715        Ok(resp)
716    }
717}
718
719impl TryInto<ExecuteResponse> for ExecuteResponseKind {
720    type Error = ();
721
722    /// Attempts to convert into an ExecuteResponse. Returns an error if not possible without
723    /// actually executing a statement.
724    fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
725        match self {
726            ExecuteResponseKind::AlteredDefaultPrivileges => {
727                Ok(ExecuteResponse::AlteredDefaultPrivileges)
728            }
729            ExecuteResponseKind::AlteredObject => Err(()),
730            ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
731            ExecuteResponseKind::AlteredSystemConfiguration => {
732                Ok(ExecuteResponse::AlteredSystemConfiguration)
733            }
734            ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
735            ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
736            ExecuteResponseKind::Copied => Err(()),
737            ExecuteResponseKind::CopyTo => Err(()),
738            ExecuteResponseKind::CopyFrom => Err(()),
739            ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
740            ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
741            ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
742            ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
743            ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
744            ExecuteResponseKind::CreatedClusterReplica => {
745                Ok(ExecuteResponse::CreatedClusterReplica)
746            }
747            ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
748            ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
749            ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
750            ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
751            ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
752            ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
753            ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
754            ExecuteResponseKind::CreatedMaterializedView => {
755                Ok(ExecuteResponse::CreatedMaterializedView)
756            }
757            ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
758            ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
759            ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
760            ExecuteResponseKind::Deallocate => Err(()),
761            ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
762            ExecuteResponseKind::Deleted => Err(()),
763            ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
764            ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
765            ExecuteResponseKind::DroppedObject => Err(()),
766            ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
767            ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
768            ExecuteResponseKind::Fetch => Err(()),
769            ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
770            ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
771            ExecuteResponseKind::Inserted => Err(()),
772            ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
773            ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
774            ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
775            ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
776            ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
777            ExecuteResponseKind::SetVariable => Err(()),
778            ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
779            ExecuteResponseKind::Subscribing => Err(()),
780            ExecuteResponseKind::TransactionCommitted => Err(()),
781            ExecuteResponseKind::TransactionRolledBack => Err(()),
782            ExecuteResponseKind::Updated => Err(()),
783            ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
784            ExecuteResponseKind::SendingRowsStreaming => Err(()),
785            ExecuteResponseKind::SendingRowsImmediate => Err(()),
786            ExecuteResponseKind::CreatedIntrospectionSubscribe => {
787                Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
788            }
789        }
790    }
791}
792
793impl ExecuteResponse {
794    pub fn tag(&self) -> Option<String> {
795        use ExecuteResponse::*;
796        match self {
797            AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
798            AlteredObject(o) => Some(format!("ALTER {}", o)),
799            AlteredRole => Some("ALTER ROLE".into()),
800            AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
801            ClosedCursor => Some("CLOSE CURSOR".into()),
802            Comment => Some("COMMENT".into()),
803            Copied(n) => Some(format!("COPY {}", n)),
804            CopyTo { .. } => None,
805            CopyFrom { .. } => None,
806            CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
807            CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
808            CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
809            CreatedRole => Some("CREATE ROLE".into()),
810            CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
811            CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
812            CreatedIndex { .. } => Some("CREATE INDEX".into()),
813            CreatedSecret { .. } => Some("CREATE SECRET".into()),
814            CreatedSink { .. } => Some("CREATE SINK".into()),
815            CreatedSource { .. } => Some("CREATE SOURCE".into()),
816            CreatedTable { .. } => Some("CREATE TABLE".into()),
817            CreatedView { .. } => Some("CREATE VIEW".into()),
818            CreatedViews { .. } => Some("CREATE VIEWS".into()),
819            CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
820            CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
821            CreatedType => Some("CREATE TYPE".into()),
822            CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
823            Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
824            DeclaredCursor => Some("DECLARE CURSOR".into()),
825            Deleted(n) => Some(format!("DELETE {}", n)),
826            DiscardedTemp => Some("DISCARD TEMP".into()),
827            DiscardedAll => Some("DISCARD ALL".into()),
828            DroppedObject(o) => Some(format!("DROP {o}")),
829            DroppedOwned => Some("DROP OWNED".into()),
830            EmptyQuery => None,
831            Fetch { .. } => None,
832            GrantedPrivilege => Some("GRANT".into()),
833            GrantedRole => Some("GRANT ROLE".into()),
834            Inserted(n) => {
835                // "On successful completion, an INSERT command returns a
836                // command tag of the form `INSERT <oid> <count>`."
837                //     -- https://www.postgresql.org/docs/11/sql-insert.html
838                //
839                // OIDs are a PostgreSQL-specific historical quirk, but we
840                // can return a 0 OID to indicate that the table does not
841                // have OIDs.
842                Some(format!("INSERT 0 {}", n))
843            }
844            Prepare => Some("PREPARE".into()),
845            Raised => Some("RAISE".into()),
846            ReassignOwned => Some("REASSIGN OWNED".into()),
847            RevokedPrivilege => Some("REVOKE".into()),
848            RevokedRole => Some("REVOKE ROLE".into()),
849            SendingRowsStreaming { .. } | SendingRowsImmediate { .. } => None,
850            SetVariable { reset: true, .. } => Some("RESET".into()),
851            SetVariable { reset: false, .. } => Some("SET".into()),
852            StartedTransaction { .. } => Some("BEGIN".into()),
853            Subscribing { .. } => None,
854            TransactionCommitted { .. } => Some("COMMIT".into()),
855            TransactionRolledBack { .. } => Some("ROLLBACK".into()),
856            Updated(n) => Some(format!("UPDATE {}", n)),
857            ValidatedConnection => Some("VALIDATE CONNECTION".into()),
858            CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
859        }
860    }
861
862    /// Expresses which [`PlanKind`] generate which set of [`ExecuteResponseKind`].
863    /// `ExecuteResponseKind::Canceled` could be generated at any point as well, but that is
864    /// excluded from this function.
865    pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
866        use ExecuteResponseKind::*;
867        use PlanKind::*;
868
869        match plan {
870            AbortTransaction => &[TransactionRolledBack],
871            AlterClusterRename
872            | AlterClusterSwap
873            | AlterCluster
874            | AlterClusterReplicaRename
875            | AlterOwner
876            | AlterItemRename
877            | AlterRetainHistory
878            | AlterSourceTimestampInterval
879            | AlterNoop
880            | AlterSchemaRename
881            | AlterSchemaSwap
882            | AlterSecret
883            | AlterConnection
884            | AlterSource
885            | AlterSink
886            | AlterTableAddColumn
887            | AlterMaterializedViewApplyReplacement
888            | AlterNetworkPolicy => &[AlteredObject],
889            AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
890            AlterSetCluster => &[AlteredObject],
891            AlterRole => &[AlteredRole],
892            AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
893                &[AlteredSystemConfiguration]
894            }
895            Close => &[ClosedCursor],
896            PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
897            PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
898            PlanKind::Comment => &[ExecuteResponseKind::Comment],
899            CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
900            CreateConnection => &[CreatedConnection],
901            CreateDatabase => &[CreatedDatabase],
902            CreateSchema => &[CreatedSchema],
903            CreateRole => &[CreatedRole],
904            CreateCluster => &[CreatedCluster],
905            CreateClusterReplica => &[CreatedClusterReplica],
906            CreateSource | CreateSources => &[CreatedSource],
907            CreateSecret => &[CreatedSecret],
908            CreateSink => &[CreatedSink],
909            CreateTable => &[CreatedTable],
910            CreateView => &[CreatedView],
911            CreateMaterializedView => &[CreatedMaterializedView],
912            CreateContinualTask => &[CreatedContinualTask],
913            CreateIndex => &[CreatedIndex],
914            CreateType => &[CreatedType],
915            PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
916            CreateNetworkPolicy => &[CreatedNetworkPolicy],
917            Declare => &[DeclaredCursor],
918            DiscardTemp => &[DiscardedTemp],
919            DiscardAll => &[DiscardedAll],
920            DropObjects => &[DroppedObject],
921            DropOwned => &[DroppedOwned],
922            PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
923            ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
924            | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
925                ExecuteResponseKind::CopyTo,
926                SendingRowsStreaming,
927                SendingRowsImmediate,
928            ],
929            Execute | ReadThenWrite => &[
930                Deleted,
931                Inserted,
932                SendingRowsStreaming,
933                SendingRowsImmediate,
934                Updated,
935            ],
936            PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
937            GrantPrivileges => &[GrantedPrivilege],
938            GrantRole => &[GrantedRole],
939            Insert => &[Inserted, SendingRowsImmediate],
940            PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
941            PlanKind::Raise => &[ExecuteResponseKind::Raised],
942            PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
943            RevokePrivileges => &[RevokedPrivilege],
944            RevokeRole => &[RevokedRole],
945            PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
946                &[ExecuteResponseKind::SetVariable]
947            }
948            PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
949            StartTransaction => &[StartedTransaction],
950            SideEffectingFunc => &[SendingRowsStreaming, SendingRowsImmediate],
951            ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
952        }
953    }
954}
955
956/// This implementation is meant to ensure that we maintain updated information
957/// about which types of `ExecuteResponse`s are permitted to be sent, which will
958/// be a function of which plan we're executing.
959impl Transmittable for ExecuteResponse {
960    type Allowed = ExecuteResponseKind;
961    fn to_allowed(&self) -> Self::Allowed {
962        ExecuteResponseKind::from(self)
963    }
964}