Skip to main content

mz_adapter/
command.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use chrono::{DateTime, Utc};
17use derivative::Derivative;
18use enum_kinds::EnumKind;
19use futures::Stream;
20use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
21use mz_auth::password::Password;
22use mz_cluster_client::ReplicaId;
23use mz_compute_types::ComputeInstanceId;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_controller_types::ClusterId;
26use mz_expr::RowSetFinishing;
27use mz_ore::collections::CollectionExt;
28use mz_ore::soft_assert_no_log;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_persist_client::PersistClient;
31use mz_pgcopy::CopyFormatParams;
32use mz_repr::global_id::TransientIdGen;
33use mz_repr::role_id::RoleId;
34use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType};
35use mz_sql::ast::{FetchDirection, Raw, Statement};
36use mz_sql::catalog::ObjectType;
37use mz_sql::optimizer_metrics::OptimizerMetrics;
38use mz_sql::plan;
39use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind, SideEffectingFunc};
40use mz_sql::session::user::User;
41use mz_sql::session::vars::{OwnedVarInput, SystemVars};
42use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
43use mz_storage_types::sources::Timeline;
44use mz_timestamp_oracle::TimestampOracle;
45use tokio::sync::{mpsc, oneshot};
46use uuid::Uuid;
47
48use crate::catalog::Catalog;
49use crate::coord::appends::BuiltinTableAppendNotify;
50use crate::coord::consistency::CoordinatorInconsistencies;
51use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
52use crate::coord::timestamp_selection::TimestampDetermination;
53use crate::coord::{ExecuteContextExtra, ExecuteContextGuard};
54use crate::error::AdapterError;
55use crate::session::{EndTransactionAction, RowBatchStream, Session};
56use crate::statement_logging::{
57    FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy,
58    StatementLoggingFrontend,
59};
60use crate::statement_logging::{StatementLoggingId, WatchSetCreation};
61use crate::util::Transmittable;
62use crate::webhook::AppendWebhookResponse;
63use crate::{
64    AdapterNotice, AppendWebhookError, CollectionIdBundle, ReadHolds, TimestampExplanation,
65};
66
67/// A handle for pgwire to stream raw byte chunks to the coordinator's
68/// parallel background batch builder tasks during COPY FROM STDIN.
69#[derive(Debug)]
70pub struct CopyFromStdinWriter {
71    /// Channels for distributing raw byte chunks (split at row boundaries)
72    /// across parallel worker tasks. pgwire round-robins chunks across these.
73    pub batch_txs: Vec<mpsc::Sender<Vec<u8>>>,
74    /// Receives the final result (`Vec<ProtoBatch>` + total row count, or error)
75    /// from the collector task. pgwire uses this to commit the batches.
76    pub completion_rx: oneshot::Receiver<
77        Result<(Vec<mz_persist_client::batch::ProtoBatch>, u64), crate::AdapterError>,
78    >,
79}
80
81#[derive(Debug)]
82pub struct CatalogSnapshot {
83    pub catalog: Arc<Catalog>,
84}
85
86#[derive(Debug)]
87pub enum Command {
88    CatalogSnapshot {
89        tx: oneshot::Sender<CatalogSnapshot>,
90    },
91
92    Startup {
93        tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
94        user: User,
95        conn_id: ConnectionId,
96        client_ip: Option<IpAddr>,
97        secret_key: u32,
98        uuid: Uuid,
99        application_name: String,
100        notice_tx: mpsc::UnboundedSender<AdapterNotice>,
101    },
102
103    AuthenticatePassword {
104        tx: oneshot::Sender<Result<(), AdapterError>>,
105        role_name: String,
106        password: Option<Password>,
107    },
108
109    AuthenticateGetSASLChallenge {
110        tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
111        role_name: String,
112        nonce: String,
113    },
114
115    AuthenticateVerifySASLProof {
116        tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
117        role_name: String,
118        proof: String,
119        auth_message: String,
120        mock_hash: String,
121    },
122
123    CheckRoleCanLogin {
124        tx: oneshot::Sender<Result<(), AdapterError>>,
125        role_name: String,
126    },
127
128    Execute {
129        portal_name: String,
130        session: Session,
131        tx: oneshot::Sender<Response<ExecuteResponse>>,
132        outer_ctx_extra: Option<ExecuteContextGuard>,
133    },
134
135    /// Attempts to commit or abort the session's transaction. Guarantees that the Coordinator's
136    /// transaction state has been cleared, even if the commit or abort fails. (A failure can
137    /// happen, for example, if the session's role id has been dropped which will prevent
138    /// sequence_end_transaction from running.)
139    Commit {
140        action: EndTransactionAction,
141        session: Session,
142        tx: oneshot::Sender<Response<ExecuteResponse>>,
143    },
144
145    CancelRequest {
146        conn_id: ConnectionIdType,
147        secret_key: u32,
148    },
149
150    PrivilegedCancelRequest {
151        conn_id: ConnectionId,
152    },
153
154    GetWebhook {
155        database: String,
156        schema: String,
157        name: String,
158        tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
159    },
160
161    GetSystemVars {
162        tx: oneshot::Sender<SystemVars>,
163    },
164
165    SetSystemVars {
166        vars: BTreeMap<String, String>,
167        conn_id: ConnectionId,
168        tx: oneshot::Sender<Result<(), AdapterError>>,
169    },
170
171    InjectAuditEvents {
172        events: Vec<crate::catalog::InjectedAuditEvent>,
173        conn_id: ConnectionId,
174        tx: oneshot::Sender<Result<(), AdapterError>>,
175    },
176
177    Terminate {
178        conn_id: ConnectionId,
179        tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
180    },
181
182    /// Sets up a streaming COPY FROM STDIN operation. The coordinator
183    /// creates parallel background batch builder tasks and returns a
184    /// [`CopyFromStdinWriter`] that pgwire uses to stream raw byte chunks.
185    StartCopyFromStdin {
186        target_id: CatalogItemId,
187        target_name: String,
188        columns: Vec<ColumnIndex>,
189        /// The row description for the target table (used for constraint checks).
190        row_desc: mz_repr::RelationDesc,
191        /// Copy format parameters (text/csv/binary) for decoding raw bytes.
192        params: mz_pgcopy::CopyFormatParams<'static>,
193        session: Session,
194        tx: oneshot::Sender<Response<CopyFromStdinWriter>>,
195    },
196
197    /// Performs any cleanup and logging actions necessary for
198    /// finalizing a statement execution.
199    ///
200    /// Only used for cases that terminate in the protocol layer and
201    /// otherwise have no reason to hand control back to the coordinator.
202    /// In other cases, we piggy-back on another command.
203    RetireExecute {
204        data: ExecuteContextExtra,
205        reason: StatementEndedExecutionReason,
206    },
207
208    CheckConsistency {
209        tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
210    },
211
212    Dump {
213        tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
214    },
215
216    GetComputeInstanceClient {
217        instance_id: ComputeInstanceId,
218        tx: oneshot::Sender<
219            Result<
220                mz_compute_client::controller::instance_client::InstanceClient,
221                mz_compute_client::controller::error::InstanceMissing,
222            >,
223        >,
224    },
225
226    GetOracle {
227        timeline: Timeline,
228        tx: oneshot::Sender<
229            Result<Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync>, AdapterError>,
230        >,
231    },
232
233    DetermineRealTimeRecentTimestamp {
234        source_ids: BTreeSet<GlobalId>,
235        real_time_recency_timeout: Duration,
236        tx: oneshot::Sender<Result<Option<mz_repr::Timestamp>, AdapterError>>,
237    },
238
239    GetTransactionReadHoldsBundle {
240        conn_id: ConnectionId,
241        tx: oneshot::Sender<Option<ReadHolds>>,
242    },
243
244    /// _Merges_ the given read holds into the given connection's stored transaction read holds.
245    StoreTransactionReadHolds {
246        conn_id: ConnectionId,
247        read_holds: ReadHolds,
248        tx: oneshot::Sender<()>,
249    },
250
251    ExecuteSlowPathPeek {
252        dataflow_plan: Box<PeekDataflowPlan>,
253        determination: TimestampDetermination,
254        finishing: RowSetFinishing,
255        compute_instance: ComputeInstanceId,
256        target_replica: Option<ReplicaId>,
257        intermediate_result_type: SqlRelationType,
258        source_ids: BTreeSet<GlobalId>,
259        conn_id: ConnectionId,
260        max_result_size: u64,
261        max_query_result_size: Option<u64>,
262        /// If statement logging is enabled, contains all info needed for installing watch sets
263        /// and logging the statement execution.
264        watch_set: Option<WatchSetCreation>,
265        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
266    },
267
268    ExecuteSubscribe {
269        df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
270        dependency_ids: BTreeSet<GlobalId>,
271        cluster_id: ComputeInstanceId,
272        replica_id: Option<ReplicaId>,
273        conn_id: ConnectionId,
274        session_uuid: Uuid,
275        read_holds: ReadHolds,
276        plan: plan::SubscribePlan,
277        statement_logging_id: Option<StatementLoggingId>,
278        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
279    },
280
281    /// Preflight check for COPY TO S3 operation. This runs the slow S3 operations
282    /// (loading SDK config, checking bucket path, verifying permissions, uploading sentinel)
283    /// in a background task to avoid blocking the coordinator.
284    CopyToPreflight {
285        /// The S3 connection info needed for preflight checks.
286        s3_sink_connection: mz_compute_types::sinks::CopyToS3OneshotSinkConnection,
287        /// The sink ID for logging and S3 key management.
288        sink_id: GlobalId,
289        /// Response channel for the preflight result.
290        tx: oneshot::Sender<Result<(), AdapterError>>,
291    },
292
293    ExecuteCopyTo {
294        df_desc: Box<DataflowDescription<mz_compute_types::plan::Plan>>,
295        compute_instance: ComputeInstanceId,
296        target_replica: Option<ReplicaId>,
297        source_ids: BTreeSet<GlobalId>,
298        conn_id: ConnectionId,
299        /// If statement logging is enabled, contains all info needed for installing watch sets
300        /// and logging the statement execution.
301        watch_set: Option<WatchSetCreation>,
302        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
303    },
304
305    /// Execute a side-effecting function from the frontend peek path.
306    ExecuteSideEffectingFunc {
307        plan: SideEffectingFunc,
308        conn_id: ConnectionId,
309        /// The current role of the session, used for RBAC checks.
310        current_role: RoleId,
311        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
312    },
313
314    /// Register a pending peek initiated by frontend sequencing. This is needed for:
315    /// - statement logging
316    /// - query cancellation
317    RegisterFrontendPeek {
318        uuid: Uuid,
319        conn_id: ConnectionId,
320        cluster_id: mz_controller_types::ClusterId,
321        depends_on: BTreeSet<GlobalId>,
322        is_fast_path: bool,
323        /// If statement logging is enabled, contains all info needed for installing watch sets
324        /// and logging the statement execution.
325        watch_set: Option<WatchSetCreation>,
326        tx: oneshot::Sender<Result<(), AdapterError>>,
327    },
328
329    /// Unregister a pending peek that was registered but failed to issue.
330    /// This is used for cleanup when `client.peek()` fails after `RegisterFrontendPeek` succeeds.
331    /// The `ExecuteContextExtra` is dropped without logging the statement retirement, because the
332    /// frontend will log the error.
333    UnregisterFrontendPeek {
334        uuid: Uuid,
335        tx: oneshot::Sender<()>,
336    },
337
338    /// Generate a timestamp explanation.
339    /// This is used when `emit_timestamp_notice` is enabled.
340    ExplainTimestamp {
341        conn_id: ConnectionId,
342        session_wall_time: DateTime<Utc>,
343        cluster_id: ClusterId,
344        id_bundle: CollectionIdBundle,
345        determination: TimestampDetermination,
346        tx: oneshot::Sender<TimestampExplanation>,
347    },
348
349    /// Statement logging event from frontend peek sequencing.
350    /// No response channel needed - this is fire-and-forget.
351    FrontendStatementLogging(FrontendStatementLoggingEvent),
352}
353
354impl Command {
355    pub fn session(&self) -> Option<&Session> {
356        match self {
357            Command::Execute { session, .. }
358            | Command::Commit { session, .. }
359            | Command::StartCopyFromStdin { session, .. } => Some(session),
360            Command::CancelRequest { .. }
361            | Command::Startup { .. }
362            | Command::AuthenticatePassword { .. }
363            | Command::AuthenticateGetSASLChallenge { .. }
364            | Command::AuthenticateVerifySASLProof { .. }
365            | Command::CheckRoleCanLogin { .. }
366            | Command::CatalogSnapshot { .. }
367            | Command::PrivilegedCancelRequest { .. }
368            | Command::GetWebhook { .. }
369            | Command::Terminate { .. }
370            | Command::GetSystemVars { .. }
371            | Command::SetSystemVars { .. }
372            | Command::RetireExecute { .. }
373            | Command::CheckConsistency { .. }
374            | Command::Dump { .. }
375            | Command::GetComputeInstanceClient { .. }
376            | Command::GetOracle { .. }
377            | Command::DetermineRealTimeRecentTimestamp { .. }
378            | Command::GetTransactionReadHoldsBundle { .. }
379            | Command::StoreTransactionReadHolds { .. }
380            | Command::ExecuteSlowPathPeek { .. }
381            | Command::ExecuteSubscribe { .. }
382            | Command::CopyToPreflight { .. }
383            | Command::ExecuteCopyTo { .. }
384            | Command::ExecuteSideEffectingFunc { .. }
385            | Command::RegisterFrontendPeek { .. }
386            | Command::UnregisterFrontendPeek { .. }
387            | Command::ExplainTimestamp { .. }
388            | Command::FrontendStatementLogging(..)
389            | Command::InjectAuditEvents { .. } => None,
390        }
391    }
392
393    pub fn session_mut(&mut self) -> Option<&mut Session> {
394        match self {
395            Command::Execute { session, .. }
396            | Command::Commit { session, .. }
397            | Command::StartCopyFromStdin { session, .. } => Some(session),
398            Command::CancelRequest { .. }
399            | Command::Startup { .. }
400            | Command::AuthenticatePassword { .. }
401            | Command::AuthenticateGetSASLChallenge { .. }
402            | Command::AuthenticateVerifySASLProof { .. }
403            | Command::CheckRoleCanLogin { .. }
404            | Command::CatalogSnapshot { .. }
405            | Command::PrivilegedCancelRequest { .. }
406            | Command::GetWebhook { .. }
407            | Command::Terminate { .. }
408            | Command::GetSystemVars { .. }
409            | Command::SetSystemVars { .. }
410            | Command::RetireExecute { .. }
411            | Command::CheckConsistency { .. }
412            | Command::Dump { .. }
413            | Command::GetComputeInstanceClient { .. }
414            | Command::GetOracle { .. }
415            | Command::DetermineRealTimeRecentTimestamp { .. }
416            | Command::GetTransactionReadHoldsBundle { .. }
417            | Command::StoreTransactionReadHolds { .. }
418            | Command::ExecuteSlowPathPeek { .. }
419            | Command::ExecuteSubscribe { .. }
420            | Command::CopyToPreflight { .. }
421            | Command::ExecuteCopyTo { .. }
422            | Command::ExecuteSideEffectingFunc { .. }
423            | Command::RegisterFrontendPeek { .. }
424            | Command::UnregisterFrontendPeek { .. }
425            | Command::ExplainTimestamp { .. }
426            | Command::FrontendStatementLogging(..)
427            | Command::InjectAuditEvents { .. } => None,
428        }
429    }
430}
431
432#[derive(Debug)]
433pub struct Response<T> {
434    pub result: Result<T, AdapterError>,
435    pub session: Session,
436    pub otel_ctx: OpenTelemetryContext,
437}
438
439#[derive(Debug, Clone, Copy)]
440pub struct SuperuserAttribute(pub Option<bool>);
441
442/// The response to [`Client::startup`](crate::Client::startup).
443#[derive(Derivative)]
444#[derivative(Debug)]
445pub struct StartupResponse {
446    /// RoleId for the user.
447    pub role_id: RoleId,
448    /// The role's superuser attribute in the Catalog.
449    /// This attribute is None for Cloud. Cloud is able
450    /// to derive the role's superuser status from
451    /// external_metadata_rx.
452    pub superuser_attribute: SuperuserAttribute,
453    /// A future that completes when all necessary Builtin Table writes have completed.
454    #[derivative(Debug = "ignore")]
455    pub write_notify: BuiltinTableAppendNotify,
456    /// Map of (name, VarInput::Flat) tuples of session default variables that should be set.
457    pub session_defaults: BTreeMap<String, OwnedVarInput>,
458    pub catalog: Arc<Catalog>,
459    pub storage_collections:
460        Arc<dyn mz_storage_client::storage_collections::StorageCollections + Send + Sync>,
461    pub transient_id_gen: Arc<TransientIdGen>,
462    pub optimizer_metrics: OptimizerMetrics,
463    pub persist_client: PersistClient,
464    pub statement_logging_frontend: StatementLoggingFrontend,
465}
466
467#[derive(Derivative)]
468#[derivative(Debug)]
469pub struct SASLChallengeResponse {
470    pub iteration_count: usize,
471    /// Base64-encoded salt for the SASL challenge.
472    pub salt: String,
473    pub nonce: String,
474}
475
476#[derive(Derivative)]
477#[derivative(Debug)]
478pub struct SASLVerifyProofResponse {
479    pub verifier: String,
480}
481
482// Facile implementation for `StartupResponse`, which does not use the `allowed`
483// feature of `ClientTransmitter`.
484impl Transmittable for StartupResponse {
485    type Allowed = bool;
486    fn to_allowed(&self) -> Self::Allowed {
487        true
488    }
489}
490
491/// The response to [`SessionClient::dump_catalog`](crate::SessionClient::dump_catalog).
492#[derive(Debug, Clone)]
493pub struct CatalogDump(String);
494
495impl CatalogDump {
496    pub fn new(raw: String) -> Self {
497        CatalogDump(raw)
498    }
499
500    pub fn into_string(self) -> String {
501        self.0
502    }
503}
504
505impl Transmittable for CatalogDump {
506    type Allowed = bool;
507    fn to_allowed(&self) -> Self::Allowed {
508        true
509    }
510}
511
512impl Transmittable for SystemVars {
513    type Allowed = bool;
514    fn to_allowed(&self) -> Self::Allowed {
515        true
516    }
517}
518
519/// The response to [`SessionClient::execute`](crate::SessionClient::execute).
520#[derive(EnumKind, Derivative)]
521#[derivative(Debug)]
522#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
523pub enum ExecuteResponse {
524    /// The default privileges were altered.
525    AlteredDefaultPrivileges,
526    /// The requested object was altered.
527    AlteredObject(ObjectType),
528    /// The role was altered.
529    AlteredRole,
530    /// The system configuration was altered.
531    AlteredSystemConfiguration,
532    /// The requested cursor was closed.
533    ClosedCursor,
534    /// The provided comment was created.
535    Comment,
536    /// The specified number of rows were copied into the requested output.
537    Copied(usize),
538    /// The response for a COPY TO STDOUT query.
539    CopyTo {
540        format: mz_sql::plan::CopyFormat,
541        resp: Box<ExecuteResponse>,
542    },
543    CopyFrom {
544        /// Table we're copying into.
545        target_id: CatalogItemId,
546        /// Human-readable full name of the target table.
547        target_name: String,
548        columns: Vec<ColumnIndex>,
549        params: CopyFormatParams<'static>,
550        ctx_extra: ExecuteContextGuard,
551    },
552    /// The requested connection was created.
553    CreatedConnection,
554    /// The requested database was created.
555    CreatedDatabase,
556    /// The requested schema was created.
557    CreatedSchema,
558    /// The requested role was created.
559    CreatedRole,
560    /// The requested cluster was created.
561    CreatedCluster,
562    /// The requested cluster replica was created.
563    CreatedClusterReplica,
564    /// The requested index was created.
565    CreatedIndex,
566    /// The requested introspection subscribe was created.
567    CreatedIntrospectionSubscribe,
568    /// The requested secret was created.
569    CreatedSecret,
570    /// The requested sink was created.
571    CreatedSink,
572    /// The requested source was created.
573    CreatedSource,
574    /// The requested table was created.
575    CreatedTable,
576    /// The requested view was created.
577    CreatedView,
578    /// The requested views were created.
579    CreatedViews,
580    /// The requested materialized view was created.
581    CreatedMaterializedView,
582    /// The requested continual task was created.
583    CreatedContinualTask,
584    /// The requested type was created.
585    CreatedType,
586    /// The requested network policy was created.
587    CreatedNetworkPolicy,
588    /// The requested prepared statement was removed.
589    Deallocate { all: bool },
590    /// The requested cursor was declared.
591    DeclaredCursor,
592    /// The specified number of rows were deleted from the requested table.
593    Deleted(usize),
594    /// The temporary objects associated with the session have been discarded.
595    DiscardedTemp,
596    /// All state associated with the session has been discarded.
597    DiscardedAll,
598    /// The requested object was dropped.
599    DroppedObject(ObjectType),
600    /// The requested objects were dropped.
601    DroppedOwned,
602    /// The provided query was empty.
603    EmptyQuery,
604    /// Fetch results from a cursor.
605    Fetch {
606        /// The name of the cursor from which to fetch results.
607        name: String,
608        /// The number of results to fetch.
609        count: Option<FetchDirection>,
610        /// How long to wait for results to arrive.
611        timeout: ExecuteTimeout,
612        ctx_extra: ExecuteContextGuard,
613    },
614    /// The requested privilege was granted.
615    GrantedPrivilege,
616    /// The requested role was granted.
617    GrantedRole,
618    /// The specified number of rows were inserted into the requested table.
619    Inserted(usize),
620    /// The specified prepared statement was created.
621    Prepare,
622    /// A user-requested warning was raised.
623    Raised,
624    /// The requested objects were reassigned.
625    ReassignOwned,
626    /// The requested privilege was revoked.
627    RevokedPrivilege,
628    /// The requested role was revoked.
629    RevokedRole,
630    /// Rows will be delivered via the specified stream.
631    SendingRowsStreaming {
632        #[derivative(Debug = "ignore")]
633        rows: Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>,
634        instance_id: ComputeInstanceId,
635        strategy: StatementExecutionStrategy,
636    },
637    /// Rows are known to be available immediately, and thus the execution is
638    /// considered ended in the coordinator.
639    SendingRowsImmediate {
640        #[derivative(Debug = "ignore")]
641        rows: Box<dyn RowIterator + Send + Sync>,
642    },
643    /// The specified variable was set to a new value.
644    SetVariable {
645        name: String,
646        /// Whether the operation was a `RESET` rather than a set.
647        reset: bool,
648    },
649    /// A new transaction was started.
650    StartedTransaction,
651    /// Updates to the requested source or view will be streamed to the
652    /// contained receiver.
653    Subscribing {
654        rx: RowBatchStream,
655        ctx_extra: ExecuteContextGuard,
656        instance_id: ComputeInstanceId,
657    },
658    /// The active transaction committed.
659    TransactionCommitted {
660        /// Session parameters that changed because the transaction ended.
661        params: BTreeMap<&'static str, String>,
662    },
663    /// The active transaction rolled back.
664    TransactionRolledBack {
665        /// Session parameters that changed because the transaction ended.
666        params: BTreeMap<&'static str, String>,
667    },
668    /// The specified number of rows were updated in the requested table.
669    Updated(usize),
670    /// A connection was validated.
671    ValidatedConnection,
672}
673
674impl TryFrom<&Statement<Raw>> for ExecuteResponse {
675    type Error = ();
676
677    /// Returns Ok if this Statement always produces a single, trivial ExecuteResponse.
678    fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
679        let resp_kinds = Plan::generated_from(&stmt.into())
680            .iter()
681            .map(ExecuteResponse::generated_from)
682            .flatten()
683            .cloned()
684            .collect::<BTreeSet<ExecuteResponseKind>>();
685        let resps = resp_kinds
686            .iter()
687            .map(|r| (*r).try_into())
688            .collect::<Result<Vec<ExecuteResponse>, _>>();
689        // Check if this statement's possible plans yield exactly one possible ExecuteResponse.
690        if let Ok(resps) = resps {
691            if resps.len() == 1 {
692                return Ok(resps.into_element());
693            }
694        }
695        let resp = match stmt {
696            Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
697                ExecuteResponse::DroppedObject((*object_type).into())
698            }
699            Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
700            | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
701                ExecuteResponse::AlteredObject((*object_type).into())
702            }
703            _ => return Err(()),
704        };
705        // Ensure that if the planner ever adds possible plans we complain here.
706        soft_assert_no_log!(
707            resp_kinds.len() == 1
708                && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
709            "ExecuteResponses out of sync with planner"
710        );
711        Ok(resp)
712    }
713}
714
715impl TryInto<ExecuteResponse> for ExecuteResponseKind {
716    type Error = ();
717
718    /// Attempts to convert into an ExecuteResponse. Returns an error if not possible without
719    /// actually executing a statement.
720    fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
721        match self {
722            ExecuteResponseKind::AlteredDefaultPrivileges => {
723                Ok(ExecuteResponse::AlteredDefaultPrivileges)
724            }
725            ExecuteResponseKind::AlteredObject => Err(()),
726            ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
727            ExecuteResponseKind::AlteredSystemConfiguration => {
728                Ok(ExecuteResponse::AlteredSystemConfiguration)
729            }
730            ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
731            ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
732            ExecuteResponseKind::Copied => Err(()),
733            ExecuteResponseKind::CopyTo => Err(()),
734            ExecuteResponseKind::CopyFrom => Err(()),
735            ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
736            ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
737            ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
738            ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
739            ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
740            ExecuteResponseKind::CreatedClusterReplica => {
741                Ok(ExecuteResponse::CreatedClusterReplica)
742            }
743            ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
744            ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
745            ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
746            ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
747            ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
748            ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
749            ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
750            ExecuteResponseKind::CreatedMaterializedView => {
751                Ok(ExecuteResponse::CreatedMaterializedView)
752            }
753            ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
754            ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
755            ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
756            ExecuteResponseKind::Deallocate => Err(()),
757            ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
758            ExecuteResponseKind::Deleted => Err(()),
759            ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
760            ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
761            ExecuteResponseKind::DroppedObject => Err(()),
762            ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
763            ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
764            ExecuteResponseKind::Fetch => Err(()),
765            ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
766            ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
767            ExecuteResponseKind::Inserted => Err(()),
768            ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
769            ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
770            ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
771            ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
772            ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
773            ExecuteResponseKind::SetVariable => Err(()),
774            ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
775            ExecuteResponseKind::Subscribing => Err(()),
776            ExecuteResponseKind::TransactionCommitted => Err(()),
777            ExecuteResponseKind::TransactionRolledBack => Err(()),
778            ExecuteResponseKind::Updated => Err(()),
779            ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
780            ExecuteResponseKind::SendingRowsStreaming => Err(()),
781            ExecuteResponseKind::SendingRowsImmediate => Err(()),
782            ExecuteResponseKind::CreatedIntrospectionSubscribe => {
783                Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
784            }
785        }
786    }
787}
788
789impl ExecuteResponse {
790    pub fn tag(&self) -> Option<String> {
791        use ExecuteResponse::*;
792        match self {
793            AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
794            AlteredObject(o) => Some(format!("ALTER {}", o)),
795            AlteredRole => Some("ALTER ROLE".into()),
796            AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
797            ClosedCursor => Some("CLOSE CURSOR".into()),
798            Comment => Some("COMMENT".into()),
799            Copied(n) => Some(format!("COPY {}", n)),
800            CopyTo { .. } => None,
801            CopyFrom { .. } => None,
802            CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
803            CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
804            CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
805            CreatedRole => Some("CREATE ROLE".into()),
806            CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
807            CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
808            CreatedIndex { .. } => Some("CREATE INDEX".into()),
809            CreatedSecret { .. } => Some("CREATE SECRET".into()),
810            CreatedSink { .. } => Some("CREATE SINK".into()),
811            CreatedSource { .. } => Some("CREATE SOURCE".into()),
812            CreatedTable { .. } => Some("CREATE TABLE".into()),
813            CreatedView { .. } => Some("CREATE VIEW".into()),
814            CreatedViews { .. } => Some("CREATE VIEWS".into()),
815            CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
816            CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
817            CreatedType => Some("CREATE TYPE".into()),
818            CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
819            Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
820            DeclaredCursor => Some("DECLARE CURSOR".into()),
821            Deleted(n) => Some(format!("DELETE {}", n)),
822            DiscardedTemp => Some("DISCARD TEMP".into()),
823            DiscardedAll => Some("DISCARD ALL".into()),
824            DroppedObject(o) => Some(format!("DROP {o}")),
825            DroppedOwned => Some("DROP OWNED".into()),
826            EmptyQuery => None,
827            Fetch { .. } => None,
828            GrantedPrivilege => Some("GRANT".into()),
829            GrantedRole => Some("GRANT ROLE".into()),
830            Inserted(n) => {
831                // "On successful completion, an INSERT command returns a
832                // command tag of the form `INSERT <oid> <count>`."
833                //     -- https://www.postgresql.org/docs/11/sql-insert.html
834                //
835                // OIDs are a PostgreSQL-specific historical quirk, but we
836                // can return a 0 OID to indicate that the table does not
837                // have OIDs.
838                Some(format!("INSERT 0 {}", n))
839            }
840            Prepare => Some("PREPARE".into()),
841            Raised => Some("RAISE".into()),
842            ReassignOwned => Some("REASSIGN OWNED".into()),
843            RevokedPrivilege => Some("REVOKE".into()),
844            RevokedRole => Some("REVOKE ROLE".into()),
845            SendingRowsStreaming { .. } | SendingRowsImmediate { .. } => None,
846            SetVariable { reset: true, .. } => Some("RESET".into()),
847            SetVariable { reset: false, .. } => Some("SET".into()),
848            StartedTransaction { .. } => Some("BEGIN".into()),
849            Subscribing { .. } => None,
850            TransactionCommitted { .. } => Some("COMMIT".into()),
851            TransactionRolledBack { .. } => Some("ROLLBACK".into()),
852            Updated(n) => Some(format!("UPDATE {}", n)),
853            ValidatedConnection => Some("VALIDATE CONNECTION".into()),
854            CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
855        }
856    }
857
858    /// Expresses which [`PlanKind`] generate which set of [`ExecuteResponseKind`].
859    /// `ExecuteResponseKind::Canceled` could be generated at any point as well, but that is
860    /// excluded from this function.
861    pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
862        use ExecuteResponseKind::*;
863        use PlanKind::*;
864
865        match plan {
866            AbortTransaction => &[TransactionRolledBack],
867            AlterClusterRename
868            | AlterClusterSwap
869            | AlterCluster
870            | AlterClusterReplicaRename
871            | AlterOwner
872            | AlterItemRename
873            | AlterRetainHistory
874            | AlterSourceTimestampInterval
875            | AlterNoop
876            | AlterSchemaRename
877            | AlterSchemaSwap
878            | AlterSecret
879            | AlterConnection
880            | AlterSource
881            | AlterSink
882            | AlterTableAddColumn
883            | AlterMaterializedViewApplyReplacement
884            | AlterNetworkPolicy => &[AlteredObject],
885            AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
886            AlterSetCluster => &[AlteredObject],
887            AlterRole => &[AlteredRole],
888            AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
889                &[AlteredSystemConfiguration]
890            }
891            Close => &[ClosedCursor],
892            PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
893            PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
894            PlanKind::Comment => &[ExecuteResponseKind::Comment],
895            CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
896            CreateConnection => &[CreatedConnection],
897            CreateDatabase => &[CreatedDatabase],
898            CreateSchema => &[CreatedSchema],
899            CreateRole => &[CreatedRole],
900            CreateCluster => &[CreatedCluster],
901            CreateClusterReplica => &[CreatedClusterReplica],
902            CreateSource | CreateSources => &[CreatedSource],
903            CreateSecret => &[CreatedSecret],
904            CreateSink => &[CreatedSink],
905            CreateTable => &[CreatedTable],
906            CreateView => &[CreatedView],
907            CreateMaterializedView => &[CreatedMaterializedView],
908            CreateContinualTask => &[CreatedContinualTask],
909            CreateIndex => &[CreatedIndex],
910            CreateType => &[CreatedType],
911            PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
912            CreateNetworkPolicy => &[CreatedNetworkPolicy],
913            Declare => &[DeclaredCursor],
914            DiscardTemp => &[DiscardedTemp],
915            DiscardAll => &[DiscardedAll],
916            DropObjects => &[DroppedObject],
917            DropOwned => &[DroppedOwned],
918            PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
919            ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
920            | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
921                ExecuteResponseKind::CopyTo,
922                SendingRowsStreaming,
923                SendingRowsImmediate,
924            ],
925            Execute | ReadThenWrite => &[
926                Deleted,
927                Inserted,
928                SendingRowsStreaming,
929                SendingRowsImmediate,
930                Updated,
931            ],
932            PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
933            GrantPrivileges => &[GrantedPrivilege],
934            GrantRole => &[GrantedRole],
935            Insert => &[Inserted, SendingRowsImmediate],
936            PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
937            PlanKind::Raise => &[ExecuteResponseKind::Raised],
938            PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
939            RevokePrivileges => &[RevokedPrivilege],
940            RevokeRole => &[RevokedRole],
941            PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
942                &[ExecuteResponseKind::SetVariable]
943            }
944            PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
945            StartTransaction => &[StartedTransaction],
946            SideEffectingFunc => &[SendingRowsStreaming, SendingRowsImmediate],
947            ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
948        }
949    }
950}
951
952/// This implementation is meant to ensure that we maintain updated information
953/// about which types of `ExecuteResponse`s are permitted to be sent, which will
954/// be a function of which plan we're executing.
955impl Transmittable for ExecuteResponse {
956    type Allowed = ExecuteResponseKind;
957    fn to_allowed(&self) -> Self::Allowed {
958        ExecuteResponseKind::from(self)
959    }
960}