Skip to main content

mz_adapter/
command.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use chrono::{DateTime, Utc};
17use derivative::Derivative;
18use enum_kinds::EnumKind;
19use futures::Stream;
20use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
21use mz_auth::password::Password;
22use mz_cluster_client::ReplicaId;
23use mz_compute_types::ComputeInstanceId;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_controller_types::ClusterId;
26use mz_expr::RowSetFinishing;
27use mz_ore::collections::CollectionExt;
28use mz_ore::soft_assert_no_log;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_persist_client::PersistClient;
31use mz_pgcopy::CopyFormatParams;
32use mz_repr::global_id::TransientIdGen;
33use mz_repr::role_id::RoleId;
34use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType};
35use mz_sql::ast::{FetchDirection, Raw, Statement};
36use mz_sql::catalog::ObjectType;
37use mz_sql::optimizer_metrics::OptimizerMetrics;
38use mz_sql::plan;
39use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind, SideEffectingFunc};
40use mz_sql::session::user::User;
41use mz_sql::session::vars::{OwnedVarInput, SystemVars};
42use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
43use mz_storage_types::sources::Timeline;
44use mz_timestamp_oracle::TimestampOracle;
45use tokio::sync::{mpsc, oneshot};
46use uuid::Uuid;
47
48use crate::catalog::Catalog;
49use crate::config::{ScopedParameters, ScopedParametersScope, SystemParameterFrontend};
50use crate::coord::appends::BuiltinTableAppendNotify;
51use crate::coord::consistency::CoordinatorInconsistencies;
52use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
53use crate::coord::timestamp_selection::TimestampDetermination;
54use crate::coord::{ExecuteContextExtra, ExecuteContextGuard};
55use crate::error::AdapterError;
56use crate::session::{EndTransactionAction, RowBatchStream, Session};
57use crate::statement_logging::{
58    FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy,
59    StatementLoggingFrontend,
60};
61use crate::statement_logging::{StatementLoggingId, WatchSetCreation};
62use crate::util::Transmittable;
63use crate::webhook::AppendWebhookResponse;
64use crate::{
65    AdapterNotice, AppendWebhookError, CollectionIdBundle, ReadHolds, TimestampExplanation,
66};
67
68/// A handle for pgwire to stream raw byte chunks to the coordinator's
69/// parallel background batch builder tasks during COPY FROM STDIN.
70#[derive(Debug)]
71pub struct CopyFromStdinWriter {
72    /// Channels for distributing raw byte chunks (split at row boundaries)
73    /// across parallel worker tasks. pgwire round-robins chunks across these.
74    pub batch_txs: Vec<mpsc::Sender<Vec<u8>>>,
75    /// Receives the final result (`Vec<ProtoBatch>` + total row count, or error)
76    /// from the collector task. pgwire uses this to commit the batches.
77    pub completion_rx: oneshot::Receiver<
78        Result<(Vec<mz_persist_client::batch::ProtoBatch>, u64), crate::AdapterError>,
79    >,
80}
81
82#[derive(Debug)]
83pub struct CatalogSnapshot {
84    pub catalog: Arc<Catalog>,
85}
86
87#[derive(Debug)]
88pub enum Command {
89    CatalogSnapshot {
90        tx: oneshot::Sender<CatalogSnapshot>,
91    },
92
93    Startup {
94        tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
95        user: User,
96        conn_id: ConnectionId,
97        client_ip: Option<IpAddr>,
98        secret_key: u32,
99        uuid: Uuid,
100        application_name: String,
101        notice_tx: mpsc::UnboundedSender<AdapterNotice>,
102    },
103
104    AuthenticatePassword {
105        tx: oneshot::Sender<Result<(), AdapterError>>,
106        role_name: String,
107        password: Option<Password>,
108    },
109
110    AuthenticateGetSASLChallenge {
111        tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
112        role_name: String,
113        nonce: String,
114    },
115
116    AuthenticateVerifySASLProof {
117        tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
118        role_name: String,
119        proof: String,
120        auth_message: String,
121        mock_hash: String,
122    },
123
124    CheckRoleCanLogin {
125        tx: oneshot::Sender<Result<(), AdapterError>>,
126        role_name: String,
127    },
128
129    Execute {
130        portal_name: String,
131        session: Session,
132        tx: oneshot::Sender<Response<ExecuteResponse>>,
133        outer_ctx_extra: Option<ExecuteContextGuard>,
134    },
135
136    /// Attempts to commit or abort the session's transaction. Guarantees that the Coordinator's
137    /// transaction state has been cleared, even if the commit or abort fails. (A failure can
138    /// happen, for example, if the session's role id has been dropped which will prevent
139    /// sequence_end_transaction from running.)
140    Commit {
141        action: EndTransactionAction,
142        session: Session,
143        tx: oneshot::Sender<Response<ExecuteResponse>>,
144    },
145
146    CancelRequest {
147        conn_id: ConnectionIdType,
148        secret_key: u32,
149    },
150
151    PrivilegedCancelRequest {
152        conn_id: ConnectionId,
153    },
154
155    GetWebhook {
156        database: String,
157        schema: String,
158        name: String,
159        tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
160    },
161
162    GetSystemVars {
163        tx: oneshot::Sender<SystemVars>,
164    },
165
166    SetSystemVars {
167        vars: BTreeMap<String, String>,
168        conn_id: ConnectionId,
169        tx: oneshot::Sender<Result<(), AdapterError>>,
170    },
171
172    /// Replace the scoped feature-flag overrides (the complete desired state).
173    /// Computed by the system-parameter sync loop from continuous LaunchDarkly
174    /// evaluation. The coordinator stores this working copy and reconciles it
175    /// into the per-scope resolution boundaries (the compute controller's
176    /// per-replica dyncfg layer for `replica`-scoped parameters). See the
177    /// scoped feature flags design.
178    UpdateScopedSystemParameters {
179        overrides: ScopedParameters,
180        /// Bounds which objects' durable rows the reconcile may prune. See
181        /// [`crate::catalog::Op::UpdateScopedSystemParameters`].
182        prune_scope: Option<ScopedParametersScope>,
183        tx: oneshot::Sender<()>,
184    },
185
186    /// Install (or replace) the shared system-parameter frontend on the
187    /// coordinator, so the create-cluster / create-replica paths can resolve a
188    /// new object's scoped overrides synchronously, before the controller
189    /// installs it or its first dataflow is planned, rather than waiting for
190    /// the next sync tick. Sent by the sync loop whenever it (re)initializes the
191    /// frontend. See the scoped feature flags design.
192    InstallScopedSystemParameterFrontend {
193        frontend: Arc<SystemParameterFrontend>,
194    },
195
196    InjectAuditEvents {
197        events: Vec<crate::catalog::InjectedAuditEvent>,
198        conn_id: ConnectionId,
199        tx: oneshot::Sender<Result<(), AdapterError>>,
200    },
201
202    Terminate {
203        conn_id: ConnectionId,
204        tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
205    },
206
207    /// Sets up a streaming COPY FROM STDIN operation. The coordinator
208    /// creates parallel background batch builder tasks and returns a
209    /// [`CopyFromStdinWriter`] that pgwire uses to stream raw byte chunks.
210    StartCopyFromStdin {
211        target_id: CatalogItemId,
212        target_name: String,
213        columns: Vec<ColumnIndex>,
214        /// The row description for the target table (used for constraint checks).
215        row_desc: mz_repr::RelationDesc,
216        /// Copy format parameters (text/csv/binary) for decoding raw bytes.
217        params: mz_pgcopy::CopyFormatParams<'static>,
218        session: Session,
219        tx: oneshot::Sender<Response<CopyFromStdinWriter>>,
220    },
221
222    /// Performs any cleanup and logging actions necessary for
223    /// finalizing a statement execution.
224    ///
225    /// Only used for cases that terminate in the protocol layer and
226    /// otherwise have no reason to hand control back to the coordinator.
227    /// In other cases, we piggy-back on another command.
228    RetireExecute {
229        data: ExecuteContextExtra,
230        reason: StatementEndedExecutionReason,
231    },
232
233    CheckConsistency {
234        tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
235    },
236
237    Dump {
238        tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
239    },
240
241    GetComputeInstanceClient {
242        instance_id: ComputeInstanceId,
243        tx: oneshot::Sender<
244            Result<
245                mz_compute_client::controller::instance_client::InstanceClient,
246                mz_compute_client::controller::error::InstanceMissing,
247            >,
248        >,
249    },
250
251    GetOracle {
252        timeline: Timeline,
253        tx: oneshot::Sender<
254            Result<Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync>, AdapterError>,
255        >,
256    },
257
258    DetermineRealTimeRecentTimestamp {
259        source_ids: BTreeSet<GlobalId>,
260        real_time_recency_timeout: Duration,
261        tx: oneshot::Sender<Result<Option<mz_repr::Timestamp>, AdapterError>>,
262    },
263
264    GetTransactionReadHoldsBundle {
265        conn_id: ConnectionId,
266        tx: oneshot::Sender<Option<ReadHolds>>,
267    },
268
269    /// _Merges_ the given read holds into the given connection's stored transaction read holds.
270    StoreTransactionReadHolds {
271        conn_id: ConnectionId,
272        read_holds: ReadHolds,
273        tx: oneshot::Sender<()>,
274    },
275
276    ExecuteSlowPathPeek {
277        dataflow_plan: Box<PeekDataflowPlan>,
278        determination: TimestampDetermination,
279        finishing: RowSetFinishing,
280        compute_instance: ComputeInstanceId,
281        target_replica: Option<ReplicaId>,
282        intermediate_result_type: SqlRelationType,
283        source_ids: BTreeSet<GlobalId>,
284        conn_id: ConnectionId,
285        max_result_size: u64,
286        max_query_result_size: Option<u64>,
287        /// If statement logging is enabled, contains all info needed for installing watch sets
288        /// and logging the statement execution.
289        watch_set: Option<WatchSetCreation>,
290        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
291    },
292
293    ExecuteSubscribe {
294        df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
295        dependency_ids: BTreeSet<GlobalId>,
296        cluster_id: ComputeInstanceId,
297        replica_id: Option<ReplicaId>,
298        conn_id: ConnectionId,
299        session_uuid: Uuid,
300        read_holds: ReadHolds,
301        plan: plan::SubscribePlan,
302        statement_logging_id: Option<StatementLoggingId>,
303        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
304    },
305
306    /// Preflight check for COPY TO S3 operation. This runs the slow S3 operations
307    /// (loading SDK config, checking bucket path, verifying permissions, uploading sentinel)
308    /// in a background task to avoid blocking the coordinator.
309    CopyToPreflight {
310        /// The S3 connection info needed for preflight checks.
311        s3_sink_connection: mz_compute_types::sinks::CopyToS3OneshotSinkConnection,
312        /// The sink ID for logging and S3 key management.
313        sink_id: GlobalId,
314        /// Response channel for the preflight result.
315        tx: oneshot::Sender<Result<(), AdapterError>>,
316    },
317
318    ExecuteCopyTo {
319        df_desc: Box<DataflowDescription<mz_compute_types::plan::Plan>>,
320        compute_instance: ComputeInstanceId,
321        target_replica: Option<ReplicaId>,
322        source_ids: BTreeSet<GlobalId>,
323        conn_id: ConnectionId,
324        /// If statement logging is enabled, contains all info needed for installing watch sets
325        /// and logging the statement execution.
326        watch_set: Option<WatchSetCreation>,
327        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
328    },
329
330    /// Execute a side-effecting function from the frontend peek path.
331    ExecuteSideEffectingFunc {
332        plan: SideEffectingFunc,
333        conn_id: ConnectionId,
334        /// The current role of the session, used for RBAC checks.
335        current_role: RoleId,
336        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
337    },
338
339    /// Register a pending peek initiated by frontend sequencing. This is needed for:
340    /// - statement logging
341    /// - query cancellation
342    RegisterFrontendPeek {
343        uuid: Uuid,
344        conn_id: ConnectionId,
345        cluster_id: mz_controller_types::ClusterId,
346        depends_on: BTreeSet<GlobalId>,
347        is_fast_path: bool,
348        /// If statement logging is enabled, contains all info needed for installing watch sets
349        /// and logging the statement execution.
350        watch_set: Option<WatchSetCreation>,
351        tx: oneshot::Sender<Result<(), AdapterError>>,
352    },
353
354    /// Unregister a pending peek that was registered but failed to issue.
355    /// This is used for cleanup when `client.peek()` fails after `RegisterFrontendPeek` succeeds.
356    /// The `ExecuteContextExtra` is dropped without logging the statement retirement, because the
357    /// frontend will log the error.
358    UnregisterFrontendPeek {
359        uuid: Uuid,
360        tx: oneshot::Sender<()>,
361    },
362
363    /// Generate a timestamp explanation.
364    /// This is used when `emit_timestamp_notice` is enabled.
365    ExplainTimestamp {
366        conn_id: ConnectionId,
367        session_wall_time: DateTime<Utc>,
368        cluster_id: ClusterId,
369        id_bundle: CollectionIdBundle,
370        determination: TimestampDetermination,
371        tx: oneshot::Sender<TimestampExplanation>,
372    },
373
374    /// Statement logging event from frontend peek sequencing.
375    /// No response channel needed - this is fire-and-forget.
376    FrontendStatementLogging(FrontendStatementLoggingEvent),
377}
378
379impl Command {
380    pub fn session(&self) -> Option<&Session> {
381        match self {
382            Command::Execute { session, .. }
383            | Command::Commit { session, .. }
384            | Command::StartCopyFromStdin { session, .. } => Some(session),
385            Command::CancelRequest { .. }
386            | Command::Startup { .. }
387            | Command::AuthenticatePassword { .. }
388            | Command::AuthenticateGetSASLChallenge { .. }
389            | Command::AuthenticateVerifySASLProof { .. }
390            | Command::CheckRoleCanLogin { .. }
391            | Command::CatalogSnapshot { .. }
392            | Command::PrivilegedCancelRequest { .. }
393            | Command::GetWebhook { .. }
394            | Command::Terminate { .. }
395            | Command::GetSystemVars { .. }
396            | Command::SetSystemVars { .. }
397            | Command::UpdateScopedSystemParameters { .. }
398            | Command::InstallScopedSystemParameterFrontend { .. }
399            | Command::RetireExecute { .. }
400            | Command::CheckConsistency { .. }
401            | Command::Dump { .. }
402            | Command::GetComputeInstanceClient { .. }
403            | Command::GetOracle { .. }
404            | Command::DetermineRealTimeRecentTimestamp { .. }
405            | Command::GetTransactionReadHoldsBundle { .. }
406            | Command::StoreTransactionReadHolds { .. }
407            | Command::ExecuteSlowPathPeek { .. }
408            | Command::ExecuteSubscribe { .. }
409            | Command::CopyToPreflight { .. }
410            | Command::ExecuteCopyTo { .. }
411            | Command::ExecuteSideEffectingFunc { .. }
412            | Command::RegisterFrontendPeek { .. }
413            | Command::UnregisterFrontendPeek { .. }
414            | Command::ExplainTimestamp { .. }
415            | Command::FrontendStatementLogging(..)
416            | Command::InjectAuditEvents { .. } => None,
417        }
418    }
419
420    pub fn session_mut(&mut self) -> Option<&mut Session> {
421        match self {
422            Command::Execute { session, .. }
423            | Command::Commit { session, .. }
424            | Command::StartCopyFromStdin { session, .. } => Some(session),
425            Command::CancelRequest { .. }
426            | Command::Startup { .. }
427            | Command::AuthenticatePassword { .. }
428            | Command::AuthenticateGetSASLChallenge { .. }
429            | Command::AuthenticateVerifySASLProof { .. }
430            | Command::CheckRoleCanLogin { .. }
431            | Command::CatalogSnapshot { .. }
432            | Command::PrivilegedCancelRequest { .. }
433            | Command::GetWebhook { .. }
434            | Command::Terminate { .. }
435            | Command::GetSystemVars { .. }
436            | Command::SetSystemVars { .. }
437            | Command::UpdateScopedSystemParameters { .. }
438            | Command::InstallScopedSystemParameterFrontend { .. }
439            | Command::RetireExecute { .. }
440            | Command::CheckConsistency { .. }
441            | Command::Dump { .. }
442            | Command::GetComputeInstanceClient { .. }
443            | Command::GetOracle { .. }
444            | Command::DetermineRealTimeRecentTimestamp { .. }
445            | Command::GetTransactionReadHoldsBundle { .. }
446            | Command::StoreTransactionReadHolds { .. }
447            | Command::ExecuteSlowPathPeek { .. }
448            | Command::ExecuteSubscribe { .. }
449            | Command::CopyToPreflight { .. }
450            | Command::ExecuteCopyTo { .. }
451            | Command::ExecuteSideEffectingFunc { .. }
452            | Command::RegisterFrontendPeek { .. }
453            | Command::UnregisterFrontendPeek { .. }
454            | Command::ExplainTimestamp { .. }
455            | Command::FrontendStatementLogging(..)
456            | Command::InjectAuditEvents { .. } => None,
457        }
458    }
459}
460
461#[derive(Debug)]
462pub struct Response<T> {
463    pub result: Result<T, AdapterError>,
464    pub session: Session,
465    pub otel_ctx: OpenTelemetryContext,
466}
467
468#[derive(Debug, Clone, Copy)]
469pub struct SuperuserAttribute(pub Option<bool>);
470
471/// The response to [`Client::startup`](crate::Client::startup).
472#[derive(Derivative)]
473#[derivative(Debug)]
474pub struct StartupResponse {
475    /// RoleId for the user.
476    pub role_id: RoleId,
477    /// The role's superuser attribute in the Catalog.
478    /// This attribute is None for Cloud. Cloud is able
479    /// to derive the role's superuser status from
480    /// external_metadata_rx.
481    pub superuser_attribute: SuperuserAttribute,
482    /// A future that completes when all necessary Builtin Table writes have completed.
483    #[derivative(Debug = "ignore")]
484    pub write_notify: BuiltinTableAppendNotify,
485    /// Map of (name, VarInput::Flat) tuples of session default variables that should be set.
486    pub session_defaults: BTreeMap<String, OwnedVarInput>,
487    pub catalog: Arc<Catalog>,
488    pub storage_collections:
489        Arc<dyn mz_storage_client::storage_collections::StorageCollections + Send + Sync>,
490    pub transient_id_gen: Arc<TransientIdGen>,
491    pub optimizer_metrics: OptimizerMetrics,
492    pub persist_client: PersistClient,
493    pub statement_logging_frontend: StatementLoggingFrontend,
494}
495
496#[derive(Derivative)]
497#[derivative(Debug)]
498pub struct SASLChallengeResponse {
499    pub iteration_count: usize,
500    /// Base64-encoded salt for the SASL challenge.
501    pub salt: String,
502    pub nonce: String,
503}
504
505#[derive(Derivative)]
506#[derivative(Debug)]
507pub struct SASLVerifyProofResponse {
508    pub verifier: String,
509}
510
511// Facile implementation for `StartupResponse`, which does not use the `allowed`
512// feature of `ClientTransmitter`.
513impl Transmittable for StartupResponse {
514    type Allowed = bool;
515    fn to_allowed(&self) -> Self::Allowed {
516        true
517    }
518}
519
520/// The response to [`SessionClient::dump_catalog`](crate::SessionClient::dump_catalog).
521#[derive(Debug, Clone)]
522pub struct CatalogDump(String);
523
524impl CatalogDump {
525    pub fn new(raw: String) -> Self {
526        CatalogDump(raw)
527    }
528
529    pub fn into_string(self) -> String {
530        self.0
531    }
532}
533
534impl Transmittable for CatalogDump {
535    type Allowed = bool;
536    fn to_allowed(&self) -> Self::Allowed {
537        true
538    }
539}
540
541impl Transmittable for SystemVars {
542    type Allowed = bool;
543    fn to_allowed(&self) -> Self::Allowed {
544        true
545    }
546}
547
548/// The response to [`SessionClient::execute`](crate::SessionClient::execute).
549#[derive(EnumKind, Derivative)]
550#[derivative(Debug)]
551#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
552pub enum ExecuteResponse {
553    /// The default privileges were altered.
554    AlteredDefaultPrivileges,
555    /// The requested object was altered.
556    AlteredObject(ObjectType),
557    /// The role was altered.
558    AlteredRole,
559    /// The system configuration was altered.
560    AlteredSystemConfiguration,
561    /// The requested cursor was closed.
562    ClosedCursor,
563    /// The provided comment was created.
564    Comment,
565    /// The specified number of rows were copied into the requested output.
566    Copied(usize),
567    /// The response for a COPY TO STDOUT query.
568    CopyTo {
569        format: mz_sql::plan::CopyFormat,
570        resp: Box<ExecuteResponse>,
571    },
572    CopyFrom {
573        /// Table we're copying into.
574        target_id: CatalogItemId,
575        /// Human-readable full name of the target table.
576        target_name: String,
577        columns: Vec<ColumnIndex>,
578        params: CopyFormatParams<'static>,
579        ctx_extra: ExecuteContextGuard,
580    },
581    /// The requested connection was created.
582    CreatedConnection,
583    /// The requested database was created.
584    CreatedDatabase,
585    /// The requested schema was created.
586    CreatedSchema,
587    /// The requested role was created.
588    CreatedRole,
589    /// The requested cluster was created.
590    CreatedCluster,
591    /// The requested cluster replica was created.
592    CreatedClusterReplica,
593    /// The requested index was created.
594    CreatedIndex,
595    /// The requested introspection subscribe was created.
596    CreatedIntrospectionSubscribe,
597    /// The requested secret was created.
598    CreatedSecret,
599    /// The requested sink was created.
600    CreatedSink,
601    /// The requested source was created.
602    CreatedSource,
603    /// The requested table was created.
604    CreatedTable,
605    /// The requested view was created.
606    CreatedView,
607    /// The requested views were created.
608    CreatedViews,
609    /// The requested materialized view was created.
610    CreatedMaterializedView,
611    /// The requested type was created.
612    CreatedType,
613    /// The requested network policy was created.
614    CreatedNetworkPolicy,
615    /// The requested prepared statement was removed.
616    Deallocate { all: bool },
617    /// The requested cursor was declared.
618    DeclaredCursor,
619    /// The specified number of rows were deleted from the requested table.
620    Deleted(usize),
621    /// The temporary objects associated with the session have been discarded.
622    DiscardedTemp,
623    /// All state associated with the session has been discarded.
624    DiscardedAll,
625    /// The requested object was dropped.
626    DroppedObject(ObjectType),
627    /// The requested objects were dropped.
628    DroppedOwned,
629    /// The provided query was empty.
630    EmptyQuery,
631    /// Fetch results from a cursor.
632    Fetch {
633        /// The name of the cursor from which to fetch results.
634        name: String,
635        /// The number of results to fetch.
636        count: Option<FetchDirection>,
637        /// How long to wait for results to arrive.
638        timeout: ExecuteTimeout,
639        ctx_extra: ExecuteContextGuard,
640    },
641    /// The requested privilege was granted.
642    GrantedPrivilege,
643    /// The requested role was granted.
644    GrantedRole,
645    /// The specified number of rows were inserted into the requested table.
646    Inserted(usize),
647    /// The specified prepared statement was created.
648    Prepare,
649    /// A user-requested warning was raised.
650    Raised,
651    /// The requested objects were reassigned.
652    ReassignOwned,
653    /// The requested privilege was revoked.
654    RevokedPrivilege,
655    /// The requested role was revoked.
656    RevokedRole,
657    /// Rows will be delivered via the specified stream.
658    SendingRowsStreaming {
659        #[derivative(Debug = "ignore")]
660        rows: Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>,
661        instance_id: ComputeInstanceId,
662        strategy: StatementExecutionStrategy,
663    },
664    /// Rows are known to be available immediately, and thus the execution is
665    /// considered ended in the coordinator.
666    SendingRowsImmediate {
667        #[derivative(Debug = "ignore")]
668        rows: Box<dyn RowIterator + Send + Sync>,
669    },
670    /// The specified variable was set to a new value.
671    SetVariable {
672        name: String,
673        /// Whether the operation was a `RESET` rather than a set.
674        reset: bool,
675    },
676    /// A new transaction was started.
677    StartedTransaction,
678    /// Updates to the requested source or view will be streamed to the
679    /// contained receiver.
680    Subscribing {
681        rx: RowBatchStream,
682        ctx_extra: ExecuteContextGuard,
683        instance_id: ComputeInstanceId,
684    },
685    /// The active transaction committed.
686    TransactionCommitted {
687        /// Session parameters that changed because the transaction ended.
688        params: BTreeMap<&'static str, String>,
689    },
690    /// The active transaction rolled back.
691    TransactionRolledBack {
692        /// Session parameters that changed because the transaction ended.
693        params: BTreeMap<&'static str, String>,
694    },
695    /// The specified number of rows were updated in the requested table.
696    Updated(usize),
697    /// A connection was validated.
698    ValidatedConnection,
699}
700
701impl TryFrom<&Statement<Raw>> for ExecuteResponse {
702    type Error = ();
703
704    /// Returns Ok if this Statement always produces a single, trivial ExecuteResponse.
705    fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
706        let resp_kinds = Plan::generated_from(&stmt.into())
707            .iter()
708            .map(ExecuteResponse::generated_from)
709            .flatten()
710            .cloned()
711            .collect::<BTreeSet<ExecuteResponseKind>>();
712        let resps = resp_kinds
713            .iter()
714            .map(|r| (*r).try_into())
715            .collect::<Result<Vec<ExecuteResponse>, _>>();
716        // Check if this statement's possible plans yield exactly one possible ExecuteResponse.
717        if let Ok(resps) = resps {
718            if resps.len() == 1 {
719                return Ok(resps.into_element());
720            }
721        }
722        let resp = match stmt {
723            Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
724                ExecuteResponse::DroppedObject((*object_type).into())
725            }
726            Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
727            | Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
728                ExecuteResponse::AlteredObject((*object_type).into())
729            }
730            _ => return Err(()),
731        };
732        // Ensure that if the planner ever adds possible plans we complain here.
733        soft_assert_no_log!(
734            resp_kinds.len() == 1
735                && resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
736            "ExecuteResponses out of sync with planner"
737        );
738        Ok(resp)
739    }
740}
741
742impl TryInto<ExecuteResponse> for ExecuteResponseKind {
743    type Error = ();
744
745    /// Attempts to convert into an ExecuteResponse. Returns an error if not possible without
746    /// actually executing a statement.
747    fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
748        match self {
749            ExecuteResponseKind::AlteredDefaultPrivileges => {
750                Ok(ExecuteResponse::AlteredDefaultPrivileges)
751            }
752            ExecuteResponseKind::AlteredObject => Err(()),
753            ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
754            ExecuteResponseKind::AlteredSystemConfiguration => {
755                Ok(ExecuteResponse::AlteredSystemConfiguration)
756            }
757            ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
758            ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
759            ExecuteResponseKind::Copied => Err(()),
760            ExecuteResponseKind::CopyTo => Err(()),
761            ExecuteResponseKind::CopyFrom => Err(()),
762            ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
763            ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
764            ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
765            ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
766            ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
767            ExecuteResponseKind::CreatedClusterReplica => {
768                Ok(ExecuteResponse::CreatedClusterReplica)
769            }
770            ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
771            ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
772            ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
773            ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
774            ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
775            ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
776            ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
777            ExecuteResponseKind::CreatedMaterializedView => {
778                Ok(ExecuteResponse::CreatedMaterializedView)
779            }
780            ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
781            ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
782            ExecuteResponseKind::Deallocate => Err(()),
783            ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
784            ExecuteResponseKind::Deleted => Err(()),
785            ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
786            ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
787            ExecuteResponseKind::DroppedObject => Err(()),
788            ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
789            ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
790            ExecuteResponseKind::Fetch => Err(()),
791            ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
792            ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
793            ExecuteResponseKind::Inserted => Err(()),
794            ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
795            ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
796            ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
797            ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
798            ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
799            ExecuteResponseKind::SetVariable => Err(()),
800            ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
801            ExecuteResponseKind::Subscribing => Err(()),
802            ExecuteResponseKind::TransactionCommitted => Err(()),
803            ExecuteResponseKind::TransactionRolledBack => Err(()),
804            ExecuteResponseKind::Updated => Err(()),
805            ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
806            ExecuteResponseKind::SendingRowsStreaming => Err(()),
807            ExecuteResponseKind::SendingRowsImmediate => Err(()),
808            ExecuteResponseKind::CreatedIntrospectionSubscribe => {
809                Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
810            }
811        }
812    }
813}
814
815impl ExecuteResponse {
816    pub fn tag(&self) -> Option<String> {
817        use ExecuteResponse::*;
818        match self {
819            AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
820            AlteredObject(o) => Some(format!("ALTER {}", o)),
821            AlteredRole => Some("ALTER ROLE".into()),
822            AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
823            ClosedCursor => Some("CLOSE CURSOR".into()),
824            Comment => Some("COMMENT".into()),
825            Copied(n) => Some(format!("COPY {}", n)),
826            CopyTo { .. } => None,
827            CopyFrom { .. } => None,
828            CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
829            CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
830            CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
831            CreatedRole => Some("CREATE ROLE".into()),
832            CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
833            CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
834            CreatedIndex { .. } => Some("CREATE INDEX".into()),
835            CreatedSecret { .. } => Some("CREATE SECRET".into()),
836            CreatedSink { .. } => Some("CREATE SINK".into()),
837            CreatedSource { .. } => Some("CREATE SOURCE".into()),
838            CreatedTable { .. } => Some("CREATE TABLE".into()),
839            CreatedView { .. } => Some("CREATE VIEW".into()),
840            CreatedViews { .. } => Some("CREATE VIEWS".into()),
841            CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
842            CreatedType => Some("CREATE TYPE".into()),
843            CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
844            Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
845            DeclaredCursor => Some("DECLARE CURSOR".into()),
846            Deleted(n) => Some(format!("DELETE {}", n)),
847            DiscardedTemp => Some("DISCARD TEMP".into()),
848            DiscardedAll => Some("DISCARD ALL".into()),
849            DroppedObject(o) => Some(format!("DROP {o}")),
850            DroppedOwned => Some("DROP OWNED".into()),
851            EmptyQuery => None,
852            Fetch { .. } => None,
853            GrantedPrivilege => Some("GRANT".into()),
854            GrantedRole => Some("GRANT ROLE".into()),
855            Inserted(n) => {
856                // "On successful completion, an INSERT command returns a
857                // command tag of the form `INSERT <oid> <count>`."
858                //     -- https://www.postgresql.org/docs/11/sql-insert.html
859                //
860                // OIDs are a PostgreSQL-specific historical quirk, but we
861                // can return a 0 OID to indicate that the table does not
862                // have OIDs.
863                Some(format!("INSERT 0 {}", n))
864            }
865            Prepare => Some("PREPARE".into()),
866            Raised => Some("RAISE".into()),
867            ReassignOwned => Some("REASSIGN OWNED".into()),
868            RevokedPrivilege => Some("REVOKE".into()),
869            RevokedRole => Some("REVOKE ROLE".into()),
870            SendingRowsStreaming { .. } | SendingRowsImmediate { .. } => None,
871            SetVariable { reset: true, .. } => Some("RESET".into()),
872            SetVariable { reset: false, .. } => Some("SET".into()),
873            StartedTransaction { .. } => Some("BEGIN".into()),
874            Subscribing { .. } => None,
875            TransactionCommitted { .. } => Some("COMMIT".into()),
876            TransactionRolledBack { .. } => Some("ROLLBACK".into()),
877            Updated(n) => Some(format!("UPDATE {}", n)),
878            ValidatedConnection => Some("VALIDATE CONNECTION".into()),
879            CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
880        }
881    }
882
883    /// Expresses which [`PlanKind`] generate which set of [`ExecuteResponseKind`].
884    /// `ExecuteResponseKind::Canceled` could be generated at any point as well, but that is
885    /// excluded from this function.
886    pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
887        use ExecuteResponseKind::*;
888        use PlanKind::*;
889
890        match plan {
891            AbortTransaction => &[TransactionRolledBack],
892            AlterClusterRename
893            | AlterClusterSwap
894            | AlterCluster
895            | AlterClusterReplicaRename
896            | AlterOwner
897            | AlterItemRename
898            | AlterRetainHistory
899            | AlterSourceTimestampInterval
900            | AlterNoop
901            | AlterSchemaRename
902            | AlterSchemaSwap
903            | AlterSecret
904            | AlterConnection
905            | AlterSource
906            | AlterSink
907            | AlterTableAddColumn
908            | AlterMaterializedViewApplyReplacement
909            | AlterNetworkPolicy => &[AlteredObject],
910            AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
911            AlterSetCluster => &[AlteredObject],
912            AlterRole => &[AlteredRole],
913            AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
914                &[AlteredSystemConfiguration]
915            }
916            Close => &[ClosedCursor],
917            PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
918            PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
919            PlanKind::Comment => &[ExecuteResponseKind::Comment],
920            CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
921            CreateConnection => &[CreatedConnection],
922            CreateDatabase => &[CreatedDatabase],
923            CreateSchema => &[CreatedSchema],
924            CreateRole => &[CreatedRole],
925            CreateCluster => &[CreatedCluster],
926            CreateClusterReplica => &[CreatedClusterReplica],
927            CreateSource | CreateSources => &[CreatedSource],
928            CreateSecret => &[CreatedSecret],
929            CreateSink => &[CreatedSink],
930            CreateTable => &[CreatedTable],
931            CreateView => &[CreatedView],
932            CreateMaterializedView => &[CreatedMaterializedView],
933            CreateIndex => &[CreatedIndex],
934            CreateType => &[CreatedType],
935            PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
936            CreateNetworkPolicy => &[CreatedNetworkPolicy],
937            Declare => &[DeclaredCursor],
938            DiscardTemp => &[DiscardedTemp],
939            DiscardAll => &[DiscardedAll],
940            DropObjects => &[DroppedObject],
941            DropOwned => &[DroppedOwned],
942            PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
943            ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
944            | ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
945                ExecuteResponseKind::CopyTo,
946                SendingRowsStreaming,
947                SendingRowsImmediate,
948            ],
949            Execute | ReadThenWrite => &[
950                Deleted,
951                Inserted,
952                SendingRowsStreaming,
953                SendingRowsImmediate,
954                Updated,
955            ],
956            PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
957            GrantPrivileges => &[GrantedPrivilege],
958            GrantRole => &[GrantedRole],
959            Insert => &[Inserted, SendingRowsImmediate],
960            PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
961            PlanKind::Raise => &[ExecuteResponseKind::Raised],
962            PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
963            RevokePrivileges => &[RevokedPrivilege],
964            RevokeRole => &[RevokedRole],
965            PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
966                &[ExecuteResponseKind::SetVariable]
967            }
968            PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
969            StartTransaction => &[StartedTransaction],
970            SideEffectingFunc => &[SendingRowsStreaming, SendingRowsImmediate],
971            ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
972        }
973    }
974}
975
976/// This implementation is meant to ensure that we maintain updated information
977/// about which types of `ExecuteResponse`s are permitted to be sent, which will
978/// be a function of which plan we're executing.
979impl Transmittable for ExecuteResponse {
980    type Allowed = ExecuteResponseKind;
981    fn to_allowed(&self) -> Self::Allowed {
982        ExecuteResponseKind::from(self)
983    }
984}