Skip to main content

mz_adapter/
command.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use chrono::{DateTime, Utc};
17use derivative::Derivative;
18use enum_kinds::EnumKind;
19use futures::Stream;
20use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
21use mz_auth::password::Password;
22use mz_cluster_client::ReplicaId;
23use mz_compute_types::ComputeInstanceId;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_controller_types::ClusterId;
26use mz_expr::RowSetFinishing;
27use mz_ore::collections::CollectionExt;
28use mz_ore::soft_assert_no_log;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_persist_client::PersistClient;
31use mz_pgcopy::CopyFormatParams;
32use mz_repr::global_id::TransientIdGen;
33use mz_repr::role_id::RoleId;
34use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType};
35use mz_sql::ast::{FetchDirection, Raw, Statement};
36use mz_sql::catalog::ObjectType;
37use mz_sql::optimizer_metrics::OptimizerMetrics;
38use mz_sql::plan;
39use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind, SideEffectingFunc};
40use mz_sql::session::user::User;
41use mz_sql::session::vars::{OwnedVarInput, SystemVars};
42use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
43use mz_storage_types::sources::Timeline;
44use mz_timestamp_oracle::TimestampOracle;
45use tokio::sync::{mpsc, oneshot};
46use uuid::Uuid;
47
48use crate::catalog::Catalog;
49use crate::coord::appends::BuiltinTableAppendNotify;
50use crate::coord::consistency::CoordinatorInconsistencies;
51use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
52use crate::coord::timestamp_selection::TimestampDetermination;
53use crate::coord::{ExecuteContextExtra, ExecuteContextGuard};
54use crate::error::AdapterError;
55use crate::session::{EndTransactionAction, RowBatchStream, Session};
56use crate::statement_logging::{
57    FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy,
58    StatementLoggingFrontend,
59};
60use crate::statement_logging::{StatementLoggingId, WatchSetCreation};
61use crate::util::Transmittable;
62use crate::webhook::AppendWebhookResponse;
63use crate::{
64    AdapterNotice, AppendWebhookError, CollectionIdBundle, ReadHolds, TimestampExplanation,
65};
66
67/// A handle for pgwire to stream raw byte chunks to the coordinator's
68/// parallel background batch builder tasks during COPY FROM STDIN.
69#[derive(Debug)]
70pub struct CopyFromStdinWriter {
71    /// Channels for distributing raw byte chunks (split at row boundaries)
72    /// across parallel worker tasks. pgwire round-robins chunks across these.
73    pub batch_txs: Vec<mpsc::Sender<Vec<u8>>>,
74    /// Receives the final result (`Vec<ProtoBatch>` + total row count, or error)
75    /// from the collector task. pgwire uses this to commit the batches.
76    pub completion_rx: oneshot::Receiver<
77        Result<(Vec<mz_persist_client::batch::ProtoBatch>, u64), crate::AdapterError>,
78    >,
79}
80
81#[derive(Debug)]
82pub struct CatalogSnapshot {
83    pub catalog: Arc<Catalog>,
84}
85
86#[derive(Debug)]
87pub enum Command {
88    CatalogSnapshot {
89        tx: oneshot::Sender<CatalogSnapshot>,
90    },
91
92    Startup {
93        tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
94        user: User,
95        conn_id: ConnectionId,
96        client_ip: Option<IpAddr>,
97        secret_key: u32,
98        uuid: Uuid,
99        application_name: String,
100        notice_tx: mpsc::UnboundedSender<AdapterNotice>,
101    },
102
103    AuthenticatePassword {
104        tx: oneshot::Sender<Result<(), AdapterError>>,
105        role_name: String,
106        password: Option<Password>,
107    },
108
109    AuthenticateGetSASLChallenge {
110        tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
111        role_name: String,
112        nonce: String,
113    },
114
115    AuthenticateVerifySASLProof {
116        tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
117        role_name: String,
118        proof: String,
119        auth_message: String,
120        mock_hash: String,
121    },
122
123    CheckRoleCanLogin {
124        tx: oneshot::Sender<Result<(), AdapterError>>,
125        role_name: String,
126    },
127
128    Execute {
129        portal_name: String,
130        session: Session,
131        tx: oneshot::Sender<Response<ExecuteResponse>>,
132        outer_ctx_extra: Option<ExecuteContextGuard>,
133    },
134
135    /// Attempts to commit or abort the session's transaction. Guarantees that the Coordinator's
136    /// transaction state has been cleared, even if the commit or abort fails. (A failure can
137    /// happen, for example, if the session's role id has been dropped which will prevent
138    /// sequence_end_transaction from running.)
139    Commit {
140        action: EndTransactionAction,
141        session: Session,
142        tx: oneshot::Sender<Response<ExecuteResponse>>,
143    },
144
145    CancelRequest {
146        conn_id: ConnectionIdType,
147        secret_key: u32,
148    },
149
150    PrivilegedCancelRequest {
151        conn_id: ConnectionId,
152    },
153
154    GetWebhook {
155        database: String,
156        schema: String,
157        name: String,
158        tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
159    },
160
161    GetSystemVars {
162        tx: oneshot::Sender<SystemVars>,
163    },
164
165    SetSystemVars {
166        vars: BTreeMap<String, String>,
167        conn_id: ConnectionId,
168        tx: oneshot::Sender<Result<(), AdapterError>>,
169    },
170
171    InjectAuditEvents {
172        events: Vec<crate::catalog::InjectedAuditEvent>,
173        conn_id: ConnectionId,
174        tx: oneshot::Sender<Result<(), AdapterError>>,
175    },
176
177    Terminate {
178        conn_id: ConnectionId,
179        tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
180    },
181
182    /// Sets up a streaming COPY FROM STDIN operation. The coordinator
183    /// creates parallel background batch builder tasks and returns a
184    /// [`CopyFromStdinWriter`] that pgwire uses to stream raw byte chunks.
185    StartCopyFromStdin {
186        target_id: CatalogItemId,
187        target_name: String,
188        columns: Vec<ColumnIndex>,
189        /// The row description for the target table (used for constraint checks).
190        row_desc: mz_repr::RelationDesc,
191        /// Copy format parameters (text/csv/binary) for decoding raw bytes.
192        params: mz_pgcopy::CopyFormatParams<'static>,
193        session: Session,
194        tx: oneshot::Sender<Response<CopyFromStdinWriter>>,
195    },
196
197    /// Performs any cleanup and logging actions necessary for
198    /// finalizing a statement execution.
199    ///
200    /// Only used for cases that terminate in the protocol layer and
201    /// otherwise have no reason to hand control back to the coordinator.
202    /// In other cases, we piggy-back on another command.
203    RetireExecute {
204        data: ExecuteContextExtra,
205        reason: StatementEndedExecutionReason,
206    },
207
208    CheckConsistency {
209        tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
210    },
211
212    Dump {
213        tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
214    },
215
216    GetComputeInstanceClient {
217        instance_id: ComputeInstanceId,
218        tx: oneshot::Sender<
219            Result<
220                mz_compute_client::controller::instance_client::InstanceClient,
221                mz_compute_client::controller::error::InstanceMissing,
222            >,
223        >,
224    },
225
226    GetOracle {
227        timeline: Timeline,
228        tx: oneshot::Sender<
229            Result<Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync>, AdapterError>,
230        >,
231    },
232
233    DetermineRealTimeRecentTimestamp {
234        source_ids: BTreeSet<GlobalId>,
235        real_time_recency_timeout: Duration,
236        tx: oneshot::Sender<Result<Option<mz_repr::Timestamp>, AdapterError>>,
237    },
238
239    GetTransactionReadHoldsBundle {
240        conn_id: ConnectionId,
241        tx: oneshot::Sender<Option<ReadHolds>>,
242    },
243
244    /// _Merges_ the given read holds into the given connection's stored transaction read holds.
245    StoreTransactionReadHolds {
246        conn_id: ConnectionId,
247        read_holds: ReadHolds,
248        tx: oneshot::Sender<()>,
249    },
250
251    ExecuteSlowPathPeek {
252        dataflow_plan: Box<PeekDataflowPlan>,
253        determination: TimestampDetermination,
254        finishing: RowSetFinishing,
255        compute_instance: ComputeInstanceId,
256        target_replica: Option<ReplicaId>,
257        intermediate_result_type: SqlRelationType,
258        source_ids: BTreeSet<GlobalId>,
259        conn_id: ConnectionId,
260        max_result_size: u64,
261        max_query_result_size: Option<u64>,
262        /// If statement logging is enabled, contains all info needed for installing watch sets
263        /// and logging the statement execution.
264        watch_set: Option<WatchSetCreation>,
265        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
266    },
267
268    ExecuteSubscribe {
269        df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
270        dependency_ids: BTreeSet<GlobalId>,
271        cluster_id: ComputeInstanceId,
272        replica_id: Option<ReplicaId>,
273        conn_id: ConnectionId,
274        session_uuid: Uuid,
275        read_holds: ReadHolds,
276        plan: plan::SubscribePlan,
277        statement_logging_id: Option<StatementLoggingId>,
278        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
279    },
280
281    /// Preflight check for COPY TO S3 operation. This runs the slow S3 operations
282    /// (loading SDK config, checking bucket path, verifying permissions, uploading sentinel)
283    /// in a background task to avoid blocking the coordinator.
284    CopyToPreflight {
285        /// The S3 connection info needed for preflight checks.
286        s3_sink_connection: mz_compute_types::sinks::CopyToS3OneshotSinkConnection,
287        /// The sink ID for logging and S3 key management.
288        sink_id: GlobalId,
289        /// Response channel for the preflight result.
290        tx: oneshot::Sender<Result<(), AdapterError>>,
291    },
292
293    ExecuteCopyTo {
294        df_desc: Box<DataflowDescription<mz_compute_types::plan::Plan>>,
295        compute_instance: ComputeInstanceId,
296        target_replica: Option<ReplicaId>,
297        source_ids: BTreeSet<GlobalId>,
298        conn_id: ConnectionId,
299        /// If statement logging is enabled, contains all info needed for installing watch sets
300        /// and logging the statement execution.
301        watch_set: Option<WatchSetCreation>,
302        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
303    },
304
305    /// Execute a side-effecting function from the frontend peek path.
306    ExecuteSideEffectingFunc {
307        plan: SideEffectingFunc,
308        conn_id: ConnectionId,
309        /// The current role of the session, used for RBAC checks.
310        current_role: RoleId,
311        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
312    },
313
314    /// Register a pending peek initiated by frontend sequencing. This is needed for:
315    /// - statement logging
316    /// - query cancellation
317    RegisterFrontendPeek {
318        uuid: Uuid,
319        conn_id: ConnectionId,
320        cluster_id: mz_controller_types::ClusterId,
321        depends_on: BTreeSet<GlobalId>,
322        is_fast_path: bool,
323        /// If statement logging is enabled, contains all info needed for installing watch sets
324        /// and logging the statement execution.
325        watch_set: Option<WatchSetCreation>,
326        tx: oneshot::Sender<Result<(), AdapterError>>,
327    },
328
329    /// Unregister a pending peek that was registered but failed to issue.
330    /// This is used for cleanup when `client.peek()` fails after `RegisterFrontendPeek` succeeds.
331    /// The `ExecuteContextExtra` is dropped without logging the statement retirement, because the
332    /// frontend will log the error.
333    UnregisterFrontendPeek {
334        uuid: Uuid,
335        tx: oneshot::Sender<()>,
336    },
337
338    /// Generate a timestamp explanation.
339    /// This is used when `emit_timestamp_notice` is enabled.
340    ExplainTimestamp {
341        conn_id: ConnectionId,
342        session_wall_time: DateTime<Utc>,
343        cluster_id: ClusterId,
344        id_bundle: CollectionIdBundle,
345        determination: TimestampDetermination,
346        tx: oneshot::Sender<TimestampExplanation>,
347    },
348
349    /// Statement logging event from frontend peek sequencing.
350    /// No response channel needed - this is fire-and-forget.
351    FrontendStatementLogging(FrontendStatementLoggingEvent),
352}
353
354impl Command {
355    pub fn session(&self) -> Option<&Session> {
356        match self {
357            Command::Execute { session, .. }
358            | Command::Commit { session, .. }
359            | Command::StartCopyFromStdin { session, .. } => Some(session),
360            Command::CancelRequest { .. }
361            | Command::Startup { .. }
362            | Command::AuthenticatePassword { .. }
363            | Command::AuthenticateGetSASLChallenge { .. }
364            | Command::AuthenticateVerifySASLProof { .. }
365            | Command::CheckRoleCanLogin { .. }
366            | Command::CatalogSnapshot { .. }
367            | Command::PrivilegedCancelRequest { .. }
368            | Command::GetWebhook { .. }
369            | Command::Terminate { .. }
370            | Command::GetSystemVars { .. }
371            | Command::SetSystemVars { .. }
372            | Command::RetireExecute { .. }
373            | Command::CheckConsistency { .. }
374            | Command::Dump { .. }
375            | Command::GetComputeInstanceClient { .. }
376            | Command::GetOracle { .. }
377            | Command::DetermineRealTimeRecentTimestamp { .. }
378            | Command::GetTransactionReadHoldsBundle { .. }
379            | Command::StoreTransactionReadHolds { .. }
380            | Command::ExecuteSlowPathPeek { .. }
381            | Command::ExecuteSubscribe { .. }
382            | Command::CopyToPreflight { .. }
383            | Command::ExecuteCopyTo { .. }
384            | Command::ExecuteSideEffectingFunc { .. }
385            | Command::RegisterFrontendPeek { .. }
386            | Command::UnregisterFrontendPeek { .. }
387            | Command::ExplainTimestamp { .. }
388            | Command::FrontendStatementLogging(..)
389            | Command::InjectAuditEvents { .. } => None,
390        }
391    }
392
393    pub fn session_mut(&mut self) -> Option<&mut Session> {
394        match self {
395            Command::Execute { session, .. }
396            | Command::Commit { session, .. }
397            | Command::StartCopyFromStdin { session, .. } => Some(session),
398            Command::CancelRequest { .. }
399            | Command::Startup { .. }
400            | Command::AuthenticatePassword { .. }
401            | Command::AuthenticateGetSASLChallenge { .. }
402            | Command::AuthenticateVerifySASLProof { .. }
403            | Command::CheckRoleCanLogin { .. }
404            | Command::CatalogSnapshot { .. }
405            | Command::PrivilegedCancelRequest { .. }
406            | Command::GetWebhook { .. }
407            | Command::Terminate { .. }
408            | Command::GetSystemVars { .. }
409            | Command::SetSystemVars { .. }
410            | Command::RetireExecute { .. }
411            | Command::CheckConsistency { .. }
412            | Command::Dump { .. }
413            | Command::GetComputeInstanceClient { .. }
414            | Command::GetOracle { .. }
415            | Command::DetermineRealTimeRecentTimestamp { .. }
416            | Command::GetTransactionReadHoldsBundle { .. }
417            | Command::StoreTransactionReadHolds { .. }
418            | Command::ExecuteSlowPathPeek { .. }
419            | Command::ExecuteSubscribe { .. }
420            | Command::CopyToPreflight { .. }
421            | Command::ExecuteCopyTo { .. }
422            | Command::ExecuteSideEffectingFunc { .. }
423            | Command::RegisterFrontendPeek { .. }
424            | Command::UnregisterFrontendPeek { .. }
425            | Command::ExplainTimestamp { .. }
426            | Command::FrontendStatementLogging(..)
427            | Command::InjectAuditEvents { .. } => None,
428        }
429    }
430}
431
432#[derive(Debug)]
433pub struct Response<T> {
434    pub result: Result<T, AdapterError>,
435    pub session: Session,
436    pub otel_ctx: OpenTelemetryContext,
437}
438
439#[derive(Debug, Clone, Copy)]
440pub struct SuperuserAttribute(pub Option<bool>);
441
442/// The response to [`Client::startup`](crate::Client::startup).
443#[derive(Derivative)]
444#[derivative(Debug)]
445pub struct StartupResponse {
446    /// RoleId for the user.
447    pub role_id: RoleId,
448    /// The role's superuser attribute in the Catalog.
449    /// This attribute is None for Cloud. Cloud is able
450    /// to derive the role's superuser status from
451    /// external_metadata_rx.
452    pub superuser_attribute: SuperuserAttribute,
453    /// A future that completes when all necessary Builtin Table writes have completed.
454    #[derivative(Debug = "ignore")]
455    pub write_notify: BuiltinTableAppendNotify,
456    /// Map of (name, VarInput::Flat) tuples of session default variables that should be set.
457    pub session_defaults: BTreeMap<String, OwnedVarInput>,
458    pub catalog: Arc<Catalog>,
459    pub storage_collections:
460        Arc<dyn mz_storage_client::storage_collections::StorageCollections + Send + Sync>,
461    pub transient_id_gen: Arc<TransientIdGen>,
462    pub optimizer_metrics: OptimizerMetrics,
463    pub persist_client: PersistClient,
464    pub statement_logging_frontend: StatementLoggingFrontend,
465}
466
467#[derive(Derivative)]
468#[derivative(Debug)]
469pub struct SASLChallengeResponse {
470    pub iteration_count: usize,
471    /// Base64-encoded salt for the SASL challenge.
472    pub salt: String,
473    pub nonce: String,
474}
475
476#[derive(Derivative)]
477#[derivative(Debug)]
478pub struct SASLVerifyProofResponse {
479    pub verifier: String,
480}
481
482// Facile implementation for `StartupResponse`, which does not use the `allowed`
483// feature of `ClientTransmitter`.
484impl Transmittable for StartupResponse {
485    type Allowed = bool;
486    fn to_allowed(&self) -> Self::Allowed {
487        true
488    }
489}
490
491/// The response to [`SessionClient::dump_catalog`](crate::SessionClient::dump_catalog).
492#[derive(Debug, Clone)]
493pub struct CatalogDump(String);
494
495impl CatalogDump {
496    pub fn new(raw: String) -> Self {
497        CatalogDump(raw)
498    }
499
500    pub fn into_string(self) -> String {
501        self.0
502    }
503}
504
505impl Transmittable for CatalogDump {
506    type Allowed = bool;
507    fn to_allowed(&self) -> Self::Allowed {
508        true
509    }
510}
511
512impl Transmittable for SystemVars {
513    type Allowed = bool;
514    fn to_allowed(&self) -> Self::Allowed {
515        true
516    }
517}
518
519/// The response to [`SessionClient::execute`](crate::SessionClient::execute).
520#[derive(EnumKind, Derivative)]
521#[derivative(Debug)]
522#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
523pub enum ExecuteResponse {
524    /// The default privileges were altered.
525    AlteredDefaultPrivileges,
526    /// The requested object was altered.
527    AlteredObject(ObjectType),
528    /// The role was altered.
529    AlteredRole,
530    /// The system configuration was altered.
531    AlteredSystemConfiguration,
532    /// The requested cursor was closed.
533    ClosedCursor,
534    /// The provided comment was created.
535    Comment,
536    /// The specified number of rows were copied into the requested output.
537    Copied(usize),
538    /// The response for a COPY TO STDOUT query.
539    CopyTo {
540        format: mz_sql::plan::CopyFormat,
541        resp: Box<ExecuteResponse>,
542    },
543    CopyFrom {
544        /// Table we're copying into.
545        target_id: CatalogItemId,
546        /// Human-readable full name of the target table.
547        target_name: String,
548        columns: Vec<ColumnIndex>,
549        params: CopyFormatParams<'static>,
550        ctx_extra: ExecuteContextGuard,
551    },
552    /// The requested connection was created.
553    CreatedConnection,
554    /// The requested database was created.
555    CreatedDatabase,
556    /// The requested schema was created.
557    CreatedSchema,
558    /// The requested role was created.
559    CreatedRole,
560    /// The requested cluster was created.
561    CreatedCluster,
562    /// The requested cluster replica was created.
563    CreatedClusterReplica,
564    /// The requested index was created.
565    CreatedIndex,
566    /// The requested introspection subscribe was created.
567    CreatedIntrospectionSubscribe,
568    /// The requested secret was created.
569    CreatedSecret,
570    /// The requested sink was created.
571    CreatedSink,
572    /// The requested source was created.
573    CreatedSource,
574    /// The requested table was created.
575    CreatedTable,
576    /// The requested view was created.
577    CreatedView,
578    /// The requested views were created.
579    CreatedViews,
580    /// The requested materialized view was created.
581    CreatedMaterializedView,
582    /// The requested type was created.
583    CreatedType,
584    /// The requested network policy was created.
585    CreatedNetworkPolicy,
586    /// The requested prepared statement was removed.
587    Deallocate { all: bool },
588    /// The requested cursor was declared.
589    DeclaredCursor,
590    /// The specified number of rows were deleted from the requested table.
591    Deleted(usize),
592    /// The temporary objects associated with the session have been discarded.
593    DiscardedTemp,
594    /// All state associated with the session has been discarded.
595    DiscardedAll,
596    /// The requested object was dropped.
597    DroppedObject(ObjectType),
598    /// The requested objects were dropped.
599    DroppedOwned,
600    /// The provided query was empty.
601    EmptyQuery,
602    /// Fetch results from a cursor.
603    Fetch {
604        /// The name of the cursor from which to fetch results.
605        name: String,
606        /// The number of results to fetch.
607        count: Option<FetchDirection>,
608        /// How long to wait for results to arrive.
609        timeout: ExecuteTimeout,
610        ctx_extra: ExecuteContextGuard,
611    },
612    /// The requested privilege was granted.
613    GrantedPrivilege,
614    /// The requested role was granted.
615    GrantedRole,
616    /// The specified number of rows were inserted into the requested table.
617    Inserted(usize),
618    /// The specified prepared statement was created.
619    Prepare,
620    /// A user-requested warning was raised.
621    Raised,
622    /// The requested objects were reassigned.
623    ReassignOwned,
624    /// The requested privilege was revoked.
625    RevokedPrivilege,
626    /// The requested role was revoked.
627    RevokedRole,
628    /// Rows will be delivered via the specified stream.
629    SendingRowsStreaming {
630        #[derivative(Debug = "ignore")]
631        rows: Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>,
632        instance_id: ComputeInstanceId,
633        strategy: StatementExecutionStrategy,
634    },
635    /// Rows are known to be available immediately, and thus the execution is
636    /// considered ended in the coordinator.
637    SendingRowsImmediate {
638        #[derivative(Debug = "ignore")]
639        rows: Box<dyn RowIterator + Send + Sync>,
640    },
641    /// The specified variable was set to a new value.
642    SetVariable {
643        name: String,
644        /// Whether the operation was a `RESET` rather than a set.
645        reset: bool,
646    },
647    /// A new transaction was started.
648    StartedTransaction,
649    /// Updates to the requested source or view will be streamed to the
650    /// contained receiver.
651    Subscribing {
652        rx: RowBatchStream,
653        ctx_extra: ExecuteContextGuard,
654        instance_id: ComputeInstanceId,
655    },
656    /// The active transaction committed.
657    TransactionCommitted {
658        /// Session parameters that changed because the transaction ended.
659        params: BTreeMap<&'static str, String>,
660    },
661    /// The active transaction rolled back.
662    TransactionRolledBack {
663        /// Session parameters that changed because the transaction ended.
664        params: BTreeMap<&'static str, String>,
665    },
666    /// The specified number of rows were updated in the requested table.
667    Updated(usize),
668    /// A connection was validated.
669    ValidatedConnection,
670}
671
672impl TryFrom<&Statement<Raw>> for ExecuteResponse {
673    type Error = ();
674
675    /// Returns Ok if this Statement always produces a single, trivial ExecuteResponse.
676    fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
677        let resp_kinds = Plan::generated_from(&stmt.into())
678            .iter()
679            .map(ExecuteResponse::generated_from)
680            .flatten()
681            .cloned()
682            .collect::<BTreeSet<ExecuteResponseKind>>();
683        let resps = resp_kinds
684            .iter()
685            .map(|r| (*r).try_into())
686            .collect::<Result<Vec<ExecuteResponse>, _>>();
687        // Check if this statement's possible plans yield exactly one possible ExecuteResponse.
688        if let Ok(resps) = resps {
689            if resps.len() == 1 {
690                return Ok(resps.into_element());
691            }
692        }
693        let resp = match stmt {
694            Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
695                ExecuteResponse::DroppedObject((*object_type).into())
696            }
697            Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
698            | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
699                ExecuteResponse::AlteredObject((*object_type).into())
700            }
701            _ => return Err(()),
702        };
703        // Ensure that if the planner ever adds possible plans we complain here.
704        soft_assert_no_log!(
705            resp_kinds.len() == 1
706                && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
707            "ExecuteResponses out of sync with planner"
708        );
709        Ok(resp)
710    }
711}
712
713impl TryInto<ExecuteResponse> for ExecuteResponseKind {
714    type Error = ();
715
716    /// Attempts to convert into an ExecuteResponse. Returns an error if not possible without
717    /// actually executing a statement.
718    fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
719        match self {
720            ExecuteResponseKind::AlteredDefaultPrivileges => {
721                Ok(ExecuteResponse::AlteredDefaultPrivileges)
722            }
723            ExecuteResponseKind::AlteredObject => Err(()),
724            ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
725            ExecuteResponseKind::AlteredSystemConfiguration => {
726                Ok(ExecuteResponse::AlteredSystemConfiguration)
727            }
728            ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
729            ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
730            ExecuteResponseKind::Copied => Err(()),
731            ExecuteResponseKind::CopyTo => Err(()),
732            ExecuteResponseKind::CopyFrom => Err(()),
733            ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
734            ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
735            ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
736            ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
737            ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
738            ExecuteResponseKind::CreatedClusterReplica => {
739                Ok(ExecuteResponse::CreatedClusterReplica)
740            }
741            ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
742            ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
743            ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
744            ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
745            ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
746            ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
747            ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
748            ExecuteResponseKind::CreatedMaterializedView => {
749                Ok(ExecuteResponse::CreatedMaterializedView)
750            }
751            ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
752            ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
753            ExecuteResponseKind::Deallocate => Err(()),
754            ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
755            ExecuteResponseKind::Deleted => Err(()),
756            ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
757            ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
758            ExecuteResponseKind::DroppedObject => Err(()),
759            ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
760            ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
761            ExecuteResponseKind::Fetch => Err(()),
762            ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
763            ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
764            ExecuteResponseKind::Inserted => Err(()),
765            ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
766            ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
767            ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
768            ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
769            ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
770            ExecuteResponseKind::SetVariable => Err(()),
771            ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
772            ExecuteResponseKind::Subscribing => Err(()),
773            ExecuteResponseKind::TransactionCommitted => Err(()),
774            ExecuteResponseKind::TransactionRolledBack => Err(()),
775            ExecuteResponseKind::Updated => Err(()),
776            ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
777            ExecuteResponseKind::SendingRowsStreaming => Err(()),
778            ExecuteResponseKind::SendingRowsImmediate => Err(()),
779            ExecuteResponseKind::CreatedIntrospectionSubscribe => {
780                Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
781            }
782        }
783    }
784}
785
786impl ExecuteResponse {
787    pub fn tag(&self) -> Option<String> {
788        use ExecuteResponse::*;
789        match self {
790            AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
791            AlteredObject(o) => Some(format!("ALTER {}", o)),
792            AlteredRole => Some("ALTER ROLE".into()),
793            AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
794            ClosedCursor => Some("CLOSE CURSOR".into()),
795            Comment => Some("COMMENT".into()),
796            Copied(n) => Some(format!("COPY {}", n)),
797            CopyTo { .. } => None,
798            CopyFrom { .. } => None,
799            CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
800            CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
801            CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
802            CreatedRole => Some("CREATE ROLE".into()),
803            CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
804            CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
805            CreatedIndex { .. } => Some("CREATE INDEX".into()),
806            CreatedSecret { .. } => Some("CREATE SECRET".into()),
807            CreatedSink { .. } => Some("CREATE SINK".into()),
808            CreatedSource { .. } => Some("CREATE SOURCE".into()),
809            CreatedTable { .. } => Some("CREATE TABLE".into()),
810            CreatedView { .. } => Some("CREATE VIEW".into()),
811            CreatedViews { .. } => Some("CREATE VIEWS".into()),
812            CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
813            CreatedType => Some("CREATE TYPE".into()),
814            CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
815            Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
816            DeclaredCursor => Some("DECLARE CURSOR".into()),
817            Deleted(n) => Some(format!("DELETE {}", n)),
818            DiscardedTemp => Some("DISCARD TEMP".into()),
819            DiscardedAll => Some("DISCARD ALL".into()),
820            DroppedObject(o) => Some(format!("DROP {o}")),
821            DroppedOwned => Some("DROP OWNED".into()),
822            EmptyQuery => None,
823            Fetch { .. } => None,
824            GrantedPrivilege => Some("GRANT".into()),
825            GrantedRole => Some("GRANT ROLE".into()),
826            Inserted(n) => {
827                // "On successful completion, an INSERT command returns a
828                // command tag of the form `INSERT <oid> <count>`."
829                //     -- https://www.postgresql.org/docs/11/sql-insert.html
830                //
831                // OIDs are a PostgreSQL-specific historical quirk, but we
832                // can return a 0 OID to indicate that the table does not
833                // have OIDs.
834                Some(format!("INSERT 0 {}", n))
835            }
836            Prepare => Some("PREPARE".into()),
837            Raised => Some("RAISE".into()),
838            ReassignOwned => Some("REASSIGN OWNED".into()),
839            RevokedPrivilege => Some("REVOKE".into()),
840            RevokedRole => Some("REVOKE ROLE".into()),
841            SendingRowsStreaming { .. } | SendingRowsImmediate { .. } => None,
842            SetVariable { reset: true, .. } => Some("RESET".into()),
843            SetVariable { reset: false, .. } => Some("SET".into()),
844            StartedTransaction { .. } => Some("BEGIN".into()),
845            Subscribing { .. } => None,
846            TransactionCommitted { .. } => Some("COMMIT".into()),
847            TransactionRolledBack { .. } => Some("ROLLBACK".into()),
848            Updated(n) => Some(format!("UPDATE {}", n)),
849            ValidatedConnection => Some("VALIDATE CONNECTION".into()),
850            CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
851        }
852    }
853
854    /// Expresses which [`PlanKind`] generate which set of [`ExecuteResponseKind`].
855    /// `ExecuteResponseKind::Canceled` could be generated at any point as well, but that is
856    /// excluded from this function.
857    pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
858        use ExecuteResponseKind::*;
859        use PlanKind::*;
860
861        match plan {
862            AbortTransaction => &[TransactionRolledBack],
863            AlterClusterRename
864            | AlterClusterSwap
865            | AlterCluster
866            | AlterClusterReplicaRename
867            | AlterOwner
868            | AlterItemRename
869            | AlterRetainHistory
870            | AlterSourceTimestampInterval
871            | AlterNoop
872            | AlterSchemaRename
873            | AlterSchemaSwap
874            | AlterSecret
875            | AlterConnection
876            | AlterSource
877            | AlterSink
878            | AlterTableAddColumn
879            | AlterMaterializedViewApplyReplacement
880            | AlterNetworkPolicy => &[AlteredObject],
881            AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
882            AlterSetCluster => &[AlteredObject],
883            AlterRole => &[AlteredRole],
884            AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
885                &[AlteredSystemConfiguration]
886            }
887            Close => &[ClosedCursor],
888            PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
889            PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
890            PlanKind::Comment => &[ExecuteResponseKind::Comment],
891            CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
892            CreateConnection => &[CreatedConnection],
893            CreateDatabase => &[CreatedDatabase],
894            CreateSchema => &[CreatedSchema],
895            CreateRole => &[CreatedRole],
896            CreateCluster => &[CreatedCluster],
897            CreateClusterReplica => &[CreatedClusterReplica],
898            CreateSource | CreateSources => &[CreatedSource],
899            CreateSecret => &[CreatedSecret],
900            CreateSink => &[CreatedSink],
901            CreateTable => &[CreatedTable],
902            CreateView => &[CreatedView],
903            CreateMaterializedView => &[CreatedMaterializedView],
904            CreateIndex => &[CreatedIndex],
905            CreateType => &[CreatedType],
906            PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
907            CreateNetworkPolicy => &[CreatedNetworkPolicy],
908            Declare => &[DeclaredCursor],
909            DiscardTemp => &[DiscardedTemp],
910            DiscardAll => &[DiscardedAll],
911            DropObjects => &[DroppedObject],
912            DropOwned => &[DroppedOwned],
913            PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
914            ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
915            | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
916                ExecuteResponseKind::CopyTo,
917                SendingRowsStreaming,
918                SendingRowsImmediate,
919            ],
920            Execute | ReadThenWrite => &[
921                Deleted,
922                Inserted,
923                SendingRowsStreaming,
924                SendingRowsImmediate,
925                Updated,
926            ],
927            PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
928            GrantPrivileges => &[GrantedPrivilege],
929            GrantRole => &[GrantedRole],
930            Insert => &[Inserted, SendingRowsImmediate],
931            PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
932            PlanKind::Raise => &[ExecuteResponseKind::Raised],
933            PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
934            RevokePrivileges => &[RevokedPrivilege],
935            RevokeRole => &[RevokedRole],
936            PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
937                &[ExecuteResponseKind::SetVariable]
938            }
939            PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
940            StartTransaction => &[StartedTransaction],
941            SideEffectingFunc => &[SendingRowsStreaming, SendingRowsImmediate],
942            ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
943        }
944    }
945}
946
947/// This implementation is meant to ensure that we maintain updated information
948/// about which types of `ExecuteResponse`s are permitted to be sent, which will
949/// be a function of which plan we're executing.
950impl Transmittable for ExecuteResponse {
951    type Allowed = ExecuteResponseKind;
952    fn to_allowed(&self) -> Self::Allowed {
953        ExecuteResponseKind::from(self)
954    }
955}